Coverage for src/manydataapi/parsers/ct1.py: 98%
153 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-02 08:38 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-02 08:38 +0200
1# -*- coding:utf-8 -*-
2"""
3@file
4@brief Parses format from a paying machine.
5"""
6import copy
7import datetime
8import os
9import pprint
12def dummy_ct1():
13 """
14 Returns a dummy file for format ``CT1``.
16 .. runpython::
17 :showcode:
19 from manydataapi.parsers.ct1 import dummy_ct1
20 name = dummy_ct1()
21 with open(name, "r") as f:
22 for i, line in enumerate(f):
23 print(i, [line])
24 if i > 10:
25 break
26 """
27 this = os.path.dirname(__file__)
28 data = os.path.join(this, "dummies", "DDMMYYXX.map")
29 if not os.path.exists(data):
30 raise FileNotFoundError(data) # pragma: no cover
31 return data
34def read_ct1(file_or_str, encoding='ascii', as_df=True):
35 """
36 Parses a file or a string which follows a specific
37 format called `CT1`.
38 See function @see fn dummy_ct1 for an example.
40 @param file_or_str file or string
41 @param encoding encoding
42 @param as_df returns the results as a dataframe
43 @return dataframe
45 Meaning of the columns:
47 * BASKET: basket id
48 * CAT: item is a quantity or a piece
49 * DATETIME: date and time
50 * FCODE, FCODE1, FCODE2: ?
51 * HT: price with no taxes
52 * INFO0, INFO1, INFO2, INFO3, INFO4, INFOL2, INFOL2_1,
53 INFOL2_2, INFOL2_3, INFOL2_4: ?
54 * IT1, IT10, IT2, IT4, IT6, IT8, IT9: ?
55 * ITCODE: item code, every item ending by X is an item
56 automatically added by the parser to fix the total
57 * ITMANUAL: manually change the total
58 * ITNAME: item name
59 * ITPRICE: price paid
60 * ITQU: quantity (kg or number of pieces)
61 * ITUNIT: price per unit
62 * NAME: vendor's name
63 * NB1, NB2: ?
64 * NEG: some item have a negative price
65 * PIECE: the quantity is a weight (False) or a number (True)
66 * PLACE, STREET, ZIPCODE: location
67 * TOTAL: total paid for the basket
68 * TVA: tax for an item
69 * TVAID: tax id
70 * TVARATE: tax rate
71 * ERROR: check this line later
72 """
73 if len(file_or_str) < 4000 and os.path.exists(file_or_str):
74 with open(file_or_str, encoding=encoding) as f:
75 content = f.read()
77 def _post_process(rec):
78 manual = [o for o in rec['data'] if o['ITMANUAL'] == '1']
79 if len(manual) > 1:
80 raise ValueError( # pragma: no cover
81 "More than one manual item.")
82 is_manual = len(manual) == 1
84 total = sum(obs['ITPRICE'] for obs in rec['data'])
85 if is_manual:
86 diff = record['TOTAL-'] - total
87 new_obs = {'CAT': 2.0, 'ERROR': 0.0,
88 'ITCODE': '30002X',
89 'ITMANUAL': '2',
90 'ITPRICE': diff,
91 'ITQU': 1,
92 'ITUNIT': abs(diff),
93 'NEG': 1 if diff < 0 else 0,
94 'PIECE': True, 'TVAID': manual[0]['TVAID']}
95 rec['data'].append(new_obs)
96 total = sum(obs['ITPRICE'] for obs in rec['data'])
98 rec['TOTAL'] = total
99 if abs(record['TOTAL-'] - rec['TOTAL']) >= 0.01:
100 raise ValueError( # pragma: no cover
101 "Mismatch total' {} != {}".format(
102 rec['TOTAL'], record['TOTAL-']))
103 if abs(record['TOTAL_'] - rec['TOTAL']) >= 0.01:
104 raise ValueError( # pragma: no cover
105 "Mismatch total' {} != {}".format(
106 rec['TOTAL'], record['TOTAL_']))
107 del record['TOTAL_']
108 del record['TOTAL-']
109 tva_d = {t['TVAID']: t for t in record['tva']}
110 if is_manual:
111 for item in record['data']:
112 if item['ITMANUAL'] != '2':
113 continue
114 tvaid = item['TVAID']
115 item['TVARATE'] = tva_d[tvaid]['RATE']
116 item['TVA'] = item['ITPRICE'] * item['TVARATE'] / 100
117 else:
118 for item in record['data']:
119 tvaid = item['TVAID']
120 item['TVARATE'] = tva_d[tvaid]['RATE']
121 item['TVA'] = item['ITPRICE'] * item['TVARATE'] / 100
122 if len(record["data"]) == 0:
123 raise ValueError("No record.") # pragma: no cover
125 records = []
126 record = None
127 first_line = None
128 content_ = content.split('\n')
129 for i, line in enumerate(content_):
130 line = line.strip('\r')
131 if line.startswith("\x02"):
132 if record is not None:
133 raise RuntimeError( # pragma: no cover
134 "Wrong format at line {}".format(i + 1))
135 record = dict(data=[], tva=[])
136 spl = line[1:].split("\x1d")
137 for ii, info in enumerate(spl):
138 record['INFO%d' % ii] = info
139 first_line = i
141 elif line.startswith('\x04'):
142 if record is None:
143 raise RuntimeError( # pragma: no cover
144 "Wrong format at line {}".format(i + 1))
145 line = line.strip("\x04\x05")
146 record['BASKET'] = line # pylint: disable=E1137
148 # verification
149 if len(record['data']) > 0: # pylint: disable=E1136
150 try:
151 _post_process(record)
152 except (KeyError, ValueError) as e: # pragma: no cover
153 raise ValueError("Unable to process one record line {}-{}\n{}\n-\n{}".format(
154 first_line + 1, i + 1, pprint.pformat(record),
155 "\n".join(content_[first_line: i + 1]))) from e
157 records.append(record) # pylint: disable=E1137
159 first_line = None
160 record = None
162 elif line.startswith('H\x1d'):
163 # description
164 if record is None:
165 raise RuntimeError( # pragma: no cover
166 "Wrong format at line {}".format(i + 1))
167 line = line[2:]
168 spl = line.split("\x1d")
169 names = ['NB1', 'NB2', 'NAME', 'PLACE', 'STREET', 'ZIPCODE',
170 'INFOL2', 'INFOL2_1', 'INFOL2_2', 'INFOL2_3', 'INFOL2_4']
171 for n, v in zip(names, spl):
172 record[n] = v # pylint: disable=E1137
174 elif line.startswith('L\x1d'):
175 # items
176 if record is None:
177 raise RuntimeError( # pragma: no cover
178 "Wrong format at line {}".format(i + 1))
179 line = line[2:]
180 spl = line.split("\x1d")
181 names = ['ITCODE', 'ITNAME', 'IT1', 'IT2', 'TVAID', 'IT4',
182 'ITUNIT', 'ITQU', 'CAT', 'ITPRICE',
183 'IT6', 'NEG', 'IT8', 'IT9', 'IT10', 'IT11', 'IT12']
184 obs = {'ITMANUAL': '0'}
185 for n, v in zip(names, spl):
186 if n in ['ITUNIT', 'ITQU', 'ITPRICE', 'NEG', 'CAT']:
187 obs[n] = float(v.replace(" ", ""))
188 else:
189 obs[n] = v
190 n = 'ITQU'
191 if obs['CAT'] == 2:
192 obs['PIECE'] = True
193 obs[n] = int(obs[n] * 1000)
194 else:
195 obs['PIECE'] = False
196 if obs['NEG']:
197 obs['ITUNIT'] *= -1
198 obs['ITPRICE'] *= -1
199 diff = abs(obs['ITQU'] * obs['ITUNIT'] - obs['ITPRICE'])
200 add_obs = None
201 if diff >= 0.01: # 1 cent
202 obs['ERROR'] = diff
203 if obs['ITQU'] == 0 or obs['ITUNIT'] == 0:
204 obs['ERROR'] = 0.
205 obs['ITPRICE'] = 0.
206 if obs['ITCODE'] == '30002':
207 obs['ITMANUAL'] = '1'
208 else:
209 obs['ITMANUAL'] = '?'
210 elif diff >= 0.02: # pragma: no cover
211 add_obs = obs.copy()
212 add_obs['ITCODE'] += 'X'
213 add_obs['ITPRICE'] = 0.
214 add_obs['NEG'] = 1 if diff < 0 else 0
215 add_obs['ITUNIT'] = abs(diff)
216 add_obs['ITQU'] = 1
217 add_obs['PIECE'] = True
218 add_obs['CAT'] = 1
219 record['data'].append(obs) # pylint: disable=E1136
220 if add_obs:
221 record['data'].append(add_obs) # pylint: disable=E1136
223 elif line.startswith('T\x1d9\x1d'):
224 # items
225 if record is None:
226 raise RuntimeError( # pragma: no cover
227 "Wrong format at line {}".format(i + 1))
228 line = line[4:]
229 spl = line.split("\x1d")
230 names = ['HT', 'TVA', 'TOTAL_']
231 tva = {}
232 for n, v in zip(names, spl):
233 record[n] = float(v.replace(" ", "")) # pylint: disable=E1137
235 elif line.startswith('T\x1d'):
236 # items
237 if record is None:
238 raise RuntimeError( # pragma: no cover
239 "Wrong format at line {}".format(i + 1))
240 line = line[2:]
241 spl = line.split("\x1d")
242 names = ['TVAID', 'RATE', 'HT', 'VALUE', 'TOTAL']
243 tva = {}
244 for n, v in zip(names, spl):
245 if n == 'TVAID':
246 tva[n] = v
247 else:
248 try:
249 tva[n] = float(v.replace(" ", ""))
250 except ValueError: # pragma: no cover
251 tva[n] = v
252 record['tva'].append(tva) # pylint: disable=E1136
254 elif line.startswith('F\x1d'):
255 # items
256 if record is None:
257 raise RuntimeError("Wrong format at line {}".format(i + 1))
258 line = line[2:]
259 spl = line.split("\x1d")
260 names = ['FCODE', 'TOTAL-', 'DATE', 'TIME', 'FCODE1', 'FCODE2']
261 vtime = None
262 vdate = None
263 for n, v in zip(names, spl):
264 if n in {'TOTAL-', }:
265 record[n] = float(v.replace(" ", "") # pylint: disable=E1137
266 ) # pylint: disable=E1137
267 elif n == "TIME":
268 vtime = v
269 elif n == "DATE":
270 vdate = v
271 else:
272 record[n] = v # pylint: disable=E1137
273 record["DATETIME"] = datetime.datetime.strptime( # pylint: disable=E1137
274 "{} {}".format(vdate, vtime), "%d.%m.%Y %H:%M:%S")
276 if as_df:
277 new_records = []
278 for record in records:
279 rec = copy.deepcopy(record)
280 del rec['tva']
281 data = rec['data']
282 del rec['data']
283 for d in data:
284 d.update(rec)
285 new_records.append(d)
286 import pandas
287 return pandas.DataFrame(new_records)
288 else:
289 return records