Coverage for aftercovid/optim/sgd.py: 100%
84 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-04-23 03:09 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2024-04-23 03:09 +0200
1"""
2Implements simple stochastic gradient optimisation.
3It is inspired from `_stochastic_optimizers.py
4<https://github.com/scikit-learn/scikit-learn/blob/master/sklearn/
5neural_network/_stochastic_optimizers.py>`_.
6"""
7import numpy
8from numpy.core._exceptions import UFuncTypeError
11class BaseOptimizer:
12 """
13 Base stochastic gradient descent optimizer.
15 :param coef: array, initial coefficient
16 :param learning_rate_init: float
17 The initial learning rate used. It controls the step-size
18 in updating the weights.
19 :param min_threshold: coefficients must be higher than *min_thresold*
20 :param max_threshold: coefficients must be below than *max_thresold*
22 The class holds the following attributes:
24 * *learning_rate*: float, the current learning rate
25 """
27 def __init__(self, coef, learning_rate_init=0.1,
28 min_threshold=None, max_threshold=None):
29 if not isinstance(coef, numpy.ndarray):
30 raise TypeError("coef must be an array.")
31 self.coef = coef
32 self.learning_rate_init = learning_rate_init
33 self.learning_rate = float(learning_rate_init)
34 self.min_threshold = min_threshold
35 self.max_threshold = max_threshold
37 def _get_updates(self, grad):
38 raise NotImplementedError("Must be overwritten.") # pragma no cover
40 def update_coef(self, grad):
41 """
42 Updates coefficients with given gradient.
44 :param grad: array, gradient
45 """
46 if self.coef.shape != grad.shape:
47 raise ValueError("coef and grad must have the same shape.")
48 update = self._get_updates(grad)
49 self.coef += update
50 if self.min_threshold is not None:
51 try:
52 self.coef = numpy.maximum(self.coef, self.min_threshold)
53 except UFuncTypeError: # pragma: no cover
54 raise RuntimeError(
55 "Unable to compute an upper bound with coef={} "
56 "max_threshold={}".format(self.coef, self.min_threshold))
57 if self.max_threshold is not None:
58 try:
59 self.coef = numpy.minimum(self.coef, self.max_threshold)
60 except UFuncTypeError: # pragma: no cover
61 raise RuntimeError(
62 "Unable to compute a lower bound with coef={} "
63 "max_threshold={}".format(self.coef, self.max_threshold))
65 def iteration_ends(self, time_step):
66 """
67 Performs update to learning rate and potentially other states at the
68 end of an iteration.
69 """
70 pass # pragma: no cover
72 def train(self, X, y, fct_loss, fct_grad, max_iter=100,
73 early_th=None, verbose=False):
74 """
75 Optimizes the coefficients.
77 :param X: datasets (array)
78 :param y: expected target
79 :param fct_loss: loss function, signature: `f(coef, X, y) -> float`
80 :param fct_grad: gradient function,
81 signature: `g(coef, x, y, i) -> array`
82 :param max_iter: number maximum of iteration
83 :param early_th: stops the training if the error goes below
84 this threshold
85 :param verbose: display information
86 :return: loss
87 """
88 if not isinstance(X, numpy.ndarray):
89 raise TypeError("X must be an array.")
90 if not isinstance(y, numpy.ndarray):
91 raise TypeError("y must be an array.")
92 if X.shape[0] != y.shape[0]:
93 raise ValueError("X and y must have the same number of rows.")
94 if any(numpy.isnan(X.ravel())):
95 raise ValueError("X contains nan value.")
96 if any(numpy.isnan(y.ravel())):
97 raise ValueError("y contains nan value.")
99 loss = fct_loss(self.coef, X, y)
100 losses = [loss]
101 if verbose:
102 self._display_progress(0, max_iter, loss)
103 n_samples = 0
104 for it in range(max_iter):
105 irows = numpy.random.choice(X.shape[0], X.shape[0])
106 for irow in irows:
107 grad = fct_grad(self.coef, X[irow, :], y[irow], irow)
108 if isinstance(verbose, int) and verbose >= 10:
109 self._display_progress( # pragma: no cover
110 0, max_iter, loss, grad, 'grad')
111 if numpy.isnan(grad).sum() > 0:
112 raise RuntimeError( # pragma: no cover
113 "The gradient has nan values.")
114 self.update_coef(grad)
115 n_samples += 1
117 self.iteration_ends(n_samples)
118 loss = fct_loss(self.coef, X, y)
119 if verbose:
120 self._display_progress(it + 1, max_iter, loss)
121 self.iter_ = it + 1
122 losses.append(loss)
123 if self._evaluate_early_stopping(
124 it, max_iter, losses, early_th, verbose=verbose):
125 break
126 return loss
128 def _evaluate_early_stopping(
129 self,
130 it,
131 max_iter,
132 losses,
133 early_th,
134 verbose=False):
135 if len(losses) < 5 or early_th is None:
136 return False
137 if numpy.isnan(losses[-5]):
138 if numpy.isnan(losses[-1]): # pragma: no cover
139 if verbose:
140 self._display_progress(it + 1, max_iter, losses[-1],
141 losses=losses[-5:])
142 return True
143 return False # pragma: no cover
144 if numpy.isnan(losses[-1]):
145 if verbose: # pragma: no cover
146 self._display_progress(it + 1, max_iter, losses[-1],
147 losses=losses[-5:])
148 return True # pragma: no cover
149 if abs(losses[-1] - losses[-5]) <= early_th:
150 if verbose: # pragma: no cover
151 self._display_progress(it + 1, max_iter, losses[-1],
152 losses=losses[-5:])
153 return True
154 return False
156 def _display_progress(self, it, max_iter, loss, losses=None):
157 'Displays training progress.'
158 if losses is None: # pragma: no cover
159 print(f'{it}/{max_iter}: loss: {loss:1.4g}')
160 else:
161 print( # pragma: no cover
162 f'{it}/{max_iter}: loss: {loss:1.4g} losses: {losses}')
165class SGDOptimizer(BaseOptimizer):
166 """
167 Stochastic gradient descent optimizer with momentum.
169 :param coef: array, initial coefficient
170 :param learning_rate_init: float
171 The initial learning rate used. It controls the step-size
172 in updating the weights,
173 :param lr_schedule: `{'constant', 'adaptive', 'invscaling'}`,
174 learning rate schedule for weight updates,
175 `'constant'` for a constant learning rate given by
176 *learning_rate_init*. `'invscaling'` gradually decreases
177 the learning rate *learning_rate_* at each time step *t*
178 using an inverse scaling exponent of *power_t*.
179 `learning_rate_ = learning_rate_init / pow(t, power_t)`,
180 `'adaptive'`, keeps the learning rate constant to
181 *learning_rate_init* as long as the training keeps decreasing.
182 Each time 2 consecutive epochs fail to decrease the training loss by
183 tol, or fail to increase validation score by tol if 'early_stopping'
184 is on, the current learning rate is divided by 5.
185 :param momentum: float
186 Value of momentum used, must be larger than or equal to 0
187 :param power_t: double
188 The exponent for inverse scaling learning rate.
189 :param early_th: stops if the error goes below that threshold
190 :param min_threshold: lower bound for parameters (can be None)
191 :param max_threshold: upper bound for parameters (can be None)
193 The class holds the following attributes:
195 * *learning_rate*: float, the current learning rate
196 * velocity*: array, velocity that are used to update params
198 .. exref::
199 :title: Stochastic Gradient Descent applied to linear regression
201 The following example how to optimize a simple linear regression.
203 .. runpython::
204 :showcode:
206 import numpy
207 from aftercovid.optim import SGDOptimizer
210 def fct_loss(c, X, y):
211 return numpy.linalg.norm(X @ c - y) ** 2
214 def fct_grad(c, x, y, i=0):
215 return x * (x @ c - y) * 0.1
218 coef = numpy.array([0.5, 0.6, -0.7])
219 X = numpy.random.randn(10, 3)
220 y = X @ coef
222 sgd = SGDOptimizer(numpy.random.randn(3))
223 sgd.train(X, y, fct_loss, fct_grad, max_iter=15, verbose=True)
224 print('optimized coefficients:', sgd.coef)
225 """
227 def __init__(self, coef, learning_rate_init=0.1, lr_schedule='constant',
228 momentum=0.9, power_t=0.5, early_th=None,
229 min_threshold=None, max_threshold=None):
230 super().__init__(coef, learning_rate_init,
231 min_threshold=min_threshold,
232 max_threshold=max_threshold)
233 self.lr_schedule = lr_schedule
234 self.momentum = momentum
235 self.power_t = power_t
236 self.early_th = early_th
237 self.velocity = numpy.zeros_like(coef)
239 def iteration_ends(self, time_step):
240 """
241 Performs updates to learning rate and potential other states at the
242 end of an iteration.
244 :param time_step: int
245 number of training samples trained on so far, used to update
246 learning rate for 'invscaling'
247 """
248 if self.lr_schedule == 'invscaling':
249 self.learning_rate = (float(self.learning_rate_init) /
250 (time_step + 1) ** self.power_t)
252 def _get_updates(self, grad):
253 """
254 Gets the values used to update params with given gradients.
256 :param grad: array, gradient
257 :return: updates, array, the values to add to params
258 """
259 update = self.momentum * self.velocity - self.learning_rate * grad
260 self.velocity = update
261 return update
263 def _display_progress(self, it, max_iter, loss, losses=None, msg='loss'):
264 'Displays training progress.'
265 if losses is None:
266 print(f'{it}/{max_iter}: {msg}: {loss:1.4g} '
267 f'lr={self.learning_rate:1.3g}')
268 else:
269 print( # pragma: no cover
270 '{}/{}: {}: {:1.4g} lr={:1.3g} {}es: {}'.format(
271 it, max_iter, msg, loss, self.learning_rate, msg, losses))