Coverage for mlinsights/mlmodel/quantile_regression.py: 97%
79 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Implements a quantile linear regression.
5"""
6import numpy
7from sklearn.linear_model import LinearRegression
8from sklearn.metrics import mean_absolute_error
11class QuantileLinearRegression(LinearRegression):
12 """
13 Quantile Linear Regression or linear regression
14 trained with norm :epkg:`L1`. This class inherits from
15 :epkg:`sklearn:linear_models:LinearRegression`.
16 See notebook :ref:`quantileregressionrst`.
18 Norm :epkg:`L1` is chosen if ``quantile=0.5``, otherwise,
19 for *quantile=*:math:`\\rho`,
20 the following error is optimized:
22 .. math::
24 \\sum_i \\rho |f(X_i) - Y_i|^- + (1-\\rho) |f(X_i) - Y_i|^+
26 where :math:`|f(X_i) - Y_i|^-= \\max(Y_i - f(X_i), 0)` and
27 :math:`|f(X_i) - Y_i|^+= \\max(f(X_i) - Y_i, 0)`.
28 :math:`f(i)` is the prediction, :math:`Y_i` the expected
29 value.
30 """
32 def __init__(self, fit_intercept=True, copy_X=True,
33 n_jobs=1, delta=0.0001, max_iter=10, quantile=0.5,
34 positive=False, verbose=False):
35 """
36 :param fit_intercept: boolean, optional, default True
37 whether to calculate the intercept for this model. If set
38 to False, no intercept will be used in calculations
39 (e.g. data is expected to be already centered).
40 :param copy_X: boolean, optional, default True
41 If True, X will be copied; else, it may be overwritten.
42 :param n_jobs: int, optional, default 1
43 The number of jobs to use for the computation.
44 If -1 all CPUs are used. This will only provide speedup for
45 n_targets > 1 and sufficient large problems.
46 :param max_iter: int, optional, default 1
47 The number of iteration to do at training time.
48 This parameter is specific to the quantile regression.
49 :param delta: float, optional, default 0.0001
50 Used to ensure matrices has an inverse
51 (*M + delta*I*).
52 :param quantile: float, by default 0.5,
53 determines which quantile to use
54 to estimate the regression.
55 :param positive: when set to True, forces the coefficients to be positive.
56 :param verbose: bool, optional, default False
57 Prints error at each iteration of the optimisation.
58 """
59 try:
60 LinearRegression.__init__(
61 self, fit_intercept=fit_intercept,
62 copy_X=copy_X, n_jobs=n_jobs, positive=positive)
63 except TypeError:
64 # scikit-learn<0.24
65 LinearRegression.__init__(
66 self, fit_intercept=fit_intercept,
67 copy_X=copy_X, n_jobs=n_jobs)
68 self.max_iter = max_iter
69 self.verbose = verbose
70 self.delta = delta
71 self.quantile = quantile
73 def fit(self, X, y, sample_weight=None):
74 """
75 Fits a linear model with :epkg:`L1` norm which
76 is equivalent to a quantile regression.
77 The implementation is not the most efficient
78 as it calls multiple times method fit
79 from :epkg:`sklearn:linear_models:LinearRegression`.
80 Data gets checked and rescaled each time.
81 The optimization follows the algorithm
82 `Iteratively reweighted least squares
83 <https://en.wikipedia.org/wiki/Iteratively_reweighted_least_squares>`_.
84 It is described in French at
85 `Régression quantile
86 <http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx/notebooks/td_note_2017_2.html>`_.
88 :param X: numpy array or sparse matrix of shape [n_samples,n_features]
89 Training data
90 :param y: numpy array of shape [n_samples, n_targets]
91 Target values. Will be cast to X's dtype if necessary
92 :param sample_weight: numpy array of shape [n_samples]
93 Individual weights for each sample
94 :return: self, returns an instance of self.
96 Fitted attributes:
98 * `coef_`: array, shape (n_features, ) or (n_targets, n_features)
99 Estimated coefficients for the linear regression problem.
100 If multiple targets are passed during the fit (y 2D), this
101 is a 2D array of shape (n_targets, n_features), while if only
102 one target is passed, this is a 1D array of length n_features.
103 * `intercept_`: array
104 Independent term in the linear model.
105 * `n_iter_`: int
106 Number of iterations at training time.
107 """
108 if len(y.shape) > 1 and y.shape[1] != 1:
109 raise ValueError("QuantileLinearRegression only works for Y real")
111 def compute_z(Xm, beta, Y, W, delta=0.0001):
112 "compute z"
113 deltas = numpy.ones(X.shape[0]) * delta
114 epsilon, mult = QuantileLinearRegression._epsilon(
115 Y, Xm @ beta, self.quantile)
116 r = numpy.reciprocal(numpy.maximum( # pylint: disable=E1111
117 epsilon, deltas)) # pylint: disable=E1111
118 if mult is not None:
119 epsilon *= 1 - mult
120 r *= 1 - mult
121 return r, epsilon
123 if not isinstance(X, numpy.ndarray):
124 if hasattr(X, 'values'):
125 X = X.values
126 else:
127 raise TypeError("X must be an array or a dataframe.")
129 if self.fit_intercept:
130 Xm = numpy.hstack([X, numpy.ones((X.shape[0], 1))])
131 else:
132 Xm = X
134 try:
135 clr = LinearRegression(fit_intercept=False, copy_X=self.copy_X,
136 n_jobs=self.n_jobs,
137 positive=self.positive)
138 except AttributeError:
139 # scikit-learn<0.24
140 clr = LinearRegression(fit_intercept=False, copy_X=self.copy_X,
141 n_jobs=self.n_jobs)
143 W = numpy.ones(X.shape[0]) if sample_weight is None else sample_weight
144 self.n_iter_ = 0
145 lastE = None
146 for i in range(0, self.max_iter):
147 clr.fit(Xm, y, W)
148 beta = clr.coef_
149 W, epsilon = compute_z(Xm, beta, y, W, delta=self.delta)
150 if sample_weight is not None:
151 W *= sample_weight
152 epsilon *= sample_weight
153 E = epsilon.sum()
154 self.n_iter_ = i
155 if self.verbose:
156 print( # pragma: no cover
157 f'[QuantileLinearRegression.fit] iter={i + 1} error={E}')
158 if lastE is not None and lastE == E:
159 break
160 lastE = E
162 if self.fit_intercept:
163 self.coef_ = beta[:-1]
164 self.intercept_ = beta[-1]
165 else:
166 self.coef_ = beta
167 self.intercept_ = 0
169 return self
171 @staticmethod
172 def _epsilon(y_true, y_pred, quantile, sample_weight=None):
173 diff = y_pred - y_true
174 epsilon = numpy.abs(diff)
175 if quantile != 0.5:
176 sign = numpy.sign(diff) # pylint: disable=E1111
177 mult = numpy.ones(y_true.shape[0])
178 mult[sign > 0] *= quantile # pylint: disable=W0143
179 mult[sign < 0] *= (1 - quantile) # pylint: disable=W0143
180 else:
181 mult = None
182 if sample_weight is not None:
183 epsilon *= sample_weight
184 return epsilon, mult
186 def score(self, X, y, sample_weight=None):
187 """
188 Returns Mean absolute error regression loss.
190 :param X: array-like, shape = (n_samples, n_features)
191 Test samples.
192 :param y: array-like, shape = (n_samples) or (n_samples, n_outputs)
193 True values for X.
194 :param sample_weight: array-like, shape = [n_samples], optional
195 Sample weights.
196 :return: score : float
197 mean absolute error regression loss
198 """
199 pred = self.predict(X)
201 if self.quantile != 0.5:
202 epsilon, mult = QuantileLinearRegression._epsilon(
203 y, pred, self.quantile, sample_weight)
204 if mult is not None:
205 epsilon *= mult * 2
206 return epsilon.sum() / X.shape[0]
207 return mean_absolute_error(y, pred, sample_weight=sample_weight)