Coverage for src/mlstatpy/ml/_neural_tree_node.py: 99%

227 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-27 05:59 +0100

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Conversion from tree to neural network. 

5""" 

6import numpy 

7import numpy.random as rnd 

8from scipy.special import expit, softmax, kl_div as kl_fct # pylint: disable=E0611 

9from ._neural_tree_api import _TrainingAPI 

10 

11 

12class NeuralTreeNode(_TrainingAPI): 

13 """ 

14 One node in a neural network. 

15 

16 :param weights: weights 

17 :param bias: bias, if None, draws a random number 

18 :param activation: activation function 

19 :param nodeid: node id 

20 :param tag: unused but to add information 

21 on how this node was created 

22 """ 

23 

24 @staticmethod 

25 def _relu(x): 

26 "Relu function." 

27 return numpy.maximum(x, 0) 

28 

29 @staticmethod 

30 def _leakyrelu(x): 

31 "Leaky Relu function." 

32 return numpy.maximum(x, 0) + numpy.minimum(x, 0) * 0.01 

33 

34 @staticmethod 

35 def _drelu(x): 

36 "Derivative of the Relu function." 

37 res = numpy.ones(x.shape, dtype=x.dtype) 

38 res[x < 0] = 0. 

39 return res 

40 

41 @staticmethod 

42 def _dleakyrelu(x): 

43 "Derivative of the Leaky Relu function." 

44 res = numpy.ones(x.shape, dtype=x.dtype) 

45 res[x < 0] = 0.01 

46 return res 

47 

48 @staticmethod 

49 def _dsigmoid(x): 

50 "Derivativ of the sigmoid function." 

51 y = expit(x) 

52 return y * (1 - y) 

53 

54 @staticmethod 

55 def _softmax(x): 

56 "Derivative of the softmax function." 

57 if len(x.shape) == 2: 

58 return softmax(x, axis=1) 

59 return softmax(x) 

60 

61 @staticmethod 

62 def _dsoftmax(x): 

63 "Derivative of the softmax function." 

64 soft = softmax(x) 

65 grad = - soft @ soft.T 

66 diag = numpy.diag(soft) 

67 return diag + grad 

68 

69 @staticmethod 

70 def get_activation_function(activation): 

71 """ 

72 Returns the activation function. 

73 It returns a function *y=f(x)*. 

74 """ 

75 if activation == 'softmax': 

76 return NeuralTreeNode._softmax 

77 if activation == 'softmax4': 

78 return lambda x: NeuralTreeNode._softmax(x * 4) 

79 if activation in {'logistic', 'expit', 'sigmoid'}: 

80 return expit 

81 if activation == 'sigmoid4': 

82 return lambda x: expit(x * 4) 

83 if activation == 'relu': 

84 return NeuralTreeNode._relu 

85 if activation == 'leakyrelu': 

86 return NeuralTreeNode._leakyrelu 

87 if activation == 'identity': 

88 return lambda x: x 

89 raise ValueError( # pragma: no cover 

90 f"Unknown activation function '{activation}'.") 

91 

92 @staticmethod 

93 def get_activation_gradient_function(activation): 

94 """ 

95 Returns the activation function. 

96 It returns a function *y=f'(x)*. 

97 About the sigmoid: 

98 

99 .. math:: 

100 

101 \\begin{array}{l} 

102 f(x) &=& \frac{1}{1 + e^{-x}} \\\\ 

103 f'(x) &=& \frac{e^{-x}}{(1 + e^{-x})^2} = f(x)(1-f(x)) 

104 \\end{array}} 

105 """ 

106 if activation == 'softmax': 

107 return NeuralTreeNode._dsoftmax 

108 if activation == 'softmax4': 

109 return lambda x: NeuralTreeNode._dsoftmax(x) * 4 

110 if activation in {'logistic', 'expit', 'sigmoid'}: 

111 return NeuralTreeNode._dsigmoid 

112 if activation == 'sigmoid4': 

113 return lambda x: NeuralTreeNode._dsigmoid(x) * 4 

114 if activation == 'relu': 

115 return NeuralTreeNode._drelu 

116 if activation == 'leakyrelu': 

117 return NeuralTreeNode._dleakyrelu 

118 if activation == 'identity': 

119 return lambda x: numpy.ones(x.shape, dtype=x.dtype) 

120 raise ValueError( # pragma: no cover 

121 f"Unknown activation gradient function '{activation}'.") 

122 

123 @staticmethod 

124 def get_activation_loss_function(activation): 

125 """ 

126 Returns a default loss function based on the activation 

127 function. It returns two functions *g=loss(x,y)*. 

128 """ 

129 if activation in {'logistic', 'expit', 'sigmoid', 'sigmoid4'}: 

130 # regression + regularization 

131 return lambda x, y: (x - y) ** 2 

132 if activation in {'softmax', 'softmax4'}: 

133 cst = numpy.finfo(numpy.float32).eps 

134 

135 # classification 

136 def kl_fct2(x, y): 

137 return kl_fct(x + cst, y + cst) 

138 return kl_fct2 

139 if activation in {'identity', 'relu', 'leakyrelu'}: 

140 # regression 

141 return lambda x, y: (x - y) ** 2 

142 raise ValueError( 

143 f"Unknown activation function '{activation}'.") 

144 

145 @staticmethod 

146 def get_activation_dloss_function(activation): 

147 """ 

148 Returns the derivative of the default loss function based 

149 on the activation function. It returns a function 

150 *df(x,y)/dw, df(w)/dw* where *w* are the weights. 

151 """ 

152 if activation in {'logistic', 'expit', 'sigmoid', 'sigmoid4'}: 

153 # regression + regularization 

154 def dregrdx(x, y): 

155 return (x - y) * 2 

156 

157 return dregrdx 

158 

159 if activation in {'softmax', 'softmax4'}: 

160 # classification 

161 cst = numpy.finfo(numpy.float32).eps 

162 

163 def dclsdx(x, y): 

164 return numpy.log(x + cst) - numpy.log(y + cst) 

165 

166 return dclsdx 

167 

168 if activation in {'identity', 'relu', 'leakyrelu'}: 

169 # regression 

170 def dregdx(x, y): 

171 return (x - y) * 2 

172 

173 return dregdx 

174 raise ValueError( # pragma: no cover 

175 f"Unknown activation function '{activation}'.") 

176 

177 def __init__(self, weights, bias=None, activation='sigmoid', nodeid=-1, 

178 tag=None): 

179 self.tag = tag 

180 if isinstance(weights, int): 

181 if activation.startswith('softmax'): 

182 weights = rnd.randn(2, weights) 

183 else: 

184 weights = rnd.randn(weights) 

185 if isinstance(weights, list): 

186 weights = numpy.array(weights) 

187 

188 if len(weights.shape) == 1: 

189 self.n_outputs = 1 

190 if bias is None: 

191 bias = rnd.randn() 

192 self.coef = numpy.empty(len(weights) + 1) 

193 self.coef[1:] = weights 

194 self.coef[0] = bias 

195 

196 elif len(weights.shape) == 2: 

197 self.n_outputs = weights.shape[0] 

198 if bias is None: 

199 bias = rnd.randn(self.n_outputs) 

200 shape = list(weights.shape) 

201 shape[1] += 1 

202 self.coef = numpy.empty(shape) 

203 self.coef[:, 1:] = weights 

204 self.coef[:, 0] = bias 

205 else: 

206 raise RuntimeError( # pragma: no cover 

207 f"Unexpected weights shape: {weights.shape}") 

208 

209 self.activation = activation 

210 self.nodeid = nodeid 

211 self._set_fcts() 

212 

213 def _set_fcts(self): 

214 self.activation_ = NeuralTreeNode.get_activation_function( 

215 self.activation) 

216 self.gradient_ = NeuralTreeNode.get_activation_gradient_function( 

217 self.activation) 

218 self.losss_ = NeuralTreeNode.get_activation_loss_function( 

219 self.activation) 

220 self.dlossds_ = NeuralTreeNode.get_activation_dloss_function( 

221 self.activation) 

222 

223 @property 

224 def input_weights(self): 

225 "Returns the weights." 

226 if self.n_outputs == 1: 

227 return self.coef[1:] 

228 return self.coef[:, 1:] 

229 

230 @property 

231 def bias(self): 

232 "Returns the weights." 

233 if self.n_outputs == 1: 

234 return self.coef[0] 

235 return self.coef[:, 0] 

236 

237 def __getstate__(self): 

238 "usual" 

239 return { 

240 'coef': self.coef, 'activation': self.activation, 

241 'nodeid': self.nodeid, 'n_outputs': self.n_outputs, 

242 'tag': self.tag} 

243 

244 def __setstate__(self, state): 

245 "usual" 

246 self.coef = state['coef'] 

247 self.activation = state['activation'] 

248 self.nodeid = state['nodeid'] 

249 self.n_outputs = state['n_outputs'] 

250 self.tag = state['tag'] 

251 self._set_fcts() 

252 

253 def __eq__(self, obj): 

254 if self.coef.shape != obj.coef.shape: 

255 return False 

256 if any(map(lambda xy: xy[0] != xy[1], 

257 zip(self.coef.ravel(), obj.coef.ravel()))): 

258 return False 

259 if self.activation != obj.activation: 

260 return False 

261 return True 

262 

263 def __repr__(self): 

264 "usual" 

265 if len(self.coef.shape) == 1: 

266 return "%s(weights=%r, bias=%r, activation=%r)" % ( 

267 self.__class__.__name__, self.coef[1:], 

268 self.coef[0], self.activation) 

269 return "%s(weights=%r, bias=%r, activation=%r)" % ( 

270 self.__class__.__name__, self.coef[:, 1:], 

271 self.coef[:, 0], self.activation) 

272 

273 def _predict(self, X): 

274 "Computes inputs of the activation function." 

275 if self.n_outputs == 1: 

276 return X @ self.coef[1:] + self.coef[0] 

277 if len(X.shape) == 2: 

278 return X @ self.coef[:, 1:].T + self.coef[:, 0] 

279 res = X.reshape((1, -1)) @ self.coef[:, 1:].T + self.coef[:, 0] 

280 return res.ravel() 

281 

282 def predict(self, X): 

283 "Computes neuron outputs." 

284 y = self._predict(X) 

285 return self.activation_(y) 

286 

287 @property 

288 def ndim(self): 

289 "Returns the input dimension." 

290 if len(self.coef.shape) == 1: 

291 return self.coef.shape[0] - 1 

292 return self.coef.shape[1] - 1 

293 

294 @property 

295 def ndim_out(self): 

296 "Returns the output dimension." 

297 if len(self.coef.shape) == 1: 

298 return 1 

299 return self.coef.shape[0] 

300 

301 @property 

302 def training_weights(self): 

303 "Returns the weights stored in the neuron." 

304 return self.coef.ravel() 

305 

306 def update_training_weights(self, X, add=True): # pylint: disable=W0237 

307 """ 

308 Updates weights. 

309 

310 :param grad: vector to add to the weights such as gradient 

311 :param add: addition or replace 

312 """ 

313 if add: 

314 self.coef += X.reshape(self.coef.shape) 

315 else: 

316 numpy.copyto(self.coef, X.reshape(self.coef.shape)) 

317 

318 def fill_cache(self, X): 

319 """ 

320 Creates a cache with intermediate results. 

321 ``lX`` is the results before the activation function, 

322 ``aX`` is the results after the activation function, the prediction. 

323 """ 

324 cache = dict(lX=self._predict(X)) 

325 cache['aX'] = self.activation_(cache['lX']) 

326 return cache 

327 

328 def _common_loss_dloss(self, X, y, cache=None): 

329 """ 

330 Common beginning to methods *loss*, *dlossds*, 

331 *dlossdw*. 

332 """ 

333 if cache is not None and 'aX' in cache: 

334 act = cache['aX'] 

335 else: 

336 act = self.predict(X) 

337 return act 

338 

339 def loss(self, X, y, cache=None): 

340 """ 

341 Computes the loss. Returns a float. 

342 """ 

343 act = self._common_loss_dloss(X, y, cache=cache) 

344 if len(X.shape) == 1: 

345 return self.losss_(act, y) # pylint: disable=E1120 

346 return self.losss_(act, y) # pylint: disable=E1120 

347 

348 def dlossds(self, X, y, cache=None): 

349 """ 

350 Computes the loss derivative due to prediction error. 

351 """ 

352 act = self._common_loss_dloss(X, y, cache=cache) 

353 return self.dlossds_(act, y) 

354 

355 def gradient_backward(self, graddx, X, inputs=False, cache=None): 

356 """ 

357 Computes the gradients at point *X*. 

358 

359 :param graddx: existing gradient against the inputs 

360 :param X: computes the gradient in X 

361 :param inputs: if False, derivative against the coefficients, 

362 otherwise against the inputs. 

363 :param cache: cache intermediate results 

364 :return: gradient 

365 """ 

366 if cache is None: 

367 cache = self.fill_cache(X) 

368 

369 pred = cache['aX'] 

370 ga = self.gradient_(pred) 

371 if len(ga.shape) == 2: 

372 f = graddx @ ga 

373 else: 

374 f = graddx * ga 

375 

376 if inputs: 

377 if len(self.coef.shape) == 1: 

378 rgrad = numpy.empty(X.shape) 

379 rgrad[:] = self.coef[1:] 

380 rgrad *= f 

381 else: 

382 rgrad = numpy.sum( 

383 self.coef[:, 1:] * f.reshape((-1, 1)), axis=0) 

384 return rgrad 

385 

386 rgrad = numpy.empty(self.coef.shape) 

387 if len(self.coef.shape) == 1: 

388 rgrad[0] = 1 

389 rgrad[1:] = X 

390 rgrad *= f 

391 else: 

392 rgrad[:, 0] = 1 

393 rgrad[:, 1:] = X 

394 rgrad *= f.reshape((-1, 1)) 

395 return rgrad