Coverage for pandas_streaming/df/dataframe_helpers.py: 85%

203 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 14:15 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Helpers for dataframes. 

5""" 

6import hashlib 

7import struct 

8import warnings 

9import numpy 

10from pandas import DataFrame, Index, Series 

11 

12 

13def numpy_types(): 

14 """ 

15 Returns the list of :epkg:`numpy` available types. 

16 

17 :return: list of types 

18 """ 

19 

20 return [numpy.bool_, 

21 numpy.int_, 

22 numpy.intc, 

23 numpy.intp, 

24 numpy.int8, 

25 numpy.int16, 

26 numpy.int32, 

27 numpy.int64, 

28 numpy.uint8, 

29 numpy.uint16, 

30 numpy.uint32, 

31 numpy.uint64, 

32 numpy.float_, 

33 numpy.float16, 

34 numpy.float32, 

35 numpy.float64, 

36 numpy.complex_, 

37 numpy.complex64, 

38 numpy.complex128] 

39 

40 

41def hash_str(c, hash_length): 

42 """ 

43 Hashes a string. 

44 

45 @param c value to hash 

46 @param hash_length hash_length 

47 @return string 

48 """ 

49 if isinstance(c, float): 

50 if numpy.isnan(c): 

51 return c 

52 raise ValueError(f"numpy.nan expected, not {c}") 

53 m = hashlib.sha256() 

54 m.update(c.encode("utf-8")) 

55 r = m.hexdigest() 

56 if len(r) >= hash_length: 

57 return r[:hash_length] 

58 return r 

59 

60 

61def hash_int(c, hash_length): 

62 """ 

63 Hashes an integer into an integer. 

64 

65 @param c value to hash 

66 @param hash_length hash_length 

67 @return int 

68 """ 

69 if isinstance(c, float): 

70 if numpy.isnan(c): 

71 return c 

72 else: 

73 raise ValueError(f"numpy.nan expected, not {c}") 

74 else: 

75 b = struct.pack("i", c) 

76 m = hashlib.sha256() 

77 m.update(b) 

78 r = m.hexdigest() 

79 if len(r) >= hash_length: 

80 r = r[:hash_length] 

81 return int(r, 16) % (10 ** 8) 

82 

83 

84def hash_float(c, hash_length): 

85 """ 

86 Hashes a float into a float. 

87 

88 @param c value to hash 

89 @param hash_length hash_length 

90 @return int 

91 """ 

92 if numpy.isnan(c): 

93 return c 

94 else: 

95 b = struct.pack("d", c) 

96 m = hashlib.sha256() 

97 m.update(b) 

98 r = m.hexdigest() 

99 if len(r) >= hash_length: 

100 r = r[:hash_length] 

101 i = int(r, 16) % (2 ** 53) 

102 return float(i) 

103 

104 

105def dataframe_hash_columns(df, cols=None, hash_length=10, inplace=False): 

106 """ 

107 Hashes a set of columns in a dataframe. 

108 Keeps the same type. Skips missing values. 

109 

110 @param df dataframe 

111 @param cols columns to hash or None for alls. 

112 @param hash_length for strings only, length of the hash 

113 @param inplace modifies inplace 

114 @return new dataframe 

115 

116 This might be useful to anonimized data before 

117 making it public. 

118 

119 .. exref:: 

120 :title: Hashes a set of columns in a dataframe 

121 :tag: dataframe 

122 

123 .. runpython:: 

124 :showcode: 

125 

126 import pandas 

127 from pandas_streaming.df import dataframe_hash_columns 

128 df = pandas.DataFrame([dict(a=1, b="e", c=5.6, ind="a1", ai=1), 

129 dict(b="f", c=5.7, ind="a2", ai=2), 

130 dict(a=4, b="g", ind="a3", ai=3), 

131 dict(a=8, b="h", c=5.9, ai=4), 

132 dict(a=16, b="i", c=6.2, ind="a5", ai=5)]) 

133 print(df) 

134 print('--------------') 

135 df2 = dataframe_hash_columns(df) 

136 print(df2) 

137 """ 

138 if cols is None: 

139 cols = list(df.columns) 

140 

141 if not inplace: 

142 df = df.copy() 

143 

144 def hash_intl(c): 

145 "hash int" 

146 return hash_int(c, hash_length) 

147 

148 def hash_strl(c): 

149 "hash string" 

150 return hash_str(c, hash_length) 

151 

152 def hash_floatl(c): 

153 "hash float" 

154 return hash_float(c, hash_length) 

155 

156 coltype = {n: t for n, t in zip( # pylint: disable=R1721 

157 df.columns, df.dtypes)} # pylint: disable=R1721 

158 for c in cols: 

159 t = coltype[c] 

160 if t == int: 

161 df[c] = df[c].apply(hash_intl) 

162 elif t == numpy.int64: 

163 df[c] = df[c].apply(lambda x: numpy.int64(hash_intl(x))) 

164 elif t == float: 

165 df[c] = df[c].apply(hash_floatl) 

166 elif t == object: 

167 df[c] = df[c].apply(hash_strl) 

168 else: 

169 raise NotImplementedError( # pragma: no cover 

170 f"Conversion of type {t} in column '{c}' is not implemented") 

171 

172 return df 

173 

174 

175def dataframe_unfold(df, col, new_col=None, sep=","): 

176 """ 

177 One column may contain concatenated values. 

178 This function splits these values and multiplies the 

179 rows for each split value. 

180 

181 @param df dataframe 

182 @param col column with the concatenated values (strings) 

183 @param new_col new column name, if None, use default value. 

184 @param sep separator 

185 @return a new dataframe 

186 

187 .. exref:: 

188 :title: Unfolds a column of a dataframe. 

189 :tag: dataframe 

190 

191 .. runpython:: 

192 :showcode: 

193 

194 import pandas 

195 import numpy 

196 from pandas_streaming.df import dataframe_unfold 

197 

198 df = pandas.DataFrame([dict(a=1, b="e,f"), 

199 dict(a=2, b="g"), 

200 dict(a=3)]) 

201 print(df) 

202 df2 = dataframe_unfold(df, "b") 

203 print('----------') 

204 print(df2) 

205 

206 # To fold: 

207 folded = df2.groupby('a').apply(lambda row: ','.join(row['b_unfold'].dropna()) \\ 

208 if len(row['b_unfold'].dropna()) > 0 else numpy.nan) 

209 print('----------') 

210 print(folded) 

211 """ 

212 if new_col is None: 

213 col_name = col + "_unfold" 

214 else: 

215 col_name = new_col 

216 temp_col = '__index__' 

217 while temp_col in df.columns: 

218 temp_col += "_" 

219 rows = [] 

220 for i, v in enumerate(df[col]): 

221 if isinstance(v, str): 

222 spl = v.split(sep) 

223 for vs in spl: 

224 rows.append({col: v, col_name: vs, temp_col: i}) 

225 else: 

226 rows.append({col: v, col_name: v, temp_col: i}) 

227 df = df.copy() 

228 df[temp_col] = list(range(df.shape[0])) 

229 dfj = DataFrame(rows) 

230 res = df.merge(dfj, on=[col, temp_col]) 

231 return res.drop(temp_col, axis=1).copy() 

232 

233 

234def dataframe_shuffle(df, random_state=None): 

235 """ 

236 Shuffles a dataframe. 

237 

238 :param df: :epkg:`pandas:DataFrame` 

239 :param random_state: seed 

240 :return: new :epkg:`pandas:DataFrame` 

241 

242 .. exref:: 

243 :title: Shuffles the rows of a dataframe 

244 :tag: dataframe 

245 

246 .. runpython:: 

247 :showcode: 

248 

249 import pandas 

250 from pandas_streaming.df import dataframe_shuffle 

251 

252 df = pandas.DataFrame([dict(a=1, b="e", c=5.6, ind="a1"), 

253 dict(a=2, b="f", c=5.7, ind="a2"), 

254 dict(a=4, b="g", c=5.8, ind="a3"), 

255 dict(a=8, b="h", c=5.9, ind="a4"), 

256 dict(a=16, b="i", c=6.2, ind="a5")]) 

257 print(df) 

258 print('----------') 

259 

260 shuffled = dataframe_shuffle(df, random_state=0) 

261 print(shuffled) 

262 """ 

263 if random_state is not None: 

264 state = numpy.random.RandomState(random_state) 

265 permutation = state.permutation 

266 else: 

267 permutation = numpy.random.permutation 

268 ori_cols = list(df.columns) 

269 scols = set(ori_cols) 

270 

271 no_index = df.reset_index(drop=False) 

272 keep_cols = [_ for _ in no_index.columns if _ not in scols] 

273 index = no_index.index 

274 index = permutation(index) 

275 shuffled = no_index.iloc[index, :] 

276 res = shuffled.set_index(keep_cols)[ori_cols] 

277 res.index.names = df.index.names 

278 return res 

279 

280 

281def pandas_fillna(df, by, hasna=None, suffix=None): 

282 """ 

283 Replaces the :epkg:`nan` values for something not :epkg:`nan`. 

284 Mostly used by @see fn pandas_groupby_nan. 

285 

286 :param df: dataframe 

287 :param by: list of columns for which we need to replace nan 

288 :param hasna: None or list of columns for which we need to replace NaN 

289 :param suffix: use a prefix for the NaN value 

290 :return: list of values chosen for each column, new dataframe (new copy) 

291 """ 

292 suffix = suffix if suffix else "²nan" 

293 df = df.copy() 

294 rep = {} 

295 for c in by: 

296 if hasna is not None and c not in hasna: 

297 continue 

298 if df[c].dtype in (str, bytes, object): 

299 se = set(df[c].dropna()) 

300 val = se.pop() 

301 if isinstance(val, str): 

302 cst = suffix 

303 val = "" 

304 elif isinstance(val, bytes): 

305 cst = b"_" 

306 else: 

307 raise TypeError( # pragma: no cover 

308 "Unable to determine a constant for type='{0}' dtype='{1}'".format( 

309 val, df[c].dtype)) 

310 val += cst 

311 while val in se: 

312 val += suffix 

313 df[c].fillna(val, inplace=True) 

314 rep[c] = val 

315 else: 

316 dr = df[c].dropna() 

317 mi = abs(dr.min()) 

318 ma = abs(dr.max()) 

319 val = ma + mi 

320 if val == ma and not isinstance(val, str): 

321 val += ma + 1. 

322 if val <= ma: 

323 raise ValueError( # pragma: no cover 

324 "Unable to find a different value for column '{}' v='{}: " 

325 "min={} max={}".format(c, val, mi, ma)) 

326 df[c].fillna(val, inplace=True) 

327 rep[c] = val 

328 return rep, df 

329 

330 

331def pandas_groupby_nan(df, by, axis=0, as_index=False, suffix=None, nanback=True, **kwargs): 

332 """ 

333 Does a *groupby* including keeping missing values (:epkg:`nan`). 

334 

335 :param df: dataframe 

336 :param by: column or list of columns 

337 :param axis: only 0 is allowed 

338 :param as_index: should be False 

339 :param suffix: None or a string 

340 :param nanback: put :epkg:`nan` back in the index, 

341 otherwise it leaves a replacement for :epkg:`nan`. 

342 (does not work when grouping by multiple columns) 

343 :param kwargs: other parameters sent to 

344 `groupby <http://pandas.pydata.org/pandas-docs/stable/ 

345 generated/pandas.DataFrame.groupby.html>`_ 

346 :return: groupby results 

347 

348 See `groupby and missing values <http://pandas-docs.github.io/ 

349 pandas-docs-travis/groupby.html#na-and-nat-group-handling>`_. 

350 If no :epkg:`nan` is detected, the function falls back in regular 

351 :epkg:`pandas:DataFrame:groupby` which has the following 

352 behavior. 

353 

354 .. exref:: 

355 :title: Group a dataframe by one column including nan values 

356 :tag: dataframe 

357 

358 The regular :epkg:`pandas:dataframe:GroupBy` of a 

359 :epkg:`pandas:DataFrame` removes every :epkg:`nan` 

360 values from the index. 

361 

362 .. runpython:: 

363 :showcode: 

364 

365 from pandas import DataFrame 

366 

367 data = [dict(a=2, ind="a", n=1), 

368 dict(a=2, ind="a"), 

369 dict(a=3, ind="b"), 

370 dict(a=30)] 

371 df = DataFrame(data) 

372 print(df) 

373 gr = df.groupby(["ind"]).sum() 

374 print(gr) 

375 

376 Function @see fn pandas_groupby_nan modifies keeps them. 

377 

378 .. runpython:: 

379 :showcode: 

380 

381 from pandas import DataFrame 

382 from pandas_streaming.df import pandas_groupby_nan 

383 

384 data = [dict(a=2, ind="a", n=1), 

385 dict(a=2, ind="a"), 

386 dict(a=3, ind="b"), 

387 dict(a=30)] 

388 df = DataFrame(data) 

389 gr2 = pandas_groupby_nan(df, ["ind"]).sum() 

390 print(gr2) 

391 """ 

392 if nanback and suffix is None: 

393 try: 

394 res = df.groupby(by, axis=axis, as_index=as_index, 

395 dropna=False, **kwargs) 

396 except TypeError: 

397 # old version of pandas 

398 res = None 

399 if res is not None: 

400 if suffix is None: 

401 return res 

402 res.index = Series(res.index).replace(numpy.nan, suffix) 

403 return res 

404 if axis != 0: 

405 raise NotImplementedError("axis should be 0") 

406 if as_index: 

407 raise NotImplementedError("as_index must be False") 

408 if isinstance(by, tuple): 

409 raise TypeError("by should be of list not tuple") 

410 if not isinstance(by, list): 

411 by = [by] 

412 hasna = {} 

413 for b in by: 

414 h = df[b].isnull().values.any() 

415 if h: 

416 hasna[b] = True 

417 if len(hasna) > 0: 

418 rep, df_copy = pandas_fillna(df, by, hasna, suffix=suffix) 

419 res = df_copy.groupby(by, axis=axis, as_index=as_index, **kwargs) 

420 if len(by) == 1: 

421 if not nanback: 

422 dummy = DataFrame([{"a": "a"}]) 

423 do = dummy.dtypes[0] 

424 typ = {c: t for c, t in zip( # pylint: disable=R1721 

425 df.columns, df.dtypes)} # pylint: disable=R1721 

426 if typ[by[0]] != do: 

427 warnings.warn( # pragma: no cover 

428 f"[pandas_groupby_nan] NaN value: {rep}") 

429 return res 

430 for b in by: 

431 fnan = rep[b] 

432 if fnan in res.grouper.groups: 

433 res.grouper.groups[numpy.nan] = res.grouper.groups[fnan] 

434 del res.grouper.groups[fnan] 

435 new_val = list((numpy.nan if b == fnan else b) 

436 for b in res.grouper.result_index) 

437 res.grouper.groupings[0]._group_index = Index(new_val) 

438 res.grouper.groupings[0].obj[b].replace( 

439 fnan, numpy.nan, inplace=True) 

440 if hasattr(res.grouper, 'grouping'): 

441 if isinstance(res.grouper.groupings[0].grouper, numpy.ndarray): 

442 arr = numpy.array(new_val) 

443 res.grouper.groupings[0].grouper = arr 

444 if (hasattr(res.grouper.groupings[0], '_cache') and 

445 'result_index' in res.grouper.groupings[0]._cache): 

446 del res.grouper.groupings[0]._cache['result_index'] 

447 else: 

448 raise NotImplementedError("Not implemented for type: {0}".format( 

449 type(res.grouper.groupings[0].grouper))) 

450 else: 

451 grouper = res.grouper._get_grouper() 

452 if isinstance(grouper, numpy.ndarray): 

453 arr = numpy.array(new_val) 

454 res.grouper.groupings[0].grouping_vector = arr 

455 if (hasattr(res.grouper.groupings[0], '_cache') and 

456 'result_index' in res.grouper.groupings[0]._cache): 

457 index = res.grouper.groupings[0]._cache['result_index'] 

458 if len(rep) == 1: 

459 key = list(rep.values())[0] 

460 new_index = numpy.array(index) 

461 for i in range(0, len(new_index)): # pylint: disable=C0200 

462 if new_index[i] == key: 

463 new_index[i] = numpy.nan 

464 res.grouper.groupings[0]._cache['result_index'] = ( 

465 index.__class__(new_index)) 

466 else: 

467 raise NotImplementedError( # pragma: no cover 

468 "NaN values not implemented for multiindex.") 

469 else: 

470 raise NotImplementedError( # pragma: no cover 

471 "Not implemented for type: {0}".format( 

472 type(res.grouper.groupings[0].grouper))) 

473 res.grouper._cache['result_index'] = res.grouper.groupings[0]._group_index 

474 else: 

475 if not nanback: 

476 dummy = DataFrame([{"a": "a"}]) 

477 do = dummy.dtypes[0] 

478 typ = {c: t for c, t in zip( # pylint: disable=R1721 

479 df.columns, df.dtypes)} # pylint: disable=R1721 

480 for b in by: 

481 if typ[b] != do: 

482 warnings.warn( # pragma: no cover 

483 f"[pandas_groupby_nan] NaN values: {rep}") 

484 break 

485 return res 

486 raise NotImplementedError( 

487 "Not yet implemented. Replacing pseudo nan values by real nan " 

488 "values is not as easy as it looks. Use nanback=False") 

489 

490 # keys = list(res.grouper.groups.keys()) 

491 # didit = False 

492 # mapping = {} 

493 # for key in keys: 

494 # new_key = list(key) 

495 # mod = False 

496 # for k, b in enumerate(by): 

497 # if b not in rep: 

498 # continue 

499 # fnan = rep[b] 

500 # if key[k] == fnan: 

501 # new_key[k] = numpy.nan 

502 # mod = True 

503 # didit = True 

504 # mapping[fnan] = numpy.nan 

505 # if mod: 

506 # new_key = tuple(new_key) 

507 # mapping[key] = new_key 

508 # res.grouper.groups[new_key] = res.grouper.groups[key] 

509 # del res.grouper.groups[key] 

510 # if didit: 

511 # # this code deos not work 

512 # vnan = numpy.nan 

513 # new_index = list(mapping.get(v, v) 

514 # for v in res.grouper.result_index) 

515 # names = res.grouper.result_index.names 

516 # # index = MultiIndex.from_tuples(tuples=new_index, names=names) 

517 # # res.grouper.result_index = index # does not work cannot set 

518 # # values for [result_index] 

519 # for k in range(len(res.grouper.groupings)): 

520 # grou = res.grouper.groupings[k] 

521 # new_val = list(mapping.get(v, v) for v in grou) 

522 # grou._group_index = Index(new_val) 

523 # b = names[k] 

524 # if b in rep: 

525 # vv = rep[b] 

526 # grou.obj[b].replace(vv, vnan, inplace=True) 

527 # if isinstance(grou.grouper, numpy.ndarray): 

528 # grou.grouper = numpy.array(new_val) 

529 # else: 

530 # raise NotImplementedError( 

531 # "Not implemented for type: {0}".format(type(grou.grouper))) 

532 # del res.grouper._cache 

533 return res 

534 return df.groupby(by, axis=axis, **kwargs)