Coverage for pandas_streaming/df/connex_split.py: 98%
209 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 14:15 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 14:15 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Implements a connex split between train and test.
5"""
6from collections import Counter
7import pandas
8import numpy
9from sklearn.model_selection import train_test_split
10from .dataframe_helpers import dataframe_shuffle
13class ImbalancedSplitException(Exception):
14 """
15 Raised when an imbalanced split is detected.
16 """
17 pass
20def train_test_split_weights(df, weights=None, test_size=0.25, train_size=None,
21 shuffle=True, fail_imbalanced=0.05, random_state=None):
22 """
23 Splits a database in train/test given, every row
24 can have a different weight.
26 @param df :epkg:`pandas:DataFrame` or @see cl StreamingDataFrame
27 @param weights None or weights or weights column name
28 @param test_size ratio for the test partition (if *train_size* is not specified)
29 @param train_size ratio for the train partition
30 @param shuffle shuffles before the split
31 @param fail_imbalanced raises an exception if relative weights difference is higher than this value
32 @param random_state seed for random generators
33 @return train and test :epkg:`pandas:DataFrame`
35 If the dataframe is not shuffled first, the function
36 will produce two datasets which are unlikely to be randomized
37 as the function tries to keep equal weights among both paths
38 without using randomness.
39 """
40 if hasattr(df, 'iter_creation'):
41 raise NotImplementedError( # pragma: no cover
42 'Not implemented yet for StreamingDataFrame.')
43 if isinstance(df, numpy.ndarray):
44 raise NotImplementedError( # pragma: no cover
45 "Not implemented on numpy arrays.")
46 if shuffle:
47 df = dataframe_shuffle(df, random_state=random_state)
48 if weights is None:
49 if test_size == 0 or train_size == 0:
50 raise ValueError(
51 f"test_size={test_size} or train_size={train_size} cannot be null (1).")
52 return train_test_split(df, test_size=test_size,
53 train_size=train_size,
54 random_state=random_state)
56 if isinstance(weights, pandas.Series):
57 weights = list(weights)
58 elif isinstance(weights, str):
59 weights = list(df[weights])
60 if len(weights) != df.shape[0]:
61 raise ValueError(
62 "Dimension mismatch between weights and dataframe "
63 "{0} != {1}".format(df.shape[0], len(weights)))
65 p = (1 - test_size) if test_size else None
66 if train_size is not None:
67 p = train_size
68 test_size = 1 - p
69 if p is None or min(test_size, p) <= 0:
70 raise ValueError(
71 f"test_size={test_size} or train_size={train_size} cannot be null (2).")
72 ratio = test_size / p
74 if random_state is None:
75 randint = numpy.random.randint
76 else:
77 state = numpy.random.RandomState(random_state)
78 randint = state.randint
80 balance = 0
81 train_ids = []
82 test_ids = []
83 test_weights = 0
84 train_weights = 0
85 for i in range(0, df.shape[0]):
86 w = weights[i]
87 if balance == 0:
88 h = randint(0, 1)
89 totest = h == 0
90 else:
91 totest = balance < 0
92 if totest:
93 test_ids.append(i)
94 balance += w
95 test_weights += w
96 else:
97 train_ids.append(i)
98 balance -= w * ratio
99 train_weights += w * ratio
101 r = abs(train_weights - test_weights) / \
102 (1.0 * (train_weights + test_weights))
103 if r >= fail_imbalanced:
104 raise ImbalancedSplitException( # pragma: no cover
105 "Split is imbalanced: train_weights={0} test_weights={1} r={2}."
106 "".format(train_weights, test_weights, r))
108 return df.iloc[train_ids, :], df.iloc[test_ids, :]
111def train_test_connex_split(df, groups, test_size=0.25, train_size=None,
112 stratify=None, hash_size=9, unique_rows=False,
113 shuffle=True, fail_imbalanced=0.05, keep_balance=None,
114 stop_if_bigger=None, return_cnx=False,
115 must_groups=None, random_state=None, fLOG=None):
116 """
117 This split is for a specific case where data is linked
118 in many ways. Let's assume we have three ids as we have
119 for online sales: *(product id, user id, card id)*.
120 As we may need to compute aggregated features,
121 we need every id not to be present in both train and
122 test set. The function computes the connected components
123 and breaks each of them in two parts for train and test.
125 @param df :epkg:`pandas:DataFrame`
126 @param groups columns name for the ids
127 @param test_size ratio for the test partition (if *train_size* is not specified)
128 @param train_size ratio for the train partition
129 @param stratify column holding the stratification
130 @param hash_size size of the hash to cache information about partition
131 @param unique_rows ensures that rows are unique
132 @param shuffle shuffles before the split
133 @param fail_imbalanced raises an exception if relative weights difference
134 is higher than this value
135 @param stop_if_bigger (float) stops a connected components from being
136 bigger than this ratio of elements, this should not be used
137 unless a big components emerges, the algorithm stops merging
138 but does not guarantee it returns the best cut,
139 the value should be close to 0
140 @param keep_balance (float), if not None, does not merge connected components
141 if their relative sizes are too different, the value should be
142 close to 1
143 @param return_cnx returns connected components as a third results
144 @param must_groups column name for ids which must not be shared by
145 train/test partitions
146 @param random_state seed for random generator
147 @param fLOG logging function
148 @return Two @see cl StreamingDataFrame, one
149 for train, one for test.
151 The list of ids must hold in memory.
152 There is no streaming implementation for the ids.
154 .. exref::
155 :title: Splits a dataframe, keep ids in separate partitions
156 :tag: dataframe
158 In some data science problems, rows are not independant
159 and share common value, most of the time ids. In some
160 specific case, multiple ids from different columns are
161 connected and must appear in the same partition.
162 Testing that each id column is evenly split and do not
163 appear in both sets in not enough. Connected components
164 are needed.
166 .. runpython::
167 :showcode:
169 from pandas import DataFrame
170 from pandas_streaming.df import train_test_connex_split
172 df = DataFrame([dict(user="UA", prod="PAA", card="C1"),
173 dict(user="UA", prod="PB", card="C1"),
174 dict(user="UB", prod="PC", card="C2"),
175 dict(user="UB", prod="PD", card="C2"),
176 dict(user="UC", prod="PAA", card="C3"),
177 dict(user="UC", prod="PF", card="C4"),
178 dict(user="UD", prod="PG", card="C5"),
179 ])
181 train, test = train_test_connex_split(
182 df, test_size=0.5, groups=['user', 'prod', 'card'],
183 fail_imbalanced=0.6)
185 print(train)
186 print(test)
188 If *return_cnx* is True, the third results contains:
190 * connected components for each id
191 * the dataframe with connected components as a new column
193 .. runpython::
194 :showcode:
196 from pandas import DataFrame
197 from pandas_streaming.df import train_test_connex_split
199 df = DataFrame([dict(user="UA", prod="PAA", card="C1"),
200 dict(user="UA", prod="PB", card="C1"),
201 dict(user="UB", prod="PC", card="C2"),
202 dict(user="UB", prod="PD", card="C2"),
203 dict(user="UC", prod="PAA", card="C3"),
204 dict(user="UC", prod="PF", card="C4"),
205 dict(user="UD", prod="PG", card="C5"),
206 ])
208 train, test, cnx = train_test_connex_split(
209 df, test_size=0.5, groups=['user', 'prod', 'card'],
210 fail_imbalanced=0.6, return_cnx=True)
212 print(cnx[0])
213 print(cnx[1])
214 """
215 if stratify is not None:
216 raise NotImplementedError( # pragma: no cover
217 "Option stratify is not implemented.")
218 if groups is None or len(groups) == 0:
219 raise ValueError( # pragma: no cover
220 "groups is empty. Use regular train_test_split.")
221 if hasattr(df, 'iter_creation'):
222 raise NotImplementedError( # pragma: no cover
223 'Not implemented yet for StreamingDataFrame.')
224 if isinstance(df, numpy.ndarray):
225 raise NotImplementedError( # pragma: no cover
226 "Not implemented on numpy arrays.")
227 if shuffle:
228 df = dataframe_shuffle(df, random_state=random_state)
230 dfids = df[groups].copy()
231 if must_groups is not None:
232 dfids_must = df[must_groups].copy()
234 name = "connex"
235 while name in dfids.columns:
236 name += "_"
237 one = "weight"
238 while one in dfids.columns:
239 one += "_"
241 # Connected components.
242 elements = list(range(dfids.shape[0]))
243 counts_cnx = {i: {i} for i in elements}
244 connex = {}
245 avoids_merge = {}
247 def do_connex_components(dfrows, local_groups, kb, sib):
248 "run connected components algorithms"
249 itern = 0
250 modif = 1
252 while modif > 0 and itern < len(elements):
253 if fLOG and df.shape[0] > 10000:
254 fLOG("[train_test_connex_split] iteration={0}-#nb connect={1} - "
255 "modif={2}".format(iter, len(set(elements)), modif))
256 modif = 0
257 itern += 1
258 for i, row in enumerate(dfrows.itertuples(index=False, name=None)):
259 vals = [val for val in zip(local_groups, row) if not isinstance(
260 val[1], float) or not numpy.isnan(val[1])]
262 c = elements[i]
264 for val in vals:
265 if val not in connex:
266 connex[val] = c
267 modif += 1
269 set_c = set(connex[val] for val in vals)
270 set_c.add(c)
271 new_c = min(set_c)
273 add_pair_c = []
274 for c in set_c:
275 if c == new_c or (new_c, c) in avoids_merge:
276 continue
277 if kb is not None:
278 maxi = min(len(counts_cnx[new_c]), len(counts_cnx[c]))
279 if maxi > 5:
280 diff = len(counts_cnx[new_c]) + \
281 len(counts_cnx[c]) - maxi
282 r = diff / float(maxi)
283 if r > kb:
284 if fLOG: # pragma: no cover
285 fLOG('[train_test_connex_split] balance '
286 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, '
287 '#[{4}]={5}'.format(r, kb, new_c,
288 len(counts_cnx[new_c]),
289 c, len(counts_cnx[c])))
290 continue
292 if sib is not None:
293 r = (len(counts_cnx[new_c]) +
294 len(counts_cnx[c])) / float(len(elements))
295 if r > sib:
296 if fLOG: # pragma: no cover
297 fLOG('[train_test_connex_split] no merge '
298 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, #[{4}]={5}'
299 ''.format(r, sib, new_c, len(counts_cnx[new_c]),
300 c, len(counts_cnx[c])))
301 avoids_merge[new_c, c] = i
302 continue
304 add_pair_c.append(c)
306 if len(add_pair_c) > 0:
307 for c in add_pair_c:
308 modif += len(counts_cnx[c])
309 for ii in counts_cnx[c]:
310 elements[ii] = new_c
311 counts_cnx[new_c] = counts_cnx[new_c].union(
312 counts_cnx[c])
313 counts_cnx[c] = set()
315 keys = list(vals)
316 for val in keys:
317 if connex[val] == c:
318 connex[val] = new_c
319 modif += 1
321 if must_groups:
322 do_connex_components(dfids_must, must_groups, None, None)
323 do_connex_components(dfids, groups, keep_balance, stop_if_bigger)
325 # final
326 dfids[name] = elements
327 dfids[one] = 1
328 grsum = dfids[[name, one]].groupby(name, as_index=False).sum()
329 if fLOG:
330 for g in groups:
331 fLOG(
332 f"[train_test_connex_split] #nb in '{g}': {len(set(dfids[g]))}")
333 fLOG(
334 f"[train_test_connex_split] #connex {grsum.shape[0]}/{dfids.shape[0]}")
335 if grsum.shape[0] <= 1:
336 raise ValueError( # pragma: no cover
337 "Every element is in the same connected components.")
339 # Statistics: top connected components
340 if fLOG:
341 # Global statistics
342 counts = Counter(elements)
343 cl = [(v, k) for k, v in counts.items()]
344 cum = 0
345 maxc = None
346 fLOG("[train_test_connex_split] number of connected components: {0}"
347 "".format(len(set(elements))))
348 for i, (v, k) in enumerate(sorted(cl, reverse=True)):
349 if i == 0:
350 maxc = k, v
351 if i >= 10:
352 break
353 cum += v
354 fLOG("[train_test_connex_split] c={0} #elements={1} cumulated"
355 "={2}/{3}".format(k, v, cum, len(elements)))
357 # Most important component
358 fLOG(
359 f'[train_test_connex_split] first row of the biggest component {maxc}')
360 tdf = dfids[dfids[name] == maxc[0]]
361 fLOG(f'[train_test_connex_split] \n{tdf.head(n=10)}')
363 # Splits.
364 train, test = train_test_split_weights(
365 grsum, weights=one, test_size=test_size, train_size=train_size,
366 shuffle=shuffle, fail_imbalanced=fail_imbalanced,
367 random_state=random_state)
368 train.drop(one, inplace=True, axis=1)
369 test.drop(one, inplace=True, axis=1)
371 # We compute the final dataframe.
372 def double_merge(d):
373 "merge twice"
374 merge1 = dfids.merge(d, left_on=name, right_on=name)
375 merge2 = df.merge(merge1, left_on=groups, right_on=groups)
376 return merge2
378 train_f = double_merge(train)
379 test_f = double_merge(test)
380 if return_cnx:
381 return train_f, test_f, (connex, dfids)
382 else:
383 return train_f, test_f
386def train_test_apart_stratify(df, group, test_size=0.25, train_size=None,
387 stratify=None, force=False, random_state=None,
388 fLOG=None):
389 """
390 This split is for a specific case where data is linked
391 in one way. Let's assume we have two ids as we have
392 for online sales: *(product id, category id)*.
393 A product can have multiple categories. We need to have
394 distinct products on train and test but common categories
395 on both sides.
397 @param df :epkg:`pandas:DataFrame`
398 @param group columns name for the ids
399 @param test_size ratio for the test partition
400 (if *train_size* is not specified)
401 @param train_size ratio for the train partition
402 @param stratify column holding the stratification
403 @param force if True, tries to get at least one example on the test side
404 for each value of the column *stratify*
405 @param random_state seed for random generators
406 @param fLOG logging function
407 @return Two @see cl StreamingDataFrame, one
408 for train, one for test.
410 .. index:: multi-label
412 The list of ids must hold in memory.
413 There is no streaming implementation for the ids.
414 This split was implemented for a case of a multi-label
415 classification. A category (*stratify*) is not exclusive
416 and an observation can be assigned to multiple
417 categories. In that particular case, the method
418 `train_test_split <http://scikit-learn.org/stable/modules/generated/
419 sklearn.model_selection.train_test_split.html>`_
420 can not directly be used.
422 .. runpython::
423 :showcode:
425 import pandas
426 from pandas_streaming.df import train_test_apart_stratify
428 df = pandas.DataFrame([dict(a=1, b="e"),
429 dict(a=1, b="f"),
430 dict(a=2, b="e"),
431 dict(a=2, b="f")])
433 train, test = train_test_apart_stratify(
434 df, group="a", stratify="b", test_size=0.5)
435 print(train)
436 print('-----------')
437 print(test)
438 """
439 if stratify is None:
440 raise ValueError( # pragma: no cover
441 "stratify must be specified.")
442 if group is None:
443 raise ValueError( # pragma: no cover
444 "group must be specified.")
445 if hasattr(df, 'iter_creation'):
446 raise NotImplementedError(
447 'Not implemented yet for StreamingDataFrame.')
448 if isinstance(df, numpy.ndarray):
449 raise NotImplementedError("Not implemented on numpy arrays.")
451 p = (1 - test_size) if test_size else None
452 if train_size is not None:
453 p = train_size
454 test_size = 1 - p
455 if p is None or min(test_size, p) <= 0:
456 raise ValueError( # pragma: no cover
457 f"test_size={test_size} or train_size={train_size} cannot be null")
459 couples = df[[group, stratify]].itertuples(name=None, index=False)
460 hist = Counter(df[stratify])
461 sorted_hist = [(v, k) for k, v in hist.items()]
462 sorted_hist.sort()
463 ids = {c: set() for c in hist}
465 for g, s in couples:
466 ids[s].add(g)
468 if random_state is None:
469 permutation = numpy.random.permutation
470 else:
471 state = numpy.random.RandomState(random_state)
472 permutation = state.permutation
474 split = {}
475 for _, k in sorted_hist:
476 not_assigned = [c for c in ids[k] if c not in split]
477 if len(not_assigned) == 0:
478 continue
479 assigned = [c for c in ids[k] if c in split]
480 nb_test = sum(split[c] for c in assigned)
481 expected = min(len(ids[k]), int(
482 test_size * len(ids[k]) + 0.5)) - nb_test
483 if force and expected == 0 and nb_test == 0:
484 nb_train = len(assigned) - nb_test
485 if nb_train > 0 or len(not_assigned) > 1:
486 expected = min(1, len(not_assigned))
487 if expected > 0:
488 permutation(not_assigned)
489 for e in not_assigned[:expected]:
490 split[e] = 1
491 for e in not_assigned[expected:]:
492 split[e] = 0
493 else:
494 for c in not_assigned:
495 split[c] = 0
497 train_set = set(k for k, v in split.items() if v == 0)
498 test_set = set(k for k, v in split.items() if v == 1)
499 train_df = df[df[group].isin(train_set)]
500 test_df = df[df[group].isin(test_set)]
501 return train_df, test_df