Coverage for pandas_streaming/df/dataframe_io_helpers.py: 92%

173 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 14:15 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Saves and reads a :epkg:`dataframe` into a :epkg:`zip` file. 

5""" 

6import os 

7from io import StringIO, BytesIO 

8try: 

9 from ujson import dumps 

10except ImportError: # pragma: no cover 

11 from json import dumps 

12import ijson 

13 

14 

15class JsonPerRowsStream: 

16 """ 

17 Reads a :epkg:`json` streams and adds 

18 ``,``, ``[``, ``]`` to convert a stream containing 

19 one :epkg:`json` object per row into one single :epkg:`json` object. 

20 It only implements method *readline*. 

21 

22 :param st: stream 

23 """ 

24 

25 def __init__(self, st): 

26 self.st = st 

27 self.begin = True 

28 self.newline = False 

29 self.end = True 

30 

31 def seek(self, offset): 

32 """ 

33 Change the stream position to the given byte offset. 

34 

35 :param offset: offset, only 0 is implemented 

36 """ 

37 self.st.seek(offset) 

38 

39 def readline(self, size=-1): 

40 """ 

41 Reads a line, adds ``,``, ``[``, ``]`` if needed. 

42 So the number of read characters is not recessarily 

43 the requested one but could be greater. 

44 """ 

45 text = self.st.readline(size) 

46 if size == 0: 

47 return text 

48 if self.newline: 

49 text = ',' + text 

50 self.newline = False 

51 elif self.begin: 

52 text = '[' + text 

53 self.begin = False 

54 

55 if text.endswith("\n"): 

56 self.newline = True 

57 return text 

58 if len(text) == 0 or len(text) < size: 

59 if self.end: 

60 self.end = False 

61 return text + ']' 

62 return text 

63 return text 

64 

65 def read(self, size=-1): 

66 """ 

67 Reads characters, adds ``,``, ``[``, ``]`` if needed. 

68 So the number of read characters is not recessarily 

69 the requested one but could be greater. 

70 """ 

71 text = self.st.read(size) 

72 if isinstance(text, bytes): 

73 cst = b"\n", b"\n,", b",", b"[", b"]" 

74 else: 

75 cst = "\n", "\n,", ",", "[", "]" 

76 if size == 0: 

77 return text 

78 if len(text) > 1: 

79 t1, t2 = text[:len(text) - 1], text[len(text) - 1:] 

80 t1 = t1.replace(cst[0], cst[1]) 

81 text = t1 + t2 

82 

83 if self.newline: 

84 text = cst[2] + text 

85 self.newline = False 

86 elif self.begin: 

87 text = cst[3] + text 

88 self.begin = False 

89 

90 if text.endswith(cst[0]): 

91 self.newline = True 

92 return text 

93 if len(text) == 0 or len(text) < size: 

94 if self.end: 

95 self.end = False 

96 return text + cst[4] 

97 return text 

98 return text 

99 

100 def getvalue(self): 

101 """ 

102 Returns the whole stream content. 

103 """ 

104 def byline(): 

105 line = self.readline() 

106 while line: 

107 yield line 

108 line = self.readline() 

109 return "".join(byline()) 

110 

111 

112def flatten_dictionary(dico, sep="_"): 

113 """ 

114 Flattens a dictionary with nested structure to a dictionary with no 

115 hierarchy. 

116 

117 :param dico: dictionary to flatten 

118 :param sep: string to separate dictionary keys by 

119 :return: flattened dictionary 

120 

121 Inspired from `flatten_json 

122 <https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_. 

123 """ 

124 flattened_dict = {} 

125 

126 def _flatten(obj, key): 

127 if obj is None: 

128 flattened_dict[key] = obj 

129 elif isinstance(obj, dict): 

130 for k, v in obj.items(): 

131 if not isinstance(k, str): 

132 raise TypeError( 

133 "All keys must a string.") # pragma: no cover 

134 k2 = k if key is None else f"{key}{sep}{k}" 

135 _flatten(v, k2) 

136 elif isinstance(obj, (list, set)): 

137 for index, item in enumerate(obj): 

138 k2 = k if key is None else f"{key}{sep}{index}" 

139 _flatten(item, k2) 

140 else: 

141 flattened_dict[key] = obj 

142 

143 _flatten(dico, None) 

144 return flattened_dict 

145 

146 

147def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fLOG=None): 

148 """ 

149 Enumerates items from a :epkg:`JSON` file or string. 

150 

151 :param filename: filename or string or stream to parse 

152 :param encoding: encoding 

153 :param lines: one record per row 

154 :param flatten: call @see fn flatten_dictionary 

155 :param fLOG: logging function 

156 :return: iterator on records at first level. 

157 

158 It assumes the syntax follows the format: ``[ {"id":1, ...}, {"id": 2, ...}, ...]``. 

159 However, if option *lines* if true, the function considers that the 

160 stream or file does have one record per row as follows: 

161 

162 {"id":1, ...} 

163 {"id": 2, ...} 

164 

165 .. exref:: 

166 :title: Processes a json file by streaming. 

167 

168 The module :epkg:`ijson` can read a :epkg:`JSON` file by streaming. 

169 This module is needed because a record can be written on multiple lines. 

170 This function leverages it produces the following results. 

171 

172 .. runpython:: 

173 :showcode: 

174 

175 from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items 

176 

177 text_json = b''' 

178 [ 

179 { 

180 "glossary": { 

181 "title": "example glossary", 

182 "GlossDiv": { 

183 "title": "S", 

184 "GlossList": [{ 

185 "GlossEntry": { 

186 "ID": "SGML", 

187 "SortAs": "SGML", 

188 "GlossTerm": "Standard Generalized Markup Language", 

189 "Acronym": "SGML", 

190 "Abbrev": "ISO 8879:1986", 

191 "GlossDef": { 

192 "para": "A meta-markup language, used to create markup languages such as DocBook.", 

193 "GlossSeeAlso": ["GML", "XML"] 

194 }, 

195 "GlossSee": "markup" 

196 } 

197 }] 

198 } 

199 } 

200 }, 

201 { 

202 "glossary": { 

203 "title": "example glossary", 

204 "GlossDiv": { 

205 "title": "S", 

206 "GlossList": { 

207 "GlossEntry": [{ 

208 "ID": "SGML", 

209 "SortAs": "SGML", 

210 "GlossTerm": "Standard Generalized Markup Language", 

211 "Acronym": "SGML", 

212 "Abbrev": "ISO 8879:1986", 

213 "GlossDef": { 

214 "para": "A meta-markup language, used to create markup languages such as DocBook.", 

215 "GlossSeeAlso": ["GML", "XML"] 

216 }, 

217 "GlossSee": "markup" 

218 }] 

219 } 

220 } 

221 } 

222 } 

223 ] 

224 ''' 

225 

226 for item in enumerate_json_items(text_json): 

227 print(item) 

228 

229 The parsed json must have an empty line at the end otherwise 

230 the following exception is raised: 

231 `ijson.common.IncompleteJSONError: ` 

232 `parse error: unallowed token at this point in JSON text`. 

233 """ 

234 if isinstance(filename, str): 

235 if "{" not in filename and os.path.exists(filename): 

236 with open(filename, "r", encoding=encoding) as f: 

237 for el in enumerate_json_items( 

238 f, encoding=encoding, lines=lines, 

239 flatten=flatten, fLOG=fLOG): 

240 yield el 

241 else: 

242 st = StringIO(filename) 

243 for el in enumerate_json_items( 

244 st, encoding=encoding, lines=lines, 

245 flatten=flatten, fLOG=fLOG): 

246 yield el 

247 elif isinstance(filename, bytes): 

248 st = BytesIO(filename) 

249 for el in enumerate_json_items( 

250 st, encoding=encoding, lines=lines, flatten=flatten, 

251 fLOG=fLOG): 

252 yield el 

253 elif lines: 

254 for el in enumerate_json_items( 

255 JsonPerRowsStream(filename), 

256 encoding=encoding, lines=False, flatten=flatten, fLOG=fLOG): 

257 yield el 

258 else: 

259 if hasattr(filename, 'seek'): 

260 filename.seek(0) 

261 parser = ijson.parse(filename) 

262 current = None 

263 curkey = None 

264 stack = [] 

265 nbyield = 0 

266 for i, (_, event, value) in enumerate(parser): 

267 if i % 1000000 == 0 and fLOG is not None: 

268 fLOG( # pragma: no cover 

269 f"[enumerate_json_items] i={i} yielded={nbyield}") 

270 if event == "start_array": 

271 if curkey is None: 

272 current = [] 

273 else: 

274 if not isinstance(current, dict): 

275 raise RuntimeError( # pragma: no cover 

276 f"Type issue {type(current)}") 

277 c = [] 

278 current[curkey] = c # pylint: disable=E1137 

279 current = c 

280 curkey = None 

281 stack.append(current) 

282 elif event == "end_array": 

283 stack.pop() 

284 if len(stack) == 0: 

285 # We should be done. 

286 current = None 

287 else: 

288 current = stack[-1] 

289 elif event == "start_map": 

290 c = {} 

291 if curkey is None: 

292 if current is None: 

293 current = [] 

294 current.append(c) 

295 else: 

296 current[curkey] = c # pylint: disable=E1137 

297 stack.append(c) 

298 current = c 

299 curkey = None 

300 elif event == "end_map": 

301 stack.pop() 

302 current = stack[-1] 

303 if len(stack) == 1: 

304 nbyield += 1 

305 if flatten: 

306 yield flatten_dictionary(current[-1]) 

307 else: 

308 yield current[-1] 

309 # We clear the memory. 

310 current.clear() 

311 elif event == "map_key": 

312 curkey = value 

313 elif event in {"string", "number", "boolean"}: 

314 if curkey is None: 

315 current.append(value) 

316 else: 

317 current[curkey] = value # pylint: disable=E1137 

318 curkey = None 

319 elif event == "null": 

320 if curkey is None: 

321 current.append(None) 

322 else: 

323 current[curkey] = None # pylint: disable=E1137 

324 curkey = None 

325 else: 

326 raise ValueError( 

327 f"Unknown event '{event}'") # pragma: no cover 

328 

329 

330class JsonIterator2Stream: 

331 """ 

332 Transforms an iterator on :epkg:`JSON` items 

333 into a stream which returns an items as a string every time 

334 method *read* is called. 

335 The iterator could be one returned by @see fn enumerate_json_items. 

336 

337 :param it: iterator 

338 :param kwargs: arguments to :epkg:`*py:json:dumps` 

339 

340 .. exref:: 

341 :title: Reshape a json file 

342 

343 The function @see fn enumerate_json_items reads any 

344 :epkg:`json` even if every record is split over 

345 multiple lines. Class @see cl JsonIterator2Stream 

346 mocks this iterator as a stream. Each row is a single item. 

347 

348 .. runpython:: 

349 :showcode: 

350 

351 from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream 

352 

353 text_json = b''' 

354 [ 

355 { 

356 "glossary": { 

357 "title": "example glossary", 

358 "GlossDiv": { 

359 "title": "S", 

360 "GlossList": [{ 

361 "GlossEntry": { 

362 "ID": "SGML", 

363 "SortAs": "SGML", 

364 "GlossTerm": "Standard Generalized Markup Language", 

365 "Acronym": "SGML", 

366 "Abbrev": "ISO 8879:1986", 

367 "GlossDef": { 

368 "para": "A meta-markup language, used to create markup languages such as DocBook.", 

369 "GlossSeeAlso": ["GML", "XML"] 

370 }, 

371 "GlossSee": "markup" 

372 } 

373 }] 

374 } 

375 } 

376 }, 

377 { 

378 "glossary": { 

379 "title": "example glossary", 

380 "GlossDiv": { 

381 "title": "S", 

382 "GlossList": { 

383 "GlossEntry": [{ 

384 "ID": "SGML", 

385 "SortAs": "SGML", 

386 "GlossTerm": "Standard Generalized Markup Language", 

387 "Acronym": "SGML", 

388 "Abbrev": "ISO 8879:1986", 

389 "GlossDef": { 

390 "para": "A meta-markup language, used to create markup languages such as DocBook.", 

391 "GlossSeeAlso": ["GML", "XML"] 

392 }, 

393 "GlossSee": "markup" 

394 }] 

395 } 

396 } 

397 } 

398 } 

399 ] 

400 ''' 

401 

402 for item in JsonIterator2Stream(lambda: enumerate_json_items(text_json)): 

403 print(item) 

404 

405 .. versionchanged:: 0.3 

406 The class takes a function which outputs an iterator and not an iterator. 

407 `JsonIterator2Stream(enumerate_json_items(text_json))` needs to be rewritten 

408 into JsonIterator2Stream(lambda: enumerate_json_items(text_json)). 

409 """ 

410 

411 def __init__(self, it, **kwargs): 

412 self.it = it 

413 self.kwargs = kwargs 

414 self.it0 = it() 

415 

416 def seek(self, offset): 

417 """ 

418 Change the stream position to the given byte offset. 

419 

420 :param offset: offset, only 0 is implemented 

421 """ 

422 if offset != 0: 

423 raise NotImplementedError( 

424 "The iterator can only return at the beginning.") 

425 self.it0 = self.it() 

426 

427 def write(self): 

428 """ 

429 The class does not write. 

430 """ 

431 raise NotImplementedError() 

432 

433 def read(self): 

434 """ 

435 Reads the next item and returns it as a string. 

436 """ 

437 try: 

438 value = next(self.it0) 

439 return dumps(value, **self.kwargs) 

440 except StopIteration: 

441 return None 

442 

443 def __iter__(self): 

444 """ 

445 Iterates on each row. The behaviour is a bit tricky. 

446 It is implemented to be swalled by :func:`pandas.read_json` which 

447 uses :func:`itertools.islice` to go through the items. 

448 It calls multiple times `__iter__` but does expect the 

449 iterator to continue from where it stopped last time. 

450 """ 

451 for value in self.it0: 

452 yield dumps(value, **self.kwargs)