Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Saves and reads a :epkg:`dataframe` into a :epkg:`zip` file.
5"""
6import os
7from io import StringIO, BytesIO
8try:
9 from ujson import dumps
10except ImportError: # pragma: no cover
11 from json import dumps
12import ijson
15class JsonPerRowsStream:
16 """
17 Reads a :epkg:`json` streams and adds
18 ``,``, ``[``, ``]`` to convert a stream containing
19 one :epkg:`json` object per row into one single :epkg:`json` object.
20 It only implements method *readline*.
22 :param st: stream
23 """
25 def __init__(self, st):
26 self.st = st
27 self.begin = True
28 self.newline = False
29 self.end = True
31 def seek(self, offset):
32 """
33 Change the stream position to the given byte offset.
35 :param offset: offset, only 0 is implemented
36 """
37 self.st.seek(offset)
39 def readline(self, size=-1):
40 """
41 Reads a line, adds ``,``, ``[``, ``]`` if needed.
42 So the number of read characters is not recessarily
43 the requested one but could be greater.
44 """
45 text = self.st.readline(size)
46 if size == 0:
47 return text
48 if self.newline:
49 text = ',' + text
50 self.newline = False
51 elif self.begin:
52 text = '[' + text
53 self.begin = False
55 if text.endswith("\n"):
56 self.newline = True
57 return text
58 if len(text) == 0 or len(text) < size:
59 if self.end:
60 self.end = False
61 return text + ']'
62 return text
63 return text
65 def read(self, size=-1):
66 """
67 Reads characters, adds ``,``, ``[``, ``]`` if needed.
68 So the number of read characters is not recessarily
69 the requested one but could be greater.
70 """
71 text = self.st.read(size)
72 if isinstance(text, bytes):
73 cst = b"\n", b"\n,", b",", b"[", b"]"
74 else:
75 cst = "\n", "\n,", ",", "[", "]"
76 if size == 0:
77 return text
78 if len(text) > 1:
79 t1, t2 = text[:len(text) - 1], text[len(text) - 1:]
80 t1 = t1.replace(cst[0], cst[1])
81 text = t1 + t2
83 if self.newline:
84 text = cst[2] + text
85 self.newline = False
86 elif self.begin:
87 text = cst[3] + text
88 self.begin = False
90 if text.endswith(cst[0]):
91 self.newline = True
92 return text
93 if len(text) == 0 or len(text) < size:
94 if self.end:
95 self.end = False
96 return text + cst[4]
97 return text
98 return text
100 def getvalue(self):
101 """
102 Returns the whole stream content.
103 """
104 def byline():
105 line = self.readline()
106 while line:
107 yield line
108 line = self.readline()
109 return "".join(byline())
112def flatten_dictionary(dico, sep="_"):
113 """
114 Flattens a dictionary with nested structure to a dictionary with no
115 hierarchy.
117 :param dico: dictionary to flatten
118 :param sep: string to separate dictionary keys by
119 :return: flattened dictionary
121 Inspired from `flatten_json
122 <https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_.
123 """
124 flattened_dict = {}
126 def _flatten(obj, key):
127 if obj is None:
128 flattened_dict[key] = obj
129 elif isinstance(obj, dict):
130 for k, v in obj.items():
131 if not isinstance(k, str):
132 raise TypeError(
133 "All keys must a string.") # pragma: no cover
134 k2 = k if key is None else "{0}{1}{2}".format(key, sep, k)
135 _flatten(v, k2)
136 elif isinstance(obj, (list, set)):
137 for index, item in enumerate(obj):
138 k2 = k if key is None else "{0}{1}{2}".format(key, sep, index)
139 _flatten(item, k2)
140 else:
141 flattened_dict[key] = obj
143 _flatten(dico, None)
144 return flattened_dict
147def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fLOG=None):
148 """
149 Enumerates items from a :epkg:`JSON` file or string.
151 :param filename: filename or string or stream to parse
152 :param encoding: encoding
153 :param lines: one record per row
154 :param flatten: call @see fn flatten_dictionary
155 :param fLOG: logging function
156 :return: iterator on records at first level.
158 It assumes the syntax follows the format: ``[ {"id":1, ...}, {"id": 2, ...}, ...]``.
159 However, if option *lines* if true, the function considers that the
160 stream or file does have one record per row as follows:
162 {"id":1, ...}
163 {"id": 2, ...}
165 .. exref::
166 :title: Processes a json file by streaming.
168 The module :epkg:`ijson` can read a :epkg:`JSON` file by streaming.
169 This module is needed because a record can be written on multiple lines.
170 This function leverages it produces the following results.
172 .. runpython::
173 :showcode:
175 from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items
177 text_json = b'''
178 [
179 {
180 "glossary": {
181 "title": "example glossary",
182 "GlossDiv": {
183 "title": "S",
184 "GlossList": [{
185 "GlossEntry": {
186 "ID": "SGML",
187 "SortAs": "SGML",
188 "GlossTerm": "Standard Generalized Markup Language",
189 "Acronym": "SGML",
190 "Abbrev": "ISO 8879:1986",
191 "GlossDef": {
192 "para": "A meta-markup language, used to create markup languages such as DocBook.",
193 "GlossSeeAlso": ["GML", "XML"]
194 },
195 "GlossSee": "markup"
196 }
197 }]
198 }
199 }
200 },
201 {
202 "glossary": {
203 "title": "example glossary",
204 "GlossDiv": {
205 "title": "S",
206 "GlossList": {
207 "GlossEntry": [{
208 "ID": "SGML",
209 "SortAs": "SGML",
210 "GlossTerm": "Standard Generalized Markup Language",
211 "Acronym": "SGML",
212 "Abbrev": "ISO 8879:1986",
213 "GlossDef": {
214 "para": "A meta-markup language, used to create markup languages such as DocBook.",
215 "GlossSeeAlso": ["GML", "XML"]
216 },
217 "GlossSee": "markup"
218 }]
219 }
220 }
221 }
222 }
223 ]
224 '''
226 for item in enumerate_json_items(text_json):
227 print(item)
229 The parsed json must have an empty line at the end otherwise
230 the following exception is raised:
231 `ijson.common.IncompleteJSONError: `
232 `parse error: unallowed token at this point in JSON text`.
233 """
234 if isinstance(filename, str):
235 if "{" not in filename and os.path.exists(filename):
236 with open(filename, "r", encoding=encoding) as f:
237 for el in enumerate_json_items(
238 f, encoding=encoding, lines=lines,
239 flatten=flatten, fLOG=fLOG):
240 yield el
241 else:
242 st = StringIO(filename)
243 for el in enumerate_json_items(
244 st, encoding=encoding, lines=lines,
245 flatten=flatten, fLOG=fLOG):
246 yield el
247 elif isinstance(filename, bytes):
248 st = BytesIO(filename)
249 for el in enumerate_json_items(
250 st, encoding=encoding, lines=lines, flatten=flatten,
251 fLOG=fLOG):
252 yield el
253 elif lines:
254 for el in enumerate_json_items(
255 JsonPerRowsStream(filename),
256 encoding=encoding, lines=False, flatten=flatten, fLOG=fLOG):
257 yield el
258 else:
259 if hasattr(filename, 'seek'):
260 filename.seek(0)
261 parser = ijson.parse(filename)
262 current = None
263 curkey = None
264 stack = []
265 nbyield = 0
266 for i, (_, event, value) in enumerate(parser):
267 if i % 1000000 == 0 and fLOG is not None:
268 fLOG( # pragma: no cover
269 "[enumerate_json_items] i={0} yielded={1}"
270 "".format(i, nbyield))
271 if event == "start_array":
272 if curkey is None:
273 current = []
274 else:
275 if not isinstance(current, dict):
276 raise RuntimeError( # pragma: no cover
277 "Type issue {0}".format(type(current)))
278 c = []
279 current[curkey] = c # pylint: disable=E1137
280 current = c
281 curkey = None
282 stack.append(current)
283 elif event == "end_array":
284 stack.pop()
285 if len(stack) == 0:
286 # We should be done.
287 current = None
288 else:
289 current = stack[-1]
290 elif event == "start_map":
291 c = {}
292 if curkey is None:
293 if current is None:
294 current = []
295 current.append(c)
296 else:
297 current[curkey] = c # pylint: disable=E1137
298 stack.append(c)
299 current = c
300 curkey = None
301 elif event == "end_map":
302 stack.pop()
303 current = stack[-1]
304 if len(stack) == 1:
305 nbyield += 1
306 if flatten:
307 yield flatten_dictionary(current[-1])
308 else:
309 yield current[-1]
310 # We clear the memory.
311 current.clear()
312 elif event == "map_key":
313 curkey = value
314 elif event in {"string", "number", "boolean"}:
315 if curkey is None:
316 current.append(value)
317 else:
318 current[curkey] = value # pylint: disable=E1137
319 curkey = None
320 elif event == "null":
321 if curkey is None:
322 current.append(None)
323 else:
324 current[curkey] = None # pylint: disable=E1137
325 curkey = None
326 else:
327 raise ValueError("Unknown event '{0}'".format(
328 event)) # pragma: no cover
331class JsonIterator2Stream:
332 """
333 Transforms an iterator on :epkg:`JSON` items
334 into a stream which returns an items as a string every time
335 method *read* is called.
336 The iterator could be one returned by @see fn enumerate_json_items.
338 :param it: iterator
339 :param kwargs: arguments to :epkg:`*py:json:dumps`
341 .. exref::
342 :title: Reshape a json file
344 The function @see fn enumerate_json_items reads any
345 :epkg:`json` even if every record is split over
346 multiple lines. Class @see cl JsonIterator2Stream
347 mocks this iterator as a stream. Each row is a single item.
349 .. runpython::
350 :showcode:
352 from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream
354 text_json = b'''
355 [
356 {
357 "glossary": {
358 "title": "example glossary",
359 "GlossDiv": {
360 "title": "S",
361 "GlossList": [{
362 "GlossEntry": {
363 "ID": "SGML",
364 "SortAs": "SGML",
365 "GlossTerm": "Standard Generalized Markup Language",
366 "Acronym": "SGML",
367 "Abbrev": "ISO 8879:1986",
368 "GlossDef": {
369 "para": "A meta-markup language, used to create markup languages such as DocBook.",
370 "GlossSeeAlso": ["GML", "XML"]
371 },
372 "GlossSee": "markup"
373 }
374 }]
375 }
376 }
377 },
378 {
379 "glossary": {
380 "title": "example glossary",
381 "GlossDiv": {
382 "title": "S",
383 "GlossList": {
384 "GlossEntry": [{
385 "ID": "SGML",
386 "SortAs": "SGML",
387 "GlossTerm": "Standard Generalized Markup Language",
388 "Acronym": "SGML",
389 "Abbrev": "ISO 8879:1986",
390 "GlossDef": {
391 "para": "A meta-markup language, used to create markup languages such as DocBook.",
392 "GlossSeeAlso": ["GML", "XML"]
393 },
394 "GlossSee": "markup"
395 }]
396 }
397 }
398 }
399 }
400 ]
401 '''
403 for item in JsonIterator2Stream(lambda: enumerate_json_items(text_json)):
404 print(item)
406 .. versionchanged:: 0.3
407 The class takes a function which outputs an iterator and not an iterator.
408 `JsonIterator2Stream(enumerate_json_items(text_json))` needs to be rewritten
409 into JsonIterator2Stream(lambda: enumerate_json_items(text_json)).
410 """
412 def __init__(self, it, **kwargs):
413 self.it = it
414 self.kwargs = kwargs
415 self.it0 = it()
417 def seek(self, offset):
418 """
419 Change the stream position to the given byte offset.
421 :param offset: offset, only 0 is implemented
422 """
423 if offset != 0:
424 raise NotImplementedError(
425 "The iterator can only return at the beginning.")
426 self.it0 = self.it()
428 def write(self):
429 """
430 The class does not write.
431 """
432 raise NotImplementedError()
434 def read(self):
435 """
436 Reads the next item and returns it as a string.
437 """
438 try:
439 value = next(self.it0)
440 return dumps(value, **self.kwargs)
441 except StopIteration:
442 return None
444 def __iter__(self):
445 """
446 Iterates on each row. The behaviour is a bit tricky.
447 It is implemented to be swalled by :func:`pandas.read_json` which
448 uses :func:`itertools.islice` to go through the items.
449 It calls multiple times `__iter__` but does expect the
450 iterator to continue from where it stopped last time.
451 """
452 for value in self.it0:
453 yield dumps(value, **self.kwargs)