Coverage for mlinsights/mlmodel/quantile_regression.py: 99%

79 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-08-09 08:45 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Implements a quantile linear regression. 

5""" 

6import numpy 

7from sklearn.linear_model import LinearRegression 

8from sklearn.metrics import mean_absolute_error 

9 

10 

11class QuantileLinearRegression(LinearRegression): 

12 """ 

13 Quantile Linear Regression or linear regression 

14 trained with norm :epkg:`L1`. This class inherits from 

15 :epkg:`sklearn:linear_models:LinearRegression`. 

16 See notebook :ref:`quantileregressionrst`. 

17 

18 Norm :epkg:`L1` is chosen if ``quantile=0.5``, otherwise, 

19 for *quantile=*:math:`\\rho`, 

20 the following error is optimized: 

21 

22 .. math:: 

23 

24 \\sum_i \\rho |f(X_i) - Y_i|^- + (1-\\rho) |f(X_i) - Y_i|^+ 

25 

26 where :math:`|f(X_i) - Y_i|^-= \\max(Y_i - f(X_i), 0)` and 

27 :math:`|f(X_i) - Y_i|^+= \\max(f(X_i) - Y_i, 0)`. 

28 :math:`f(i)` is the prediction, :math:`Y_i` the expected 

29 value. 

30 """ 

31 

32 def __init__(self, fit_intercept=True, normalize=False, copy_X=True, 

33 n_jobs=1, delta=0.0001, max_iter=10, quantile=0.5, 

34 positive=False, verbose=False): 

35 """ 

36 :param fit_intercept: boolean, optional, default True 

37 whether to calculate the intercept for this model. If set 

38 to False, no intercept will be used in calculations 

39 (e.g. data is expected to be already centered). 

40 :param normalize: boolean, optional, default False 

41 This parameter is ignored when ``fit_intercept`` is set to False. 

42 If True, the regressors X will be normalized before regression by 

43 subtracting the mean and dividing by the l2-norm. 

44 If you wish to standardize, please use 

45 :class:`sklearn.preprocessing.StandardScaler` before calling ``fit`` on 

46 an estimator with ``normalize=False``. 

47 :param copy_X: boolean, optional, default True 

48 If True, X will be copied; else, it may be overwritten. 

49 :param n_jobs: int, optional, default 1 

50 The number of jobs to use for the computation. 

51 If -1 all CPUs are used. This will only provide speedup for 

52 n_targets > 1 and sufficient large problems. 

53 :param max_iter: int, optional, default 1 

54 The number of iteration to do at training time. 

55 This parameter is specific to the quantile regression. 

56 :param delta: float, optional, default 0.0001 

57 Used to ensure matrices has an inverse 

58 (*M + delta*I*). 

59 :param quantile: float, by default 0.5, 

60 determines which quantile to use 

61 to estimate the regression. 

62 :param positive: when set to True, forces the coefficients to be positive. 

63 :param verbose: bool, optional, default False 

64 Prints error at each iteration of the optimisation. 

65 """ 

66 try: 

67 LinearRegression.__init__( 

68 self, fit_intercept=fit_intercept, normalize=normalize, 

69 copy_X=copy_X, n_jobs=n_jobs, positive=positive) 

70 except TypeError: 

71 # scikit-learn<0.24 

72 LinearRegression.__init__( 

73 self, fit_intercept=fit_intercept, normalize=normalize, 

74 copy_X=copy_X, n_jobs=n_jobs) 

75 self.max_iter = max_iter 

76 self.verbose = verbose 

77 self.delta = delta 

78 self.quantile = quantile 

79 

80 def fit(self, X, y, sample_weight=None): 

81 """ 

82 Fits a linear model with :epkg:`L1` norm which 

83 is equivalent to a quantile regression. 

84 The implementation is not the most efficient 

85 as it calls multiple times method fit 

86 from :epkg:`sklearn:linear_models:LinearRegression`. 

87 Data gets checked and rescaled each time. 

88 The optimization follows the algorithm 

89 `Iteratively reweighted least squares 

90 <https://en.wikipedia.org/wiki/Iteratively_reweighted_least_squares>`_. 

91 It is described in French at 

92 `Régression quantile 

93 <http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx/notebooks/td_note_2017_2.html>`_. 

94 

95 :param X: numpy array or sparse matrix of shape [n_samples,n_features] 

96 Training data 

97 :param y: numpy array of shape [n_samples, n_targets] 

98 Target values. Will be cast to X's dtype if necessary 

99 :param sample_weight: numpy array of shape [n_samples] 

100 Individual weights for each sample 

101 :return: self, returns an instance of self. 

102 

103 Fitted attributes: 

104 

105 * `coef_`: array, shape (n_features, ) or (n_targets, n_features) 

106 Estimated coefficients for the linear regression problem. 

107 If multiple targets are passed during the fit (y 2D), this 

108 is a 2D array of shape (n_targets, n_features), while if only 

109 one target is passed, this is a 1D array of length n_features. 

110 * `intercept_`: array 

111 Independent term in the linear model. 

112 * `n_iter_`: int 

113 Number of iterations at training time. 

114 """ 

115 if len(y.shape) > 1 and y.shape[1] != 1: 

116 raise ValueError("QuantileLinearRegression only works for Y real") 

117 

118 def compute_z(Xm, beta, Y, W, delta=0.0001): 

119 "compute z" 

120 deltas = numpy.ones(X.shape[0]) * delta 

121 epsilon, mult = QuantileLinearRegression._epsilon( 

122 Y, Xm @ beta, self.quantile) 

123 r = numpy.reciprocal(numpy.maximum( # pylint: disable=E1111 

124 epsilon, deltas)) # pylint: disable=E1111 

125 if mult is not None: 

126 epsilon *= 1 - mult 

127 r *= 1 - mult 

128 return r, epsilon 

129 

130 if not isinstance(X, numpy.ndarray): 

131 if hasattr(X, 'values'): 

132 X = X.values 

133 else: 

134 raise TypeError("X must be an array or a dataframe.") 

135 

136 if self.fit_intercept: 

137 Xm = numpy.hstack([X, numpy.ones((X.shape[0], 1))]) 

138 else: 

139 Xm = X 

140 

141 try: 

142 clr = LinearRegression(fit_intercept=False, copy_X=self.copy_X, 

143 n_jobs=self.n_jobs, normalize=self.normalize, 

144 positive=self.positive) 

145 except AttributeError: 

146 # scikit-learn<0.24 

147 clr = LinearRegression(fit_intercept=False, copy_X=self.copy_X, 

148 n_jobs=self.n_jobs, normalize=self.normalize) 

149 

150 W = numpy.ones(X.shape[0]) if sample_weight is None else sample_weight 

151 self.n_iter_ = 0 

152 lastE = None 

153 for i in range(0, self.max_iter): 

154 clr.fit(Xm, y, W) 

155 beta = clr.coef_ 

156 W, epsilon = compute_z(Xm, beta, y, W, delta=self.delta) 

157 if sample_weight is not None: 

158 W *= sample_weight 

159 epsilon *= sample_weight 

160 E = epsilon.sum() 

161 self.n_iter_ = i 

162 if self.verbose: 

163 print( # pragma: no cover 

164 f'[QuantileLinearRegression.fit] iter={i + 1} error={E}') 

165 if lastE is not None and lastE == E: 

166 break 

167 lastE = E 

168 

169 if self.fit_intercept: 

170 self.coef_ = beta[:-1] 

171 self.intercept_ = beta[-1] 

172 else: 

173 self.coef_ = beta 

174 self.intercept_ = 0 

175 

176 return self 

177 

178 @staticmethod 

179 def _epsilon(y_true, y_pred, quantile, sample_weight=None): 

180 diff = y_pred - y_true 

181 epsilon = numpy.abs(diff) 

182 if quantile != 0.5: 

183 sign = numpy.sign(diff) # pylint: disable=E1111 

184 mult = numpy.ones(y_true.shape[0]) 

185 mult[sign > 0] *= quantile # pylint: disable=W0143 

186 mult[sign < 0] *= (1 - quantile) # pylint: disable=W0143 

187 else: 

188 mult = None 

189 if sample_weight is not None: 

190 epsilon *= sample_weight 

191 return epsilon, mult 

192 

193 def score(self, X, y, sample_weight=None): 

194 """ 

195 Returns Mean absolute error regression loss. 

196 

197 :param X: array-like, shape = (n_samples, n_features) 

198 Test samples. 

199 :param y: array-like, shape = (n_samples) or (n_samples, n_outputs) 

200 True values for X. 

201 :param sample_weight: array-like, shape = [n_samples], optional 

202 Sample weights. 

203 :return: score : float 

204 mean absolute error regression loss 

205 """ 

206 pred = self.predict(X) 

207 

208 if self.quantile != 0.5: 

209 epsilon, mult = QuantileLinearRegression._epsilon( 

210 y, pred, self.quantile, sample_weight) 

211 if mult is not None: 

212 epsilon *= mult * 2 

213 return epsilon.sum() / X.shape[0] 

214 return mean_absolute_error(y, pred, sample_weight=sample_weight)