Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Implements a connex split between train and test.
5"""
6from collections import Counter
7import pandas
8import numpy
9from sklearn.model_selection import train_test_split
10from .dataframe_helpers import dataframe_shuffle
13class ImbalancedSplitException(Exception):
14 """
15 Raised when an imbalanced split is detected.
16 """
17 pass
20def train_test_split_weights(df, weights=None, test_size=0.25, train_size=None,
21 shuffle=True, fail_imbalanced=0.05, random_state=None):
22 """
23 Splits a database in train/test given, every row
24 can have a different weight.
26 @param df :epkg:`pandas:DataFrame` or @see cl StreamingDataFrame
27 @param weights None or weights or weights column name
28 @param test_size ratio for the test partition (if *train_size* is not specified)
29 @param train_size ratio for the train partition
30 @param shuffle shuffles before the split
31 @param fail_imbalanced raises an exception if relative weights difference is higher than this value
32 @param random_state seed for random generators
33 @return train and test :epkg:`pandas:DataFrame`
35 If the dataframe is not shuffled first, the function
36 will produce two datasets which are unlikely to be randomized
37 as the function tries to keep equal weights among both paths
38 without using randomness.
39 """
40 if hasattr(df, 'iter_creation'):
41 raise NotImplementedError( # pragma: no cover
42 'Not implemented yet for StreamingDataFrame.')
43 if isinstance(df, numpy.ndarray):
44 raise NotImplementedError( # pragma: no cover
45 "Not implemented on numpy arrays.")
46 if shuffle:
47 df = dataframe_shuffle(df, random_state=random_state)
48 if weights is None:
49 if test_size == 0 or train_size == 0:
50 raise ValueError(
51 "test_size={0} or train_size={1} cannot be null (1)."
52 "".format(test_size, train_size))
53 return train_test_split(df, test_size=test_size,
54 train_size=train_size,
55 random_state=random_state)
57 if isinstance(weights, pandas.Series):
58 weights = list(weights)
59 elif isinstance(weights, str):
60 weights = list(df[weights])
61 if len(weights) != df.shape[0]:
62 raise ValueError(
63 "Dimension mismatch between weights and dataframe "
64 "{0} != {1}".format(df.shape[0], len(weights)))
66 p = (1 - test_size) if test_size else None
67 if train_size is not None:
68 p = train_size
69 test_size = 1 - p
70 if p is None or min(test_size, p) <= 0:
71 raise ValueError(
72 "test_size={0} or train_size={1} cannot be null (2)."
73 "".format(test_size, train_size))
74 ratio = test_size / p
76 if random_state is None:
77 randint = numpy.random.randint
78 else:
79 state = numpy.random.RandomState(random_state)
80 randint = state.randint
82 balance = 0
83 train_ids = []
84 test_ids = []
85 test_weights = 0
86 train_weights = 0
87 for i in range(0, df.shape[0]):
88 w = weights[i]
89 if balance == 0:
90 h = randint(0, 1)
91 totest = h == 0
92 else:
93 totest = balance < 0
94 if totest:
95 test_ids.append(i)
96 balance += w
97 test_weights += w
98 else:
99 train_ids.append(i)
100 balance -= w * ratio
101 train_weights += w * ratio
103 r = abs(train_weights - test_weights) / \
104 (1.0 * (train_weights + test_weights))
105 if r >= fail_imbalanced:
106 raise ImbalancedSplitException( # pragma: no cover
107 "Split is imbalanced: train_weights={0} test_weights={1} r={2}."
108 "".format(train_weights, test_weights, r))
110 return df.iloc[train_ids, :], df.iloc[test_ids, :]
113def train_test_connex_split(df, groups, test_size=0.25, train_size=None,
114 stratify=None, hash_size=9, unique_rows=False,
115 shuffle=True, fail_imbalanced=0.05, keep_balance=None,
116 stop_if_bigger=None, return_cnx=False,
117 must_groups=None, random_state=None, fLOG=None):
118 """
119 This split is for a specific case where data is linked
120 in many ways. Let's assume we have three ids as we have
121 for online sales: *(product id, user id, card id)*.
122 As we may need to compute aggregated features,
123 we need every id not to be present in both train and
124 test set. The function computes the connected components
125 and breaks each of them in two parts for train and test.
127 @param df :epkg:`pandas:DataFrame`
128 @param groups columns name for the ids
129 @param test_size ratio for the test partition (if *train_size* is not specified)
130 @param train_size ratio for the train partition
131 @param stratify column holding the stratification
132 @param hash_size size of the hash to cache information about partition
133 @param unique_rows ensures that rows are unique
134 @param shuffle shuffles before the split
135 @param fail_imbalanced raises an exception if relative weights difference
136 is higher than this value
137 @param stop_if_bigger (float) stops a connected components from being
138 bigger than this ratio of elements, this should not be used
139 unless a big components emerges, the algorithm stops merging
140 but does not guarantee it returns the best cut,
141 the value should be close to 0
142 @param keep_balance (float), if not None, does not merge connected components
143 if their relative sizes are too different, the value should be
144 close to 1
145 @param return_cnx returns connected components as a third results
146 @param must_groups column name for ids which must not be shared by
147 train/test partitions
148 @param random_state seed for random generator
149 @param fLOG logging function
150 @return Two @see cl StreamingDataFrame, one
151 for train, one for test.
153 The list of ids must hold in memory.
154 There is no streaming implementation for the ids.
156 .. exref::
157 :title: Splits a dataframe, keep ids in separate partitions
158 :tag: dataframe
160 In some data science problems, rows are not independant
161 and share common value, most of the time ids. In some
162 specific case, multiple ids from different columns are
163 connected and must appear in the same partition.
164 Testing that each id column is evenly split and do not
165 appear in both sets in not enough. Connected components
166 are needed.
168 .. runpython::
169 :showcode:
171 from pandas import DataFrame
172 from pandas_streaming.df import train_test_connex_split
174 df = DataFrame([dict(user="UA", prod="PAA", card="C1"),
175 dict(user="UA", prod="PB", card="C1"),
176 dict(user="UB", prod="PC", card="C2"),
177 dict(user="UB", prod="PD", card="C2"),
178 dict(user="UC", prod="PAA", card="C3"),
179 dict(user="UC", prod="PF", card="C4"),
180 dict(user="UD", prod="PG", card="C5"),
181 ])
183 train, test = train_test_connex_split(
184 df, test_size=0.5, groups=['user', 'prod', 'card'],
185 fail_imbalanced=0.6)
187 print(train)
188 print(test)
190 If *return_cnx* is True, the third results contains:
192 * connected components for each id
193 * the dataframe with connected components as a new column
195 .. runpython::
196 :showcode:
198 from pandas import DataFrame
199 from pandas_streaming.df import train_test_connex_split
201 df = DataFrame([dict(user="UA", prod="PAA", card="C1"),
202 dict(user="UA", prod="PB", card="C1"),
203 dict(user="UB", prod="PC", card="C2"),
204 dict(user="UB", prod="PD", card="C2"),
205 dict(user="UC", prod="PAA", card="C3"),
206 dict(user="UC", prod="PF", card="C4"),
207 dict(user="UD", prod="PG", card="C5"),
208 ])
210 train, test, cnx = train_test_connex_split(
211 df, test_size=0.5, groups=['user', 'prod', 'card'],
212 fail_imbalanced=0.6, return_cnx=True)
214 print(cnx[0])
215 print(cnx[1])
216 """
217 if stratify is not None:
218 raise NotImplementedError( # pragma: no cover
219 "Option stratify is not implemented.")
220 if groups is None or len(groups) == 0:
221 raise ValueError( # pragma: no cover
222 "groups is empty. Use regular train_test_split.")
223 if hasattr(df, 'iter_creation'):
224 raise NotImplementedError( # pragma: no cover
225 'Not implemented yet for StreamingDataFrame.')
226 if isinstance(df, numpy.ndarray):
227 raise NotImplementedError( # pragma: no cover
228 "Not implemented on numpy arrays.")
229 if shuffle:
230 df = dataframe_shuffle(df, random_state=random_state)
232 dfids = df[groups].copy()
233 if must_groups is not None:
234 dfids_must = df[must_groups].copy()
236 name = "connex"
237 while name in dfids.columns:
238 name += "_"
239 one = "weight"
240 while one in dfids.columns:
241 one += "_"
243 # Connected components.
244 elements = list(range(dfids.shape[0]))
245 counts_cnx = {i: {i} for i in elements}
246 connex = {}
247 avoids_merge = {}
249 def do_connex_components(dfrows, local_groups, kb, sib):
250 "run connected components algorithms"
251 itern = 0
252 modif = 1
254 while modif > 0 and itern < len(elements):
255 if fLOG and df.shape[0] > 10000:
256 fLOG("[train_test_connex_split] iteration={0}-#nb connect={1} - "
257 "modif={2}".format(iter, len(set(elements)), modif))
258 modif = 0
259 itern += 1
260 for i, row in enumerate(dfrows.itertuples(index=False, name=None)):
261 vals = [val for val in zip(local_groups, row) if not isinstance(
262 val[1], float) or not numpy.isnan(val[1])]
264 c = elements[i]
266 for val in vals:
267 if val not in connex:
268 connex[val] = c
269 modif += 1
271 set_c = set(connex[val] for val in vals)
272 set_c.add(c)
273 new_c = min(set_c)
275 add_pair_c = []
276 for c in set_c:
277 if c == new_c or (new_c, c) in avoids_merge:
278 continue
279 if kb is not None:
280 maxi = min(len(counts_cnx[new_c]), len(counts_cnx[c]))
281 if maxi > 5:
282 diff = len(counts_cnx[new_c]) + \
283 len(counts_cnx[c]) - maxi
284 r = diff / float(maxi)
285 if r > kb:
286 if fLOG: # pragma: no cover
287 fLOG('[train_test_connex_split] balance '
288 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, '
289 '#[{4}]={5}'.format(r, kb, new_c,
290 len(counts_cnx[new_c]),
291 c, len(counts_cnx[c])))
292 continue
294 if sib is not None:
295 r = (len(counts_cnx[new_c]) +
296 len(counts_cnx[c])) / float(len(elements))
297 if r > sib:
298 if fLOG: # pragma: no cover
299 fLOG('[train_test_connex_split] no merge '
300 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, #[{4}]={5}'
301 ''.format(r, sib, new_c, len(counts_cnx[new_c]),
302 c, len(counts_cnx[c])))
303 avoids_merge[new_c, c] = i
304 continue
306 add_pair_c.append(c)
308 if len(add_pair_c) > 0:
309 for c in add_pair_c:
310 modif += len(counts_cnx[c])
311 for ii in counts_cnx[c]:
312 elements[ii] = new_c
313 counts_cnx[new_c] = counts_cnx[new_c].union(
314 counts_cnx[c])
315 counts_cnx[c] = set()
317 keys = list(vals)
318 for val in keys:
319 if connex[val] == c:
320 connex[val] = new_c
321 modif += 1
323 if must_groups:
324 do_connex_components(dfids_must, must_groups, None, None)
325 do_connex_components(dfids, groups, keep_balance, stop_if_bigger)
327 # final
328 dfids[name] = elements
329 dfids[one] = 1
330 grsum = dfids[[name, one]].groupby(name, as_index=False).sum()
331 if fLOG:
332 for g in groups:
333 fLOG("[train_test_connex_split] #nb in '{0}': {1}".format(
334 g, len(set(dfids[g]))))
335 fLOG(
336 "[train_test_connex_split] #connex {0}/{1}".format(
337 grsum.shape[0], dfids.shape[0]))
338 if grsum.shape[0] <= 1:
339 raise ValueError( # pragma: no cover
340 "Every element is in the same connected components.")
342 # Statistics: top connected components
343 if fLOG:
344 # Global statistics
345 counts = Counter(elements)
346 cl = [(v, k) for k, v in counts.items()]
347 cum = 0
348 maxc = None
349 fLOG("[train_test_connex_split] number of connected components: {0}"
350 "".format(len(set(elements))))
351 for i, (v, k) in enumerate(sorted(cl, reverse=True)):
352 if i == 0:
353 maxc = k, v
354 if i >= 10:
355 break
356 cum += v
357 fLOG("[train_test_connex_split] c={0} #elements={1} cumulated"
358 "={2}/{3}".format(k, v, cum, len(elements)))
360 # Most important component
361 fLOG('[train_test_connex_split] first row of the biggest component '
362 '{0}'.format(maxc))
363 tdf = dfids[dfids[name] == maxc[0]]
364 fLOG('[train_test_connex_split] \n{0}'.format(tdf.head(n=10)))
366 # Splits.
367 train, test = train_test_split_weights(
368 grsum, weights=one, test_size=test_size, train_size=train_size,
369 shuffle=shuffle, fail_imbalanced=fail_imbalanced,
370 random_state=random_state)
371 train.drop(one, inplace=True, axis=1)
372 test.drop(one, inplace=True, axis=1)
374 # We compute the final dataframe.
375 def double_merge(d):
376 "merge twice"
377 merge1 = dfids.merge(d, left_on=name, right_on=name)
378 merge2 = df.merge(merge1, left_on=groups, right_on=groups)
379 return merge2
381 train_f = double_merge(train)
382 test_f = double_merge(test)
383 if return_cnx:
384 return train_f, test_f, (connex, dfids)
385 else:
386 return train_f, test_f
389def train_test_apart_stratify(df, group, test_size=0.25, train_size=None,
390 stratify=None, force=False, random_state=None,
391 fLOG=None):
392 """
393 This split is for a specific case where data is linked
394 in one way. Let's assume we have two ids as we have
395 for online sales: *(product id, category id)*.
396 A product can have multiple categories. We need to have
397 distinct products on train and test but common categories
398 on both sides.
400 @param df :epkg:`pandas:DataFrame`
401 @param group columns name for the ids
402 @param test_size ratio for the test partition
403 (if *train_size* is not specified)
404 @param train_size ratio for the train partition
405 @param stratify column holding the stratification
406 @param force if True, tries to get at least one example on the test side
407 for each value of the column *stratify*
408 @param random_state seed for random generators
409 @param fLOG logging function
410 @return Two @see cl StreamingDataFrame, one
411 for train, one for test.
413 .. index:: multi-label
415 The list of ids must hold in memory.
416 There is no streaming implementation for the ids.
417 This split was implemented for a case of a multi-label
418 classification. A category (*stratify*) is not exclusive
419 and an observation can be assigned to multiple
420 categories. In that particular case, the method
421 `train_test_split <http://scikit-learn.org/stable/modules/generated/
422 sklearn.model_selection.train_test_split.html>`_
423 can not directly be used.
425 .. runpython::
426 :showcode:
428 import pandas
429 from pandas_streaming.df import train_test_apart_stratify
431 df = pandas.DataFrame([dict(a=1, b="e"),
432 dict(a=1, b="f"),
433 dict(a=2, b="e"),
434 dict(a=2, b="f")])
436 train, test = train_test_apart_stratify(
437 df, group="a", stratify="b", test_size=0.5)
438 print(train)
439 print('-----------')
440 print(test)
441 """
442 if stratify is None:
443 raise ValueError( # pragma: no cover
444 "stratify must be specified.")
445 if group is None:
446 raise ValueError( # pragma: no cover
447 "group must be specified.")
448 if hasattr(df, 'iter_creation'):
449 raise NotImplementedError(
450 'Not implemented yet for StreamingDataFrame.')
451 if isinstance(df, numpy.ndarray):
452 raise NotImplementedError("Not implemented on numpy arrays.")
454 p = (1 - test_size) if test_size else None
455 if train_size is not None:
456 p = train_size
457 test_size = 1 - p
458 if p is None or min(test_size, p) <= 0:
459 raise ValueError( # pragma: no cover
460 "test_size={0} or train_size={1} cannot be null".format(
461 test_size, train_size))
463 couples = df[[group, stratify]].itertuples(name=None, index=False)
464 hist = Counter(df[stratify])
465 sorted_hist = [(v, k) for k, v in hist.items()]
466 sorted_hist.sort()
467 ids = {c: set() for c in hist}
469 for g, s in couples:
470 ids[s].add(g)
472 if random_state is None:
473 permutation = numpy.random.permutation
474 else:
475 state = numpy.random.RandomState(random_state)
476 permutation = state.permutation
478 split = {}
479 for _, k in sorted_hist:
480 not_assigned = [c for c in ids[k] if c not in split]
481 if len(not_assigned) == 0:
482 continue
483 assigned = [c for c in ids[k] if c in split]
484 nb_test = sum(split[c] for c in assigned)
485 expected = min(len(ids[k]), int(
486 test_size * len(ids[k]) + 0.5)) - nb_test
487 if force and expected == 0 and nb_test == 0:
488 nb_train = len(assigned) - nb_test
489 if nb_train > 0 or len(not_assigned) > 1:
490 expected = min(1, len(not_assigned))
491 if expected > 0:
492 permutation(not_assigned)
493 for e in not_assigned[:expected]:
494 split[e] = 1
495 for e in not_assigned[expected:]:
496 split[e] = 0
497 else:
498 for c in not_assigned:
499 split[c] = 0
501 train_set = set(k for k, v in split.items() if v == 0)
502 test_set = set(k for k, v in split.items() if v == 1)
503 train_df = df[df[group].isin(train_set)]
504 test_df = df[df[group].isin(test_set)]
505 return train_df, test_df