1# -*- coding: utf-8 -*-

2"""

3@file

4@brief Implements a quantile linear regression.

5"""

6import numpy

7from sklearn.linear_model import LinearRegression

8from sklearn.metrics import mean_absolute_error

11class QuantileLinearRegression(LinearRegression):

12 """

13 Quantile Linear Regression or linear regression

14 trained with norm :epkg:L1. This class inherits from

15 :epkg:sklearn:linear_models:LinearRegression.

16 See notebook :ref:quantileregressionrst.

18 Norm :epkg:L1 is chosen if quantile=0.5, otherwise,

19 for *quantile=*:math:\\rho,

20 the following error is optimized:

22 .. math::

24 \\sum_i \\rho |f(X_i) - Y_i|^- + (1-\\rho) |f(X_i) - Y_i|^+

26 where :math:|f(X_i) - Y_i|^-= \\max(Y_i - f(X_i), 0) and

27 :math:|f(X_i) - Y_i|^+= \\max(f(X_i) - Y_i, 0).

28 :math:f(i) is the prediction, :math:Y_i the expected

29 value.

30 """

32 def __init__(self, fit_intercept=True, normalize=False, copy_X=True,

33 n_jobs=1, delta=0.0001, max_iter=10, quantile=0.5,

34 positive=False, verbose=False):

35 """

36 :param fit_intercept: boolean, optional, default True

37 whether to calculate the intercept for this model. If set

38 to False, no intercept will be used in calculations

39 (e.g. data is expected to be already centered).

40 :param normalize: boolean, optional, default False

41 This parameter is ignored when fit_intercept is set to False.

42 If True, the regressors X will be normalized before regression by

43 subtracting the mean and dividing by the l2-norm.

44 If you wish to standardize, please use

45 :class:sklearn.preprocessing.StandardScaler before calling fit on

46 an estimator with normalize=False.

47 :param copy_X: boolean, optional, default True

48 If True, X will be copied; else, it may be overwritten.

49 :param n_jobs: int, optional, default 1

50 The number of jobs to use for the computation.

51 If -1 all CPUs are used. This will only provide speedup for

52 n_targets > 1 and sufficient large problems.

53 :param max_iter: int, optional, default 1

54 The number of iteration to do at training time.

55 This parameter is specific to the quantile regression.

56 :param delta: float, optional, default 0.0001

57 Used to ensure matrices has an inverse

58 (*M + delta*I*).

59 :param quantile: float, by default 0.5,

60 determines which quantile to use

61 to estimate the regression.

62 :param positive: when set to True, forces the coefficients to be positive.

63 :param verbose: bool, optional, default False

64 Prints error at each iteration of the optimisation.

65 """

66 try:

67 LinearRegression.__init__(

68 self, fit_intercept=fit_intercept, normalize=normalize,

69 copy_X=copy_X, n_jobs=n_jobs, positive=positive)

70 except TypeError:

71 # scikit-learn<0.24

72 LinearRegression.__init__(

73 self, fit_intercept=fit_intercept, normalize=normalize,

74 copy_X=copy_X, n_jobs=n_jobs)

75 self.max_iter = max_iter

76 self.verbose = verbose

77 self.delta = delta

78 self.quantile = quantile

80 def fit(self, X, y, sample_weight=None):

81 """

82 Fits a linear model with :epkg:L1 norm which

83 is equivalent to a quantile regression.

84 The implementation is not the most efficient

85 as it calls multiple times method fit

86 from :epkg:sklearn:linear_models:LinearRegression.

87 Data gets checked and rescaled each time.

88 The optimization follows the algorithm

89 Iteratively reweighted least squares

90 <https://en.wikipedia.org/wiki/Iteratively_reweighted_least_squares>_.

91 It is described in French at

92 Régression quantile

93 <http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx/notebooks/td_note_2017_2.html>_.

95 :param X: numpy array or sparse matrix of shape [n_samples,n_features]

96 Training data

97 :param y: numpy array of shape [n_samples, n_targets]

98 Target values. Will be cast to X's dtype if necessary

99 :param sample_weight: numpy array of shape [n_samples]

100 Individual weights for each sample

101 :return: self, returns an instance of self.

103 Fitted attributes:

105 * coef_: array, shape (n_features, ) or (n_targets, n_features)

106 Estimated coefficients for the linear regression problem.

107 If multiple targets are passed during the fit (y 2D), this

108 is a 2D array of shape (n_targets, n_features), while if only

109 one target is passed, this is a 1D array of length n_features.

110 * intercept_: array

111 Independent term in the linear model.

112 * n_iter_: int

113 Number of iterations at training time.

114 """

115 if len(y.shape) > 1 and y.shape[1] != 1:

116 raise ValueError("QuantileLinearRegression only works for Y real")

118 def compute_z(Xm, beta, Y, W, delta=0.0001):

119 "compute z"

120 deltas = numpy.ones(X.shape[0]) * delta

121 epsilon, mult = QuantileLinearRegression._epsilon(

122 Y, Xm @ beta, self.quantile)

123 r = numpy.reciprocal(numpy.maximum( # pylint: disable=E1111

124 epsilon, deltas)) # pylint: disable=E1111

125 if mult is not None:

126 epsilon *= 1 - mult

127 r *= 1 - mult

128 return r, epsilon

130 if not isinstance(X, numpy.ndarray):

131 if hasattr(X, 'values'):

132 X = X.values

133 else:

134 raise TypeError("X must be an array or a dataframe.")

136 if self.fit_intercept:

137 Xm = numpy.hstack([X, numpy.ones((X.shape[0], 1))])

138 else:

139 Xm = X

141 try:

142 clr = LinearRegression(fit_intercept=False, copy_X=self.copy_X,

143 n_jobs=self.n_jobs, normalize=self.normalize,

144 positive=self.positive)

145 except AttributeError:

146 # scikit-learn<0.24

147 clr = LinearRegression(fit_intercept=False, copy_X=self.copy_X,

148 n_jobs=self.n_jobs, normalize=self.normalize)

150 W = numpy.ones(X.shape[0]) if sample_weight is None else sample_weight

151 self.n_iter_ = 0

152 lastE = None

153 for i in range(0, self.max_iter):

154 clr.fit(Xm, y, W)

155 beta = clr.coef_

156 W, epsilon = compute_z(Xm, beta, y, W, delta=self.delta)

157 if sample_weight is not None:

158 W *= sample_weight

159 epsilon *= sample_weight

160 E = epsilon.sum()

161 self.n_iter_ = i

162 if self.verbose:

163 print( # pragma: no cover

164 f'[QuantileLinearRegression.fit] iter={i + 1} error={E}')

165 if lastE is not None and lastE == E:

166 break

167 lastE = E

169 if self.fit_intercept:

170 self.coef_ = beta[:-1]

171 self.intercept_ = beta[-1]

172 else:

173 self.coef_ = beta

174 self.intercept_ = 0

176 return self

178 @staticmethod

179 def _epsilon(y_true, y_pred, quantile, sample_weight=None):

180 diff = y_pred - y_true

181 epsilon = numpy.abs(diff)

182 if quantile != 0.5:

183 sign = numpy.sign(diff) # pylint: disable=E1111

184 mult = numpy.ones(y_true.shape[0])

185 mult[sign > 0] *= quantile # pylint: disable=W0143

186 mult[sign < 0] *= (1 - quantile) # pylint: disable=W0143

187 else:

188 mult = None

189 if sample_weight is not None:

190 epsilon *= sample_weight

191 return epsilon, mult

193 def score(self, X, y, sample_weight=None):

194 """

195 Returns Mean absolute error regression loss.

197 :param X: array-like, shape = (n_samples, n_features)

198 Test samples.

199 :param y: array-like, shape = (n_samples) or (n_samples, n_outputs)

200 True values for X.

201 :param sample_weight: array-like, shape = [n_samples], optional

202 Sample weights.

203 :return: score : float

204 mean absolute error regression loss

205 """

206 pred = self.predict(X)

208 if self.quantile != 0.5:

209 epsilon, mult = QuantileLinearRegression._epsilon(

210 y, pred, self.quantile, sample_weight)

211 if mult is not None:

212 epsilon *= mult * 2

213 return epsilon.sum() / X.shape[0]

214 return mean_absolute_error(y, pred, sample_weight=sample_weight)