Coverage for mlinsights/mlmodel/piecewise_tree_regression.py: 99%
77 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Implements a kind of piecewise linear regression by modifying
5the criterion used by the algorithm which builds a decision tree.
6"""
7import numpy
8from sklearn.tree import DecisionTreeRegressor
11class PiecewiseTreeRegressor(DecisionTreeRegressor):
12 """
13 Implements a kind of piecewise linear regression by modifying
14 the criterion used by the algorithm which builds a decision tree.
15 See :epkg:`sklearn:tree:DecisionTreeRegressor` to get the meaning
16 of the parameters except criterion:
18 * ``mselin``: optimizes for a piecewise linear regression
19 * ``simple``: optimizes for a stepwise regression (equivalent to *mse*)
20 """
22 def __init__(self, criterion='mselin', splitter='best', max_depth=None,
23 min_samples_split=2, min_samples_leaf=1,
24 min_weight_fraction_leaf=0.0, max_features=None,
25 random_state=None, max_leaf_nodes=None,
26 min_impurity_decrease=0.0):
27 DecisionTreeRegressor.__init__(
28 self, criterion=criterion,
29 splitter=splitter, max_depth=max_depth,
30 min_samples_split=min_samples_split,
31 min_samples_leaf=min_samples_leaf,
32 min_weight_fraction_leaf=min_weight_fraction_leaf,
33 max_features=max_features, random_state=random_state,
34 max_leaf_nodes=max_leaf_nodes,
35 min_impurity_decrease=min_impurity_decrease)
37 def fit(self, X, y, sample_weight=None, check_input=True):
38 """
39 Replaces the string stored in criterion by an instance of a class.
40 """
41 replace = None
42 if isinstance(self.criterion, str):
43 if self.criterion == 'mselin':
44 from .piecewise_tree_regression_criterion_linear import ( # pylint: disable=E0611,C0415
45 LinearRegressorCriterion)
46 replace = self.criterion
47 self.criterion = LinearRegressorCriterion(
48 1 if len(y.shape) <= 1 else y.shape[1], X)
49 elif self.criterion == "simple":
50 from .piecewise_tree_regression_criterion_fast import ( # pylint: disable=E0611,C0415
51 SimpleRegressorCriterionFast)
52 replace = self.criterion
53 self.criterion = SimpleRegressorCriterionFast(
54 1 if len(y.shape) <= 1 else y.shape[1], X.shape[0])
55 else:
56 replace = None
58 DecisionTreeRegressor.fit(
59 self, X, y,
60 sample_weight=sample_weight,
61 check_input=check_input)
63 if replace:
64 self.criterion = replace
66 if self.criterion == "mselin":
67 self._fit_reglin(X, y, sample_weight)
68 return self
70 def _mapping_train(self, X):
71 tree = self.tree_
72 leaves = [i for i in range(len(tree.children_left))
73 if tree.children_left[i] <= i and tree.children_right[i] <= i] # pylint: disable=E1136
74 dec_path = self.decision_path(X)
75 association = numpy.zeros((X.shape[0],))
76 association[:] = -1
77 mapping = {}
78 ntree = 0
79 for j in leaves:
80 ind = dec_path[:, j] == 1
81 ind = numpy.asarray(ind.todense()).flatten()
82 if not numpy.any(ind):
83 # No training example for this bucket.
84 continue
85 mapping[j] = ntree
86 association[ind] = ntree
87 ntree += 1
88 return mapping
90 def predict_leaves(self, X):
91 """
92 Returns the leave index for each observation of *X*.
94 :param X: array
95 :return: array
96 leaves index in ``self.leaves_index_``
97 """
98 # The creation of the sparse matrix could be avoided.
99 leaves = self.decision_path(X)
100 leaves = leaves[:, self.leaves_index_]
101 mat = numpy.argmax(leaves, 1)
102 res = numpy.asarray(mat).ravel()
103 return res
105 def _fit_reglin(self, X, y, sample_weight):
106 """
107 Fits linear regressions for all leaves.
108 Sets attributes ``leaves_mapping_``, ``betas_``, ``leaves_index_``.
109 The first attribute is a dictionary ``{leave: row}``
110 which maps a leave of the tree to the coefficients
111 ``betas_[row, :]`` of a regression trained on all training
112 points mapped a specific leave. ``leaves_index_`` keeps
113 in memory a set of leaves.
114 """
115 from .piecewise_tree_regression_criterion_linear import ( # pylint: disable=E0611,C0415
116 LinearRegressorCriterion)
118 tree = self.tree_
119 self.leaves_index_ = [i for i in range(len(tree.children_left))
120 if tree.children_left[i] <= i and tree.children_right[i] <= i] # pylint: disable=E1136
121 if tree.n_leaves != len(self.leaves_index_):
122 raise RuntimeError( # pragma: no cover
123 f"Unexpected number of leaves {tree.n_leaves} "
124 f"!= {len(self.leaves_index_)}.")
125 pred_leaves = self.predict_leaves(X)
126 self.leaves_mapping_ = {k: i for i, k in enumerate(pred_leaves)}
127 self.betas_ = numpy.empty((len(self.leaves_index_), X.shape[1] + 1))
128 for i, _ in enumerate(self.leaves_index_):
129 ind = pred_leaves == i
130 xs = X[ind, :].copy()
131 ys = y[ind].astype(numpy.float64)
132 if len(ys.shape) == 1:
133 ys = ys[:, numpy.newaxis]
134 ys = ys.copy()
135 ws = sample_weight[ind].copy() if sample_weight else None
136 dec = LinearRegressorCriterion.create(xs, ys, ws)
137 dec.node_beta(self.betas_[i, :])
139 def predict(self, X, check_input=True):
140 """
141 Overloads method *predict*. Falls back into
142 the predict from a decision tree is criterion is
143 *mse*, *mae*, *simple*. Computes the predictions
144 from linear regression if the criterion is *mselin*.
145 """
146 if self.criterion == 'mselin':
147 return self._predict_reglin(X, check_input=check_input)
148 return DecisionTreeRegressor.predict(self, X, check_input=check_input)
150 def _predict_reglin(self, X, check_input=True):
151 """
152 Computes the predictions with a linear regression
153 fitted with the observations mapped to each leave
154 of the tree.
156 :param X: array-like or sparse matrix of shape = [n_samples, n_features]
157 The input samples. Internally, it will be converted to
158 ``dtype=np.float32`` and if a sparse matrix is provided
159 to a sparse ``csr_matrix``.
160 :param check_input: boolean, (default=True)
161 Allow to bypass several input checking.
162 Don't use this parameter unless you know what you do.
163 :return: y, array of shape = [n_samples] or [n_samples, n_outputs]
164 The predicted classes, or the predict values.
165 """
166 leaves = self.predict_leaves(X)
167 pred = numpy.ones((X.shape[0], 1))
168 Xone = numpy.hstack([X, pred])
169 for i in range(0, X.shape[0]):
170 li = leaves[i]
171 pred[i] = numpy.dot(Xone[i, :], self.betas_[li, :])
172 return pred.ravel()