Coverage for aftercovid/optim/sgd.py: 100%

84 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-04-23 03:09 +0200

1""" 

2Implements simple stochastic gradient optimisation. 

3It is inspired from `_stochastic_optimizers.py 

4<https://github.com/scikit-learn/scikit-learn/blob/master/sklearn/ 

5neural_network/_stochastic_optimizers.py>`_. 

6""" 

7import numpy 

8from numpy.core._exceptions import UFuncTypeError 

9 

10 

11class BaseOptimizer: 

12 """ 

13 Base stochastic gradient descent optimizer. 

14 

15 :param coef: array, initial coefficient 

16 :param learning_rate_init: float 

17 The initial learning rate used. It controls the step-size 

18 in updating the weights. 

19 :param min_threshold: coefficients must be higher than *min_thresold* 

20 :param max_threshold: coefficients must be below than *max_thresold* 

21 

22 The class holds the following attributes: 

23 

24 * *learning_rate*: float, the current learning rate 

25 """ 

26 

27 def __init__(self, coef, learning_rate_init=0.1, 

28 min_threshold=None, max_threshold=None): 

29 if not isinstance(coef, numpy.ndarray): 

30 raise TypeError("coef must be an array.") 

31 self.coef = coef 

32 self.learning_rate_init = learning_rate_init 

33 self.learning_rate = float(learning_rate_init) 

34 self.min_threshold = min_threshold 

35 self.max_threshold = max_threshold 

36 

37 def _get_updates(self, grad): 

38 raise NotImplementedError("Must be overwritten.") # pragma no cover 

39 

40 def update_coef(self, grad): 

41 """ 

42 Updates coefficients with given gradient. 

43 

44 :param grad: array, gradient 

45 """ 

46 if self.coef.shape != grad.shape: 

47 raise ValueError("coef and grad must have the same shape.") 

48 update = self._get_updates(grad) 

49 self.coef += update 

50 if self.min_threshold is not None: 

51 try: 

52 self.coef = numpy.maximum(self.coef, self.min_threshold) 

53 except UFuncTypeError: # pragma: no cover 

54 raise RuntimeError( 

55 "Unable to compute an upper bound with coef={} " 

56 "max_threshold={}".format(self.coef, self.min_threshold)) 

57 if self.max_threshold is not None: 

58 try: 

59 self.coef = numpy.minimum(self.coef, self.max_threshold) 

60 except UFuncTypeError: # pragma: no cover 

61 raise RuntimeError( 

62 "Unable to compute a lower bound with coef={} " 

63 "max_threshold={}".format(self.coef, self.max_threshold)) 

64 

65 def iteration_ends(self, time_step): 

66 """ 

67 Performs update to learning rate and potentially other states at the 

68 end of an iteration. 

69 """ 

70 pass # pragma: no cover 

71 

72 def train(self, X, y, fct_loss, fct_grad, max_iter=100, 

73 early_th=None, verbose=False): 

74 """ 

75 Optimizes the coefficients. 

76 

77 :param X: datasets (array) 

78 :param y: expected target 

79 :param fct_loss: loss function, signature: `f(coef, X, y) -> float` 

80 :param fct_grad: gradient function, 

81 signature: `g(coef, x, y, i) -> array` 

82 :param max_iter: number maximum of iteration 

83 :param early_th: stops the training if the error goes below 

84 this threshold 

85 :param verbose: display information 

86 :return: loss 

87 """ 

88 if not isinstance(X, numpy.ndarray): 

89 raise TypeError("X must be an array.") 

90 if not isinstance(y, numpy.ndarray): 

91 raise TypeError("y must be an array.") 

92 if X.shape[0] != y.shape[0]: 

93 raise ValueError("X and y must have the same number of rows.") 

94 if any(numpy.isnan(X.ravel())): 

95 raise ValueError("X contains nan value.") 

96 if any(numpy.isnan(y.ravel())): 

97 raise ValueError("y contains nan value.") 

98 

99 loss = fct_loss(self.coef, X, y) 

100 losses = [loss] 

101 if verbose: 

102 self._display_progress(0, max_iter, loss) 

103 n_samples = 0 

104 for it in range(max_iter): 

105 irows = numpy.random.choice(X.shape[0], X.shape[0]) 

106 for irow in irows: 

107 grad = fct_grad(self.coef, X[irow, :], y[irow], irow) 

108 if isinstance(verbose, int) and verbose >= 10: 

109 self._display_progress( # pragma: no cover 

110 0, max_iter, loss, grad, 'grad') 

111 if numpy.isnan(grad).sum() > 0: 

112 raise RuntimeError( # pragma: no cover 

113 "The gradient has nan values.") 

114 self.update_coef(grad) 

115 n_samples += 1 

116 

117 self.iteration_ends(n_samples) 

118 loss = fct_loss(self.coef, X, y) 

119 if verbose: 

120 self._display_progress(it + 1, max_iter, loss) 

121 self.iter_ = it + 1 

122 losses.append(loss) 

123 if self._evaluate_early_stopping( 

124 it, max_iter, losses, early_th, verbose=verbose): 

125 break 

126 return loss 

127 

128 def _evaluate_early_stopping( 

129 self, 

130 it, 

131 max_iter, 

132 losses, 

133 early_th, 

134 verbose=False): 

135 if len(losses) < 5 or early_th is None: 

136 return False 

137 if numpy.isnan(losses[-5]): 

138 if numpy.isnan(losses[-1]): # pragma: no cover 

139 if verbose: 

140 self._display_progress(it + 1, max_iter, losses[-1], 

141 losses=losses[-5:]) 

142 return True 

143 return False # pragma: no cover 

144 if numpy.isnan(losses[-1]): 

145 if verbose: # pragma: no cover 

146 self._display_progress(it + 1, max_iter, losses[-1], 

147 losses=losses[-5:]) 

148 return True # pragma: no cover 

149 if abs(losses[-1] - losses[-5]) <= early_th: 

150 if verbose: # pragma: no cover 

151 self._display_progress(it + 1, max_iter, losses[-1], 

152 losses=losses[-5:]) 

153 return True 

154 return False 

155 

156 def _display_progress(self, it, max_iter, loss, losses=None): 

157 'Displays training progress.' 

158 if losses is None: # pragma: no cover 

159 print(f'{it}/{max_iter}: loss: {loss:1.4g}') 

160 else: 

161 print( # pragma: no cover 

162 f'{it}/{max_iter}: loss: {loss:1.4g} losses: {losses}') 

163 

164 

165class SGDOptimizer(BaseOptimizer): 

166 """ 

167 Stochastic gradient descent optimizer with momentum. 

168 

169 :param coef: array, initial coefficient 

170 :param learning_rate_init: float 

171 The initial learning rate used. It controls the step-size 

172 in updating the weights, 

173 :param lr_schedule: `{'constant', 'adaptive', 'invscaling'}`, 

174 learning rate schedule for weight updates, 

175 `'constant'` for a constant learning rate given by 

176 *learning_rate_init*. `'invscaling'` gradually decreases 

177 the learning rate *learning_rate_* at each time step *t* 

178 using an inverse scaling exponent of *power_t*. 

179 `learning_rate_ = learning_rate_init / pow(t, power_t)`, 

180 `'adaptive'`, keeps the learning rate constant to 

181 *learning_rate_init* as long as the training keeps decreasing. 

182 Each time 2 consecutive epochs fail to decrease the training loss by 

183 tol, or fail to increase validation score by tol if 'early_stopping' 

184 is on, the current learning rate is divided by 5. 

185 :param momentum: float 

186 Value of momentum used, must be larger than or equal to 0 

187 :param power_t: double 

188 The exponent for inverse scaling learning rate. 

189 :param early_th: stops if the error goes below that threshold 

190 :param min_threshold: lower bound for parameters (can be None) 

191 :param max_threshold: upper bound for parameters (can be None) 

192 

193 The class holds the following attributes: 

194 

195 * *learning_rate*: float, the current learning rate 

196 * velocity*: array, velocity that are used to update params 

197 

198 .. exref:: 

199 :title: Stochastic Gradient Descent applied to linear regression 

200 

201 The following example how to optimize a simple linear regression. 

202 

203 .. runpython:: 

204 :showcode: 

205 

206 import numpy 

207 from aftercovid.optim import SGDOptimizer 

208 

209 

210 def fct_loss(c, X, y): 

211 return numpy.linalg.norm(X @ c - y) ** 2 

212 

213 

214 def fct_grad(c, x, y, i=0): 

215 return x * (x @ c - y) * 0.1 

216 

217 

218 coef = numpy.array([0.5, 0.6, -0.7]) 

219 X = numpy.random.randn(10, 3) 

220 y = X @ coef 

221 

222 sgd = SGDOptimizer(numpy.random.randn(3)) 

223 sgd.train(X, y, fct_loss, fct_grad, max_iter=15, verbose=True) 

224 print('optimized coefficients:', sgd.coef) 

225 """ 

226 

227 def __init__(self, coef, learning_rate_init=0.1, lr_schedule='constant', 

228 momentum=0.9, power_t=0.5, early_th=None, 

229 min_threshold=None, max_threshold=None): 

230 super().__init__(coef, learning_rate_init, 

231 min_threshold=min_threshold, 

232 max_threshold=max_threshold) 

233 self.lr_schedule = lr_schedule 

234 self.momentum = momentum 

235 self.power_t = power_t 

236 self.early_th = early_th 

237 self.velocity = numpy.zeros_like(coef) 

238 

239 def iteration_ends(self, time_step): 

240 """ 

241 Performs updates to learning rate and potential other states at the 

242 end of an iteration. 

243 

244 :param time_step: int 

245 number of training samples trained on so far, used to update 

246 learning rate for 'invscaling' 

247 """ 

248 if self.lr_schedule == 'invscaling': 

249 self.learning_rate = (float(self.learning_rate_init) / 

250 (time_step + 1) ** self.power_t) 

251 

252 def _get_updates(self, grad): 

253 """ 

254 Gets the values used to update params with given gradients. 

255 

256 :param grad: array, gradient 

257 :return: updates, array, the values to add to params 

258 """ 

259 update = self.momentum * self.velocity - self.learning_rate * grad 

260 self.velocity = update 

261 return update 

262 

263 def _display_progress(self, it, max_iter, loss, losses=None, msg='loss'): 

264 'Displays training progress.' 

265 if losses is None: 

266 print(f'{it}/{max_iter}: {msg}: {loss:1.4g} ' 

267 f'lr={self.learning_rate:1.3g}') 

268 else: 

269 print( # pragma: no cover 

270 '{}/{}: {}: {:1.4g} lr={:1.3g} {}es: {}'.format( 

271 it, max_iter, msg, loss, self.learning_rate, msg, losses))