Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Defines a streaming dataframe.
5"""
6import pickle
7import os
8from io import StringIO, BytesIO
9from inspect import isfunction
10import numpy
11import numpy.random as nrandom
12import pandas
13from pandas.testing import assert_frame_equal
14from pandas.io.json import json_normalize
15from .dataframe_split import sklearn_train_test_split, sklearn_train_test_split_streaming
16from .dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream
19class StreamingDataFrameSchemaError(Exception):
20 """
21 Reveals an issue with inconsistant schemas.
22 """
23 pass
26class StreamingDataFrame:
27 """
28 Defines a streaming dataframe.
29 The goal is to reduce the memory footprint.
30 The class takes a function which creates an iterator
31 on :epkg:`dataframe`. We assume this function can
32 be called multiple time. As a matter of fact, the
33 function is called every time the class needs to walk
34 through the stream with the following loop:
36 ::
38 for df in self: # self is a StreamingDataFrame
39 # ...
41 The constructor cannot receive an iterator otherwise
42 this class would be able to walk through the data
43 only once. The main reason is it is impossible to
44 :epkg:`*py:pickle` (or :epkg:`dill`)
45 an iterator: it cannot be replicated.
46 Instead, the class takes a function which generates
47 an iterator on :epkg:`DataFrame`.
48 Most of the methods returns either a :epkg:`DataFrame`
49 either a @see cl StreamingDataFrame. In the second case,
50 methods can be chained.
52 By default, the object checks that the schema remains
53 the same between two chunks. This can be disabled
54 by setting *check_schema=False* in the constructor.
56 The user should expect the data to remain stable.
57 Every loop should produce the same data. However,
58 in some situations, it is more efficient not to keep
59 that constraints. Draw a random @see me sample
60 is one of these cases.
62 :param iter_creation: function which creates an iterator or an
63 instance of @see cl StreamingDataFrame
64 :param check_schema: checks that the schema is the same
65 for every :epkg:`dataframe`
66 :param stable: indicates if the :epkg:`dataframe` remains the same
67 whenever it is walked through
68 """
70 def __init__(self, iter_creation, check_schema=True, stable=True):
71 self._delete_ = []
72 if isinstance(iter_creation, (pandas.DataFrame, dict,
73 numpy.ndarray, str)):
74 raise TypeError(
75 "Unexpected type %r for iter_creation. It must "
76 "be an iterator." % type(iter_creation))
77 if isinstance(iter_creation, StreamingDataFrame):
78 self.iter_creation = iter_creation.iter_creation
79 self.stable = iter_creation.stable
80 else:
81 self.iter_creation = iter_creation
82 self.stable = stable
83 self.check_schema = check_schema
85 def is_stable(self, do_check=False, n=10):
86 """
87 Tells if the :epkg:`dataframe` is supposed to be stable.
89 @param do_check do not trust the value sent to the constructor
90 @param n number of rows used to check the stability,
91 None for all rows
92 @return boolean
94 *do_check=True* means the methods checks the first
95 *n* rows remains the same for two iterations.
96 """
97 if do_check:
98 for i, (a, b) in enumerate(zip(self, self)):
99 if n is not None and i >= n:
100 break
101 try:
102 assert_frame_equal(a, b)
103 except AssertionError: # pragma: no cover
104 return False
105 return True
106 else:
107 return self.stable
109 def get_kwargs(self):
110 """
111 Returns the parameters used to call the constructor.
112 """
113 return dict(check_schema=self.check_schema)
115 def train_test_split(self, path_or_buf=None, export_method="to_csv",
116 names=None, streaming=True, partitions=None,
117 **kwargs):
118 """
119 Randomly splits a :epkg:`dataframe` into smaller pieces.
120 The function returns streams of file names.
121 It chooses one of the options from module
122 :mod:`dataframe_split <pandas_streaming.df.dataframe_split>`.
124 @param path_or_buf a string, a list of strings or buffers, if it is a
125 string, it must contain ``{}`` like ``partition{}.txt``,
126 if None, the function returns strings.
127 @param export_method method used to store the partitions, by default
128 :epkg:`pandas:DataFrame:to_csv`, additional parameters
129 will be given to that function
130 @param names partitions names, by default ``('train', 'test')``
131 @param kwargs parameters for the export function and
132 :epkg:`sklearn:model_selection:train_test_split`.
133 @param streaming the function switches to a
134 streaming version of the algorithm.
135 @param partitions splitting partitions
136 @return outputs of the exports functions or two
137 @see cl StreamingDataFrame if path_or_buf is None.
139 The streaming version of this algorithm is implemented by function
140 @see fn sklearn_train_test_split_streaming. Its documentation
141 indicates the limitation of the streaming version and gives some
142 insights about the additional parameters.
143 """
144 if streaming:
145 if partitions is not None:
146 if len(partitions) != 2:
147 raise NotImplementedError( # pragma: no cover
148 "Only train and test split is allowed, *partitions* "
149 "must be of length 2.")
150 kwargs = kwargs.copy()
151 kwargs['train_size'] = partitions[0]
152 kwargs['test_size'] = partitions[1]
153 return sklearn_train_test_split_streaming(self, **kwargs)
154 return sklearn_train_test_split(self, path_or_buf=path_or_buf,
155 export_method=export_method,
156 names=names, **kwargs)
158 @staticmethod
159 def _process_kwargs(kwargs):
160 """
161 Filters out parameters for the constructor of this class.
162 """
163 kw = {}
164 for k in ['check_schema']:
165 if k in kwargs:
166 kw[k] = kwargs[k]
167 del kwargs[k]
168 return kw
170 @staticmethod
171 def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDataFrame':
172 """
173 Reads a :epkg:`json` file or buffer as an iterator
174 on :epkg:`DataFrame`. The signature is the same as
175 :epkg:`pandas:read_json`. The important parameter is
176 *chunksize* which defines the number
177 of rows to parse in a single bloc
178 and it must be defined to return an iterator.
179 If *lines* is True, the function falls back into
180 :epkg:`pandas:read_json`, otherwise it used
181 @see fn enumerate_json_items. If *lines* is ``'stream'``,
182 *enumerate_json_items* is called with parameter
183 ``lines=True``.
184 Parameter *flatten* uses the trick described at
185 `Flattening JSON objects in Python
186 <https://towardsdatascience.com/flattening-json-objects-in-python-f5343c794b10>`_.
187 Examples:
189 .. runpython::
190 :showcode:
192 from io import BytesIO
193 from pandas_streaming.df import StreamingDataFrame
195 data = b'''{"a": 1, "b": 2}
196 {"a": 3, "b": 4}'''
197 it = StreamingDataFrame.read_json(BytesIO(data), lines=True)
198 dfs = list(it)
199 print(dfs)
201 .. runpython::
202 :showcode:
204 from io import BytesIO
205 from pandas_streaming.df import StreamingDataFrame
207 data = b'''[{"a": 1,
208 "b": 2},
209 {"a": 3,
210 "b": 4}]'''
212 it = StreamingDataFrame.read_json(BytesIO(data))
213 dfs = list(it)
214 print(dfs)
216 .. index:: IncompleteJSONError
218 The parsed json must have an empty line at the end otherwise
219 the following exception is raised:
220 `ijson.common.IncompleteJSONError: `
221 `parse error: unallowed token at this point in JSON text`.
222 """
223 if not isinstance(chunksize, int) or chunksize <= 0:
224 raise ValueError( # pragma: no cover
225 'chunksize must be a positive integer')
226 kwargs_create = StreamingDataFrame._process_kwargs(kwargs)
228 if isinstance(args[0], (list, dict)):
229 if flatten:
230 return StreamingDataFrame.read_df(
231 json_normalize(args[0]), **kwargs_create)
232 return StreamingDataFrame.read_df(args[0], **kwargs_create)
234 if kwargs.get('lines', None) == 'stream':
235 del kwargs['lines']
237 def localf(a0=args[0]):
238 if hasattr(a0, 'seek'):
239 a0.seek(0)
240 return enumerate_json_items(
241 a0, encoding=kwargs.get('encoding', None), lines=True,
242 flatten=flatten)
244 st = JsonIterator2Stream(localf)
245 args = args[1:]
247 if chunksize is None:
248 return StreamingDataFrame(
249 lambda: pandas.read_json(
250 st, *args, chunksize=None, lines=True, **kwargs),
251 **kwargs_create)
253 def fct1(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()):
254 st.seek(0)
255 for r in pandas.read_json(
256 st, *args, chunksize=chunksize, nrows=chunksize,
257 lines=True, **kw):
258 yield r
260 return StreamingDataFrame(fct1, **kwargs_create)
262 if kwargs.get('lines', False):
263 if flatten:
264 raise NotImplementedError(
265 "flatten==True is implemented with option lines='stream'")
266 if chunksize is None:
267 return StreamingDataFrame(
268 lambda: pandas.read_json(*args, chunksize=None, **kwargs),
269 **kwargs_create)
271 def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()):
272 for r in pandas.read_json(
273 *args, chunksize=chunksize, nrows=chunksize, **kw):
274 yield r
275 return StreamingDataFrame(fct2, **kwargs_create)
277 st = JsonIterator2Stream(
278 lambda a0=args[0]: enumerate_json_items(
279 a0, encoding=kwargs.get('encoding', None), flatten=flatten))
280 args = args[1:]
281 if 'lines' in kwargs:
282 del kwargs['lines']
284 if chunksize is None:
285 return StreamingDataFrame(
286 lambda: pandas.read_json(
287 st, *args, chunksize=chunksize, lines=True, **kwargs),
288 **kwargs_create)
290 def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()):
291 if hasattr(st, 'seek'):
292 st.seek(0)
293 for r in pandas.read_json(
294 st, *args, chunksize=chunksize, nrows=chunksize,
295 lines=True, **kw):
296 yield r
297 return StreamingDataFrame(fct3, **kwargs_create)
299 @staticmethod
300 def read_csv(*args, **kwargs) -> 'StreamingDataFrame':
301 """
302 Reads a :epkg:`csv` file or buffer
303 as an iterator on :epkg:`DataFrame`.
304 The signature is the same as :epkg:`pandas:read_csv`.
305 The important parameter is *chunksize* which defines the number
306 of rows to parse in a single bloc. If not specified,
307 it will be equal to 100000.
308 """
309 if not kwargs.get('iterator', True):
310 raise ValueError("If specified, iterator must be True.")
311 if not kwargs.get('chunksize', 100000):
312 raise ValueError("If specified, chunksize must not be None.")
313 kwargs_create = StreamingDataFrame._process_kwargs(kwargs)
314 kwargs['iterator'] = True
315 if 'chunksize' not in kwargs:
316 kwargs['chunksize'] = 100000
317 return StreamingDataFrame(lambda: pandas.read_csv(*args, **kwargs), **kwargs_create)
319 @staticmethod
320 def read_str(text, **kwargs) -> 'StreamingDataFrame':
321 """
322 Reads a :epkg:`DataFrame` as an iterator on :epkg:`DataFrame`.
323 The signature is the same as :epkg:`pandas:read_csv`.
324 The important parameter is *chunksize* which defines the number
325 of rows to parse in a single bloc.
326 """
327 if not kwargs.get('iterator', True):
328 raise ValueError("If specified, iterator must be True.")
329 if not kwargs.get('chunksize', 100000):
330 raise ValueError("If specified, chunksize must not be None.")
331 kwargs_create = StreamingDataFrame._process_kwargs(kwargs)
332 kwargs['iterator'] = True
333 if 'chunksize' not in kwargs:
334 kwargs['chunksize'] = 100000
335 if isinstance(text, str):
336 buffer = StringIO(text)
337 else:
338 buffer = BytesIO(text)
339 return StreamingDataFrame(
340 lambda: pandas.read_csv(buffer, **kwargs), **kwargs_create)
342 @staticmethod
343 def read_df(df, chunksize=None, check_schema=True) -> 'StreamingDataFrame':
344 """
345 Splits a :epkg:`DataFrame` into small chunks mostly for
346 unit testing purposes.
348 @param df :epkg:`DataFrame`
349 @param chunksize number rows per chunks (// 10 by default)
350 @param check_schema check schema between two iterations
351 @return iterator on @see cl StreamingDataFrame
352 """
353 if chunksize is None:
354 if hasattr(df, 'shape'):
355 chunksize = df.shape[0]
356 else:
357 raise NotImplementedError(
358 "Cannot retrieve size to infer chunksize for type={0}"
359 ".".format(type(df)))
361 if hasattr(df, 'shape'):
362 size = df.shape[0]
363 else:
364 raise NotImplementedError( # pragma: no cover
365 "Cannot retrieve size for type={0}.".format(type(df)))
367 def local_iterator():
368 "local iterator"
369 for i in range(0, size, chunksize):
370 end = min(size, i + chunksize)
371 yield df[i:end].copy()
372 return StreamingDataFrame(local_iterator, check_schema=check_schema)
374 def __iter__(self):
375 """
376 Iterator on a large file with a sliding window.
377 Each windows is a :epkg:`DataFrame`.
378 The method stores a copy of the initial iterator
379 and restores it after the end of the iterations.
380 If *check_schema* was enabled when calling the constructor,
381 the method checks that every :epkg:`DataFrame`
382 follows the same schema as the first chunck.
384 Even with a big chunk size, it might happen
385 that consecutive chunks might detect different type
386 for one particular column. An error message shows up
387 saying ``Column types are different after row``
388 with more information about the column which failed.
389 In that case, :epkg:`pandas:DataFrame.read_csv` can overwrite
390 the type on one column by specifying
391 ``dtype={column_name: new_type}``. It frequently happens
392 when a string column has many missing values.
393 """
394 iters = self.iter_creation()
395 sch = None
396 rows = 0
397 for it in iters:
398 if sch is None:
399 sch = (list(it.columns), list(it.dtypes))
400 elif self.check_schema:
401 if list(it.columns) != sch[0]: # pylint: disable=E1136
402 raise StreamingDataFrameSchemaError( # pragma: no cover
403 'Column names are different after row {0}\nFirst chunk: {1}'
404 '\nCurrent chunk: {2}'.format(
405 rows, sch[0], list(it.columns))) # pylint: disable=E1136
406 if list(it.dtypes) != sch[1]: # pylint: disable=E1136
407 errdf = pandas.DataFrame(
408 dict(names=sch[0], schema1=sch[1], # pylint: disable=E1136
409 schema2=list(it.dtypes))) # pylint: disable=E1136
410 tdf = StringIO()
411 errdf['diff'] = errdf['schema2'] != errdf['schema1']
412 errdf = errdf[errdf['diff']]
413 errdf.to_csv(tdf, sep=",", index=False)
414 raise StreamingDataFrameSchemaError(
415 'Column types are different after row {0}. You may use option '
416 'dtype={{"column_name": str}} to force the type on this column.'
417 '\n---\n{1}'.format(rows, tdf.getvalue()))
419 rows += it.shape[0]
420 yield it
422 @property
423 def shape(self):
424 """
425 This is the kind of operations you do not want to do
426 when a file is large because it goes through the whole
427 stream just to get the number of rows.
428 """
429 nl, nc = 0, 0
430 for it in self:
431 nc = max(it.shape[1], nc)
432 nl += it.shape[0]
433 return nl, nc
435 @property
436 def columns(self):
437 """
438 See :epkg:`pandas:DataFrame:columns`.
439 """
440 for it in self:
441 return it.columns
442 # The dataframe is empty.
443 return []
445 @property
446 def dtypes(self):
447 """
448 See :epkg:`pandas:DataFrame:dtypes`.
449 """
450 for it in self:
451 return it.dtypes
453 def to_csv(self, path_or_buf=None, **kwargs) -> 'StreamingDataFrame':
454 """
455 Saves the :epkg:`DataFrame` into string.
456 See :epkg:`pandas:DataFrame.to_csv`.
457 """
458 if path_or_buf is None:
459 st = StringIO()
460 close = False
461 elif isinstance(path_or_buf, str):
462 st = open( # pylint: disable=R1732
463 path_or_buf, "w", encoding=kwargs.get('encoding'))
464 close = True
465 else:
466 st = path_or_buf
467 close = False
469 for df in self:
470 df.to_csv(st, **kwargs)
471 kwargs['header'] = False
473 if close:
474 st.close()
475 if isinstance(st, StringIO):
476 return st.getvalue()
477 return path_or_buf
479 def to_dataframe(self) -> pandas.DataFrame:
480 """
481 Converts everything into a single :epkg:`DataFrame`.
482 """
483 return pandas.concat(self, axis=0)
485 def to_df(self) -> pandas.DataFrame:
486 """
487 Converts everything into a single :epkg:`DataFrame`.
488 """
489 return self.to_dataframe()
491 def iterrows(self):
492 """
493 See :epkg:`pandas:DataFrame:iterrows`.
494 """
495 for df in self:
496 for it in df.iterrows():
497 yield it
499 def head(self, n=5) -> pandas.DataFrame:
500 """
501 Returns the first rows as a :epkg:`DataFrame`.
502 """
503 st = []
504 total = 0
505 for df in self:
506 h = df.head(n=n)
507 total += h.shape[0]
508 st.append(h)
509 if total >= n:
510 break
511 n -= h.shape[0]
512 if len(st) == 1:
513 return st[0]
514 if len(st) == 0:
515 return None
516 return pandas.concat(st, axis=0)
518 def tail(self, n=5) -> pandas.DataFrame:
519 """
520 Returns the last rows as a :epkg:`DataFrame`.
521 The size of chunks must be greater than ``n`` to
522 get ``n`` lines. This method is not efficient
523 because the whole dataset must be walked through.
524 """
525 for df in self:
526 h = df.tail(n=n)
527 return h
529 def where(self, *args, **kwargs) -> 'StreamingDataFrame':
530 """
531 Applies :epkg:`pandas:DataFrame:where`.
532 *inplace* must be False.
533 This function returns a @see cl StreamingDataFrame.
534 """
535 kwargs['inplace'] = False
536 return StreamingDataFrame(
537 lambda: map(lambda df: df.where(*args, **kwargs), self),
538 **self.get_kwargs())
540 def sample(self, reservoir=False, cache=False, **kwargs) -> 'StreamingDataFrame':
541 """
542 See :epkg:`pandas:DataFrame:sample`.
543 Only *frac* is available, otherwise choose
544 @see me reservoir_sampling.
545 This function returns a @see cl StreamingDataFrame.
547 @param reservoir use `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_
548 @param cache cache the sample
549 @param kwargs additional parameters for :epkg:`pandas:DataFrame:sample`
551 If *cache* is True, the sample is cached (assuming it holds in memory).
552 The second time an iterator walks through the
553 """
554 if reservoir or 'n' in kwargs:
555 if 'frac' in kwargs:
556 raise ValueError(
557 'frac cannot be specified for reservoir sampling.')
558 return self._reservoir_sampling(cache=cache, n=kwargs['n'], random_state=kwargs.get('random_state'))
559 if cache:
560 sdf = self.sample(cache=False, **kwargs)
561 df = sdf.to_df()
562 return StreamingDataFrame.read_df(df, chunksize=df.shape[0])
563 return StreamingDataFrame(lambda: map(lambda df: df.sample(**kwargs), self), **self.get_kwargs(), stable=False)
565 def _reservoir_sampling(self, cache=True, n=1000, random_state=None) -> 'StreamingDataFrame':
566 """
567 Uses the `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_
568 algorithm to draw a random sample with exactly *n* samples.
570 @param cache cache the sample
571 @param n number of observations to keep
572 @param random_state sets the random_state
573 @return @see cl StreamingDataFrame
575 .. warning::
576 The sample is split by chunks of size 1000.
577 This parameter is not yet exposed.
578 """
579 if not cache:
580 raise ValueError(
581 "cache=False is not available for reservoir sampling.")
582 indices = []
583 seen = 0
584 for i, df in enumerate(self):
585 for ir, _ in enumerate(df.iterrows()):
586 seen += 1
587 if len(indices) < n:
588 indices.append((i, ir))
589 else:
590 x = nrandom.random() # pylint: disable=E1101
591 if x * n < (seen - n):
592 k = nrandom.randint(0, len(indices) - 1)
593 indices[k] = (i, ir) # pylint: disable=E1126
594 indices = set(indices)
596 def reservoir_iterate(sdf, indices, chunksize):
597 "iterator"
598 buffer = []
599 for i, df in enumerate(self):
600 for ir, row in enumerate(df.iterrows()):
601 if (i, ir) in indices:
602 buffer.append(row)
603 if len(buffer) >= chunksize:
604 yield pandas.DataFrame(buffer)
605 buffer.clear()
606 if len(buffer) > 0:
607 yield pandas.DataFrame(buffer)
609 return StreamingDataFrame(
610 lambda: reservoir_iterate(sdf=self, indices=indices, chunksize=1000))
612 def apply(self, *args, **kwargs) -> 'StreamingDataFrame':
613 """
614 Applies :epkg:`pandas:DataFrame:apply`.
615 This function returns a @see cl StreamingDataFrame.
616 """
617 return StreamingDataFrame(
618 lambda: map(lambda df: df.apply(*args, **kwargs), self),
619 **self.get_kwargs())
621 def applymap(self, *args, **kwargs) -> 'StreamingDataFrame':
622 """
623 Applies :epkg:`pandas:DataFrame:applymap`.
624 This function returns a @see cl StreamingDataFrame.
625 """
626 return StreamingDataFrame(
627 lambda: map(lambda df: df.applymap(*args, **kwargs), self),
628 **self.get_kwargs())
630 def merge(self, right, **kwargs) -> 'StreamingDataFrame':
631 """
632 Merges two @see cl StreamingDataFrame and returns @see cl StreamingDataFrame.
633 *right* can be either a @see cl StreamingDataFrame or simply
634 a :epkg:`pandas:DataFrame`. It calls :epkg:`pandas:DataFrame:merge` in
635 a double loop, loop on *self*, loop on *right*.
636 """
637 if isinstance(right, pandas.DataFrame):
638 return self.merge(StreamingDataFrame.read_df(right, chunksize=right.shape[0]), **kwargs)
640 def iterator_merge(sdf1, sdf2, **kw):
641 "iterate on dataframes"
642 for df1 in sdf1:
643 for df2 in sdf2:
644 df = df1.merge(df2, **kw)
645 yield df
647 return StreamingDataFrame(
648 lambda: iterator_merge(self, right, **kwargs), **self.get_kwargs())
650 def concat(self, others, axis=0) -> 'StreamingDataFrame':
651 """
652 Concatenates :epkg:`dataframes`. The function ensures all :epkg:`pandas:DataFrame`
653 or @see cl StreamingDataFrame share the same columns (name and type).
654 Otherwise, the function fails as it cannot guess the schema without
655 walking through all :epkg:`dataframes`.
657 :param others: list, enumeration, :epkg:`pandas:DataFrame`
658 :param axis: concatenate by rows (0) or by columns (1)
659 :return: @see cl StreamingDataFrame
660 """
661 if axis == 1:
662 return self._concath(others)
663 if axis == 0:
664 return self._concatv(others)
665 raise ValueError("axis must be 0 or 1") # pragma: no cover
667 def _concath(self, others):
668 if not isinstance(others, list):
669 others = [others]
671 def iterateh(self, others):
672 cols = tuple([self] + others)
673 for dfs in zip(*cols):
674 nrows = [_.shape[0] for _ in dfs]
675 if min(nrows) != max(nrows):
676 raise RuntimeError(
677 "StreamingDataFram cannot merge DataFrame with different size or chunksize")
678 yield pandas.concat(list(dfs), axis=1)
680 return StreamingDataFrame(lambda: iterateh(self, others), **self.get_kwargs())
682 def _concatv(self, others):
684 def iterator_concat(this, lothers):
685 "iterator on dataframes"
686 columns = None
687 dtypes = None
688 for df in this:
689 if columns is None:
690 columns = df.columns
691 dtypes = df.dtypes
692 yield df
693 for obj in lothers:
694 check = True
695 for i, df in enumerate(obj):
696 if check:
697 if list(columns) != list(df.columns):
698 raise ValueError(
699 "Frame others[{0}] do not have the same column names or the same order.".format(i))
700 if list(dtypes) != list(df.dtypes):
701 raise ValueError(
702 "Frame others[{0}] do not have the same column types.".format(i))
703 check = False
704 yield df
706 if isinstance(others, pandas.DataFrame):
707 others = [others]
708 elif isinstance(others, StreamingDataFrame):
709 others = [others]
711 def change_type(obj):
712 "change column type"
713 if isinstance(obj, pandas.DataFrame):
714 return StreamingDataFrame.read_df(obj, obj.shape[0])
715 else:
716 return obj
718 others = list(map(change_type, others))
719 return StreamingDataFrame(
720 lambda: iterator_concat(self, others), **self.get_kwargs())
722 def groupby(self, by=None, lambda_agg=None, lambda_agg_agg=None,
723 in_memory=True, **kwargs) -> pandas.DataFrame:
724 """
725 Implements the streaming :epkg:`pandas:DataFrame:groupby`.
726 We assume the result holds in memory. The out-of-memory is
727 not implemented yet.
729 @param by see :epkg:`pandas:DataFrame:groupby`
730 @param in_memory in-memory algorithm
731 @param lambda_agg aggregation function, *sum* by default
732 @param lambda_agg_agg to aggregate the aggregations, *sum* by default
733 @param kwargs additional parameters for :epkg:`pandas:DataFrame:groupby`
734 @return :epkg:`pandas:DataFrame`
736 As the input @see cl StreamingDataFrame does not necessarily hold
737 in memory, the aggregation must be done at every iteration.
738 There are two levels of aggregation: one to reduce every iterated
739 :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`.
740 This second one is always a **sum**.
741 As a consequence, this function should not compute any *mean* or *count*,
742 only *sum* because we do not know the size of each iterated
743 :epkg:`dataframe`. To compute an average, sum and weights must be
744 aggregated.
746 Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default.
747 It could also be ``lambda gr: gr.max()`` or
748 ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()``
749 as it would lead to incoherent results.
751 .. exref::
752 :title: StreamingDataFrame and groupby
753 :tag: streaming
755 Here is an example which shows how to write a simple *groupby*
756 with :epkg:`pandas` and @see cl StreamingDataFrame.
758 .. runpython::
759 :showcode:
761 from pandas import DataFrame
762 from pandas_streaming.df import StreamingDataFrame
764 df = DataFrame(dict(A=[3, 4, 3], B=[5,6, 7]))
765 sdf = StreamingDataFrame.read_df(df)
767 # The following:
768 print(sdf.groupby("A", lambda gr: gr.sum()))
770 # Is equivalent to:
771 print(df.groupby("A").sum())
772 """
773 if not in_memory:
774 raise NotImplementedError(
775 "Out-of-memory group by is not implemented.")
776 if lambda_agg is None:
777 def lambda_agg_(gr):
778 "sum"
779 return gr.sum()
780 lambda_agg = lambda_agg_
781 if lambda_agg_agg is None:
782 def lambda_agg_agg_(gr):
783 "sum"
784 return gr.sum()
785 lambda_agg_agg = lambda_agg_agg_
786 ckw = kwargs.copy()
787 ckw["as_index"] = False
789 agg = []
790 for df in self:
791 gr = df.groupby(by=by, **ckw)
792 agg.append(lambda_agg(gr))
793 conc = pandas.concat(agg, sort=False)
794 return lambda_agg_agg(conc.groupby(by=by, **kwargs))
796 def groupby_streaming(self, by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True,
797 strategy='cum', **kwargs) -> pandas.DataFrame:
798 """
799 Implements the streaming :epkg:`pandas:DataFrame:groupby`.
800 We assume the result holds in memory. The out-of-memory is
801 not implemented yet.
803 :param by: see :epkg:`pandas:DataFrame:groupby`
804 :param in_memory: in-memory algorithm
805 :param lambda_agg: aggregation function, *sum* by default
806 :param lambda_agg_agg: to aggregate the aggregations, *sum* by default
807 :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby`
808 :param strategy: ``'cum'``, or ``'streaming'``, see below
809 :return: :epkg:`pandas:DataFrame`
811 As the input @see cl StreamingDataFrame does not necessarily hold
812 in memory, the aggregation must be done at every iteration.
813 There are two levels of aggregation: one to reduce every iterated
814 :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`.
815 This second one is always a **sum**.
816 As a consequence, this function should not compute any *mean* or *count*,
817 only *sum* because we do not know the size of each iterated
818 :epkg:`dataframe`. To compute an average, sum and weights must be
819 aggregated.
821 Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default.
822 It could also be ``lambda gr: gr.max()`` or
823 ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()``
824 as it would lead to incoherent results.
826 Parameter *strategy* allows three scenarios.
827 First one if ``strategy is None`` goes through
828 the whole datasets to produce a final :epkg:`DataFrame`.
829 Second if ``strategy=='cum'`` returns a
830 @see cl StreamingDataFrame, each iteration produces
831 the current status of the *group by*. Last case,
832 ``strategy=='streaming'`` produces :epkg:`DataFrame`
833 which must be concatenated into a single :epkg:`DataFrame`
834 and grouped again to get the results.
836 .. exref::
837 :title: StreamingDataFrame and groupby
838 :tag: streaming
840 Here is an example which shows how to write a simple *groupby*
841 with :epkg:`pandas` and @see cl StreamingDataFrame.
843 .. runpython::
844 :showcode:
846 from pandas import DataFrame
847 from pandas_streaming.df import StreamingDataFrame
848 from pandas_streaming.data import dummy_streaming_dataframe
850 df20 = dummy_streaming_dataframe(20).to_dataframe()
851 df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0)
852 sdf20 = StreamingDataFrame.read_df(df20, chunksize=5)
853 sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(),
854 strategy='cum', as_index=False)
855 for gr in sgr:
856 print()
857 print(gr)
858 """
859 if not in_memory:
860 raise NotImplementedError(
861 "Out-of-memory group by is not implemented.")
862 if lambda_agg is None:
863 def lambda_agg_(gr):
864 "sum"
865 return gr.sum()
866 lambda_agg = lambda_agg_
867 if lambda_agg_agg is None:
868 def lambda_agg_agg_(gr):
869 "sum"
870 return gr.sum()
871 lambda_agg_agg = lambda_agg_agg_
872 ckw = kwargs.copy()
873 ckw["as_index"] = False
875 if strategy == 'cum':
876 def iterate_cum():
877 agg = None
878 for df in self:
879 gr = df.groupby(by=by, **ckw)
880 gragg = lambda_agg(gr)
881 if agg is None:
882 yield lambda_agg_agg(gragg.groupby(by=by, **kwargs))
883 agg = gragg
884 else:
885 lagg = pandas.concat([agg, gragg], sort=False)
886 yield lambda_agg_agg(lagg.groupby(by=by, **kwargs))
887 agg = lagg
888 return StreamingDataFrame(lambda: iterate_cum(), **self.get_kwargs())
890 if strategy == 'streaming':
891 def iterate_streaming():
892 for df in self:
893 gr = df.groupby(by=by, **ckw)
894 gragg = lambda_agg(gr)
895 yield lambda_agg(gragg.groupby(by=by, **kwargs))
896 return StreamingDataFrame(lambda: iterate_streaming(), **self.get_kwargs())
898 raise ValueError( # pragma: no cover
899 "Unknown strategy '{0}'".format(strategy))
901 def ensure_dtype(self, df, dtypes):
902 """
903 Ensures the :epkg:`dataframe` *df* has types indicated in dtypes.
904 Changes it if not.
906 :param df: dataframe
907 :param dtypes: list of types
908 :return: updated?
909 """
910 ch = False
911 cols = df.columns
912 for i, (has, exp) in enumerate(zip(df.dtypes, dtypes)):
913 if has != exp:
914 name = cols[i]
915 df[name] = df[name].astype(exp)
916 ch = True
917 return ch
919 def __getitem__(self, *args):
920 """
921 Implements some of the functionalities :epkg:`pandas`
922 offers for the operator ``[]``.
923 """
924 if len(args) != 1:
925 raise NotImplementedError("Only a list of columns is supported.")
926 cols = args[0]
927 if isinstance(cols, str):
928 # One column.
929 iter_creation = self.iter_creation
931 def iterate_col():
932 "iterate on one column"
933 one_col = [cols]
934 for df in iter_creation():
935 yield df[one_col]
936 return StreamingSeries(iterate_col, **self.get_kwargs())
938 if not isinstance(cols, list):
939 raise NotImplementedError("Only a list of columns is supported.")
941 def iterate_cols(sdf):
942 """Iterate on columns."""
943 for df in sdf:
944 yield df[cols]
946 return StreamingDataFrame(lambda: iterate_cols(self), **self.get_kwargs())
948 def __setitem__(self, index, value):
949 """
950 Limited set of operators are supported.
951 """
952 if not isinstance(index, str):
953 raise ValueError(
954 "Only column affected are supported but index=%r." % index)
955 if isinstance(value, (int, float, numpy.number, str)):
956 # Is is equivalent to add_column.
957 iter_creation = self.iter_creation
959 def iterate_fct():
960 "iterate on rows"
961 iters = iter_creation()
962 for df in iters:
963 dfc = df.copy()
964 dfc[index] = value
965 yield dfc
967 self.iter_creation = iterate_fct
969 elif isinstance(value, StreamingSeries):
970 iter_creation = self.iter_creation
972 def iterate_fct():
973 "iterate on rows"
974 iters = iter_creation()
975 for df, dfs in zip(iters, value):
976 if df.shape[0] != dfs.shape[0]:
977 raise RuntimeError(
978 "Chunksize or shape are different when "
979 "iterating on two StreamDataFrame at the same "
980 "time: %r != %r." % (df.shape[0], dfs.shape[0]))
981 dfc = df.copy()
982 dfc[index] = dfs
983 yield dfc
985 self.iter_creation = iterate_fct
986 else:
987 raise NotImplementedError(
988 "Not implemented for type(index)=%r and type(value)=%r." % (
989 type(index), type(value)))
991 def add_column(self, col, value):
992 """
993 Implements some of the functionalities :epkg:`pandas`
994 offers for the operator ``[]``.
996 @param col new column
997 @param value @see cl StreamingDataFrame or a lambda function
998 @return @see cl StreamingDataFrame
1000 ..note::
1002 If value is a @see cl StreamingDataFrame,
1003 *chunksize* must be the same for both.
1005 .. exref::
1006 :title: Add a new column to a StreamingDataFrame
1007 :tag: streaming
1009 .. runpython::
1010 :showcode:
1012 from pandas import DataFrame
1013 from pandas_streaming.df import StreamingDataFrame
1015 df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"]))
1016 sdf = StreamingDataFrame.read_df(df)
1017 sdf2 = sdf.add_column("d", lambda row: int(1))
1018 print(sdf2.to_dataframe())
1020 sdf2 = sdf.add_column("d", lambda row: int(1))
1021 print(sdf2.to_dataframe())
1023 """
1024 if not isinstance(col, str):
1025 raise NotImplementedError(
1026 "Only a column as a string is supported.")
1028 if isfunction(value):
1029 def iterate_fct(self, value, col):
1030 "iterate on rows"
1031 for df in self:
1032 dfc = df.copy()
1033 dfc.insert(dfc.shape[1], col, dfc.apply(value, axis=1))
1034 yield dfc
1036 return StreamingDataFrame(lambda: iterate_fct(self, value, col), **self.get_kwargs())
1038 if isinstance(value, (pandas.Series, pandas.DataFrame, StreamingDataFrame)):
1039 raise NotImplementedError(
1040 "Unable set a new column based on a datadframe.")
1042 def iterate_cst(self, value, col):
1043 "iterate on rows"
1044 for df in self:
1045 dfc = df.copy()
1046 dfc[col] = value
1047 yield dfc
1049 return StreamingDataFrame(
1050 lambda: iterate_cst(self, value, col), **self.get_kwargs())
1052 def fillna(self, **kwargs):
1053 """
1054 Replaces the missing values, calls
1055 :epkg:`pandas:DataFrame:fillna`.
1057 @param kwargs see :epkg:`pandas:DataFrame:fillna`
1058 @return @see cl StreamingDataFrame
1060 .. warning::
1061 The function does not check what happens at the
1062 limit of every chunk of data. Anything but a constant value
1063 will probably have an inconsistent behaviour.
1064 """
1066 def iterate_na(self, **kwargs):
1067 "iterate on rows"
1068 if kwargs.get('inplace', True):
1069 kwargs['inplace'] = True
1070 for df in self:
1071 df.fillna(**kwargs)
1072 yield df
1073 else:
1074 for df in self:
1075 yield df.fillna(**kwargs)
1077 return StreamingDataFrame(
1078 lambda: iterate_na(self, **kwargs), **self.get_kwargs())
1080 def describe(self, percentiles=None, include=None, exclude=None,
1081 datetime_is_numeric=False):
1082 """
1083 Calls :epkg:`pandas:DataFrame:describe` on every piece
1084 of the datasets. *percentiles* are not really accurate
1085 but just an indication.
1087 :param percentiles: see :epkg:`pandas:DataFrame:describe`
1088 :param include: see :epkg:`pandas:DataFrame:describe`
1089 :param exclude: see :epkg:`pandas:DataFrame:describe`
1090 :param datetime_is_numeric: see :epkg:`pandas:DataFrame:describe`
1091 :return: :epkg:`pandas:DataFrame:describe`
1092 """
1093 merged = None
1094 stack = []
1095 notper = ['count', 'mean', 'std']
1096 for df in self:
1097 desc = df.describe(
1098 percentiles=percentiles, include=include, exclude=exclude,
1099 datetime_is_numeric=datetime_is_numeric)
1100 count = desc.loc['count', :]
1101 rows = [name for name in desc.index if name not in notper]
1102 stack.append(desc.loc[rows, :])
1103 if merged is None:
1104 merged = desc
1105 merged.loc['std', :] = (
1106 merged.loc['std', :] ** 2 + merged.loc['mean', :] ** 2) * count
1107 merged.loc['mean', :] *= count
1108 else:
1109 merged.loc['count', :] += desc.loc['count', :]
1110 merged.loc['mean', :] += desc.loc['mean', :] * count
1111 merged.loc['std', :] += (
1112 desc.loc['std', :] ** 2 + desc.loc['mean', :] ** 2) * count
1113 merged.loc['max', :] = numpy.maximum(
1114 merged.loc['max', :], desc.loc['max', :])
1115 merged.loc['min', :] = numpy.maximum(
1116 merged.loc['min', :], desc.loc['min', :])
1117 merged.loc['mean', :] /= merged.loc['count', :]
1118 merged.loc['std', :] = (
1119 merged.loc['std', :] / merged.loc['count', :] -
1120 merged.loc['mean', :] ** 2) ** 0.5
1121 values = pandas.concat(stack)
1122 summary = values.describe(percentiles=percentiles,
1123 datetime_is_numeric=datetime_is_numeric)
1124 merged = merged.loc[notper, :]
1125 rows = [name for name in summary.index if name not in notper]
1126 summary = summary.loc[rows, :]
1127 return pandas.concat([merged, summary])
1129 def sort_values(self, by, axis=0, ascending=True, kind='quicksort',
1130 na_position='last',
1131 temp_file='_pandas_streaming_sort_values_'):
1132 """
1133 Sorts the streaming dataframe by values.
1135 :param by: one column
1136 :param ascending: order
1137 :param kind: see :meth:`pandas.DataFrame.sort_values`
1138 :param na_position: see :meth:`pandas.DataFrame.sort_values`
1139 :param temp_file: sorting a whole database is impossible
1140 without storing intermediate results on disk
1141 unless it can fit into the memory, but in that case,
1142 it is easier to convert the streaming database into
1143 a dataframe and sort it
1144 :return: streaming database
1145 """
1146 if not isinstance(by, str):
1147 raise NotImplementedError(
1148 "Only one column can be used to sort not %r." % by)
1149 keys = {}
1150 nans = []
1151 indices = []
1152 with open(temp_file, 'wb') as f:
1153 for df in self:
1154 dfs = df.sort_values(by, ascending=ascending, kind=kind,
1155 na_position=na_position)
1156 for tu in dfs[by]:
1157 if isinstance(tu, float) and numpy.isnan(tu):
1158 nans.append(len(indices))
1159 else:
1160 if tu not in keys:
1161 keys[tu] = []
1162 keys[tu].append(len(indices))
1163 indices.append(f.tell())
1164 st = BytesIO()
1165 pickle.dump(dfs, st)
1166 f.write(st.getvalue())
1168 indices.append(f.tell())
1170 values = list(keys.items())
1171 values.sort(reverse=not ascending)
1173 def iterate():
1175 with open(temp_file, 'rb') as f:
1177 if na_position == 'first':
1178 for p in nans:
1179 f.seek(indices[p])
1180 length = indices[p + 1] - indices[p]
1181 pkl = f.read(length)
1182 dfs = pickle.load(BytesIO(pkl))
1183 sub = dfs[numpy.isnan(dfs[by])]
1184 yield sub
1186 for key, positions in values:
1187 for p in positions:
1188 f.seek(indices[p])
1189 length = indices[p + 1] - indices[p]
1190 pkl = f.read(length)
1191 dfs = pickle.load(BytesIO(pkl))
1192 sub = dfs[dfs[by] == key]
1193 yield sub
1195 if na_position == 'last':
1196 for p in nans:
1197 f.seek(indices[p])
1198 length = indices[p + 1] - indices[p]
1199 pkl = f.read(length)
1200 dfs = pickle.load(BytesIO(pkl))
1201 sub = dfs[numpy.isnan(dfs[by])]
1202 yield sub
1204 res = StreamingDataFrame(
1205 lambda: iterate(), **self.get_kwargs())
1206 res._delete_.append(lambda: os.remove(temp_file))
1207 return res
1209 def __del__(self):
1210 """
1211 Calls every function in `_delete_`.
1212 """
1213 for f in self._delete_:
1214 f()
1217class StreamingSeries(StreamingDataFrame):
1218 """
1219 Seens as a @see cl StreamingDataFrame of one column.
1220 """
1222 def __init__(self, iter_creation, check_schema=True, stable=True):
1223 StreamingDataFrame.__init__(
1224 self, iter_creation, check_schema=check_schema, stable=stable)
1225 if len(self.columns) != 1:
1226 raise RuntimeError(
1227 "A series can contain only one column not %r." % len(self.columns))
1229 def apply(self, *args, **kwargs) -> 'StreamingDataFrame':
1230 """
1231 Applies :epkg:`pandas:Series:apply`.
1232 This function returns a @see cl StreamingSeries.
1233 """
1234 return StreamingSeries(
1235 lambda: map(lambda df: df.apply(*args, **kwargs), self),
1236 **self.get_kwargs())
1238 def __add__(self, value):
1239 """
1240 Does an addition on every value hoping that has a meaning.
1242 :param value: any value which makes sense
1243 :return: a new series
1244 """
1245 def iterate():
1246 for df in self:
1247 yield df + value
1249 return StreamingSeries(iterate, **self.get_kwargs())