Coverage for src/manydataapi/parsers/ct1.py: 98%

153 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-02 08:38 +0200

1# -*- coding:utf-8 -*- 

2""" 

3@file 

4@brief Parses format from a paying machine. 

5""" 

6import copy 

7import datetime 

8import os 

9import pprint 

10 

11 

12def dummy_ct1(): 

13 """ 

14 Returns a dummy file for format ``CT1``. 

15 

16 .. runpython:: 

17 :showcode: 

18 

19 from manydataapi.parsers.ct1 import dummy_ct1 

20 name = dummy_ct1() 

21 with open(name, "r") as f: 

22 for i, line in enumerate(f): 

23 print(i, [line]) 

24 if i > 10: 

25 break 

26 """ 

27 this = os.path.dirname(__file__) 

28 data = os.path.join(this, "dummies", "DDMMYYXX.map") 

29 if not os.path.exists(data): 

30 raise FileNotFoundError(data) # pragma: no cover 

31 return data 

32 

33 

34def read_ct1(file_or_str, encoding='ascii', as_df=True): 

35 """ 

36 Parses a file or a string which follows a specific 

37 format called `CT1`. 

38 See function @see fn dummy_ct1 for an example. 

39 

40 @param file_or_str file or string 

41 @param encoding encoding 

42 @param as_df returns the results as a dataframe 

43 @return dataframe 

44 

45 Meaning of the columns: 

46 

47 * BASKET: basket id 

48 * CAT: item is a quantity or a piece 

49 * DATETIME: date and time 

50 * FCODE, FCODE1, FCODE2: ? 

51 * HT: price with no taxes 

52 * INFO0, INFO1, INFO2, INFO3, INFO4, INFOL2, INFOL2_1, 

53 INFOL2_2, INFOL2_3, INFOL2_4: ? 

54 * IT1, IT10, IT2, IT4, IT6, IT8, IT9: ? 

55 * ITCODE: item code, every item ending by X is an item 

56 automatically added by the parser to fix the total 

57 * ITMANUAL: manually change the total 

58 * ITNAME: item name 

59 * ITPRICE: price paid 

60 * ITQU: quantity (kg or number of pieces) 

61 * ITUNIT: price per unit 

62 * NAME: vendor's name 

63 * NB1, NB2: ? 

64 * NEG: some item have a negative price 

65 * PIECE: the quantity is a weight (False) or a number (True) 

66 * PLACE, STREET, ZIPCODE: location 

67 * TOTAL: total paid for the basket 

68 * TVA: tax for an item 

69 * TVAID: tax id 

70 * TVARATE: tax rate 

71 * ERROR: check this line later 

72 """ 

73 if len(file_or_str) < 4000 and os.path.exists(file_or_str): 

74 with open(file_or_str, encoding=encoding) as f: 

75 content = f.read() 

76 

77 def _post_process(rec): 

78 manual = [o for o in rec['data'] if o['ITMANUAL'] == '1'] 

79 if len(manual) > 1: 

80 raise ValueError( # pragma: no cover 

81 "More than one manual item.") 

82 is_manual = len(manual) == 1 

83 

84 total = sum(obs['ITPRICE'] for obs in rec['data']) 

85 if is_manual: 

86 diff = record['TOTAL-'] - total 

87 new_obs = {'CAT': 2.0, 'ERROR': 0.0, 

88 'ITCODE': '30002X', 

89 'ITMANUAL': '2', 

90 'ITPRICE': diff, 

91 'ITQU': 1, 

92 'ITUNIT': abs(diff), 

93 'NEG': 1 if diff < 0 else 0, 

94 'PIECE': True, 'TVAID': manual[0]['TVAID']} 

95 rec['data'].append(new_obs) 

96 total = sum(obs['ITPRICE'] for obs in rec['data']) 

97 

98 rec['TOTAL'] = total 

99 if abs(record['TOTAL-'] - rec['TOTAL']) >= 0.01: 

100 raise ValueError( # pragma: no cover 

101 "Mismatch total' {} != {}".format( 

102 rec['TOTAL'], record['TOTAL-'])) 

103 if abs(record['TOTAL_'] - rec['TOTAL']) >= 0.01: 

104 raise ValueError( # pragma: no cover 

105 "Mismatch total' {} != {}".format( 

106 rec['TOTAL'], record['TOTAL_'])) 

107 del record['TOTAL_'] 

108 del record['TOTAL-'] 

109 tva_d = {t['TVAID']: t for t in record['tva']} 

110 if is_manual: 

111 for item in record['data']: 

112 if item['ITMANUAL'] != '2': 

113 continue 

114 tvaid = item['TVAID'] 

115 item['TVARATE'] = tva_d[tvaid]['RATE'] 

116 item['TVA'] = item['ITPRICE'] * item['TVARATE'] / 100 

117 else: 

118 for item in record['data']: 

119 tvaid = item['TVAID'] 

120 item['TVARATE'] = tva_d[tvaid]['RATE'] 

121 item['TVA'] = item['ITPRICE'] * item['TVARATE'] / 100 

122 if len(record["data"]) == 0: 

123 raise ValueError("No record.") # pragma: no cover 

124 

125 records = [] 

126 record = None 

127 first_line = None 

128 content_ = content.split('\n') 

129 for i, line in enumerate(content_): 

130 line = line.strip('\r') 

131 if line.startswith("\x02"): 

132 if record is not None: 

133 raise RuntimeError( # pragma: no cover 

134 "Wrong format at line {}".format(i + 1)) 

135 record = dict(data=[], tva=[]) 

136 spl = line[1:].split("\x1d") 

137 for ii, info in enumerate(spl): 

138 record['INFO%d' % ii] = info 

139 first_line = i 

140 

141 elif line.startswith('\x04'): 

142 if record is None: 

143 raise RuntimeError( # pragma: no cover 

144 "Wrong format at line {}".format(i + 1)) 

145 line = line.strip("\x04\x05") 

146 record['BASKET'] = line # pylint: disable=E1137 

147 

148 # verification 

149 if len(record['data']) > 0: # pylint: disable=E1136 

150 try: 

151 _post_process(record) 

152 except (KeyError, ValueError) as e: # pragma: no cover 

153 raise ValueError("Unable to process one record line {}-{}\n{}\n-\n{}".format( 

154 first_line + 1, i + 1, pprint.pformat(record), 

155 "\n".join(content_[first_line: i + 1]))) from e 

156 

157 records.append(record) # pylint: disable=E1137 

158 

159 first_line = None 

160 record = None 

161 

162 elif line.startswith('H\x1d'): 

163 # description 

164 if record is None: 

165 raise RuntimeError( # pragma: no cover 

166 "Wrong format at line {}".format(i + 1)) 

167 line = line[2:] 

168 spl = line.split("\x1d") 

169 names = ['NB1', 'NB2', 'NAME', 'PLACE', 'STREET', 'ZIPCODE', 

170 'INFOL2', 'INFOL2_1', 'INFOL2_2', 'INFOL2_3', 'INFOL2_4'] 

171 for n, v in zip(names, spl): 

172 record[n] = v # pylint: disable=E1137 

173 

174 elif line.startswith('L\x1d'): 

175 # items 

176 if record is None: 

177 raise RuntimeError( # pragma: no cover 

178 "Wrong format at line {}".format(i + 1)) 

179 line = line[2:] 

180 spl = line.split("\x1d") 

181 names = ['ITCODE', 'ITNAME', 'IT1', 'IT2', 'TVAID', 'IT4', 

182 'ITUNIT', 'ITQU', 'CAT', 'ITPRICE', 

183 'IT6', 'NEG', 'IT8', 'IT9', 'IT10', 'IT11', 'IT12'] 

184 obs = {'ITMANUAL': '0'} 

185 for n, v in zip(names, spl): 

186 if n in ['ITUNIT', 'ITQU', 'ITPRICE', 'NEG', 'CAT']: 

187 obs[n] = float(v.replace(" ", "")) 

188 else: 

189 obs[n] = v 

190 n = 'ITQU' 

191 if obs['CAT'] == 2: 

192 obs['PIECE'] = True 

193 obs[n] = int(obs[n] * 1000) 

194 else: 

195 obs['PIECE'] = False 

196 if obs['NEG']: 

197 obs['ITUNIT'] *= -1 

198 obs['ITPRICE'] *= -1 

199 diff = abs(obs['ITQU'] * obs['ITUNIT'] - obs['ITPRICE']) 

200 add_obs = None 

201 if diff >= 0.01: # 1 cent 

202 obs['ERROR'] = diff 

203 if obs['ITQU'] == 0 or obs['ITUNIT'] == 0: 

204 obs['ERROR'] = 0. 

205 obs['ITPRICE'] = 0. 

206 if obs['ITCODE'] == '30002': 

207 obs['ITMANUAL'] = '1' 

208 else: 

209 obs['ITMANUAL'] = '?' 

210 elif diff >= 0.02: # pragma: no cover 

211 add_obs = obs.copy() 

212 add_obs['ITCODE'] += 'X' 

213 add_obs['ITPRICE'] = 0. 

214 add_obs['NEG'] = 1 if diff < 0 else 0 

215 add_obs['ITUNIT'] = abs(diff) 

216 add_obs['ITQU'] = 1 

217 add_obs['PIECE'] = True 

218 add_obs['CAT'] = 1 

219 record['data'].append(obs) # pylint: disable=E1136 

220 if add_obs: 

221 record['data'].append(add_obs) # pylint: disable=E1136 

222 

223 elif line.startswith('T\x1d9\x1d'): 

224 # items 

225 if record is None: 

226 raise RuntimeError( # pragma: no cover 

227 "Wrong format at line {}".format(i + 1)) 

228 line = line[4:] 

229 spl = line.split("\x1d") 

230 names = ['HT', 'TVA', 'TOTAL_'] 

231 tva = {} 

232 for n, v in zip(names, spl): 

233 record[n] = float(v.replace(" ", "")) # pylint: disable=E1137 

234 

235 elif line.startswith('T\x1d'): 

236 # items 

237 if record is None: 

238 raise RuntimeError( # pragma: no cover 

239 "Wrong format at line {}".format(i + 1)) 

240 line = line[2:] 

241 spl = line.split("\x1d") 

242 names = ['TVAID', 'RATE', 'HT', 'VALUE', 'TOTAL'] 

243 tva = {} 

244 for n, v in zip(names, spl): 

245 if n == 'TVAID': 

246 tva[n] = v 

247 else: 

248 try: 

249 tva[n] = float(v.replace(" ", "")) 

250 except ValueError: # pragma: no cover 

251 tva[n] = v 

252 record['tva'].append(tva) # pylint: disable=E1136 

253 

254 elif line.startswith('F\x1d'): 

255 # items 

256 if record is None: 

257 raise RuntimeError("Wrong format at line {}".format(i + 1)) 

258 line = line[2:] 

259 spl = line.split("\x1d") 

260 names = ['FCODE', 'TOTAL-', 'DATE', 'TIME', 'FCODE1', 'FCODE2'] 

261 vtime = None 

262 vdate = None 

263 for n, v in zip(names, spl): 

264 if n in {'TOTAL-', }: 

265 record[n] = float(v.replace(" ", "") # pylint: disable=E1137 

266 ) # pylint: disable=E1137 

267 elif n == "TIME": 

268 vtime = v 

269 elif n == "DATE": 

270 vdate = v 

271 else: 

272 record[n] = v # pylint: disable=E1137 

273 record["DATETIME"] = datetime.datetime.strptime( # pylint: disable=E1137 

274 "{} {}".format(vdate, vtime), "%d.%m.%Y %H:%M:%S") 

275 

276 if as_df: 

277 new_records = [] 

278 for record in records: 

279 rec = copy.deepcopy(record) 

280 del rec['tva'] 

281 data = rec['data'] 

282 del rec['data'] 

283 for d in data: 

284 d.update(rec) 

285 new_records.append(d) 

286 import pandas 

287 return pandas.DataFrame(new_records) 

288 else: 

289 return records