Coverage for pandas_streaming/df/dataframe.py: 96%
538 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 14:15 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 14:15 +0200
1# -*- coding: utf-8 -*-
2# pylint: disable=W0102
3"""
4@file
5@brief Defines a streaming dataframe.
6"""
7import pickle
8import os
9from io import StringIO, BytesIO
10from inspect import isfunction
11import numpy
12import numpy.random as nrandom
13import pandas
14from pandas.testing import assert_frame_equal
15try:
16 from pandas import json_normalize
17except ImportError:
18 from pandas.io.json import json_normalize
19from .dataframe_split import sklearn_train_test_split, sklearn_train_test_split_streaming
20from .dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream
23class StreamingDataFrameSchemaError(Exception):
24 """
25 Reveals an issue with inconsistant schemas.
26 """
27 pass
30class StreamingDataFrame:
31 """
32 Defines a streaming dataframe.
33 The goal is to reduce the memory footprint.
34 The class takes a function which creates an iterator
35 on :epkg:`dataframe`. We assume this function can
36 be called multiple time. As a matter of fact, the
37 function is called every time the class needs to walk
38 through the stream with the following loop:
40 ::
42 for df in self: # self is a StreamingDataFrame
43 # ...
45 The constructor cannot receive an iterator otherwise
46 this class would be able to walk through the data
47 only once. The main reason is it is impossible to
48 :epkg:`*py:pickle` (or :epkg:`dill`)
49 an iterator: it cannot be replicated.
50 Instead, the class takes a function which generates
51 an iterator on :epkg:`DataFrame`.
52 Most of the methods returns either a :epkg:`DataFrame`
53 either a @see cl StreamingDataFrame. In the second case,
54 methods can be chained.
56 By default, the object checks that the schema remains
57 the same between two chunks. This can be disabled
58 by setting *check_schema=False* in the constructor.
60 The user should expect the data to remain stable.
61 Every loop should produce the same data. However,
62 in some situations, it is more efficient not to keep
63 that constraints. Draw a random @see me sample
64 is one of these cases.
66 :param iter_creation: function which creates an iterator or an
67 instance of @see cl StreamingDataFrame
68 :param check_schema: checks that the schema is the same
69 for every :epkg:`dataframe`
70 :param stable: indicates if the :epkg:`dataframe` remains the same
71 whenever it is walked through
72 """
74 def __init__(self, iter_creation, check_schema=True, stable=True):
75 self._delete_ = []
76 if isinstance(iter_creation, (pandas.DataFrame, dict,
77 numpy.ndarray, str)):
78 raise TypeError(
79 "Unexpected type %r for iter_creation. It must "
80 "be an iterator." % type(iter_creation))
81 if isinstance(iter_creation, StreamingDataFrame):
82 self.iter_creation = iter_creation.iter_creation
83 self.stable = iter_creation.stable
84 else:
85 self.iter_creation = iter_creation
86 self.stable = stable
87 self.check_schema = check_schema
89 def is_stable(self, do_check=False, n=10):
90 """
91 Tells if the :epkg:`dataframe` is supposed to be stable.
93 @param do_check do not trust the value sent to the constructor
94 @param n number of rows used to check the stability,
95 None for all rows
96 @return boolean
98 *do_check=True* means the methods checks the first
99 *n* rows remains the same for two iterations.
100 """
101 if do_check:
102 for i, (a, b) in enumerate(zip(self, self)):
103 if n is not None and i >= n:
104 break
105 try:
106 assert_frame_equal(a, b)
107 except AssertionError: # pragma: no cover
108 return False
109 return True
110 else:
111 return self.stable
113 def get_kwargs(self):
114 """
115 Returns the parameters used to call the constructor.
116 """
117 return dict(check_schema=self.check_schema)
119 def train_test_split(self, path_or_buf=None, export_method="to_csv",
120 names=None, streaming=True, partitions=None,
121 **kwargs):
122 """
123 Randomly splits a :epkg:`dataframe` into smaller pieces.
124 The function returns streams of file names.
125 It chooses one of the options from module
126 :mod:`dataframe_split <pandas_streaming.df.dataframe_split>`.
128 @param path_or_buf a string, a list of strings or buffers, if it is a
129 string, it must contain ``{}`` like ``partition{}.txt``,
130 if None, the function returns strings.
131 @param export_method method used to store the partitions, by default
132 :epkg:`pandas:DataFrame:to_csv`, additional parameters
133 will be given to that function
134 @param names partitions names, by default ``('train', 'test')``
135 @param kwargs parameters for the export function and
136 :epkg:`sklearn:model_selection:train_test_split`.
137 @param streaming the function switches to a
138 streaming version of the algorithm.
139 @param partitions splitting partitions
140 @return outputs of the exports functions or two
141 @see cl StreamingDataFrame if path_or_buf is None.
143 The streaming version of this algorithm is implemented by function
144 @see fn sklearn_train_test_split_streaming. Its documentation
145 indicates the limitation of the streaming version and gives some
146 insights about the additional parameters.
147 """
148 if streaming:
149 if partitions is not None:
150 if len(partitions) != 2:
151 raise NotImplementedError( # pragma: no cover
152 "Only train and test split is allowed, *partitions* "
153 "must be of length 2.")
154 kwargs = kwargs.copy()
155 kwargs['train_size'] = partitions[0]
156 kwargs['test_size'] = partitions[1]
157 return sklearn_train_test_split_streaming(self, **kwargs)
158 return sklearn_train_test_split(self, path_or_buf=path_or_buf,
159 export_method=export_method,
160 names=names, **kwargs)
162 @staticmethod
163 def _process_kwargs(kwargs):
164 """
165 Filters out parameters for the constructor of this class.
166 """
167 kw = {}
168 for k in ['check_schema']:
169 if k in kwargs:
170 kw[k] = kwargs[k]
171 del kwargs[k]
172 return kw
174 @staticmethod
175 def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDataFrame':
176 """
177 Reads a :epkg:`json` file or buffer as an iterator
178 on :epkg:`DataFrame`. The signature is the same as
179 :epkg:`pandas:read_json`. The important parameter is
180 *chunksize* which defines the number
181 of rows to parse in a single bloc
182 and it must be defined to return an iterator.
183 If *lines* is True, the function falls back into
184 :epkg:`pandas:read_json`, otherwise it used
185 @see fn enumerate_json_items. If *lines* is ``'stream'``,
186 *enumerate_json_items* is called with parameter
187 ``lines=True``.
188 Parameter *flatten* uses the trick described at
189 `Flattening JSON objects in Python
190 <https://towardsdatascience.com/flattening-json-objects-in-python-f5343c794b10>`_.
191 Examples:
193 .. runpython::
194 :showcode:
196 from io import BytesIO
197 from pandas_streaming.df import StreamingDataFrame
199 data = b'''{"a": 1, "b": 2}
200 {"a": 3, "b": 4}'''
201 it = StreamingDataFrame.read_json(BytesIO(data), lines=True)
202 dfs = list(it)
203 print(dfs)
205 .. runpython::
206 :showcode:
208 from io import BytesIO
209 from pandas_streaming.df import StreamingDataFrame
211 data = b'''[{"a": 1,
212 "b": 2},
213 {"a": 3,
214 "b": 4}]'''
216 it = StreamingDataFrame.read_json(BytesIO(data))
217 dfs = list(it)
218 print(dfs)
220 .. index:: IncompleteJSONError
222 The parsed json must have an empty line at the end otherwise
223 the following exception is raised:
224 `ijson.common.IncompleteJSONError: `
225 `parse error: unallowed token at this point in JSON text`.
226 """
227 if not isinstance(chunksize, int) or chunksize <= 0:
228 raise ValueError( # pragma: no cover
229 'chunksize must be a positive integer')
230 kwargs_create = StreamingDataFrame._process_kwargs(kwargs)
232 if isinstance(args[0], (list, dict)):
233 if flatten:
234 return StreamingDataFrame.read_df(
235 json_normalize(args[0]), **kwargs_create)
236 return StreamingDataFrame.read_df(args[0], **kwargs_create)
238 if kwargs.get('lines', None) == 'stream':
239 del kwargs['lines']
241 def localf(a0=args[0]):
242 if hasattr(a0, 'seek'):
243 a0.seek(0)
244 return enumerate_json_items(
245 a0, encoding=kwargs.get('encoding', None), lines=True,
246 flatten=flatten)
248 st = JsonIterator2Stream(localf)
249 args = args[1:]
251 if chunksize is None:
252 return StreamingDataFrame(
253 lambda: pandas.read_json(
254 st, *args, chunksize=None, lines=True, **kwargs),
255 **kwargs_create)
257 def fct1(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()):
258 st.seek(0)
259 for r in pandas.read_json(
260 st, *args, chunksize=chunksize, nrows=chunksize,
261 lines=True, **kw):
262 yield r
264 return StreamingDataFrame(fct1, **kwargs_create)
266 if kwargs.get('lines', False):
267 if flatten:
268 raise NotImplementedError(
269 "flatten==True is implemented with option lines='stream'")
270 if chunksize is None:
271 return StreamingDataFrame(
272 lambda: pandas.read_json(*args, chunksize=None, **kwargs),
273 **kwargs_create)
275 def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()):
276 for r in pandas.read_json(
277 *args, chunksize=chunksize, nrows=chunksize, **kw):
278 yield r
279 return StreamingDataFrame(fct2, **kwargs_create)
281 st = JsonIterator2Stream(
282 lambda a0=args[0]: enumerate_json_items(
283 a0, encoding=kwargs.get('encoding', None), flatten=flatten))
284 args = args[1:]
285 if 'lines' in kwargs:
286 del kwargs['lines']
288 if chunksize is None:
289 return StreamingDataFrame(
290 lambda: pandas.read_json(
291 st, *args, chunksize=chunksize, lines=True, **kwargs),
292 **kwargs_create)
294 def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()):
295 if hasattr(st, 'seek'):
296 st.seek(0)
297 for r in pandas.read_json(
298 st, *args, chunksize=chunksize, nrows=chunksize,
299 lines=True, **kw):
300 yield r
301 return StreamingDataFrame(fct3, **kwargs_create)
303 @staticmethod
304 def read_csv(*args, **kwargs) -> 'StreamingDataFrame':
305 """
306 Reads a :epkg:`csv` file or buffer
307 as an iterator on :epkg:`DataFrame`.
308 The signature is the same as :epkg:`pandas:read_csv`.
309 The important parameter is *chunksize* which defines the number
310 of rows to parse in a single bloc. If not specified,
311 it will be equal to 100000.
312 """
313 if not kwargs.get('iterator', True):
314 raise ValueError("If specified, iterator must be True.")
315 if not kwargs.get('chunksize', 100000):
316 raise ValueError("If specified, chunksize must not be None.")
317 kwargs_create = StreamingDataFrame._process_kwargs(kwargs)
318 kwargs['iterator'] = True
319 if 'chunksize' not in kwargs:
320 kwargs['chunksize'] = 100000
321 return StreamingDataFrame(lambda: pandas.read_csv(*args, **kwargs), **kwargs_create)
323 @staticmethod
324 def read_str(text, **kwargs) -> 'StreamingDataFrame':
325 """
326 Reads a :epkg:`DataFrame` as an iterator on :epkg:`DataFrame`.
327 The signature is the same as :epkg:`pandas:read_csv`.
328 The important parameter is *chunksize* which defines the number
329 of rows to parse in a single bloc.
330 """
331 if not kwargs.get('iterator', True):
332 raise ValueError("If specified, iterator must be True.")
333 if not kwargs.get('chunksize', 100000):
334 raise ValueError("If specified, chunksize must not be None.")
335 kwargs_create = StreamingDataFrame._process_kwargs(kwargs)
336 kwargs['iterator'] = True
337 if 'chunksize' not in kwargs:
338 kwargs['chunksize'] = 100000
339 if isinstance(text, str):
340 buffer = StringIO(text)
341 else:
342 buffer = BytesIO(text)
343 return StreamingDataFrame(
344 lambda: pandas.read_csv(buffer, **kwargs), **kwargs_create)
346 @staticmethod
347 def read_df(df, chunksize=None, check_schema=True) -> 'StreamingDataFrame':
348 """
349 Splits a :epkg:`DataFrame` into small chunks mostly for
350 unit testing purposes.
352 @param df :epkg:`DataFrame`
353 @param chunksize number rows per chunks (// 10 by default)
354 @param check_schema check schema between two iterations
355 @return iterator on @see cl StreamingDataFrame
356 """
357 if chunksize is None:
358 if hasattr(df, 'shape'):
359 chunksize = df.shape[0]
360 else:
361 raise NotImplementedError(
362 f"Cannot retrieve size to infer chunksize for type={type(df)}.")
364 if hasattr(df, 'shape'):
365 size = df.shape[0]
366 else:
367 raise NotImplementedError( # pragma: no cover
368 f"Cannot retrieve size for type={type(df)}.")
370 def local_iterator():
371 "local iterator"
372 for i in range(0, size, chunksize):
373 end = min(size, i + chunksize)
374 yield df[i:end].copy()
375 return StreamingDataFrame(local_iterator, check_schema=check_schema)
377 def __iter__(self):
378 """
379 Iterator on a large file with a sliding window.
380 Each windows is a :epkg:`DataFrame`.
381 The method stores a copy of the initial iterator
382 and restores it after the end of the iterations.
383 If *check_schema* was enabled when calling the constructor,
384 the method checks that every :epkg:`DataFrame`
385 follows the same schema as the first chunck.
387 Even with a big chunk size, it might happen
388 that consecutive chunks might detect different type
389 for one particular column. An error message shows up
390 saying ``Column types are different after row``
391 with more information about the column which failed.
392 In that case, :epkg:`pandas:DataFrame.read_csv` can overwrite
393 the type on one column by specifying
394 ``dtype={column_name: new_type}``. It frequently happens
395 when a string column has many missing values.
396 """
397 iters = self.iter_creation()
398 sch = None
399 rows = 0
400 for it in iters:
401 if sch is None:
402 sch = (list(it.columns), list(it.dtypes))
403 elif self.check_schema:
404 if list(it.columns) != sch[0]: # pylint: disable=E1136
405 raise StreamingDataFrameSchemaError( # pragma: no cover
406 'Column names are different after row {0}\nFirst chunk: {1}'
407 '\nCurrent chunk: {2}'.format(
408 rows, sch[0], list(it.columns))) # pylint: disable=E1136
409 if list(it.dtypes) != sch[1]: # pylint: disable=E1136
410 errdf = pandas.DataFrame(
411 dict(names=sch[0], schema1=sch[1], # pylint: disable=E1136
412 schema2=list(it.dtypes))) # pylint: disable=E1136
413 tdf = StringIO()
414 errdf['diff'] = errdf['schema2'] != errdf['schema1']
415 errdf = errdf[errdf['diff']]
416 errdf.to_csv(tdf, sep=",", index=False)
417 raise StreamingDataFrameSchemaError(
418 'Column types are different after row {0}. You may use option '
419 'dtype={{"column_name": str}} to force the type on this column.'
420 '\n---\n{1}'.format(rows, tdf.getvalue()))
422 rows += it.shape[0]
423 yield it
425 @property
426 def shape(self):
427 """
428 This is the kind of operations you do not want to do
429 when a file is large because it goes through the whole
430 stream just to get the number of rows.
431 """
432 nl, nc = 0, 0
433 for it in self:
434 nc = max(it.shape[1], nc)
435 nl += it.shape[0]
436 return nl, nc
438 @property
439 def columns(self):
440 """
441 See :epkg:`pandas:DataFrame:columns`.
442 """
443 for it in self:
444 return it.columns
445 # The dataframe is empty.
446 return []
448 @property
449 def dtypes(self):
450 """
451 See :epkg:`pandas:DataFrame:dtypes`.
452 """
453 for it in self:
454 return it.dtypes
456 def to_csv(self, path_or_buf=None, **kwargs) -> 'StreamingDataFrame':
457 """
458 Saves the :epkg:`DataFrame` into string.
459 See :epkg:`pandas:DataFrame.to_csv`.
460 """
461 if path_or_buf is None:
462 st = StringIO()
463 close = False
464 elif isinstance(path_or_buf, str):
465 st = open( # pylint: disable=R1732
466 path_or_buf, "w", encoding=kwargs.get('encoding'))
467 close = True
468 else:
469 st = path_or_buf
470 close = False
472 for df in self:
473 df.to_csv(st, **kwargs)
474 kwargs['header'] = False
476 if close:
477 st.close()
478 if isinstance(st, StringIO):
479 return st.getvalue()
480 return path_or_buf
482 def to_dataframe(self) -> pandas.DataFrame:
483 """
484 Converts everything into a single :epkg:`DataFrame`.
485 """
486 return pandas.concat(self, axis=0)
488 def to_df(self) -> pandas.DataFrame:
489 """
490 Converts everything into a single :epkg:`DataFrame`.
491 """
492 return self.to_dataframe()
494 def iterrows(self):
495 """
496 See :epkg:`pandas:DataFrame:iterrows`.
497 """
498 for df in self:
499 for it in df.iterrows():
500 yield it
502 def head(self, n=5) -> pandas.DataFrame:
503 """
504 Returns the first rows as a :epkg:`DataFrame`.
505 """
506 st = []
507 total = 0
508 for df in self:
509 h = df.head(n=n)
510 total += h.shape[0]
511 st.append(h)
512 if total >= n:
513 break
514 n -= h.shape[0]
515 if len(st) == 1:
516 return st[0]
517 if len(st) == 0:
518 return None
519 return pandas.concat(st, axis=0)
521 def tail(self, n=5) -> pandas.DataFrame:
522 """
523 Returns the last rows as a :epkg:`DataFrame`.
524 The size of chunks must be greater than ``n`` to
525 get ``n`` lines. This method is not efficient
526 because the whole dataset must be walked through.
527 """
528 for df in self:
529 h = df.tail(n=n)
530 return h
532 def where(self, *args, **kwargs) -> 'StreamingDataFrame':
533 """
534 Applies :epkg:`pandas:DataFrame:where`.
535 *inplace* must be False.
536 This function returns a @see cl StreamingDataFrame.
537 """
538 kwargs['inplace'] = False
539 return StreamingDataFrame(
540 lambda: map(lambda df: df.where(*args, **kwargs), self),
541 **self.get_kwargs())
543 def sample(self, reservoir=False, cache=False, **kwargs) -> 'StreamingDataFrame':
544 """
545 See :epkg:`pandas:DataFrame:sample`.
546 Only *frac* is available, otherwise choose
547 @see me reservoir_sampling.
548 This function returns a @see cl StreamingDataFrame.
550 @param reservoir use `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_
551 @param cache cache the sample
552 @param kwargs additional parameters for :epkg:`pandas:DataFrame:sample`
554 If *cache* is True, the sample is cached (assuming it holds in memory).
555 The second time an iterator walks through the
556 """
557 if reservoir or 'n' in kwargs:
558 if 'frac' in kwargs:
559 raise ValueError(
560 'frac cannot be specified for reservoir sampling.')
561 return self._reservoir_sampling(cache=cache, n=kwargs['n'], random_state=kwargs.get('random_state'))
562 if cache:
563 sdf = self.sample(cache=False, **kwargs)
564 df = sdf.to_df()
565 return StreamingDataFrame.read_df(df, chunksize=df.shape[0])
566 return StreamingDataFrame(lambda: map(lambda df: df.sample(**kwargs), self), **self.get_kwargs(), stable=False)
568 def _reservoir_sampling(self, cache=True, n=1000, random_state=None) -> 'StreamingDataFrame':
569 """
570 Uses the `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_
571 algorithm to draw a random sample with exactly *n* samples.
573 @param cache cache the sample
574 @param n number of observations to keep
575 @param random_state sets the random_state
576 @return @see cl StreamingDataFrame
578 .. warning::
579 The sample is split by chunks of size 1000.
580 This parameter is not yet exposed.
581 """
582 if not cache:
583 raise ValueError(
584 "cache=False is not available for reservoir sampling.")
585 indices = []
586 seen = 0
587 for i, df in enumerate(self):
588 for ir, _ in enumerate(df.iterrows()):
589 seen += 1
590 if len(indices) < n:
591 indices.append((i, ir))
592 else:
593 x = nrandom.random() # pylint: disable=E1101
594 if x * n < (seen - n):
595 k = nrandom.randint(0, len(indices) - 1)
596 indices[k] = (i, ir) # pylint: disable=E1126
597 indices = set(indices)
599 def reservoir_iterate(sdf, indices, chunksize):
600 "iterator"
601 buffer = []
602 for i, df in enumerate(self):
603 for ir, row in enumerate(df.iterrows()):
604 if (i, ir) in indices:
605 buffer.append(row)
606 if len(buffer) >= chunksize:
607 yield pandas.DataFrame(buffer)
608 buffer.clear()
609 if len(buffer) > 0:
610 yield pandas.DataFrame(buffer)
612 return StreamingDataFrame(
613 lambda: reservoir_iterate(sdf=self, indices=indices, chunksize=1000))
615 def drop(self, labels=None, *, axis=0, index=None, columns=None, level=None,
616 inplace=False, errors='raise') -> 'StreamingDataFrame':
617 """
618 Applies :epkg:`pandas:DataFrame:drop`.
619 This function returns a @see cl StreamingDataFrame.
620 """
621 if axis == 0:
622 raise NotImplementedError(f"drop is not implemented for axis={axis}.")
623 if inplace:
624 raise NotImplementedError(f"drop is not implemented for inplace={inplace}.")
625 return StreamingDataFrame(
626 lambda: map(lambda df: df.drop(
627 labels, axis=axis, index=index, columns=columns,
628 level=level, inplace=False, errors=errors), self),
629 **self.get_kwargs())
631 def apply(self, *args, **kwargs) -> 'StreamingDataFrame':
632 """
633 Applies :epkg:`pandas:DataFrame:apply`.
634 This function returns a @see cl StreamingDataFrame.
635 """
636 return StreamingDataFrame(
637 lambda: map(lambda df: df.apply(*args, **kwargs), self),
638 **self.get_kwargs())
640 def applymap(self, *args, **kwargs) -> 'StreamingDataFrame':
641 """
642 Applies :epkg:`pandas:DataFrame:applymap`.
643 This function returns a @see cl StreamingDataFrame.
644 """
645 return StreamingDataFrame(
646 lambda: map(lambda df: df.applymap(*args, **kwargs), self),
647 **self.get_kwargs())
649 def merge(self, right, **kwargs) -> 'StreamingDataFrame':
650 """
651 Merges two @see cl StreamingDataFrame and returns @see cl StreamingDataFrame.
652 *right* can be either a @see cl StreamingDataFrame or simply
653 a :epkg:`pandas:DataFrame`. It calls :epkg:`pandas:DataFrame:merge` in
654 a double loop, loop on *self*, loop on *right*.
655 """
656 if isinstance(right, pandas.DataFrame):
657 return self.merge(StreamingDataFrame.read_df(right, chunksize=right.shape[0]), **kwargs)
659 def iterator_merge(sdf1, sdf2, **kw):
660 "iterate on dataframes"
661 for df1 in sdf1:
662 for df2 in sdf2:
663 df = df1.merge(df2, **kw)
664 yield df
666 return StreamingDataFrame(
667 lambda: iterator_merge(self, right, **kwargs), **self.get_kwargs())
669 def concat(self, others, axis=0) -> 'StreamingDataFrame':
670 """
671 Concatenates :epkg:`dataframes`. The function ensures all :epkg:`pandas:DataFrame`
672 or @see cl StreamingDataFrame share the same columns (name and type).
673 Otherwise, the function fails as it cannot guess the schema without
674 walking through all :epkg:`dataframes`.
676 :param others: list, enumeration, :epkg:`pandas:DataFrame`
677 :param axis: concatenate by rows (0) or by columns (1)
678 :return: @see cl StreamingDataFrame
679 """
680 if axis == 1:
681 return self._concath(others)
682 if axis == 0:
683 return self._concatv(others)
684 raise ValueError("axis must be 0 or 1") # pragma: no cover
686 def _concath(self, others):
687 if not isinstance(others, list):
688 others = [others]
690 def iterateh(self, others):
691 cols = tuple([self] + others)
692 for dfs in zip(*cols):
693 nrows = [_.shape[0] for _ in dfs]
694 if min(nrows) != max(nrows):
695 raise RuntimeError(
696 "StreamingDataFram cannot merge DataFrame with different size or chunksize")
697 yield pandas.concat(list(dfs), axis=1)
699 return StreamingDataFrame(lambda: iterateh(self, others), **self.get_kwargs())
701 def _concatv(self, others):
703 def iterator_concat(this, lothers):
704 "iterator on dataframes"
705 columns = None
706 dtypes = None
707 for df in this:
708 if columns is None:
709 columns = df.columns
710 dtypes = df.dtypes
711 yield df
712 for obj in lothers:
713 check = True
714 for i, df in enumerate(obj):
715 if check:
716 if list(columns) != list(df.columns):
717 raise ValueError(
718 f"Frame others[{i}] do not have the same column names or the same order.")
719 if list(dtypes) != list(df.dtypes):
720 raise ValueError(
721 f"Frame others[{i}] do not have the same column types.")
722 check = False
723 yield df
725 if isinstance(others, pandas.DataFrame):
726 others = [others]
727 elif isinstance(others, StreamingDataFrame):
728 others = [others]
730 def change_type(obj):
731 "change column type"
732 if isinstance(obj, pandas.DataFrame):
733 return StreamingDataFrame.read_df(obj, obj.shape[0])
734 else:
735 return obj
737 others = list(map(change_type, others))
738 return StreamingDataFrame(
739 lambda: iterator_concat(self, others), **self.get_kwargs())
741 def groupby(self, by=None, lambda_agg=None, lambda_agg_agg=None,
742 in_memory=True, **kwargs) -> pandas.DataFrame:
743 """
744 Implements the streaming :epkg:`pandas:DataFrame:groupby`.
745 We assume the result holds in memory. The out-of-memory is
746 not implemented yet.
748 @param by see :epkg:`pandas:DataFrame:groupby`
749 @param in_memory in-memory algorithm
750 @param lambda_agg aggregation function, *sum* by default
751 @param lambda_agg_agg to aggregate the aggregations, *sum* by default
752 @param kwargs additional parameters for :epkg:`pandas:DataFrame:groupby`
753 @return :epkg:`pandas:DataFrame`
755 As the input @see cl StreamingDataFrame does not necessarily hold
756 in memory, the aggregation must be done at every iteration.
757 There are two levels of aggregation: one to reduce every iterated
758 :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`.
759 This second one is always a **sum**.
760 As a consequence, this function should not compute any *mean* or *count*,
761 only *sum* because we do not know the size of each iterated
762 :epkg:`dataframe`. To compute an average, sum and weights must be
763 aggregated.
765 Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default.
766 It could also be ``lambda gr: gr.max()`` or
767 ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()``
768 as it would lead to incoherent results.
770 .. exref::
771 :title: StreamingDataFrame and groupby
772 :tag: streaming
774 Here is an example which shows how to write a simple *groupby*
775 with :epkg:`pandas` and @see cl StreamingDataFrame.
777 .. runpython::
778 :showcode:
780 from pandas import DataFrame
781 from pandas_streaming.df import StreamingDataFrame
783 df = DataFrame(dict(A=[3, 4, 3], B=[5,6, 7]))
784 sdf = StreamingDataFrame.read_df(df)
786 # The following:
787 print(sdf.groupby("A", lambda gr: gr.sum()))
789 # Is equivalent to:
790 print(df.groupby("A").sum())
791 """
792 if not in_memory:
793 raise NotImplementedError(
794 "Out-of-memory group by is not implemented.")
795 if lambda_agg is None:
796 def lambda_agg_(gr):
797 "sum"
798 return gr.sum()
799 lambda_agg = lambda_agg_
800 if lambda_agg_agg is None:
801 def lambda_agg_agg_(gr):
802 "sum"
803 return gr.sum()
804 lambda_agg_agg = lambda_agg_agg_
805 ckw = kwargs.copy()
806 ckw["as_index"] = False
808 agg = []
809 for df in self:
810 gr = df.groupby(by=by, **ckw)
811 agg.append(lambda_agg(gr))
812 conc = pandas.concat(agg, sort=False)
813 return lambda_agg_agg(conc.groupby(by=by, **kwargs))
815 def groupby_streaming(self, by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True,
816 strategy='cum', **kwargs) -> pandas.DataFrame:
817 """
818 Implements the streaming :epkg:`pandas:DataFrame:groupby`.
819 We assume the result holds in memory. The out-of-memory is
820 not implemented yet.
822 :param by: see :epkg:`pandas:DataFrame:groupby`
823 :param in_memory: in-memory algorithm
824 :param lambda_agg: aggregation function, *sum* by default
825 :param lambda_agg_agg: to aggregate the aggregations, *sum* by default
826 :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby`
827 :param strategy: ``'cum'``, or ``'streaming'``, see below
828 :return: :epkg:`pandas:DataFrame`
830 As the input @see cl StreamingDataFrame does not necessarily hold
831 in memory, the aggregation must be done at every iteration.
832 There are two levels of aggregation: one to reduce every iterated
833 :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`.
834 This second one is always a **sum**.
835 As a consequence, this function should not compute any *mean* or *count*,
836 only *sum* because we do not know the size of each iterated
837 :epkg:`dataframe`. To compute an average, sum and weights must be
838 aggregated.
840 Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default.
841 It could also be ``lambda gr: gr.max()`` or
842 ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()``
843 as it would lead to incoherent results.
845 Parameter *strategy* allows three scenarios.
846 First one if ``strategy is None`` goes through
847 the whole datasets to produce a final :epkg:`DataFrame`.
848 Second if ``strategy=='cum'`` returns a
849 @see cl StreamingDataFrame, each iteration produces
850 the current status of the *group by*. Last case,
851 ``strategy=='streaming'`` produces :epkg:`DataFrame`
852 which must be concatenated into a single :epkg:`DataFrame`
853 and grouped again to get the results.
855 .. exref::
856 :title: StreamingDataFrame and groupby
857 :tag: streaming
859 Here is an example which shows how to write a simple *groupby*
860 with :epkg:`pandas` and @see cl StreamingDataFrame.
862 .. runpython::
863 :showcode:
865 from pandas import DataFrame
866 from pandas_streaming.df import StreamingDataFrame
867 from pandas_streaming.data import dummy_streaming_dataframe
869 df20 = dummy_streaming_dataframe(20).to_dataframe()
870 df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0)
871 sdf20 = StreamingDataFrame.read_df(df20, chunksize=5)
872 sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(),
873 strategy='cum', as_index=False)
874 for gr in sgr:
875 print()
876 print(gr)
877 """
878 if not in_memory:
879 raise NotImplementedError(
880 "Out-of-memory group by is not implemented.")
881 if lambda_agg is None:
882 def lambda_agg_(gr):
883 "sum"
884 return gr.sum()
885 lambda_agg = lambda_agg_
886 if lambda_agg_agg is None:
887 def lambda_agg_agg_(gr):
888 "sum"
889 return gr.sum()
890 lambda_agg_agg = lambda_agg_agg_
891 ckw = kwargs.copy()
892 ckw["as_index"] = False
894 if strategy == 'cum':
895 def iterate_cum():
896 agg = None
897 for df in self:
898 gr = df.groupby(by=by, **ckw)
899 gragg = lambda_agg(gr)
900 if agg is None:
901 yield lambda_agg_agg(gragg.groupby(by=by, **kwargs))
902 agg = gragg
903 else:
904 lagg = pandas.concat([agg, gragg], sort=False)
905 yield lambda_agg_agg(lagg.groupby(by=by, **kwargs))
906 agg = lagg
907 return StreamingDataFrame(lambda: iterate_cum(), **self.get_kwargs())
909 if strategy == 'streaming':
910 def iterate_streaming():
911 for df in self:
912 gr = df.groupby(by=by, **ckw)
913 gragg = lambda_agg(gr)
914 yield lambda_agg(gragg.groupby(by=by, **kwargs))
915 return StreamingDataFrame(lambda: iterate_streaming(), **self.get_kwargs())
917 raise ValueError( # pragma: no cover
918 f"Unknown strategy '{strategy}'")
920 def ensure_dtype(self, df, dtypes):
921 """
922 Ensures the :epkg:`dataframe` *df* has types indicated in dtypes.
923 Changes it if not.
925 :param df: dataframe
926 :param dtypes: list of types
927 :return: updated?
928 """
929 ch = False
930 cols = df.columns
931 for i, (has, exp) in enumerate(zip(df.dtypes, dtypes)):
932 if has != exp:
933 name = cols[i]
934 df[name] = df[name].astype(exp)
935 ch = True
936 return ch
938 def __getitem__(self, *args):
939 """
940 Implements some of the functionalities :epkg:`pandas`
941 offers for the operator ``[]``.
942 """
943 if len(args) != 1:
944 raise NotImplementedError( # pragma: no cover
945 "Only a list of columns is supported.")
946 cols = args[0]
947 if isinstance(cols, str):
948 # One column.
949 iter_creation = self.iter_creation
951 def iterate_col():
952 "iterate on one column"
953 one_col = [cols]
954 for df in iter_creation():
955 yield df[one_col]
956 return StreamingSeries(iterate_col, **self.get_kwargs())
958 if not isinstance(cols, list):
959 raise NotImplementedError("Only a list of columns is supported.")
961 def iterate_cols(sdf):
962 """Iterate on columns."""
963 for df in sdf:
964 yield df[cols]
966 return StreamingDataFrame(lambda: iterate_cols(self), **self.get_kwargs())
968 def __setitem__(self, index, value):
969 """
970 Limited set of operators are supported.
971 """
972 if not isinstance(index, str):
973 raise ValueError(
974 f"Only column affected are supported but index={index!r}.")
975 if isinstance(value, (int, float, numpy.number, str)):
976 # Is is equivalent to add_column.
977 iter_creation = self.iter_creation
979 def iterate_fct():
980 "iterate on rows"
981 iters = iter_creation()
982 for df in iters:
983 dfc = df.copy()
984 dfc[index] = value
985 yield dfc
987 self.iter_creation = iterate_fct
989 elif isinstance(value, StreamingSeries):
990 iter_creation = self.iter_creation
992 def iterate_fct():
993 "iterate on rows"
994 iters = iter_creation()
995 for df, dfs in zip(iters, value):
996 if df.shape[0] != dfs.shape[0]:
997 raise RuntimeError(
998 "Chunksize or shape are different when "
999 "iterating on two StreamDataFrame at the same "
1000 "time: %r != %r." % (df.shape[0], dfs.shape[0]))
1001 dfc = df.copy()
1002 dfc[index] = dfs
1003 yield dfc
1005 self.iter_creation = iterate_fct
1006 else:
1007 raise NotImplementedError(
1008 "Not implemented for type(index)=%r and type(value)=%r." % (
1009 type(index), type(value)))
1011 def add_column(self, col, value):
1012 """
1013 Implements some of the functionalities :epkg:`pandas`
1014 offers for the operator ``[]``.
1016 @param col new column
1017 @param value @see cl StreamingDataFrame or a lambda function
1018 @return @see cl StreamingDataFrame
1020 ..note::
1022 If value is a @see cl StreamingDataFrame,
1023 *chunksize* must be the same for both.
1025 .. exref::
1026 :title: Add a new column to a StreamingDataFrame
1027 :tag: streaming
1029 .. runpython::
1030 :showcode:
1032 from pandas import DataFrame
1033 from pandas_streaming.df import StreamingDataFrame
1035 df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"]))
1036 sdf = StreamingDataFrame.read_df(df)
1037 sdf2 = sdf.add_column("d", lambda row: int(1))
1038 print(sdf2.to_dataframe())
1040 sdf2 = sdf.add_column("d", lambda row: int(1))
1041 print(sdf2.to_dataframe())
1043 """
1044 if not isinstance(col, str):
1045 raise NotImplementedError( # pragma: no cover
1046 "Only a column as a string is supported.")
1048 if isfunction(value):
1049 def iterate_fct(self, value, col):
1050 "iterate on rows"
1051 for df in self:
1052 dfc = df.copy()
1053 dfc.insert(dfc.shape[1], col, dfc.apply(value, axis=1))
1054 yield dfc
1056 return StreamingDataFrame(lambda: iterate_fct(self, value, col), **self.get_kwargs())
1058 if isinstance(value, (pandas.Series, pandas.DataFrame, StreamingDataFrame)):
1059 raise NotImplementedError(
1060 "Unable set a new column based on a datadframe.")
1062 def iterate_cst(self, value, col):
1063 "iterate on rows"
1064 for df in self:
1065 dfc = df.copy()
1066 dfc[col] = value
1067 yield dfc
1069 return StreamingDataFrame(
1070 lambda: iterate_cst(self, value, col), **self.get_kwargs())
1072 def fillna(self, **kwargs):
1073 """
1074 Replaces the missing values, calls
1075 :epkg:`pandas:DataFrame:fillna`.
1077 @param kwargs see :epkg:`pandas:DataFrame:fillna`
1078 @return @see cl StreamingDataFrame
1080 .. warning::
1081 The function does not check what happens at the
1082 limit of every chunk of data. Anything but a constant value
1083 will probably have an inconsistent behaviour.
1084 """
1086 def iterate_na(self, **kwargs):
1087 "iterate on rows"
1088 if kwargs.get('inplace', True):
1089 kwargs['inplace'] = True
1090 for df in self:
1091 df.fillna(**kwargs)
1092 yield df
1093 else:
1094 for df in self:
1095 yield df.fillna(**kwargs)
1097 return StreamingDataFrame(
1098 lambda: iterate_na(self, **kwargs), **self.get_kwargs())
1100 def describe(self, percentiles=None, include=None, exclude=None):
1101 """
1102 Calls :epkg:`pandas:DataFrame:describe` on every piece
1103 of the datasets. *percentiles* are not really accurate
1104 but just an indication.
1106 :param percentiles: see :epkg:`pandas:DataFrame:describe`
1107 :param include: see :epkg:`pandas:DataFrame:describe`
1108 :param exclude: see :epkg:`pandas:DataFrame:describe`
1109 :return: :epkg:`pandas:DataFrame:describe`
1111 .. versionchanged:: 0.3.219
1113 Parameter *datetime_is_numeric* was removed
1114 (see :epkg:`pandas:DataFrame:describe`).
1115 """
1116 merged = None
1117 stack = []
1118 notper = ['count', 'mean', 'std']
1119 for df in self:
1120 desc = df.describe(
1121 percentiles=percentiles, include=include, exclude=exclude)
1122 count = desc.loc['count', :]
1123 rows = [name for name in desc.index if name not in notper]
1124 stack.append(desc.loc[rows, :])
1125 if merged is None:
1126 merged = desc
1127 merged.loc['std', :] = (
1128 merged.loc['std', :] ** 2 + merged.loc['mean', :] ** 2) * count
1129 merged.loc['mean', :] *= count
1130 else:
1131 merged.loc['count', :] += desc.loc['count', :]
1132 merged.loc['mean', :] += desc.loc['mean', :] * count
1133 merged.loc['std', :] += (
1134 desc.loc['std', :] ** 2 + desc.loc['mean', :] ** 2) * count
1135 merged.loc['max', :] = numpy.maximum(
1136 merged.loc['max', :], desc.loc['max', :])
1137 merged.loc['min', :] = numpy.maximum(
1138 merged.loc['min', :], desc.loc['min', :])
1139 merged.loc['mean', :] /= merged.loc['count', :]
1140 merged.loc['std', :] = (
1141 merged.loc['std', :] / merged.loc['count', :] -
1142 merged.loc['mean', :] ** 2) ** 0.5
1143 values = pandas.concat(stack)
1144 summary = values.describe(percentiles=percentiles)
1145 merged = merged.loc[notper, :]
1146 rows = [name for name in summary.index if name not in notper]
1147 summary = summary.loc[rows, :]
1148 return pandas.concat([merged, summary])
1150 def sort_values(self, by, axis=0, ascending=True, kind='quicksort',
1151 na_position='last',
1152 temp_file='_pandas_streaming_sort_values_'):
1153 """
1154 Sorts the streaming dataframe by values.
1156 :param by: one column
1157 :param ascending: order
1158 :param kind: see :meth:`pandas.DataFrame.sort_values`
1159 :param na_position: see :meth:`pandas.DataFrame.sort_values`
1160 :param temp_file: sorting a whole database is impossible
1161 without storing intermediate results on disk
1162 unless it can fit into the memory, but in that case,
1163 it is easier to convert the streaming database into
1164 a dataframe and sort it
1165 :return: streaming database
1166 """
1167 if not isinstance(by, str):
1168 raise NotImplementedError( # pragma: no cover
1169 f"Only one column can be used to sort not {by!r}.")
1170 keys = {}
1171 nans = []
1172 indices = []
1173 with open(temp_file, 'wb') as f:
1174 for df in self:
1175 dfs = df.sort_values(by, ascending=ascending, kind=kind,
1176 na_position=na_position)
1177 for tu in dfs[by]:
1178 if isinstance(tu, float) and numpy.isnan(tu):
1179 nans.append(len(indices))
1180 else:
1181 if tu not in keys:
1182 keys[tu] = []
1183 keys[tu].append(len(indices))
1184 indices.append(f.tell())
1185 st = BytesIO()
1186 pickle.dump(dfs, st)
1187 f.write(st.getvalue())
1189 indices.append(f.tell())
1191 values = list(keys.items())
1192 values.sort(reverse=not ascending)
1194 def iterate():
1196 with open(temp_file, 'rb') as f:
1198 if na_position == 'first':
1199 for p in nans:
1200 f.seek(indices[p])
1201 length = indices[p + 1] - indices[p]
1202 pkl = f.read(length)
1203 dfs = pickle.load(BytesIO(pkl))
1204 sub = dfs[numpy.isnan(dfs[by])]
1205 yield sub
1207 for key, positions in values:
1208 for p in positions:
1209 f.seek(indices[p])
1210 length = indices[p + 1] - indices[p]
1211 pkl = f.read(length)
1212 dfs = pickle.load(BytesIO(pkl))
1213 sub = dfs[dfs[by] == key]
1214 yield sub
1216 if na_position == 'last':
1217 for p in nans:
1218 f.seek(indices[p])
1219 length = indices[p + 1] - indices[p]
1220 pkl = f.read(length)
1221 dfs = pickle.load(BytesIO(pkl))
1222 sub = dfs[numpy.isnan(dfs[by])]
1223 yield sub
1225 res = StreamingDataFrame(
1226 lambda: iterate(), **self.get_kwargs())
1227 res._delete_.append(lambda: os.remove(temp_file))
1228 return res
1230 def __del__(self):
1231 """
1232 Calls every function in `_delete_`.
1233 """
1234 for f in self._delete_:
1235 f()
1238class StreamingSeries(StreamingDataFrame):
1239 """
1240 Seens as a @see cl StreamingDataFrame of one column.
1241 """
1243 def __init__(self, iter_creation, check_schema=True, stable=True):
1244 StreamingDataFrame.__init__(
1245 self, iter_creation, check_schema=check_schema, stable=stable)
1246 if len(self.columns) != 1:
1247 raise RuntimeError( # pragma: no cover
1248 f"A series can contain only one column not "
1249 f"{len(self.columns)!r}.")
1251 def apply(self, *args, **kwargs) -> 'StreamingDataFrame':
1252 """
1253 Applies :epkg:`pandas:Series:apply`.
1254 This function returns a @see cl StreamingSeries.
1255 """
1256 return StreamingSeries(
1257 lambda: map(lambda df: df.apply(*args, **kwargs), self),
1258 **self.get_kwargs())
1260 def __add__(self, value):
1261 """
1262 Does an addition on every value hoping that has a meaning.
1264 :param value: any value which makes sense
1265 :return: a new series
1266 """
1267 def iterate():
1268 for df in self:
1269 yield df + value
1271 return StreamingSeries(iterate, **self.get_kwargs())