Coverage for pandas_streaming/df/dataframe.py: 96%

538 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 14:15 +0200

1# -*- coding: utf-8 -*- 

2# pylint: disable=W0102 

3""" 

4@file 

5@brief Defines a streaming dataframe. 

6""" 

7import pickle 

8import os 

9from io import StringIO, BytesIO 

10from inspect import isfunction 

11import numpy 

12import numpy.random as nrandom 

13import pandas 

14from pandas.testing import assert_frame_equal 

15try: 

16 from pandas import json_normalize 

17except ImportError: 

18 from pandas.io.json import json_normalize 

19from .dataframe_split import sklearn_train_test_split, sklearn_train_test_split_streaming 

20from .dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream 

21 

22 

23class StreamingDataFrameSchemaError(Exception): 

24 """ 

25 Reveals an issue with inconsistant schemas. 

26 """ 

27 pass 

28 

29 

30class StreamingDataFrame: 

31 """ 

32 Defines a streaming dataframe. 

33 The goal is to reduce the memory footprint. 

34 The class takes a function which creates an iterator 

35 on :epkg:`dataframe`. We assume this function can 

36 be called multiple time. As a matter of fact, the 

37 function is called every time the class needs to walk 

38 through the stream with the following loop: 

39 

40 :: 

41 

42 for df in self: # self is a StreamingDataFrame 

43 # ... 

44 

45 The constructor cannot receive an iterator otherwise 

46 this class would be able to walk through the data 

47 only once. The main reason is it is impossible to 

48 :epkg:`*py:pickle` (or :epkg:`dill`) 

49 an iterator: it cannot be replicated. 

50 Instead, the class takes a function which generates 

51 an iterator on :epkg:`DataFrame`. 

52 Most of the methods returns either a :epkg:`DataFrame` 

53 either a @see cl StreamingDataFrame. In the second case, 

54 methods can be chained. 

55 

56 By default, the object checks that the schema remains 

57 the same between two chunks. This can be disabled 

58 by setting *check_schema=False* in the constructor. 

59 

60 The user should expect the data to remain stable. 

61 Every loop should produce the same data. However, 

62 in some situations, it is more efficient not to keep 

63 that constraints. Draw a random @see me sample 

64 is one of these cases. 

65 

66 :param iter_creation: function which creates an iterator or an 

67 instance of @see cl StreamingDataFrame 

68 :param check_schema: checks that the schema is the same 

69 for every :epkg:`dataframe` 

70 :param stable: indicates if the :epkg:`dataframe` remains the same 

71 whenever it is walked through 

72 """ 

73 

74 def __init__(self, iter_creation, check_schema=True, stable=True): 

75 self._delete_ = [] 

76 if isinstance(iter_creation, (pandas.DataFrame, dict, 

77 numpy.ndarray, str)): 

78 raise TypeError( 

79 "Unexpected type %r for iter_creation. It must " 

80 "be an iterator." % type(iter_creation)) 

81 if isinstance(iter_creation, StreamingDataFrame): 

82 self.iter_creation = iter_creation.iter_creation 

83 self.stable = iter_creation.stable 

84 else: 

85 self.iter_creation = iter_creation 

86 self.stable = stable 

87 self.check_schema = check_schema 

88 

89 def is_stable(self, do_check=False, n=10): 

90 """ 

91 Tells if the :epkg:`dataframe` is supposed to be stable. 

92 

93 @param do_check do not trust the value sent to the constructor 

94 @param n number of rows used to check the stability, 

95 None for all rows 

96 @return boolean 

97 

98 *do_check=True* means the methods checks the first 

99 *n* rows remains the same for two iterations. 

100 """ 

101 if do_check: 

102 for i, (a, b) in enumerate(zip(self, self)): 

103 if n is not None and i >= n: 

104 break 

105 try: 

106 assert_frame_equal(a, b) 

107 except AssertionError: # pragma: no cover 

108 return False 

109 return True 

110 else: 

111 return self.stable 

112 

113 def get_kwargs(self): 

114 """ 

115 Returns the parameters used to call the constructor. 

116 """ 

117 return dict(check_schema=self.check_schema) 

118 

119 def train_test_split(self, path_or_buf=None, export_method="to_csv", 

120 names=None, streaming=True, partitions=None, 

121 **kwargs): 

122 """ 

123 Randomly splits a :epkg:`dataframe` into smaller pieces. 

124 The function returns streams of file names. 

125 It chooses one of the options from module 

126 :mod:`dataframe_split <pandas_streaming.df.dataframe_split>`. 

127 

128 @param path_or_buf a string, a list of strings or buffers, if it is a 

129 string, it must contain ``{}`` like ``partition{}.txt``, 

130 if None, the function returns strings. 

131 @param export_method method used to store the partitions, by default 

132 :epkg:`pandas:DataFrame:to_csv`, additional parameters 

133 will be given to that function 

134 @param names partitions names, by default ``('train', 'test')`` 

135 @param kwargs parameters for the export function and 

136 :epkg:`sklearn:model_selection:train_test_split`. 

137 @param streaming the function switches to a 

138 streaming version of the algorithm. 

139 @param partitions splitting partitions 

140 @return outputs of the exports functions or two 

141 @see cl StreamingDataFrame if path_or_buf is None. 

142 

143 The streaming version of this algorithm is implemented by function 

144 @see fn sklearn_train_test_split_streaming. Its documentation 

145 indicates the limitation of the streaming version and gives some 

146 insights about the additional parameters. 

147 """ 

148 if streaming: 

149 if partitions is not None: 

150 if len(partitions) != 2: 

151 raise NotImplementedError( # pragma: no cover 

152 "Only train and test split is allowed, *partitions* " 

153 "must be of length 2.") 

154 kwargs = kwargs.copy() 

155 kwargs['train_size'] = partitions[0] 

156 kwargs['test_size'] = partitions[1] 

157 return sklearn_train_test_split_streaming(self, **kwargs) 

158 return sklearn_train_test_split(self, path_or_buf=path_or_buf, 

159 export_method=export_method, 

160 names=names, **kwargs) 

161 

162 @staticmethod 

163 def _process_kwargs(kwargs): 

164 """ 

165 Filters out parameters for the constructor of this class. 

166 """ 

167 kw = {} 

168 for k in ['check_schema']: 

169 if k in kwargs: 

170 kw[k] = kwargs[k] 

171 del kwargs[k] 

172 return kw 

173 

174 @staticmethod 

175 def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDataFrame': 

176 """ 

177 Reads a :epkg:`json` file or buffer as an iterator 

178 on :epkg:`DataFrame`. The signature is the same as 

179 :epkg:`pandas:read_json`. The important parameter is 

180 *chunksize* which defines the number 

181 of rows to parse in a single bloc 

182 and it must be defined to return an iterator. 

183 If *lines* is True, the function falls back into 

184 :epkg:`pandas:read_json`, otherwise it used 

185 @see fn enumerate_json_items. If *lines* is ``'stream'``, 

186 *enumerate_json_items* is called with parameter 

187 ``lines=True``. 

188 Parameter *flatten* uses the trick described at 

189 `Flattening JSON objects in Python 

190 <https://towardsdatascience.com/flattening-json-objects-in-python-f5343c794b10>`_. 

191 Examples: 

192 

193 .. runpython:: 

194 :showcode: 

195 

196 from io import BytesIO 

197 from pandas_streaming.df import StreamingDataFrame 

198 

199 data = b'''{"a": 1, "b": 2} 

200 {"a": 3, "b": 4}''' 

201 it = StreamingDataFrame.read_json(BytesIO(data), lines=True) 

202 dfs = list(it) 

203 print(dfs) 

204 

205 .. runpython:: 

206 :showcode: 

207 

208 from io import BytesIO 

209 from pandas_streaming.df import StreamingDataFrame 

210 

211 data = b'''[{"a": 1, 

212 "b": 2}, 

213 {"a": 3, 

214 "b": 4}]''' 

215 

216 it = StreamingDataFrame.read_json(BytesIO(data)) 

217 dfs = list(it) 

218 print(dfs) 

219 

220 .. index:: IncompleteJSONError 

221 

222 The parsed json must have an empty line at the end otherwise 

223 the following exception is raised: 

224 `ijson.common.IncompleteJSONError: ` 

225 `parse error: unallowed token at this point in JSON text`. 

226 """ 

227 if not isinstance(chunksize, int) or chunksize <= 0: 

228 raise ValueError( # pragma: no cover 

229 'chunksize must be a positive integer') 

230 kwargs_create = StreamingDataFrame._process_kwargs(kwargs) 

231 

232 if isinstance(args[0], (list, dict)): 

233 if flatten: 

234 return StreamingDataFrame.read_df( 

235 json_normalize(args[0]), **kwargs_create) 

236 return StreamingDataFrame.read_df(args[0], **kwargs_create) 

237 

238 if kwargs.get('lines', None) == 'stream': 

239 del kwargs['lines'] 

240 

241 def localf(a0=args[0]): 

242 if hasattr(a0, 'seek'): 

243 a0.seek(0) 

244 return enumerate_json_items( 

245 a0, encoding=kwargs.get('encoding', None), lines=True, 

246 flatten=flatten) 

247 

248 st = JsonIterator2Stream(localf) 

249 args = args[1:] 

250 

251 if chunksize is None: 

252 return StreamingDataFrame( 

253 lambda: pandas.read_json( 

254 st, *args, chunksize=None, lines=True, **kwargs), 

255 **kwargs_create) 

256 

257 def fct1(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): 

258 st.seek(0) 

259 for r in pandas.read_json( 

260 st, *args, chunksize=chunksize, nrows=chunksize, 

261 lines=True, **kw): 

262 yield r 

263 

264 return StreamingDataFrame(fct1, **kwargs_create) 

265 

266 if kwargs.get('lines', False): 

267 if flatten: 

268 raise NotImplementedError( 

269 "flatten==True is implemented with option lines='stream'") 

270 if chunksize is None: 

271 return StreamingDataFrame( 

272 lambda: pandas.read_json(*args, chunksize=None, **kwargs), 

273 **kwargs_create) 

274 

275 def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()): 

276 for r in pandas.read_json( 

277 *args, chunksize=chunksize, nrows=chunksize, **kw): 

278 yield r 

279 return StreamingDataFrame(fct2, **kwargs_create) 

280 

281 st = JsonIterator2Stream( 

282 lambda a0=args[0]: enumerate_json_items( 

283 a0, encoding=kwargs.get('encoding', None), flatten=flatten)) 

284 args = args[1:] 

285 if 'lines' in kwargs: 

286 del kwargs['lines'] 

287 

288 if chunksize is None: 

289 return StreamingDataFrame( 

290 lambda: pandas.read_json( 

291 st, *args, chunksize=chunksize, lines=True, **kwargs), 

292 **kwargs_create) 

293 

294 def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): 

295 if hasattr(st, 'seek'): 

296 st.seek(0) 

297 for r in pandas.read_json( 

298 st, *args, chunksize=chunksize, nrows=chunksize, 

299 lines=True, **kw): 

300 yield r 

301 return StreamingDataFrame(fct3, **kwargs_create) 

302 

303 @staticmethod 

304 def read_csv(*args, **kwargs) -> 'StreamingDataFrame': 

305 """ 

306 Reads a :epkg:`csv` file or buffer 

307 as an iterator on :epkg:`DataFrame`. 

308 The signature is the same as :epkg:`pandas:read_csv`. 

309 The important parameter is *chunksize* which defines the number 

310 of rows to parse in a single bloc. If not specified, 

311 it will be equal to 100000. 

312 """ 

313 if not kwargs.get('iterator', True): 

314 raise ValueError("If specified, iterator must be True.") 

315 if not kwargs.get('chunksize', 100000): 

316 raise ValueError("If specified, chunksize must not be None.") 

317 kwargs_create = StreamingDataFrame._process_kwargs(kwargs) 

318 kwargs['iterator'] = True 

319 if 'chunksize' not in kwargs: 

320 kwargs['chunksize'] = 100000 

321 return StreamingDataFrame(lambda: pandas.read_csv(*args, **kwargs), **kwargs_create) 

322 

323 @staticmethod 

324 def read_str(text, **kwargs) -> 'StreamingDataFrame': 

325 """ 

326 Reads a :epkg:`DataFrame` as an iterator on :epkg:`DataFrame`. 

327 The signature is the same as :epkg:`pandas:read_csv`. 

328 The important parameter is *chunksize* which defines the number 

329 of rows to parse in a single bloc. 

330 """ 

331 if not kwargs.get('iterator', True): 

332 raise ValueError("If specified, iterator must be True.") 

333 if not kwargs.get('chunksize', 100000): 

334 raise ValueError("If specified, chunksize must not be None.") 

335 kwargs_create = StreamingDataFrame._process_kwargs(kwargs) 

336 kwargs['iterator'] = True 

337 if 'chunksize' not in kwargs: 

338 kwargs['chunksize'] = 100000 

339 if isinstance(text, str): 

340 buffer = StringIO(text) 

341 else: 

342 buffer = BytesIO(text) 

343 return StreamingDataFrame( 

344 lambda: pandas.read_csv(buffer, **kwargs), **kwargs_create) 

345 

346 @staticmethod 

347 def read_df(df, chunksize=None, check_schema=True) -> 'StreamingDataFrame': 

348 """ 

349 Splits a :epkg:`DataFrame` into small chunks mostly for 

350 unit testing purposes. 

351 

352 @param df :epkg:`DataFrame` 

353 @param chunksize number rows per chunks (// 10 by default) 

354 @param check_schema check schema between two iterations 

355 @return iterator on @see cl StreamingDataFrame 

356 """ 

357 if chunksize is None: 

358 if hasattr(df, 'shape'): 

359 chunksize = df.shape[0] 

360 else: 

361 raise NotImplementedError( 

362 f"Cannot retrieve size to infer chunksize for type={type(df)}.") 

363 

364 if hasattr(df, 'shape'): 

365 size = df.shape[0] 

366 else: 

367 raise NotImplementedError( # pragma: no cover 

368 f"Cannot retrieve size for type={type(df)}.") 

369 

370 def local_iterator(): 

371 "local iterator" 

372 for i in range(0, size, chunksize): 

373 end = min(size, i + chunksize) 

374 yield df[i:end].copy() 

375 return StreamingDataFrame(local_iterator, check_schema=check_schema) 

376 

377 def __iter__(self): 

378 """ 

379 Iterator on a large file with a sliding window. 

380 Each windows is a :epkg:`DataFrame`. 

381 The method stores a copy of the initial iterator 

382 and restores it after the end of the iterations. 

383 If *check_schema* was enabled when calling the constructor, 

384 the method checks that every :epkg:`DataFrame` 

385 follows the same schema as the first chunck. 

386 

387 Even with a big chunk size, it might happen 

388 that consecutive chunks might detect different type 

389 for one particular column. An error message shows up 

390 saying ``Column types are different after row`` 

391 with more information about the column which failed. 

392 In that case, :epkg:`pandas:DataFrame.read_csv` can overwrite 

393 the type on one column by specifying 

394 ``dtype={column_name: new_type}``. It frequently happens 

395 when a string column has many missing values. 

396 """ 

397 iters = self.iter_creation() 

398 sch = None 

399 rows = 0 

400 for it in iters: 

401 if sch is None: 

402 sch = (list(it.columns), list(it.dtypes)) 

403 elif self.check_schema: 

404 if list(it.columns) != sch[0]: # pylint: disable=E1136 

405 raise StreamingDataFrameSchemaError( # pragma: no cover 

406 'Column names are different after row {0}\nFirst chunk: {1}' 

407 '\nCurrent chunk: {2}'.format( 

408 rows, sch[0], list(it.columns))) # pylint: disable=E1136 

409 if list(it.dtypes) != sch[1]: # pylint: disable=E1136 

410 errdf = pandas.DataFrame( 

411 dict(names=sch[0], schema1=sch[1], # pylint: disable=E1136 

412 schema2=list(it.dtypes))) # pylint: disable=E1136 

413 tdf = StringIO() 

414 errdf['diff'] = errdf['schema2'] != errdf['schema1'] 

415 errdf = errdf[errdf['diff']] 

416 errdf.to_csv(tdf, sep=",", index=False) 

417 raise StreamingDataFrameSchemaError( 

418 'Column types are different after row {0}. You may use option ' 

419 'dtype={{"column_name": str}} to force the type on this column.' 

420 '\n---\n{1}'.format(rows, tdf.getvalue())) 

421 

422 rows += it.shape[0] 

423 yield it 

424 

425 @property 

426 def shape(self): 

427 """ 

428 This is the kind of operations you do not want to do 

429 when a file is large because it goes through the whole 

430 stream just to get the number of rows. 

431 """ 

432 nl, nc = 0, 0 

433 for it in self: 

434 nc = max(it.shape[1], nc) 

435 nl += it.shape[0] 

436 return nl, nc 

437 

438 @property 

439 def columns(self): 

440 """ 

441 See :epkg:`pandas:DataFrame:columns`. 

442 """ 

443 for it in self: 

444 return it.columns 

445 # The dataframe is empty. 

446 return [] 

447 

448 @property 

449 def dtypes(self): 

450 """ 

451 See :epkg:`pandas:DataFrame:dtypes`. 

452 """ 

453 for it in self: 

454 return it.dtypes 

455 

456 def to_csv(self, path_or_buf=None, **kwargs) -> 'StreamingDataFrame': 

457 """ 

458 Saves the :epkg:`DataFrame` into string. 

459 See :epkg:`pandas:DataFrame.to_csv`. 

460 """ 

461 if path_or_buf is None: 

462 st = StringIO() 

463 close = False 

464 elif isinstance(path_or_buf, str): 

465 st = open( # pylint: disable=R1732 

466 path_or_buf, "w", encoding=kwargs.get('encoding')) 

467 close = True 

468 else: 

469 st = path_or_buf 

470 close = False 

471 

472 for df in self: 

473 df.to_csv(st, **kwargs) 

474 kwargs['header'] = False 

475 

476 if close: 

477 st.close() 

478 if isinstance(st, StringIO): 

479 return st.getvalue() 

480 return path_or_buf 

481 

482 def to_dataframe(self) -> pandas.DataFrame: 

483 """ 

484 Converts everything into a single :epkg:`DataFrame`. 

485 """ 

486 return pandas.concat(self, axis=0) 

487 

488 def to_df(self) -> pandas.DataFrame: 

489 """ 

490 Converts everything into a single :epkg:`DataFrame`. 

491 """ 

492 return self.to_dataframe() 

493 

494 def iterrows(self): 

495 """ 

496 See :epkg:`pandas:DataFrame:iterrows`. 

497 """ 

498 for df in self: 

499 for it in df.iterrows(): 

500 yield it 

501 

502 def head(self, n=5) -> pandas.DataFrame: 

503 """ 

504 Returns the first rows as a :epkg:`DataFrame`. 

505 """ 

506 st = [] 

507 total = 0 

508 for df in self: 

509 h = df.head(n=n) 

510 total += h.shape[0] 

511 st.append(h) 

512 if total >= n: 

513 break 

514 n -= h.shape[0] 

515 if len(st) == 1: 

516 return st[0] 

517 if len(st) == 0: 

518 return None 

519 return pandas.concat(st, axis=0) 

520 

521 def tail(self, n=5) -> pandas.DataFrame: 

522 """ 

523 Returns the last rows as a :epkg:`DataFrame`. 

524 The size of chunks must be greater than ``n`` to 

525 get ``n`` lines. This method is not efficient 

526 because the whole dataset must be walked through. 

527 """ 

528 for df in self: 

529 h = df.tail(n=n) 

530 return h 

531 

532 def where(self, *args, **kwargs) -> 'StreamingDataFrame': 

533 """ 

534 Applies :epkg:`pandas:DataFrame:where`. 

535 *inplace* must be False. 

536 This function returns a @see cl StreamingDataFrame. 

537 """ 

538 kwargs['inplace'] = False 

539 return StreamingDataFrame( 

540 lambda: map(lambda df: df.where(*args, **kwargs), self), 

541 **self.get_kwargs()) 

542 

543 def sample(self, reservoir=False, cache=False, **kwargs) -> 'StreamingDataFrame': 

544 """ 

545 See :epkg:`pandas:DataFrame:sample`. 

546 Only *frac* is available, otherwise choose 

547 @see me reservoir_sampling. 

548 This function returns a @see cl StreamingDataFrame. 

549 

550 @param reservoir use `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_ 

551 @param cache cache the sample 

552 @param kwargs additional parameters for :epkg:`pandas:DataFrame:sample` 

553 

554 If *cache* is True, the sample is cached (assuming it holds in memory). 

555 The second time an iterator walks through the 

556 """ 

557 if reservoir or 'n' in kwargs: 

558 if 'frac' in kwargs: 

559 raise ValueError( 

560 'frac cannot be specified for reservoir sampling.') 

561 return self._reservoir_sampling(cache=cache, n=kwargs['n'], random_state=kwargs.get('random_state')) 

562 if cache: 

563 sdf = self.sample(cache=False, **kwargs) 

564 df = sdf.to_df() 

565 return StreamingDataFrame.read_df(df, chunksize=df.shape[0]) 

566 return StreamingDataFrame(lambda: map(lambda df: df.sample(**kwargs), self), **self.get_kwargs(), stable=False) 

567 

568 def _reservoir_sampling(self, cache=True, n=1000, random_state=None) -> 'StreamingDataFrame': 

569 """ 

570 Uses the `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_ 

571 algorithm to draw a random sample with exactly *n* samples. 

572 

573 @param cache cache the sample 

574 @param n number of observations to keep 

575 @param random_state sets the random_state 

576 @return @see cl StreamingDataFrame 

577 

578 .. warning:: 

579 The sample is split by chunks of size 1000. 

580 This parameter is not yet exposed. 

581 """ 

582 if not cache: 

583 raise ValueError( 

584 "cache=False is not available for reservoir sampling.") 

585 indices = [] 

586 seen = 0 

587 for i, df in enumerate(self): 

588 for ir, _ in enumerate(df.iterrows()): 

589 seen += 1 

590 if len(indices) < n: 

591 indices.append((i, ir)) 

592 else: 

593 x = nrandom.random() # pylint: disable=E1101 

594 if x * n < (seen - n): 

595 k = nrandom.randint(0, len(indices) - 1) 

596 indices[k] = (i, ir) # pylint: disable=E1126 

597 indices = set(indices) 

598 

599 def reservoir_iterate(sdf, indices, chunksize): 

600 "iterator" 

601 buffer = [] 

602 for i, df in enumerate(self): 

603 for ir, row in enumerate(df.iterrows()): 

604 if (i, ir) in indices: 

605 buffer.append(row) 

606 if len(buffer) >= chunksize: 

607 yield pandas.DataFrame(buffer) 

608 buffer.clear() 

609 if len(buffer) > 0: 

610 yield pandas.DataFrame(buffer) 

611 

612 return StreamingDataFrame( 

613 lambda: reservoir_iterate(sdf=self, indices=indices, chunksize=1000)) 

614 

615 def drop(self, labels=None, *, axis=0, index=None, columns=None, level=None, 

616 inplace=False, errors='raise') -> 'StreamingDataFrame': 

617 """ 

618 Applies :epkg:`pandas:DataFrame:drop`. 

619 This function returns a @see cl StreamingDataFrame. 

620 """ 

621 if axis == 0: 

622 raise NotImplementedError(f"drop is not implemented for axis={axis}.") 

623 if inplace: 

624 raise NotImplementedError(f"drop is not implemented for inplace={inplace}.") 

625 return StreamingDataFrame( 

626 lambda: map(lambda df: df.drop( 

627 labels, axis=axis, index=index, columns=columns, 

628 level=level, inplace=False, errors=errors), self), 

629 **self.get_kwargs()) 

630 

631 def apply(self, *args, **kwargs) -> 'StreamingDataFrame': 

632 """ 

633 Applies :epkg:`pandas:DataFrame:apply`. 

634 This function returns a @see cl StreamingDataFrame. 

635 """ 

636 return StreamingDataFrame( 

637 lambda: map(lambda df: df.apply(*args, **kwargs), self), 

638 **self.get_kwargs()) 

639 

640 def applymap(self, *args, **kwargs) -> 'StreamingDataFrame': 

641 """ 

642 Applies :epkg:`pandas:DataFrame:applymap`. 

643 This function returns a @see cl StreamingDataFrame. 

644 """ 

645 return StreamingDataFrame( 

646 lambda: map(lambda df: df.applymap(*args, **kwargs), self), 

647 **self.get_kwargs()) 

648 

649 def merge(self, right, **kwargs) -> 'StreamingDataFrame': 

650 """ 

651 Merges two @see cl StreamingDataFrame and returns @see cl StreamingDataFrame. 

652 *right* can be either a @see cl StreamingDataFrame or simply 

653 a :epkg:`pandas:DataFrame`. It calls :epkg:`pandas:DataFrame:merge` in 

654 a double loop, loop on *self*, loop on *right*. 

655 """ 

656 if isinstance(right, pandas.DataFrame): 

657 return self.merge(StreamingDataFrame.read_df(right, chunksize=right.shape[0]), **kwargs) 

658 

659 def iterator_merge(sdf1, sdf2, **kw): 

660 "iterate on dataframes" 

661 for df1 in sdf1: 

662 for df2 in sdf2: 

663 df = df1.merge(df2, **kw) 

664 yield df 

665 

666 return StreamingDataFrame( 

667 lambda: iterator_merge(self, right, **kwargs), **self.get_kwargs()) 

668 

669 def concat(self, others, axis=0) -> 'StreamingDataFrame': 

670 """ 

671 Concatenates :epkg:`dataframes`. The function ensures all :epkg:`pandas:DataFrame` 

672 or @see cl StreamingDataFrame share the same columns (name and type). 

673 Otherwise, the function fails as it cannot guess the schema without 

674 walking through all :epkg:`dataframes`. 

675 

676 :param others: list, enumeration, :epkg:`pandas:DataFrame` 

677 :param axis: concatenate by rows (0) or by columns (1) 

678 :return: @see cl StreamingDataFrame 

679 """ 

680 if axis == 1: 

681 return self._concath(others) 

682 if axis == 0: 

683 return self._concatv(others) 

684 raise ValueError("axis must be 0 or 1") # pragma: no cover 

685 

686 def _concath(self, others): 

687 if not isinstance(others, list): 

688 others = [others] 

689 

690 def iterateh(self, others): 

691 cols = tuple([self] + others) 

692 for dfs in zip(*cols): 

693 nrows = [_.shape[0] for _ in dfs] 

694 if min(nrows) != max(nrows): 

695 raise RuntimeError( 

696 "StreamingDataFram cannot merge DataFrame with different size or chunksize") 

697 yield pandas.concat(list(dfs), axis=1) 

698 

699 return StreamingDataFrame(lambda: iterateh(self, others), **self.get_kwargs()) 

700 

701 def _concatv(self, others): 

702 

703 def iterator_concat(this, lothers): 

704 "iterator on dataframes" 

705 columns = None 

706 dtypes = None 

707 for df in this: 

708 if columns is None: 

709 columns = df.columns 

710 dtypes = df.dtypes 

711 yield df 

712 for obj in lothers: 

713 check = True 

714 for i, df in enumerate(obj): 

715 if check: 

716 if list(columns) != list(df.columns): 

717 raise ValueError( 

718 f"Frame others[{i}] do not have the same column names or the same order.") 

719 if list(dtypes) != list(df.dtypes): 

720 raise ValueError( 

721 f"Frame others[{i}] do not have the same column types.") 

722 check = False 

723 yield df 

724 

725 if isinstance(others, pandas.DataFrame): 

726 others = [others] 

727 elif isinstance(others, StreamingDataFrame): 

728 others = [others] 

729 

730 def change_type(obj): 

731 "change column type" 

732 if isinstance(obj, pandas.DataFrame): 

733 return StreamingDataFrame.read_df(obj, obj.shape[0]) 

734 else: 

735 return obj 

736 

737 others = list(map(change_type, others)) 

738 return StreamingDataFrame( 

739 lambda: iterator_concat(self, others), **self.get_kwargs()) 

740 

741 def groupby(self, by=None, lambda_agg=None, lambda_agg_agg=None, 

742 in_memory=True, **kwargs) -> pandas.DataFrame: 

743 """ 

744 Implements the streaming :epkg:`pandas:DataFrame:groupby`. 

745 We assume the result holds in memory. The out-of-memory is 

746 not implemented yet. 

747 

748 @param by see :epkg:`pandas:DataFrame:groupby` 

749 @param in_memory in-memory algorithm 

750 @param lambda_agg aggregation function, *sum* by default 

751 @param lambda_agg_agg to aggregate the aggregations, *sum* by default 

752 @param kwargs additional parameters for :epkg:`pandas:DataFrame:groupby` 

753 @return :epkg:`pandas:DataFrame` 

754 

755 As the input @see cl StreamingDataFrame does not necessarily hold 

756 in memory, the aggregation must be done at every iteration. 

757 There are two levels of aggregation: one to reduce every iterated 

758 :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`. 

759 This second one is always a **sum**. 

760 As a consequence, this function should not compute any *mean* or *count*, 

761 only *sum* because we do not know the size of each iterated 

762 :epkg:`dataframe`. To compute an average, sum and weights must be 

763 aggregated. 

764 

765 Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default. 

766 It could also be ``lambda gr: gr.max()`` or 

767 ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()`` 

768 as it would lead to incoherent results. 

769 

770 .. exref:: 

771 :title: StreamingDataFrame and groupby 

772 :tag: streaming 

773 

774 Here is an example which shows how to write a simple *groupby* 

775 with :epkg:`pandas` and @see cl StreamingDataFrame. 

776 

777 .. runpython:: 

778 :showcode: 

779 

780 from pandas import DataFrame 

781 from pandas_streaming.df import StreamingDataFrame 

782 

783 df = DataFrame(dict(A=[3, 4, 3], B=[5,6, 7])) 

784 sdf = StreamingDataFrame.read_df(df) 

785 

786 # The following: 

787 print(sdf.groupby("A", lambda gr: gr.sum())) 

788 

789 # Is equivalent to: 

790 print(df.groupby("A").sum()) 

791 """ 

792 if not in_memory: 

793 raise NotImplementedError( 

794 "Out-of-memory group by is not implemented.") 

795 if lambda_agg is None: 

796 def lambda_agg_(gr): 

797 "sum" 

798 return gr.sum() 

799 lambda_agg = lambda_agg_ 

800 if lambda_agg_agg is None: 

801 def lambda_agg_agg_(gr): 

802 "sum" 

803 return gr.sum() 

804 lambda_agg_agg = lambda_agg_agg_ 

805 ckw = kwargs.copy() 

806 ckw["as_index"] = False 

807 

808 agg = [] 

809 for df in self: 

810 gr = df.groupby(by=by, **ckw) 

811 agg.append(lambda_agg(gr)) 

812 conc = pandas.concat(agg, sort=False) 

813 return lambda_agg_agg(conc.groupby(by=by, **kwargs)) 

814 

815 def groupby_streaming(self, by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, 

816 strategy='cum', **kwargs) -> pandas.DataFrame: 

817 """ 

818 Implements the streaming :epkg:`pandas:DataFrame:groupby`. 

819 We assume the result holds in memory. The out-of-memory is 

820 not implemented yet. 

821 

822 :param by: see :epkg:`pandas:DataFrame:groupby` 

823 :param in_memory: in-memory algorithm 

824 :param lambda_agg: aggregation function, *sum* by default 

825 :param lambda_agg_agg: to aggregate the aggregations, *sum* by default 

826 :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby` 

827 :param strategy: ``'cum'``, or ``'streaming'``, see below 

828 :return: :epkg:`pandas:DataFrame` 

829 

830 As the input @see cl StreamingDataFrame does not necessarily hold 

831 in memory, the aggregation must be done at every iteration. 

832 There are two levels of aggregation: one to reduce every iterated 

833 :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`. 

834 This second one is always a **sum**. 

835 As a consequence, this function should not compute any *mean* or *count*, 

836 only *sum* because we do not know the size of each iterated 

837 :epkg:`dataframe`. To compute an average, sum and weights must be 

838 aggregated. 

839 

840 Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default. 

841 It could also be ``lambda gr: gr.max()`` or 

842 ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()`` 

843 as it would lead to incoherent results. 

844 

845 Parameter *strategy* allows three scenarios. 

846 First one if ``strategy is None`` goes through 

847 the whole datasets to produce a final :epkg:`DataFrame`. 

848 Second if ``strategy=='cum'`` returns a 

849 @see cl StreamingDataFrame, each iteration produces 

850 the current status of the *group by*. Last case, 

851 ``strategy=='streaming'`` produces :epkg:`DataFrame` 

852 which must be concatenated into a single :epkg:`DataFrame` 

853 and grouped again to get the results. 

854 

855 .. exref:: 

856 :title: StreamingDataFrame and groupby 

857 :tag: streaming 

858 

859 Here is an example which shows how to write a simple *groupby* 

860 with :epkg:`pandas` and @see cl StreamingDataFrame. 

861 

862 .. runpython:: 

863 :showcode: 

864 

865 from pandas import DataFrame 

866 from pandas_streaming.df import StreamingDataFrame 

867 from pandas_streaming.data import dummy_streaming_dataframe 

868 

869 df20 = dummy_streaming_dataframe(20).to_dataframe() 

870 df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0) 

871 sdf20 = StreamingDataFrame.read_df(df20, chunksize=5) 

872 sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(), 

873 strategy='cum', as_index=False) 

874 for gr in sgr: 

875 print() 

876 print(gr) 

877 """ 

878 if not in_memory: 

879 raise NotImplementedError( 

880 "Out-of-memory group by is not implemented.") 

881 if lambda_agg is None: 

882 def lambda_agg_(gr): 

883 "sum" 

884 return gr.sum() 

885 lambda_agg = lambda_agg_ 

886 if lambda_agg_agg is None: 

887 def lambda_agg_agg_(gr): 

888 "sum" 

889 return gr.sum() 

890 lambda_agg_agg = lambda_agg_agg_ 

891 ckw = kwargs.copy() 

892 ckw["as_index"] = False 

893 

894 if strategy == 'cum': 

895 def iterate_cum(): 

896 agg = None 

897 for df in self: 

898 gr = df.groupby(by=by, **ckw) 

899 gragg = lambda_agg(gr) 

900 if agg is None: 

901 yield lambda_agg_agg(gragg.groupby(by=by, **kwargs)) 

902 agg = gragg 

903 else: 

904 lagg = pandas.concat([agg, gragg], sort=False) 

905 yield lambda_agg_agg(lagg.groupby(by=by, **kwargs)) 

906 agg = lagg 

907 return StreamingDataFrame(lambda: iterate_cum(), **self.get_kwargs()) 

908 

909 if strategy == 'streaming': 

910 def iterate_streaming(): 

911 for df in self: 

912 gr = df.groupby(by=by, **ckw) 

913 gragg = lambda_agg(gr) 

914 yield lambda_agg(gragg.groupby(by=by, **kwargs)) 

915 return StreamingDataFrame(lambda: iterate_streaming(), **self.get_kwargs()) 

916 

917 raise ValueError( # pragma: no cover 

918 f"Unknown strategy '{strategy}'") 

919 

920 def ensure_dtype(self, df, dtypes): 

921 """ 

922 Ensures the :epkg:`dataframe` *df* has types indicated in dtypes. 

923 Changes it if not. 

924 

925 :param df: dataframe 

926 :param dtypes: list of types 

927 :return: updated? 

928 """ 

929 ch = False 

930 cols = df.columns 

931 for i, (has, exp) in enumerate(zip(df.dtypes, dtypes)): 

932 if has != exp: 

933 name = cols[i] 

934 df[name] = df[name].astype(exp) 

935 ch = True 

936 return ch 

937 

938 def __getitem__(self, *args): 

939 """ 

940 Implements some of the functionalities :epkg:`pandas` 

941 offers for the operator ``[]``. 

942 """ 

943 if len(args) != 1: 

944 raise NotImplementedError( # pragma: no cover 

945 "Only a list of columns is supported.") 

946 cols = args[0] 

947 if isinstance(cols, str): 

948 # One column. 

949 iter_creation = self.iter_creation 

950 

951 def iterate_col(): 

952 "iterate on one column" 

953 one_col = [cols] 

954 for df in iter_creation(): 

955 yield df[one_col] 

956 return StreamingSeries(iterate_col, **self.get_kwargs()) 

957 

958 if not isinstance(cols, list): 

959 raise NotImplementedError("Only a list of columns is supported.") 

960 

961 def iterate_cols(sdf): 

962 """Iterate on columns.""" 

963 for df in sdf: 

964 yield df[cols] 

965 

966 return StreamingDataFrame(lambda: iterate_cols(self), **self.get_kwargs()) 

967 

968 def __setitem__(self, index, value): 

969 """ 

970 Limited set of operators are supported. 

971 """ 

972 if not isinstance(index, str): 

973 raise ValueError( 

974 f"Only column affected are supported but index={index!r}.") 

975 if isinstance(value, (int, float, numpy.number, str)): 

976 # Is is equivalent to add_column. 

977 iter_creation = self.iter_creation 

978 

979 def iterate_fct(): 

980 "iterate on rows" 

981 iters = iter_creation() 

982 for df in iters: 

983 dfc = df.copy() 

984 dfc[index] = value 

985 yield dfc 

986 

987 self.iter_creation = iterate_fct 

988 

989 elif isinstance(value, StreamingSeries): 

990 iter_creation = self.iter_creation 

991 

992 def iterate_fct(): 

993 "iterate on rows" 

994 iters = iter_creation() 

995 for df, dfs in zip(iters, value): 

996 if df.shape[0] != dfs.shape[0]: 

997 raise RuntimeError( 

998 "Chunksize or shape are different when " 

999 "iterating on two StreamDataFrame at the same " 

1000 "time: %r != %r." % (df.shape[0], dfs.shape[0])) 

1001 dfc = df.copy() 

1002 dfc[index] = dfs 

1003 yield dfc 

1004 

1005 self.iter_creation = iterate_fct 

1006 else: 

1007 raise NotImplementedError( 

1008 "Not implemented for type(index)=%r and type(value)=%r." % ( 

1009 type(index), type(value))) 

1010 

1011 def add_column(self, col, value): 

1012 """ 

1013 Implements some of the functionalities :epkg:`pandas` 

1014 offers for the operator ``[]``. 

1015 

1016 @param col new column 

1017 @param value @see cl StreamingDataFrame or a lambda function 

1018 @return @see cl StreamingDataFrame 

1019 

1020 ..note:: 

1021 

1022 If value is a @see cl StreamingDataFrame, 

1023 *chunksize* must be the same for both. 

1024 

1025 .. exref:: 

1026 :title: Add a new column to a StreamingDataFrame 

1027 :tag: streaming 

1028 

1029 .. runpython:: 

1030 :showcode: 

1031 

1032 from pandas import DataFrame 

1033 from pandas_streaming.df import StreamingDataFrame 

1034 

1035 df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"])) 

1036 sdf = StreamingDataFrame.read_df(df) 

1037 sdf2 = sdf.add_column("d", lambda row: int(1)) 

1038 print(sdf2.to_dataframe()) 

1039 

1040 sdf2 = sdf.add_column("d", lambda row: int(1)) 

1041 print(sdf2.to_dataframe()) 

1042 

1043 """ 

1044 if not isinstance(col, str): 

1045 raise NotImplementedError( # pragma: no cover 

1046 "Only a column as a string is supported.") 

1047 

1048 if isfunction(value): 

1049 def iterate_fct(self, value, col): 

1050 "iterate on rows" 

1051 for df in self: 

1052 dfc = df.copy() 

1053 dfc.insert(dfc.shape[1], col, dfc.apply(value, axis=1)) 

1054 yield dfc 

1055 

1056 return StreamingDataFrame(lambda: iterate_fct(self, value, col), **self.get_kwargs()) 

1057 

1058 if isinstance(value, (pandas.Series, pandas.DataFrame, StreamingDataFrame)): 

1059 raise NotImplementedError( 

1060 "Unable set a new column based on a datadframe.") 

1061 

1062 def iterate_cst(self, value, col): 

1063 "iterate on rows" 

1064 for df in self: 

1065 dfc = df.copy() 

1066 dfc[col] = value 

1067 yield dfc 

1068 

1069 return StreamingDataFrame( 

1070 lambda: iterate_cst(self, value, col), **self.get_kwargs()) 

1071 

1072 def fillna(self, **kwargs): 

1073 """ 

1074 Replaces the missing values, calls 

1075 :epkg:`pandas:DataFrame:fillna`. 

1076 

1077 @param kwargs see :epkg:`pandas:DataFrame:fillna` 

1078 @return @see cl StreamingDataFrame 

1079 

1080 .. warning:: 

1081 The function does not check what happens at the 

1082 limit of every chunk of data. Anything but a constant value 

1083 will probably have an inconsistent behaviour. 

1084 """ 

1085 

1086 def iterate_na(self, **kwargs): 

1087 "iterate on rows" 

1088 if kwargs.get('inplace', True): 

1089 kwargs['inplace'] = True 

1090 for df in self: 

1091 df.fillna(**kwargs) 

1092 yield df 

1093 else: 

1094 for df in self: 

1095 yield df.fillna(**kwargs) 

1096 

1097 return StreamingDataFrame( 

1098 lambda: iterate_na(self, **kwargs), **self.get_kwargs()) 

1099 

1100 def describe(self, percentiles=None, include=None, exclude=None): 

1101 """ 

1102 Calls :epkg:`pandas:DataFrame:describe` on every piece 

1103 of the datasets. *percentiles* are not really accurate 

1104 but just an indication. 

1105 

1106 :param percentiles: see :epkg:`pandas:DataFrame:describe` 

1107 :param include: see :epkg:`pandas:DataFrame:describe` 

1108 :param exclude: see :epkg:`pandas:DataFrame:describe` 

1109 :return: :epkg:`pandas:DataFrame:describe` 

1110 

1111 .. versionchanged:: 0.3.219 

1112 

1113 Parameter *datetime_is_numeric* was removed 

1114 (see :epkg:`pandas:DataFrame:describe`). 

1115 """ 

1116 merged = None 

1117 stack = [] 

1118 notper = ['count', 'mean', 'std'] 

1119 for df in self: 

1120 desc = df.describe( 

1121 percentiles=percentiles, include=include, exclude=exclude) 

1122 count = desc.loc['count', :] 

1123 rows = [name for name in desc.index if name not in notper] 

1124 stack.append(desc.loc[rows, :]) 

1125 if merged is None: 

1126 merged = desc 

1127 merged.loc['std', :] = ( 

1128 merged.loc['std', :] ** 2 + merged.loc['mean', :] ** 2) * count 

1129 merged.loc['mean', :] *= count 

1130 else: 

1131 merged.loc['count', :] += desc.loc['count', :] 

1132 merged.loc['mean', :] += desc.loc['mean', :] * count 

1133 merged.loc['std', :] += ( 

1134 desc.loc['std', :] ** 2 + desc.loc['mean', :] ** 2) * count 

1135 merged.loc['max', :] = numpy.maximum( 

1136 merged.loc['max', :], desc.loc['max', :]) 

1137 merged.loc['min', :] = numpy.maximum( 

1138 merged.loc['min', :], desc.loc['min', :]) 

1139 merged.loc['mean', :] /= merged.loc['count', :] 

1140 merged.loc['std', :] = ( 

1141 merged.loc['std', :] / merged.loc['count', :] - 

1142 merged.loc['mean', :] ** 2) ** 0.5 

1143 values = pandas.concat(stack) 

1144 summary = values.describe(percentiles=percentiles) 

1145 merged = merged.loc[notper, :] 

1146 rows = [name for name in summary.index if name not in notper] 

1147 summary = summary.loc[rows, :] 

1148 return pandas.concat([merged, summary]) 

1149 

1150 def sort_values(self, by, axis=0, ascending=True, kind='quicksort', 

1151 na_position='last', 

1152 temp_file='_pandas_streaming_sort_values_'): 

1153 """ 

1154 Sorts the streaming dataframe by values. 

1155 

1156 :param by: one column 

1157 :param ascending: order 

1158 :param kind: see :meth:`pandas.DataFrame.sort_values` 

1159 :param na_position: see :meth:`pandas.DataFrame.sort_values` 

1160 :param temp_file: sorting a whole database is impossible 

1161 without storing intermediate results on disk 

1162 unless it can fit into the memory, but in that case, 

1163 it is easier to convert the streaming database into 

1164 a dataframe and sort it 

1165 :return: streaming database 

1166 """ 

1167 if not isinstance(by, str): 

1168 raise NotImplementedError( # pragma: no cover 

1169 f"Only one column can be used to sort not {by!r}.") 

1170 keys = {} 

1171 nans = [] 

1172 indices = [] 

1173 with open(temp_file, 'wb') as f: 

1174 for df in self: 

1175 dfs = df.sort_values(by, ascending=ascending, kind=kind, 

1176 na_position=na_position) 

1177 for tu in dfs[by]: 

1178 if isinstance(tu, float) and numpy.isnan(tu): 

1179 nans.append(len(indices)) 

1180 else: 

1181 if tu not in keys: 

1182 keys[tu] = [] 

1183 keys[tu].append(len(indices)) 

1184 indices.append(f.tell()) 

1185 st = BytesIO() 

1186 pickle.dump(dfs, st) 

1187 f.write(st.getvalue()) 

1188 

1189 indices.append(f.tell()) 

1190 

1191 values = list(keys.items()) 

1192 values.sort(reverse=not ascending) 

1193 

1194 def iterate(): 

1195 

1196 with open(temp_file, 'rb') as f: 

1197 

1198 if na_position == 'first': 

1199 for p in nans: 

1200 f.seek(indices[p]) 

1201 length = indices[p + 1] - indices[p] 

1202 pkl = f.read(length) 

1203 dfs = pickle.load(BytesIO(pkl)) 

1204 sub = dfs[numpy.isnan(dfs[by])] 

1205 yield sub 

1206 

1207 for key, positions in values: 

1208 for p in positions: 

1209 f.seek(indices[p]) 

1210 length = indices[p + 1] - indices[p] 

1211 pkl = f.read(length) 

1212 dfs = pickle.load(BytesIO(pkl)) 

1213 sub = dfs[dfs[by] == key] 

1214 yield sub 

1215 

1216 if na_position == 'last': 

1217 for p in nans: 

1218 f.seek(indices[p]) 

1219 length = indices[p + 1] - indices[p] 

1220 pkl = f.read(length) 

1221 dfs = pickle.load(BytesIO(pkl)) 

1222 sub = dfs[numpy.isnan(dfs[by])] 

1223 yield sub 

1224 

1225 res = StreamingDataFrame( 

1226 lambda: iterate(), **self.get_kwargs()) 

1227 res._delete_.append(lambda: os.remove(temp_file)) 

1228 return res 

1229 

1230 def __del__(self): 

1231 """ 

1232 Calls every function in `_delete_`. 

1233 """ 

1234 for f in self._delete_: 

1235 f() 

1236 

1237 

1238class StreamingSeries(StreamingDataFrame): 

1239 """ 

1240 Seens as a @see cl StreamingDataFrame of one column. 

1241 """ 

1242 

1243 def __init__(self, iter_creation, check_schema=True, stable=True): 

1244 StreamingDataFrame.__init__( 

1245 self, iter_creation, check_schema=check_schema, stable=stable) 

1246 if len(self.columns) != 1: 

1247 raise RuntimeError( # pragma: no cover 

1248 f"A series can contain only one column not " 

1249 f"{len(self.columns)!r}.") 

1250 

1251 def apply(self, *args, **kwargs) -> 'StreamingDataFrame': 

1252 """ 

1253 Applies :epkg:`pandas:Series:apply`. 

1254 This function returns a @see cl StreamingSeries. 

1255 """ 

1256 return StreamingSeries( 

1257 lambda: map(lambda df: df.apply(*args, **kwargs), self), 

1258 **self.get_kwargs()) 

1259 

1260 def __add__(self, value): 

1261 """ 

1262 Does an addition on every value hoping that has a meaning. 

1263 

1264 :param value: any value which makes sense 

1265 :return: a new series 

1266 """ 

1267 def iterate(): 

1268 for df in self: 

1269 yield df + value 

1270 

1271 return StreamingSeries(iterate, **self.get_kwargs())