Coverage for mlinsights/mlmodel/quantile_regression.py: 97%

79 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-28 08:46 +0100

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Implements a quantile linear regression. 

5""" 

6import numpy 

7from sklearn.linear_model import LinearRegression 

8from sklearn.metrics import mean_absolute_error 

9 

10 

11class QuantileLinearRegression(LinearRegression): 

12 """ 

13 Quantile Linear Regression or linear regression 

14 trained with norm :epkg:`L1`. This class inherits from 

15 :epkg:`sklearn:linear_models:LinearRegression`. 

16 See notebook :ref:`quantileregressionrst`. 

17 

18 Norm :epkg:`L1` is chosen if ``quantile=0.5``, otherwise, 

19 for *quantile=*:math:`\\rho`, 

20 the following error is optimized: 

21 

22 .. math:: 

23 

24 \\sum_i \\rho |f(X_i) - Y_i|^- + (1-\\rho) |f(X_i) - Y_i|^+ 

25 

26 where :math:`|f(X_i) - Y_i|^-= \\max(Y_i - f(X_i), 0)` and 

27 :math:`|f(X_i) - Y_i|^+= \\max(f(X_i) - Y_i, 0)`. 

28 :math:`f(i)` is the prediction, :math:`Y_i` the expected 

29 value. 

30 """ 

31 

32 def __init__(self, fit_intercept=True, copy_X=True, 

33 n_jobs=1, delta=0.0001, max_iter=10, quantile=0.5, 

34 positive=False, verbose=False): 

35 """ 

36 :param fit_intercept: boolean, optional, default True 

37 whether to calculate the intercept for this model. If set 

38 to False, no intercept will be used in calculations 

39 (e.g. data is expected to be already centered). 

40 :param copy_X: boolean, optional, default True 

41 If True, X will be copied; else, it may be overwritten. 

42 :param n_jobs: int, optional, default 1 

43 The number of jobs to use for the computation. 

44 If -1 all CPUs are used. This will only provide speedup for 

45 n_targets > 1 and sufficient large problems. 

46 :param max_iter: int, optional, default 1 

47 The number of iteration to do at training time. 

48 This parameter is specific to the quantile regression. 

49 :param delta: float, optional, default 0.0001 

50 Used to ensure matrices has an inverse 

51 (*M + delta*I*). 

52 :param quantile: float, by default 0.5, 

53 determines which quantile to use 

54 to estimate the regression. 

55 :param positive: when set to True, forces the coefficients to be positive. 

56 :param verbose: bool, optional, default False 

57 Prints error at each iteration of the optimisation. 

58 """ 

59 try: 

60 LinearRegression.__init__( 

61 self, fit_intercept=fit_intercept, 

62 copy_X=copy_X, n_jobs=n_jobs, positive=positive) 

63 except TypeError: 

64 # scikit-learn<0.24 

65 LinearRegression.__init__( 

66 self, fit_intercept=fit_intercept, 

67 copy_X=copy_X, n_jobs=n_jobs) 

68 self.max_iter = max_iter 

69 self.verbose = verbose 

70 self.delta = delta 

71 self.quantile = quantile 

72 

73 def fit(self, X, y, sample_weight=None): 

74 """ 

75 Fits a linear model with :epkg:`L1` norm which 

76 is equivalent to a quantile regression. 

77 The implementation is not the most efficient 

78 as it calls multiple times method fit 

79 from :epkg:`sklearn:linear_models:LinearRegression`. 

80 Data gets checked and rescaled each time. 

81 The optimization follows the algorithm 

82 `Iteratively reweighted least squares 

83 <https://en.wikipedia.org/wiki/Iteratively_reweighted_least_squares>`_. 

84 It is described in French at 

85 `Régression quantile 

86 <http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx/notebooks/td_note_2017_2.html>`_. 

87 

88 :param X: numpy array or sparse matrix of shape [n_samples,n_features] 

89 Training data 

90 :param y: numpy array of shape [n_samples, n_targets] 

91 Target values. Will be cast to X's dtype if necessary 

92 :param sample_weight: numpy array of shape [n_samples] 

93 Individual weights for each sample 

94 :return: self, returns an instance of self. 

95 

96 Fitted attributes: 

97 

98 * `coef_`: array, shape (n_features, ) or (n_targets, n_features) 

99 Estimated coefficients for the linear regression problem. 

100 If multiple targets are passed during the fit (y 2D), this 

101 is a 2D array of shape (n_targets, n_features), while if only 

102 one target is passed, this is a 1D array of length n_features. 

103 * `intercept_`: array 

104 Independent term in the linear model. 

105 * `n_iter_`: int 

106 Number of iterations at training time. 

107 """ 

108 if len(y.shape) > 1 and y.shape[1] != 1: 

109 raise ValueError("QuantileLinearRegression only works for Y real") 

110 

111 def compute_z(Xm, beta, Y, W, delta=0.0001): 

112 "compute z" 

113 deltas = numpy.ones(X.shape[0]) * delta 

114 epsilon, mult = QuantileLinearRegression._epsilon( 

115 Y, Xm @ beta, self.quantile) 

116 r = numpy.reciprocal(numpy.maximum( # pylint: disable=E1111 

117 epsilon, deltas)) # pylint: disable=E1111 

118 if mult is not None: 

119 epsilon *= 1 - mult 

120 r *= 1 - mult 

121 return r, epsilon 

122 

123 if not isinstance(X, numpy.ndarray): 

124 if hasattr(X, 'values'): 

125 X = X.values 

126 else: 

127 raise TypeError("X must be an array or a dataframe.") 

128 

129 if self.fit_intercept: 

130 Xm = numpy.hstack([X, numpy.ones((X.shape[0], 1))]) 

131 else: 

132 Xm = X 

133 

134 try: 

135 clr = LinearRegression(fit_intercept=False, copy_X=self.copy_X, 

136 n_jobs=self.n_jobs, 

137 positive=self.positive) 

138 except AttributeError: 

139 # scikit-learn<0.24 

140 clr = LinearRegression(fit_intercept=False, copy_X=self.copy_X, 

141 n_jobs=self.n_jobs) 

142 

143 W = numpy.ones(X.shape[0]) if sample_weight is None else sample_weight 

144 self.n_iter_ = 0 

145 lastE = None 

146 for i in range(0, self.max_iter): 

147 clr.fit(Xm, y, W) 

148 beta = clr.coef_ 

149 W, epsilon = compute_z(Xm, beta, y, W, delta=self.delta) 

150 if sample_weight is not None: 

151 W *= sample_weight 

152 epsilon *= sample_weight 

153 E = epsilon.sum() 

154 self.n_iter_ = i 

155 if self.verbose: 

156 print( # pragma: no cover 

157 f'[QuantileLinearRegression.fit] iter={i + 1} error={E}') 

158 if lastE is not None and lastE == E: 

159 break 

160 lastE = E 

161 

162 if self.fit_intercept: 

163 self.coef_ = beta[:-1] 

164 self.intercept_ = beta[-1] 

165 else: 

166 self.coef_ = beta 

167 self.intercept_ = 0 

168 

169 return self 

170 

171 @staticmethod 

172 def _epsilon(y_true, y_pred, quantile, sample_weight=None): 

173 diff = y_pred - y_true 

174 epsilon = numpy.abs(diff) 

175 if quantile != 0.5: 

176 sign = numpy.sign(diff) # pylint: disable=E1111 

177 mult = numpy.ones(y_true.shape[0]) 

178 mult[sign > 0] *= quantile # pylint: disable=W0143 

179 mult[sign < 0] *= (1 - quantile) # pylint: disable=W0143 

180 else: 

181 mult = None 

182 if sample_weight is not None: 

183 epsilon *= sample_weight 

184 return epsilon, mult 

185 

186 def score(self, X, y, sample_weight=None): 

187 """ 

188 Returns Mean absolute error regression loss. 

189 

190 :param X: array-like, shape = (n_samples, n_features) 

191 Test samples. 

192 :param y: array-like, shape = (n_samples) or (n_samples, n_outputs) 

193 True values for X. 

194 :param sample_weight: array-like, shape = [n_samples], optional 

195 Sample weights. 

196 :return: score : float 

197 mean absolute error regression loss 

198 """ 

199 pred = self.predict(X) 

200 

201 if self.quantile != 0.5: 

202 epsilon, mult = QuantileLinearRegression._epsilon( 

203 y, pred, self.quantile, sample_weight) 

204 if mult is not None: 

205 epsilon *= mult * 2 

206 return epsilon.sum() / X.shape[0] 

207 return mean_absolute_error(y, pred, sample_weight=sample_weight)