Coverage for src/mlstatpy/ml/_neural_tree_node.py: 99%
227 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-27 05:59 +0100
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-27 05:59 +0100
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Conversion from tree to neural network.
5"""
6import numpy
7import numpy.random as rnd
8from scipy.special import expit, softmax, kl_div as kl_fct # pylint: disable=E0611
9from ._neural_tree_api import _TrainingAPI
12class NeuralTreeNode(_TrainingAPI):
13 """
14 One node in a neural network.
16 :param weights: weights
17 :param bias: bias, if None, draws a random number
18 :param activation: activation function
19 :param nodeid: node id
20 :param tag: unused but to add information
21 on how this node was created
22 """
24 @staticmethod
25 def _relu(x):
26 "Relu function."
27 return numpy.maximum(x, 0)
29 @staticmethod
30 def _leakyrelu(x):
31 "Leaky Relu function."
32 return numpy.maximum(x, 0) + numpy.minimum(x, 0) * 0.01
34 @staticmethod
35 def _drelu(x):
36 "Derivative of the Relu function."
37 res = numpy.ones(x.shape, dtype=x.dtype)
38 res[x < 0] = 0.
39 return res
41 @staticmethod
42 def _dleakyrelu(x):
43 "Derivative of the Leaky Relu function."
44 res = numpy.ones(x.shape, dtype=x.dtype)
45 res[x < 0] = 0.01
46 return res
48 @staticmethod
49 def _dsigmoid(x):
50 "Derivativ of the sigmoid function."
51 y = expit(x)
52 return y * (1 - y)
54 @staticmethod
55 def _softmax(x):
56 "Derivative of the softmax function."
57 if len(x.shape) == 2:
58 return softmax(x, axis=1)
59 return softmax(x)
61 @staticmethod
62 def _dsoftmax(x):
63 "Derivative of the softmax function."
64 soft = softmax(x)
65 grad = - soft @ soft.T
66 diag = numpy.diag(soft)
67 return diag + grad
69 @staticmethod
70 def get_activation_function(activation):
71 """
72 Returns the activation function.
73 It returns a function *y=f(x)*.
74 """
75 if activation == 'softmax':
76 return NeuralTreeNode._softmax
77 if activation == 'softmax4':
78 return lambda x: NeuralTreeNode._softmax(x * 4)
79 if activation in {'logistic', 'expit', 'sigmoid'}:
80 return expit
81 if activation == 'sigmoid4':
82 return lambda x: expit(x * 4)
83 if activation == 'relu':
84 return NeuralTreeNode._relu
85 if activation == 'leakyrelu':
86 return NeuralTreeNode._leakyrelu
87 if activation == 'identity':
88 return lambda x: x
89 raise ValueError( # pragma: no cover
90 f"Unknown activation function '{activation}'.")
92 @staticmethod
93 def get_activation_gradient_function(activation):
94 """
95 Returns the activation function.
96 It returns a function *y=f'(x)*.
97 About the sigmoid:
99 .. math::
101 \\begin{array}{l}
102 f(x) &=& \frac{1}{1 + e^{-x}} \\\\
103 f'(x) &=& \frac{e^{-x}}{(1 + e^{-x})^2} = f(x)(1-f(x))
104 \\end{array}}
105 """
106 if activation == 'softmax':
107 return NeuralTreeNode._dsoftmax
108 if activation == 'softmax4':
109 return lambda x: NeuralTreeNode._dsoftmax(x) * 4
110 if activation in {'logistic', 'expit', 'sigmoid'}:
111 return NeuralTreeNode._dsigmoid
112 if activation == 'sigmoid4':
113 return lambda x: NeuralTreeNode._dsigmoid(x) * 4
114 if activation == 'relu':
115 return NeuralTreeNode._drelu
116 if activation == 'leakyrelu':
117 return NeuralTreeNode._dleakyrelu
118 if activation == 'identity':
119 return lambda x: numpy.ones(x.shape, dtype=x.dtype)
120 raise ValueError( # pragma: no cover
121 f"Unknown activation gradient function '{activation}'.")
123 @staticmethod
124 def get_activation_loss_function(activation):
125 """
126 Returns a default loss function based on the activation
127 function. It returns two functions *g=loss(x,y)*.
128 """
129 if activation in {'logistic', 'expit', 'sigmoid', 'sigmoid4'}:
130 # regression + regularization
131 return lambda x, y: (x - y) ** 2
132 if activation in {'softmax', 'softmax4'}:
133 cst = numpy.finfo(numpy.float32).eps
135 # classification
136 def kl_fct2(x, y):
137 return kl_fct(x + cst, y + cst)
138 return kl_fct2
139 if activation in {'identity', 'relu', 'leakyrelu'}:
140 # regression
141 return lambda x, y: (x - y) ** 2
142 raise ValueError(
143 f"Unknown activation function '{activation}'.")
145 @staticmethod
146 def get_activation_dloss_function(activation):
147 """
148 Returns the derivative of the default loss function based
149 on the activation function. It returns a function
150 *df(x,y)/dw, df(w)/dw* where *w* are the weights.
151 """
152 if activation in {'logistic', 'expit', 'sigmoid', 'sigmoid4'}:
153 # regression + regularization
154 def dregrdx(x, y):
155 return (x - y) * 2
157 return dregrdx
159 if activation in {'softmax', 'softmax4'}:
160 # classification
161 cst = numpy.finfo(numpy.float32).eps
163 def dclsdx(x, y):
164 return numpy.log(x + cst) - numpy.log(y + cst)
166 return dclsdx
168 if activation in {'identity', 'relu', 'leakyrelu'}:
169 # regression
170 def dregdx(x, y):
171 return (x - y) * 2
173 return dregdx
174 raise ValueError( # pragma: no cover
175 f"Unknown activation function '{activation}'.")
177 def __init__(self, weights, bias=None, activation='sigmoid', nodeid=-1,
178 tag=None):
179 self.tag = tag
180 if isinstance(weights, int):
181 if activation.startswith('softmax'):
182 weights = rnd.randn(2, weights)
183 else:
184 weights = rnd.randn(weights)
185 if isinstance(weights, list):
186 weights = numpy.array(weights)
188 if len(weights.shape) == 1:
189 self.n_outputs = 1
190 if bias is None:
191 bias = rnd.randn()
192 self.coef = numpy.empty(len(weights) + 1)
193 self.coef[1:] = weights
194 self.coef[0] = bias
196 elif len(weights.shape) == 2:
197 self.n_outputs = weights.shape[0]
198 if bias is None:
199 bias = rnd.randn(self.n_outputs)
200 shape = list(weights.shape)
201 shape[1] += 1
202 self.coef = numpy.empty(shape)
203 self.coef[:, 1:] = weights
204 self.coef[:, 0] = bias
205 else:
206 raise RuntimeError( # pragma: no cover
207 f"Unexpected weights shape: {weights.shape}")
209 self.activation = activation
210 self.nodeid = nodeid
211 self._set_fcts()
213 def _set_fcts(self):
214 self.activation_ = NeuralTreeNode.get_activation_function(
215 self.activation)
216 self.gradient_ = NeuralTreeNode.get_activation_gradient_function(
217 self.activation)
218 self.losss_ = NeuralTreeNode.get_activation_loss_function(
219 self.activation)
220 self.dlossds_ = NeuralTreeNode.get_activation_dloss_function(
221 self.activation)
223 @property
224 def input_weights(self):
225 "Returns the weights."
226 if self.n_outputs == 1:
227 return self.coef[1:]
228 return self.coef[:, 1:]
230 @property
231 def bias(self):
232 "Returns the weights."
233 if self.n_outputs == 1:
234 return self.coef[0]
235 return self.coef[:, 0]
237 def __getstate__(self):
238 "usual"
239 return {
240 'coef': self.coef, 'activation': self.activation,
241 'nodeid': self.nodeid, 'n_outputs': self.n_outputs,
242 'tag': self.tag}
244 def __setstate__(self, state):
245 "usual"
246 self.coef = state['coef']
247 self.activation = state['activation']
248 self.nodeid = state['nodeid']
249 self.n_outputs = state['n_outputs']
250 self.tag = state['tag']
251 self._set_fcts()
253 def __eq__(self, obj):
254 if self.coef.shape != obj.coef.shape:
255 return False
256 if any(map(lambda xy: xy[0] != xy[1],
257 zip(self.coef.ravel(), obj.coef.ravel()))):
258 return False
259 if self.activation != obj.activation:
260 return False
261 return True
263 def __repr__(self):
264 "usual"
265 if len(self.coef.shape) == 1:
266 return "%s(weights=%r, bias=%r, activation=%r)" % (
267 self.__class__.__name__, self.coef[1:],
268 self.coef[0], self.activation)
269 return "%s(weights=%r, bias=%r, activation=%r)" % (
270 self.__class__.__name__, self.coef[:, 1:],
271 self.coef[:, 0], self.activation)
273 def _predict(self, X):
274 "Computes inputs of the activation function."
275 if self.n_outputs == 1:
276 return X @ self.coef[1:] + self.coef[0]
277 if len(X.shape) == 2:
278 return X @ self.coef[:, 1:].T + self.coef[:, 0]
279 res = X.reshape((1, -1)) @ self.coef[:, 1:].T + self.coef[:, 0]
280 return res.ravel()
282 def predict(self, X):
283 "Computes neuron outputs."
284 y = self._predict(X)
285 return self.activation_(y)
287 @property
288 def ndim(self):
289 "Returns the input dimension."
290 if len(self.coef.shape) == 1:
291 return self.coef.shape[0] - 1
292 return self.coef.shape[1] - 1
294 @property
295 def ndim_out(self):
296 "Returns the output dimension."
297 if len(self.coef.shape) == 1:
298 return 1
299 return self.coef.shape[0]
301 @property
302 def training_weights(self):
303 "Returns the weights stored in the neuron."
304 return self.coef.ravel()
306 def update_training_weights(self, X, add=True): # pylint: disable=W0237
307 """
308 Updates weights.
310 :param grad: vector to add to the weights such as gradient
311 :param add: addition or replace
312 """
313 if add:
314 self.coef += X.reshape(self.coef.shape)
315 else:
316 numpy.copyto(self.coef, X.reshape(self.coef.shape))
318 def fill_cache(self, X):
319 """
320 Creates a cache with intermediate results.
321 ``lX`` is the results before the activation function,
322 ``aX`` is the results after the activation function, the prediction.
323 """
324 cache = dict(lX=self._predict(X))
325 cache['aX'] = self.activation_(cache['lX'])
326 return cache
328 def _common_loss_dloss(self, X, y, cache=None):
329 """
330 Common beginning to methods *loss*, *dlossds*,
331 *dlossdw*.
332 """
333 if cache is not None and 'aX' in cache:
334 act = cache['aX']
335 else:
336 act = self.predict(X)
337 return act
339 def loss(self, X, y, cache=None):
340 """
341 Computes the loss. Returns a float.
342 """
343 act = self._common_loss_dloss(X, y, cache=cache)
344 if len(X.shape) == 1:
345 return self.losss_(act, y) # pylint: disable=E1120
346 return self.losss_(act, y) # pylint: disable=E1120
348 def dlossds(self, X, y, cache=None):
349 """
350 Computes the loss derivative due to prediction error.
351 """
352 act = self._common_loss_dloss(X, y, cache=cache)
353 return self.dlossds_(act, y)
355 def gradient_backward(self, graddx, X, inputs=False, cache=None):
356 """
357 Computes the gradients at point *X*.
359 :param graddx: existing gradient against the inputs
360 :param X: computes the gradient in X
361 :param inputs: if False, derivative against the coefficients,
362 otherwise against the inputs.
363 :param cache: cache intermediate results
364 :return: gradient
365 """
366 if cache is None:
367 cache = self.fill_cache(X)
369 pred = cache['aX']
370 ga = self.gradient_(pred)
371 if len(ga.shape) == 2:
372 f = graddx @ ga
373 else:
374 f = graddx * ga
376 if inputs:
377 if len(self.coef.shape) == 1:
378 rgrad = numpy.empty(X.shape)
379 rgrad[:] = self.coef[1:]
380 rgrad *= f
381 else:
382 rgrad = numpy.sum(
383 self.coef[:, 1:] * f.reshape((-1, 1)), axis=0)
384 return rgrad
386 rgrad = numpy.empty(self.coef.shape)
387 if len(self.coef.shape) == 1:
388 rgrad[0] = 1
389 rgrad[1:] = X
390 rgrad *= f
391 else:
392 rgrad[:, 0] = 1
393 rgrad[:, 1:] = X
394 rgrad *= f.reshape((-1, 1))
395 return rgrad