Coverage for pandas_streaming/df/connex_split.py: 98%

209 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 14:15 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Implements a connex split between train and test. 

5""" 

6from collections import Counter 

7import pandas 

8import numpy 

9from sklearn.model_selection import train_test_split 

10from .dataframe_helpers import dataframe_shuffle 

11 

12 

13class ImbalancedSplitException(Exception): 

14 """ 

15 Raised when an imbalanced split is detected. 

16 """ 

17 pass 

18 

19 

20def train_test_split_weights(df, weights=None, test_size=0.25, train_size=None, 

21 shuffle=True, fail_imbalanced=0.05, random_state=None): 

22 """ 

23 Splits a database in train/test given, every row 

24 can have a different weight. 

25 

26 @param df :epkg:`pandas:DataFrame` or @see cl StreamingDataFrame 

27 @param weights None or weights or weights column name 

28 @param test_size ratio for the test partition (if *train_size* is not specified) 

29 @param train_size ratio for the train partition 

30 @param shuffle shuffles before the split 

31 @param fail_imbalanced raises an exception if relative weights difference is higher than this value 

32 @param random_state seed for random generators 

33 @return train and test :epkg:`pandas:DataFrame` 

34 

35 If the dataframe is not shuffled first, the function 

36 will produce two datasets which are unlikely to be randomized 

37 as the function tries to keep equal weights among both paths 

38 without using randomness. 

39 """ 

40 if hasattr(df, 'iter_creation'): 

41 raise NotImplementedError( # pragma: no cover 

42 'Not implemented yet for StreamingDataFrame.') 

43 if isinstance(df, numpy.ndarray): 

44 raise NotImplementedError( # pragma: no cover 

45 "Not implemented on numpy arrays.") 

46 if shuffle: 

47 df = dataframe_shuffle(df, random_state=random_state) 

48 if weights is None: 

49 if test_size == 0 or train_size == 0: 

50 raise ValueError( 

51 f"test_size={test_size} or train_size={train_size} cannot be null (1).") 

52 return train_test_split(df, test_size=test_size, 

53 train_size=train_size, 

54 random_state=random_state) 

55 

56 if isinstance(weights, pandas.Series): 

57 weights = list(weights) 

58 elif isinstance(weights, str): 

59 weights = list(df[weights]) 

60 if len(weights) != df.shape[0]: 

61 raise ValueError( 

62 "Dimension mismatch between weights and dataframe " 

63 "{0} != {1}".format(df.shape[0], len(weights))) 

64 

65 p = (1 - test_size) if test_size else None 

66 if train_size is not None: 

67 p = train_size 

68 test_size = 1 - p 

69 if p is None or min(test_size, p) <= 0: 

70 raise ValueError( 

71 f"test_size={test_size} or train_size={train_size} cannot be null (2).") 

72 ratio = test_size / p 

73 

74 if random_state is None: 

75 randint = numpy.random.randint 

76 else: 

77 state = numpy.random.RandomState(random_state) 

78 randint = state.randint 

79 

80 balance = 0 

81 train_ids = [] 

82 test_ids = [] 

83 test_weights = 0 

84 train_weights = 0 

85 for i in range(0, df.shape[0]): 

86 w = weights[i] 

87 if balance == 0: 

88 h = randint(0, 1) 

89 totest = h == 0 

90 else: 

91 totest = balance < 0 

92 if totest: 

93 test_ids.append(i) 

94 balance += w 

95 test_weights += w 

96 else: 

97 train_ids.append(i) 

98 balance -= w * ratio 

99 train_weights += w * ratio 

100 

101 r = abs(train_weights - test_weights) / \ 

102 (1.0 * (train_weights + test_weights)) 

103 if r >= fail_imbalanced: 

104 raise ImbalancedSplitException( # pragma: no cover 

105 "Split is imbalanced: train_weights={0} test_weights={1} r={2}." 

106 "".format(train_weights, test_weights, r)) 

107 

108 return df.iloc[train_ids, :], df.iloc[test_ids, :] 

109 

110 

111def train_test_connex_split(df, groups, test_size=0.25, train_size=None, 

112 stratify=None, hash_size=9, unique_rows=False, 

113 shuffle=True, fail_imbalanced=0.05, keep_balance=None, 

114 stop_if_bigger=None, return_cnx=False, 

115 must_groups=None, random_state=None, fLOG=None): 

116 """ 

117 This split is for a specific case where data is linked 

118 in many ways. Let's assume we have three ids as we have 

119 for online sales: *(product id, user id, card id)*. 

120 As we may need to compute aggregated features, 

121 we need every id not to be present in both train and 

122 test set. The function computes the connected components 

123 and breaks each of them in two parts for train and test. 

124 

125 @param df :epkg:`pandas:DataFrame` 

126 @param groups columns name for the ids 

127 @param test_size ratio for the test partition (if *train_size* is not specified) 

128 @param train_size ratio for the train partition 

129 @param stratify column holding the stratification 

130 @param hash_size size of the hash to cache information about partition 

131 @param unique_rows ensures that rows are unique 

132 @param shuffle shuffles before the split 

133 @param fail_imbalanced raises an exception if relative weights difference 

134 is higher than this value 

135 @param stop_if_bigger (float) stops a connected components from being 

136 bigger than this ratio of elements, this should not be used 

137 unless a big components emerges, the algorithm stops merging 

138 but does not guarantee it returns the best cut, 

139 the value should be close to 0 

140 @param keep_balance (float), if not None, does not merge connected components 

141 if their relative sizes are too different, the value should be 

142 close to 1 

143 @param return_cnx returns connected components as a third results 

144 @param must_groups column name for ids which must not be shared by 

145 train/test partitions 

146 @param random_state seed for random generator 

147 @param fLOG logging function 

148 @return Two @see cl StreamingDataFrame, one 

149 for train, one for test. 

150 

151 The list of ids must hold in memory. 

152 There is no streaming implementation for the ids. 

153 

154 .. exref:: 

155 :title: Splits a dataframe, keep ids in separate partitions 

156 :tag: dataframe 

157 

158 In some data science problems, rows are not independant 

159 and share common value, most of the time ids. In some 

160 specific case, multiple ids from different columns are 

161 connected and must appear in the same partition. 

162 Testing that each id column is evenly split and do not 

163 appear in both sets in not enough. Connected components 

164 are needed. 

165 

166 .. runpython:: 

167 :showcode: 

168 

169 from pandas import DataFrame 

170 from pandas_streaming.df import train_test_connex_split 

171 

172 df = DataFrame([dict(user="UA", prod="PAA", card="C1"), 

173 dict(user="UA", prod="PB", card="C1"), 

174 dict(user="UB", prod="PC", card="C2"), 

175 dict(user="UB", prod="PD", card="C2"), 

176 dict(user="UC", prod="PAA", card="C3"), 

177 dict(user="UC", prod="PF", card="C4"), 

178 dict(user="UD", prod="PG", card="C5"), 

179 ]) 

180 

181 train, test = train_test_connex_split( 

182 df, test_size=0.5, groups=['user', 'prod', 'card'], 

183 fail_imbalanced=0.6) 

184 

185 print(train) 

186 print(test) 

187 

188 If *return_cnx* is True, the third results contains: 

189 

190 * connected components for each id 

191 * the dataframe with connected components as a new column 

192 

193 .. runpython:: 

194 :showcode: 

195 

196 from pandas import DataFrame 

197 from pandas_streaming.df import train_test_connex_split 

198 

199 df = DataFrame([dict(user="UA", prod="PAA", card="C1"), 

200 dict(user="UA", prod="PB", card="C1"), 

201 dict(user="UB", prod="PC", card="C2"), 

202 dict(user="UB", prod="PD", card="C2"), 

203 dict(user="UC", prod="PAA", card="C3"), 

204 dict(user="UC", prod="PF", card="C4"), 

205 dict(user="UD", prod="PG", card="C5"), 

206 ]) 

207 

208 train, test, cnx = train_test_connex_split( 

209 df, test_size=0.5, groups=['user', 'prod', 'card'], 

210 fail_imbalanced=0.6, return_cnx=True) 

211 

212 print(cnx[0]) 

213 print(cnx[1]) 

214 """ 

215 if stratify is not None: 

216 raise NotImplementedError( # pragma: no cover 

217 "Option stratify is not implemented.") 

218 if groups is None or len(groups) == 0: 

219 raise ValueError( # pragma: no cover 

220 "groups is empty. Use regular train_test_split.") 

221 if hasattr(df, 'iter_creation'): 

222 raise NotImplementedError( # pragma: no cover 

223 'Not implemented yet for StreamingDataFrame.') 

224 if isinstance(df, numpy.ndarray): 

225 raise NotImplementedError( # pragma: no cover 

226 "Not implemented on numpy arrays.") 

227 if shuffle: 

228 df = dataframe_shuffle(df, random_state=random_state) 

229 

230 dfids = df[groups].copy() 

231 if must_groups is not None: 

232 dfids_must = df[must_groups].copy() 

233 

234 name = "connex" 

235 while name in dfids.columns: 

236 name += "_" 

237 one = "weight" 

238 while one in dfids.columns: 

239 one += "_" 

240 

241 # Connected components. 

242 elements = list(range(dfids.shape[0])) 

243 counts_cnx = {i: {i} for i in elements} 

244 connex = {} 

245 avoids_merge = {} 

246 

247 def do_connex_components(dfrows, local_groups, kb, sib): 

248 "run connected components algorithms" 

249 itern = 0 

250 modif = 1 

251 

252 while modif > 0 and itern < len(elements): 

253 if fLOG and df.shape[0] > 10000: 

254 fLOG("[train_test_connex_split] iteration={0}-#nb connect={1} - " 

255 "modif={2}".format(iter, len(set(elements)), modif)) 

256 modif = 0 

257 itern += 1 

258 for i, row in enumerate(dfrows.itertuples(index=False, name=None)): 

259 vals = [val for val in zip(local_groups, row) if not isinstance( 

260 val[1], float) or not numpy.isnan(val[1])] 

261 

262 c = elements[i] 

263 

264 for val in vals: 

265 if val not in connex: 

266 connex[val] = c 

267 modif += 1 

268 

269 set_c = set(connex[val] for val in vals) 

270 set_c.add(c) 

271 new_c = min(set_c) 

272 

273 add_pair_c = [] 

274 for c in set_c: 

275 if c == new_c or (new_c, c) in avoids_merge: 

276 continue 

277 if kb is not None: 

278 maxi = min(len(counts_cnx[new_c]), len(counts_cnx[c])) 

279 if maxi > 5: 

280 diff = len(counts_cnx[new_c]) + \ 

281 len(counts_cnx[c]) - maxi 

282 r = diff / float(maxi) 

283 if r > kb: 

284 if fLOG: # pragma: no cover 

285 fLOG('[train_test_connex_split] balance ' 

286 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, ' 

287 '#[{4}]={5}'.format(r, kb, new_c, 

288 len(counts_cnx[new_c]), 

289 c, len(counts_cnx[c]))) 

290 continue 

291 

292 if sib is not None: 

293 r = (len(counts_cnx[new_c]) + 

294 len(counts_cnx[c])) / float(len(elements)) 

295 if r > sib: 

296 if fLOG: # pragma: no cover 

297 fLOG('[train_test_connex_split] no merge ' 

298 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, #[{4}]={5}' 

299 ''.format(r, sib, new_c, len(counts_cnx[new_c]), 

300 c, len(counts_cnx[c]))) 

301 avoids_merge[new_c, c] = i 

302 continue 

303 

304 add_pair_c.append(c) 

305 

306 if len(add_pair_c) > 0: 

307 for c in add_pair_c: 

308 modif += len(counts_cnx[c]) 

309 for ii in counts_cnx[c]: 

310 elements[ii] = new_c 

311 counts_cnx[new_c] = counts_cnx[new_c].union( 

312 counts_cnx[c]) 

313 counts_cnx[c] = set() 

314 

315 keys = list(vals) 

316 for val in keys: 

317 if connex[val] == c: 

318 connex[val] = new_c 

319 modif += 1 

320 

321 if must_groups: 

322 do_connex_components(dfids_must, must_groups, None, None) 

323 do_connex_components(dfids, groups, keep_balance, stop_if_bigger) 

324 

325 # final 

326 dfids[name] = elements 

327 dfids[one] = 1 

328 grsum = dfids[[name, one]].groupby(name, as_index=False).sum() 

329 if fLOG: 

330 for g in groups: 

331 fLOG( 

332 f"[train_test_connex_split] #nb in '{g}': {len(set(dfids[g]))}") 

333 fLOG( 

334 f"[train_test_connex_split] #connex {grsum.shape[0]}/{dfids.shape[0]}") 

335 if grsum.shape[0] <= 1: 

336 raise ValueError( # pragma: no cover 

337 "Every element is in the same connected components.") 

338 

339 # Statistics: top connected components 

340 if fLOG: 

341 # Global statistics 

342 counts = Counter(elements) 

343 cl = [(v, k) for k, v in counts.items()] 

344 cum = 0 

345 maxc = None 

346 fLOG("[train_test_connex_split] number of connected components: {0}" 

347 "".format(len(set(elements)))) 

348 for i, (v, k) in enumerate(sorted(cl, reverse=True)): 

349 if i == 0: 

350 maxc = k, v 

351 if i >= 10: 

352 break 

353 cum += v 

354 fLOG("[train_test_connex_split] c={0} #elements={1} cumulated" 

355 "={2}/{3}".format(k, v, cum, len(elements))) 

356 

357 # Most important component 

358 fLOG( 

359 f'[train_test_connex_split] first row of the biggest component {maxc}') 

360 tdf = dfids[dfids[name] == maxc[0]] 

361 fLOG(f'[train_test_connex_split] \n{tdf.head(n=10)}') 

362 

363 # Splits. 

364 train, test = train_test_split_weights( 

365 grsum, weights=one, test_size=test_size, train_size=train_size, 

366 shuffle=shuffle, fail_imbalanced=fail_imbalanced, 

367 random_state=random_state) 

368 train.drop(one, inplace=True, axis=1) 

369 test.drop(one, inplace=True, axis=1) 

370 

371 # We compute the final dataframe. 

372 def double_merge(d): 

373 "merge twice" 

374 merge1 = dfids.merge(d, left_on=name, right_on=name) 

375 merge2 = df.merge(merge1, left_on=groups, right_on=groups) 

376 return merge2 

377 

378 train_f = double_merge(train) 

379 test_f = double_merge(test) 

380 if return_cnx: 

381 return train_f, test_f, (connex, dfids) 

382 else: 

383 return train_f, test_f 

384 

385 

386def train_test_apart_stratify(df, group, test_size=0.25, train_size=None, 

387 stratify=None, force=False, random_state=None, 

388 fLOG=None): 

389 """ 

390 This split is for a specific case where data is linked 

391 in one way. Let's assume we have two ids as we have 

392 for online sales: *(product id, category id)*. 

393 A product can have multiple categories. We need to have 

394 distinct products on train and test but common categories 

395 on both sides. 

396 

397 @param df :epkg:`pandas:DataFrame` 

398 @param group columns name for the ids 

399 @param test_size ratio for the test partition 

400 (if *train_size* is not specified) 

401 @param train_size ratio for the train partition 

402 @param stratify column holding the stratification 

403 @param force if True, tries to get at least one example on the test side 

404 for each value of the column *stratify* 

405 @param random_state seed for random generators 

406 @param fLOG logging function 

407 @return Two @see cl StreamingDataFrame, one 

408 for train, one for test. 

409 

410 .. index:: multi-label 

411 

412 The list of ids must hold in memory. 

413 There is no streaming implementation for the ids. 

414 This split was implemented for a case of a multi-label 

415 classification. A category (*stratify*) is not exclusive 

416 and an observation can be assigned to multiple 

417 categories. In that particular case, the method 

418 `train_test_split <http://scikit-learn.org/stable/modules/generated/ 

419 sklearn.model_selection.train_test_split.html>`_ 

420 can not directly be used. 

421 

422 .. runpython:: 

423 :showcode: 

424 

425 import pandas 

426 from pandas_streaming.df import train_test_apart_stratify 

427 

428 df = pandas.DataFrame([dict(a=1, b="e"), 

429 dict(a=1, b="f"), 

430 dict(a=2, b="e"), 

431 dict(a=2, b="f")]) 

432 

433 train, test = train_test_apart_stratify( 

434 df, group="a", stratify="b", test_size=0.5) 

435 print(train) 

436 print('-----------') 

437 print(test) 

438 """ 

439 if stratify is None: 

440 raise ValueError( # pragma: no cover 

441 "stratify must be specified.") 

442 if group is None: 

443 raise ValueError( # pragma: no cover 

444 "group must be specified.") 

445 if hasattr(df, 'iter_creation'): 

446 raise NotImplementedError( 

447 'Not implemented yet for StreamingDataFrame.') 

448 if isinstance(df, numpy.ndarray): 

449 raise NotImplementedError("Not implemented on numpy arrays.") 

450 

451 p = (1 - test_size) if test_size else None 

452 if train_size is not None: 

453 p = train_size 

454 test_size = 1 - p 

455 if p is None or min(test_size, p) <= 0: 

456 raise ValueError( # pragma: no cover 

457 f"test_size={test_size} or train_size={train_size} cannot be null") 

458 

459 couples = df[[group, stratify]].itertuples(name=None, index=False) 

460 hist = Counter(df[stratify]) 

461 sorted_hist = [(v, k) for k, v in hist.items()] 

462 sorted_hist.sort() 

463 ids = {c: set() for c in hist} 

464 

465 for g, s in couples: 

466 ids[s].add(g) 

467 

468 if random_state is None: 

469 permutation = numpy.random.permutation 

470 else: 

471 state = numpy.random.RandomState(random_state) 

472 permutation = state.permutation 

473 

474 split = {} 

475 for _, k in sorted_hist: 

476 not_assigned = [c for c in ids[k] if c not in split] 

477 if len(not_assigned) == 0: 

478 continue 

479 assigned = [c for c in ids[k] if c in split] 

480 nb_test = sum(split[c] for c in assigned) 

481 expected = min(len(ids[k]), int( 

482 test_size * len(ids[k]) + 0.5)) - nb_test 

483 if force and expected == 0 and nb_test == 0: 

484 nb_train = len(assigned) - nb_test 

485 if nb_train > 0 or len(not_assigned) > 1: 

486 expected = min(1, len(not_assigned)) 

487 if expected > 0: 

488 permutation(not_assigned) 

489 for e in not_assigned[:expected]: 

490 split[e] = 1 

491 for e in not_assigned[expected:]: 

492 split[e] = 0 

493 else: 

494 for c in not_assigned: 

495 split[c] = 0 

496 

497 train_set = set(k for k, v in split.items() if v == 0) 

498 test_set = set(k for k, v in split.items() if v == 1) 

499 train_df = df[df[group].isin(train_set)] 

500 test_df = df[df[group].isin(test_set)] 

501 return train_df, test_df