Coverage for mlinsights/mlmodel/piecewise_estimator.py: 99%
193 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
1"""
2@file
3@brief Implements a piecewise linear regression.
4"""
5import numpy
6import numpy.random
7import pandas
8from sklearn.base import BaseEstimator, RegressorMixin, ClassifierMixin, clone
9from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
10from sklearn.linear_model import LinearRegression, LogisticRegression
11from sklearn.preprocessing import KBinsDiscretizer
12from sklearn.utils._joblib import Parallel, delayed
13try:
14 from tqdm import tqdm
15except ImportError: # pragma: no cover
16 pass
19def _fit_piecewise_estimator(i, model, X, y, sample_weight, association, nb_classes, random_state):
20 ind = association == i
21 if not numpy.any(ind):
22 # No training example for this bucket.
23 return model # pragma: no cover
24 Xi = X[ind, :]
25 yi = y[ind]
26 sw = sample_weight[ind] if sample_weight is not None else None
28 if nb_classes is not None and len(set(yi)) != nb_classes:
29 # Issues a classifiers requires to have at least one example
30 # of each class.
31 if random_state is None:
32 random_state = numpy.random.RandomState() # pylint: disable=E1101
33 addition = numpy.arange(len(ind))
34 random_state.shuffle(addition)
35 found = set(yi)
36 allcl = set(y)
37 res = []
38 while len(found) < len(allcl):
39 for ki in addition:
40 if y[ki] not in found:
41 res.append(ki)
42 found.add(y[ki])
43 ind = ind.copy()
44 for ki in res:
45 ind[ki] = True
47 Xi = X[ind, :]
48 yi = y[ind]
49 sw = sample_weight[ind] if sample_weight is not None else None
51 return model.fit(Xi, yi, sample_weight=sw)
54def _predict_piecewise_estimator(i, est, X, association):
55 ind = association == i
56 if not numpy.any(ind):
57 return None, None
58 return ind, est.predict(X[ind, :])
61def _predict_proba_piecewise_estimator(i, est, X, association):
62 ind = association == i
63 if not numpy.any(ind):
64 return None, None
65 return ind, est.predict_proba(X[ind, :])
68def _decision_function_piecewise_estimator(i, est, X, association):
69 ind = association == i
70 if not numpy.any(ind):
71 return None, None
72 return ind, est.decision_function(X[ind, :])
75class PiecewiseEstimator(BaseEstimator):
76 """
77 Uses a :epkg:`decision tree` to split the space of features
78 into buckets and trains a linear regression on each of them.
79 The second estimator can be a :epkg:`sklearn:linear_model:LinearRegression`
80 for a regression or :epkg:`sklearn:linear_model:LogisticRegression`
81 for a classifier. It can also be :epkg:`sklearn:dummy:DummyRegressor`
82 :epkg:`sklearn:dummy:DummyClassifier` to just get the average on each bucket.
83 When the buckets are defined by a decision tree and the
84 estimator is linear, @see cl PiecewiseTreeRegressor optimizes
85 the buckets based on the results of a linear regression.
86 The accuracy is usually better.
87 """
89 def __init__(self, binner=None, estimator=None, n_jobs=None, verbose=False):
90 """
91 @param binner transformer or predictor which creates the buckets
92 @param estimator predictor trained on every bucket
93 @param n_jobs number of parallel jobs (for training and predicting)
94 @param verbose boolean or use ``'tqdm'`` to use :epkg:`tqdm`
95 to fit the estimators
97 *binner* must be filled or must be:
99 - ``'bins'``: the model :epkg:`sklearn:preprocessing:KBinsDiscretizer`
100 - any instanciated model
102 *estimator* allows the following values:
104 - ``None``: the model is :epkg:`sklearn:linear_model:LinearRegression`
105 - any instanciated model
106 """
107 BaseEstimator.__init__(self)
108 if estimator is None:
109 raise ValueError( # pragma: no cover
110 "estimator cannot be null.")
111 if binner is None:
112 raise TypeError( # pragma: no cover
113 f"Unsupported options for binner=='tree' and model {type(estimator)}.")
114 elif binner == "bins":
115 binner = KBinsDiscretizer()
116 self.binner = binner
117 self.estimator = estimator
118 self.n_jobs = n_jobs
119 self.verbose = verbose
121 @property
122 def n_estimators_(self):
123 """
124 Returns the number of estimators = the number of buckets
125 the data was split in.
126 """
127 return len(self.estimators_)
129 def _mapping_train(self, X, binner):
130 if hasattr(binner, "tree_"):
131 tree = binner.tree_
132 leaves = [i for i in range(len(tree.children_left))
133 if tree.children_left[i] <= i and tree.children_right[i] <= i]
134 dec_path = self.binner_.decision_path(X)
135 association = numpy.zeros((X.shape[0],))
136 association[:] = -1
137 mapping = {}
138 ntree = 0
139 for j in leaves:
140 ind = dec_path[:, j] == 1
141 ind = numpy.asarray(ind.todense()).flatten()
142 if not numpy.any(ind):
143 # No training example for this bucket.
144 continue # pragma: no cover
145 mapping[j] = ntree
146 association[ind] = ntree
147 ntree += 1
149 elif hasattr(binner, "transform"):
150 tr = binner.transform(X)
151 unique = set()
152 for x in tr:
153 d = tuple(numpy.asarray(
154 x.todense()).ravel().astype(numpy.int32))
155 unique.add(d)
156 leaves = list(sorted(unique))
157 association = numpy.zeros((X.shape[0],))
158 association[:] = -1
159 ntree = 0
160 mapping = {}
161 for i, le in enumerate(leaves):
162 mapping[le] = i
163 for i, x in enumerate(tr):
164 d = tuple(numpy.asarray(
165 x.todense()).ravel().astype(numpy.int32))
166 association[i] = mapping.get(d, -1)
167 else:
168 raise NotImplementedError( # pragma: no cover
169 "binner is not a decision tree or a transform")
171 return association, mapping, leaves
173 def transform_bins(self, X):
174 """
175 Maps every row to a tree in *self.estimators_*.
176 """
177 binner = self.binner_
178 if hasattr(binner, "tree_"):
179 dec_path = self.binner_.decision_path(X)
180 association = numpy.zeros((X.shape[0],))
181 association[:] = -1
182 for j in self.leaves_:
183 ind = dec_path[:, j] == 1
184 ind = numpy.asarray(ind.todense()).flatten()
185 if not numpy.any(ind):
186 # No training example for this bucket.
187 continue
188 association[ind] = self.mapping_.get(j, -1)
190 elif hasattr(binner, "transform"):
191 association = numpy.zeros((X.shape[0],))
192 association[:] = -1
193 tr = binner.transform(X)
194 for i, x in enumerate(tr):
195 d = tuple(numpy.asarray(
196 x.todense()).ravel().astype(numpy.int32))
197 association[i] = self.mapping_.get(d, -1)
198 else:
199 raise NotImplementedError( # pragma: no cover
200 "binner is not a decision tree or a transform")
201 return association
203 def fit(self, X, y, sample_weight=None):
204 """
205 Trains the binner and an estimator on every
206 bucket.
208 :param X: features, *X* is converted into an array if *X* is a dataframe
209 :param y: target
210 :param sample_weight: sample weights
211 :return: self: returns an instance of self.
213 Fitted attributes:
215 * `binner_`: binner
216 * `estimators_`: dictionary of estimators, each of them
217 mapped to a leave to the tree
218 * `mean_estimator_`: estimator trained on the whole
219 datasets in case the binner can find a bucket for
220 a new observation
221 * `dim_`: dimension of the output
222 * `mean_`: average targets
223 """
224 if len(y.shape) == 2:
225 if y.shape[-1] == 1:
226 y = y.ravel()
227 else:
228 raise RuntimeError(
229 "This regressor only works with single dimension targets.")
230 if isinstance(X, pandas.DataFrame):
231 X = X.values
232 if isinstance(X, list):
233 raise TypeError( # pragma: no cover
234 "X cannot be a list.")
235 binner = clone(self.binner)
236 if sample_weight is None:
237 self.binner_ = binner.fit(X, y)
238 else:
239 self.binner_ = binner.fit(X, y, sample_weight=sample_weight)
241 association, self.mapping_, self.leaves_ = self._mapping_train(
242 X, self.binner_)
244 estimators = [clone(self.estimator) for i in self.mapping_]
246 loop = (tqdm(range(len(estimators)))
247 if self.verbose == 'tqdm' else range(len(estimators)))
248 verbose = 1 if self.verbose == 'tqdm' else (1 if self.verbose else 0)
250 self.mean_estimator_ = clone(self.estimator).fit(X, y, sample_weight)
251 nb_classes = (None if not hasattr(self.mean_estimator_, 'classes_')
252 else len(set(self.mean_estimator_.classes_)))
254 if hasattr(self, 'random_state') and self.random_state is not None: # pylint: disable=E1101
255 rnd = numpy.random.RandomState( # pylint: disable=E1101
256 self.random_state) # pylint: disable=E1101
257 else:
258 rnd = None
260 self.estimators_ = \
261 Parallel(n_jobs=self.n_jobs, verbose=verbose, prefer='threads')(
262 delayed(_fit_piecewise_estimator)(
263 i, estimators[i], X, y, sample_weight, association, nb_classes, rnd)
264 for i in loop)
266 self.dim_ = 1 if len(y.shape) == 1 else y.shape[1]
267 if hasattr(self.estimators_[0], 'classes_'):
268 self.classes_ = self.estimators_[0].classes_
269 return self
271 def _apply_predict_method(self, X, method, parallelized, dimout):
272 """
273 Generic *predict* method, works for *predict_proba* and
274 *decision_function* as well.
275 """
276 if len(self.estimators_) == 0:
277 raise RuntimeError( # pragma: no cover
278 "Estimator was apparently fitted but contains no estimator.")
279 if not hasattr(self.estimators_[0], method):
280 raise TypeError( # pragma: no cover
281 f"Estimator {type(self.estimators_[0])} "
282 f"does not have method {method!r}.")
283 if isinstance(X, pandas.DataFrame):
284 X = X.values
286 association = self.transform_bins(X)
288 indpred = Parallel(n_jobs=self.n_jobs, prefer='threads')(
289 delayed(parallelized)(i, model, X, association)
290 for i, model in enumerate(self.estimators_))
292 pred = numpy.zeros((X.shape[0], dimout)
293 if dimout > 1 else (X.shape[0],))
294 indall = numpy.empty((X.shape[0],))
295 indall[:] = False
296 for ind, p in indpred:
297 if ind is None:
298 continue
299 pred[ind] = p
300 indall = numpy.logical_or(indall, ind) # pylint: disable=E1111
302 # no in a bucket
303 indall = numpy.logical_not(indall) # pylint: disable=E1111
304 Xmissed = X[indall]
305 if Xmissed.shape[0] > 0:
306 meth = getattr(self.mean_estimator_, method)
307 missed = meth(Xmissed)
308 pred[indall] = missed
309 return pred
312class PiecewiseRegressor(PiecewiseEstimator, RegressorMixin):
313 """
314 Uses a :epkg:`decision tree` to split the space of features
315 into buckets and trains a linear regression (default) on each of them.
316 The second estimator is usually a :epkg:`sklearn:linear_model:LinearRegression`.
317 It can also be :epkg:`sklearn:dummy:DummyRegressor` to just get
318 the average on each bucket.
319 """
321 def __init__(self, binner=None, estimator=None, n_jobs=None, verbose=False):
322 """
323 @param binner transformer or predictor which creates the buckets
324 @param estimator predictor trained on every bucket
325 @param n_jobs number of parallel jobs (for training and predicting)
326 @param verbose boolean or use ``'tqdm'`` to use :epkg:`tqdm`
327 to fit the estimators
329 *binner* allows the following values:
331 - ``tree``: the model is :epkg:`sklearn:tree:DecisionTreeRegressor`
332 - ``'bins'``: the model :epkg:`sklearn:preprocessing:KBinsDiscretizer`
333 - any instanciated model
335 *estimator* allows the following values:
337 - ``None``: the model is :epkg:`sklearn:linear_model:LinearRegression`
338 - any instanciated model
339 """
340 if estimator is None:
341 estimator = LinearRegression()
342 if binner in ('tree', None):
343 binner = DecisionTreeRegressor(min_samples_leaf=2)
344 RegressorMixin.__init__(self)
345 PiecewiseEstimator.__init__(self, binner=binner, estimator=estimator,
346 n_jobs=n_jobs, verbose=verbose)
348 def predict(self, X):
349 """
350 Computes the predictions.
352 :param X: features, *X* is converted into an array if *X* is a dataframe
353 :return: predictions
354 """
355 return self._apply_predict_method(
356 X, "predict", _predict_piecewise_estimator, self.dim_)
359class PiecewiseClassifier(PiecewiseEstimator, ClassifierMixin):
360 """
361 Uses a :epkg:`decision tree` to split the space of features
362 into buckets and trains a logistic regression (default) on each of them.
363 The second estimator is usually a :epkg:`sklearn:linear_model:LogisticRegression`.
364 It can also be :epkg:`sklearn:dummy:DummyClassifier` to just get
365 the average on each bucket.
367 The main issue with the *PiecewiseClassifier* is that each piece requires
368 one example of each class in each bucket which may not happen.
369 To avoid that, the training will pick up random example
370 from other bucket to ensure this case does not happen.
371 """
373 def __init__(self, binner=None, estimator=None, n_jobs=None,
374 random_state=None, verbose=False):
375 """
376 @param binner transformer or predictor which creates the buckets
377 @param estimator predictor trained on every bucket
378 @param n_jobs number of parallel jobs (for training and predicting)
379 @param random_state to pick up random examples when buckets do not
380 contain enough examples of each class
381 @param verbose boolean or use ``'tqdm'`` to use :epkg:`tqdm`
382 to fit the estimators
384 *binner* allows the following values:
386 - ``tree``: the model is :epkg:`sklearn:tree:DecisionTreeClassifier`
387 - ``'bins'``: the model :epkg:`sklearn:preprocessing:KBinsDiscretizer`
388 - any instanciated model
390 *estimator* allows the following values:
392 - ``None``: the model is :epkg:`sklearn:linear_model:LogisticRegression`
393 - any instanciated model
394 """
395 if estimator is None:
396 estimator = LogisticRegression()
397 if binner in ('tree', None):
398 binner = DecisionTreeClassifier(min_samples_leaf=5)
399 ClassifierMixin.__init__(self)
400 PiecewiseEstimator.__init__(
401 self, binner=binner, estimator=estimator,
402 n_jobs=n_jobs, verbose=verbose)
403 self.random_state = random_state
405 def predict(self, X):
406 """
407 Computes the predictions.
409 :param X: features, *X* is converted into an array if *X* is a dataframe
410 :return: predictions
411 """
412 pred = self._apply_predict_method(
413 X, "predict", _predict_piecewise_estimator, 1)
414 return pred.astype(numpy.int32)
416 def predict_proba(self, X):
417 """
418 Computes the predictions probabilities.
420 :param X: features, *X* is converted into an array if *X* is a dataframe
421 :return: predictions probabilities
422 """
423 return self._apply_predict_method(
424 X, "predict_proba", _predict_proba_piecewise_estimator,
425 len(self.mean_estimator_.classes_))
427 def decision_function(self, X):
428 """
429 Computes the predictions probabilities.
431 :param X: features, *X* is converted into an array if *X* is a dataframe
432 :return: predictions probabilities
433 """
434 justone = self.mean_estimator_.decision_function(X[:1])
435 return self._apply_predict_method(
436 X, "decision_function", _decision_function_piecewise_estimator,
437 1 if len(justone.shape) == 1 else justone.shape[1])