Coverage for src/mlstatpy/ml/_neural_tree_node.py: 99%

224 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-13 20:42 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Conversion from tree to neural network. 

5""" 

6import numpy 

7import numpy.random as rnd 

8from scipy.special import expit, softmax, kl_div as kl_fct # pylint: disable=E0611 

9from ._neural_tree_api import _TrainingAPI 

10 

11 

12class NeuralTreeNode(_TrainingAPI): 

13 """ 

14 One node in a neural network. 

15 """ 

16 

17 @staticmethod 

18 def _relu(x): 

19 "Relu function." 

20 return numpy.maximum(x, 0) 

21 

22 @staticmethod 

23 def _leakyrelu(x): 

24 "Leaky Relu function." 

25 return numpy.maximum(x, 0) + numpy.minimum(x, 0) * 0.01 

26 

27 @staticmethod 

28 def _drelu(x): 

29 "Derivative of the Relu function." 

30 res = numpy.ones(x.shape, dtype=x.dtype) 

31 res[x < 0] = 0. 

32 return res 

33 

34 @staticmethod 

35 def _dleakyrelu(x): 

36 "Derivative of the Leaky Relu function." 

37 res = numpy.ones(x.shape, dtype=x.dtype) 

38 res[x < 0] = 0.01 

39 return res 

40 

41 @staticmethod 

42 def _dsigmoid(x): 

43 "Derivativ of the sigmoid function." 

44 y = expit(x) 

45 return y * (1 - y) 

46 

47 @staticmethod 

48 def _softmax(x): 

49 "Derivative of the softmax function." 

50 if len(x.shape) == 2: 

51 return softmax(x, axis=1) 

52 return softmax(x) 

53 

54 @staticmethod 

55 def _dsoftmax(x): 

56 "Derivative of the softmax function." 

57 soft = softmax(x) 

58 grad = - soft @ soft.T 

59 diag = numpy.diag(soft) 

60 return diag + grad 

61 

62 @staticmethod 

63 def get_activation_function(activation): 

64 """ 

65 Returns the activation function. 

66 It returns a function *y=f(x)*. 

67 """ 

68 if activation == 'softmax': 

69 return NeuralTreeNode._softmax 

70 if activation == 'softmax4': 

71 return lambda x: NeuralTreeNode._softmax(x * 4) 

72 if activation in {'logistic', 'expit', 'sigmoid'}: 

73 return expit 

74 if activation == 'sigmoid4': 

75 return lambda x: expit(x * 4) 

76 if activation == 'relu': 

77 return NeuralTreeNode._relu 

78 if activation == 'leakyrelu': 

79 return NeuralTreeNode._leakyrelu 

80 if activation == 'identity': 

81 return lambda x: x 

82 raise ValueError( # pragma: no cover 

83 "Unknown activation function '{}'.".format(activation)) 

84 

85 @staticmethod 

86 def get_activation_gradient_function(activation): 

87 """ 

88 Returns the activation function. 

89 It returns a function *y=f'(x)*. 

90 About the sigmoid: 

91 

92 .. math:: 

93 

94 \\begin{array}{l} 

95 f(x) &=& \frac{1}{1 + e^{-x}} \\\\ 

96 f'(x) &=& \frac{e^{-x}}{(1 + e^{-x})^2} = f(x)(1-f(x)) 

97 \\end{array}} 

98 """ 

99 if activation == 'softmax': 

100 return NeuralTreeNode._dsoftmax 

101 if activation == 'softmax4': 

102 return lambda x: NeuralTreeNode._dsoftmax(x) * 4 

103 if activation in {'logistic', 'expit', 'sigmoid'}: 

104 return NeuralTreeNode._dsigmoid 

105 if activation == 'sigmoid4': 

106 return lambda x: NeuralTreeNode._dsigmoid(x) * 4 

107 if activation == 'relu': 

108 return NeuralTreeNode._drelu 

109 if activation == 'leakyrelu': 

110 return NeuralTreeNode._dleakyrelu 

111 if activation == 'identity': 

112 return lambda x: numpy.ones(x.shape, dtype=x.dtype) 

113 raise ValueError( # pragma: no cover 

114 "Unknown activation gradient function '{}'.".format(activation)) 

115 

116 @staticmethod 

117 def get_activation_loss_function(activation): 

118 """ 

119 Returns a default loss function based on the activation 

120 function. It returns two functions *g=loss(x,y)*. 

121 """ 

122 if activation in {'logistic', 'expit', 'sigmoid', 'sigmoid4'}: 

123 # regression + regularization 

124 return lambda x, y: (x - y) ** 2 

125 if activation in {'softmax', 'softmax4'}: 

126 cst = numpy.finfo(numpy.float32).eps 

127 

128 # classification 

129 def kl_fct2(x, y): 

130 return kl_fct(x + cst, y + cst) 

131 return kl_fct2 

132 if activation in {'identity', 'relu', 'leakyrelu'}: 

133 # regression 

134 return lambda x, y: (x - y) ** 2 

135 raise ValueError( 

136 "Unknown activation function '{}'.".format(activation)) 

137 

138 @staticmethod 

139 def get_activation_dloss_function(activation): 

140 """ 

141 Returns the derivative of the default loss function based 

142 on the activation function. It returns a function 

143 *df(x,y)/dw, df(w)/dw* where *w* are the weights. 

144 """ 

145 if activation in {'logistic', 'expit', 'sigmoid', 'sigmoid4'}: 

146 # regression + regularization 

147 def dregrdx(x, y): 

148 return (x - y) * 2 

149 

150 return dregrdx 

151 

152 if activation in {'softmax', 'softmax4'}: 

153 # classification 

154 cst = numpy.finfo(numpy.float32).eps 

155 

156 def dclsdx(x, y): 

157 return numpy.log(x + cst) - numpy.log(y + cst) 

158 

159 return dclsdx 

160 

161 if activation in {'identity', 'relu', 'leakyrelu'}: 

162 # regression 

163 def dregdx(x, y): 

164 return (x - y) * 2 

165 

166 return dregdx 

167 raise ValueError( # pragma: no cover 

168 "Unknown activation function '{}'.".format(activation)) 

169 

170 def __init__(self, weights, bias=None, activation='sigmoid', nodeid=-1, 

171 tag=None): 

172 """ 

173 @param weights weights 

174 @param bias bias, if None, draws a random number 

175 @param activation activation function 

176 @param nodeid node id 

177 @param tag unused but to add information 

178 on how this node was created 

179 """ 

180 self.tag = tag 

181 if isinstance(weights, int): 

182 if activation.startswith('softmax'): 

183 weights = rnd.randn(2, weights) 

184 else: 

185 weights = rnd.randn(weights) 

186 if isinstance(weights, list): 

187 weights = numpy.array(weights) 

188 

189 if len(weights.shape) == 1: 

190 self.n_outputs = 1 

191 if bias is None: 

192 bias = rnd.randn() 

193 self.coef = numpy.empty(len(weights) + 1) 

194 self.coef[1:] = weights 

195 self.coef[0] = bias 

196 

197 elif len(weights.shape) == 2: 

198 self.n_outputs = weights.shape[0] 

199 if self.n_outputs == 1: 

200 raise RuntimeError( # pragma: no cover 

201 "Unexpected unsqueezed weights shape: {}".format(weights.shape)) 

202 if bias is None: 

203 bias = rnd.randn(self.n_outputs) 

204 shape = list(weights.shape) 

205 shape[1] += 1 

206 self.coef = numpy.empty(shape) 

207 self.coef[:, 1:] = weights 

208 self.coef[:, 0] = bias 

209 else: 

210 raise RuntimeError( # pragma: no cover 

211 "Unexpected weights shape: {}".format(weights.shape)) 

212 

213 self.activation = activation 

214 self.nodeid = nodeid 

215 self._set_fcts() 

216 

217 def _set_fcts(self): 

218 self.activation_ = NeuralTreeNode.get_activation_function( 

219 self.activation) 

220 self.gradient_ = NeuralTreeNode.get_activation_gradient_function( 

221 self.activation) 

222 self.losss_ = NeuralTreeNode.get_activation_loss_function( 

223 self.activation) 

224 self.dlossds_ = NeuralTreeNode.get_activation_dloss_function( 

225 self.activation) 

226 

227 @property 

228 def input_weights(self): 

229 "Returns the weights." 

230 if self.n_outputs == 1: 

231 return self.coef[1:] 

232 return self.coef[:, 1:] 

233 

234 @property 

235 def bias(self): 

236 "Returns the weights." 

237 if self.n_outputs == 1: 

238 return self.coef[0] 

239 return self.coef[:, 0] 

240 

241 def __getstate__(self): 

242 "usual" 

243 return { 

244 'coef': self.coef, 'activation': self.activation, 

245 'nodeid': self.nodeid, 'n_outputs': self.n_outputs, 

246 'tag': self.tag} 

247 

248 def __setstate__(self, state): 

249 "usual" 

250 self.coef = state['coef'] 

251 self.activation = state['activation'] 

252 self.nodeid = state['nodeid'] 

253 self.n_outputs = state['n_outputs'] 

254 self.tag = state['tag'] 

255 self._set_fcts() 

256 

257 def __eq__(self, obj): 

258 if self.coef.shape != obj.coef.shape: 

259 return False 

260 if any(map(lambda xy: xy[0] != xy[1], 

261 zip(self.coef.ravel(), obj.coef.ravel()))): 

262 return False 

263 if self.activation != obj.activation: 

264 return False 

265 return True 

266 

267 def __repr__(self): 

268 "usual" 

269 if len(self.coef.shape) == 1: 

270 return "%s(weights=%r, bias=%r, activation=%r)" % ( 

271 self.__class__.__name__, self.coef[1:], 

272 self.coef[0], self.activation) 

273 return "%s(weights=%r, bias=%r, activation=%r)" % ( 

274 self.__class__.__name__, self.coef[:, 1:], 

275 self.coef[:, 0], self.activation) 

276 

277 def _predict(self, X): 

278 "Computes inputs of the activation function." 

279 if self.n_outputs == 1: 

280 return X @ self.coef[1:] + self.coef[0] 

281 res = X.reshape((1, -1)) @ self.coef[:, 1:].T + self.coef[:, 0] 

282 return res.ravel() 

283 

284 def predict(self, X): 

285 "Computes neuron outputs." 

286 if self.n_outputs == 1: 

287 return self.activation_(X @ self.coef[1:] + self.coef[0]) 

288 if len(X.shape) == 2: 

289 return self.activation_( 

290 (X @ self.coef[:, 1:].T + self.coef[:, 0])) 

291 return self.activation_( 

292 (X.reshape((1, -1)) @ self.coef[:, 1:].T + self.coef[:, 0]).ravel()) 

293 

294 @property 

295 def ndim(self): 

296 "Returns the input dimension." 

297 if len(self.coef.shape) == 1: 

298 return self.coef.shape[0] - 1 

299 return self.coef.shape[1] - 1 

300 

301 @property 

302 def training_weights(self): 

303 "Returns the weights stored in the neuron." 

304 return self.coef.ravel() 

305 

306 def update_training_weights(self, X, add=True): # pylint: disable=W0237 

307 """ 

308 Updates weights. 

309 

310 :param grad: vector to add to the weights such as gradient 

311 :param add: addition or replace 

312 """ 

313 if add: 

314 self.coef += X.reshape(self.coef.shape) 

315 else: 

316 numpy.copyto(self.coef, X.reshape(self.coef.shape)) 

317 

318 def fill_cache(self, X): 

319 """ 

320 Creates a cache with intermediate results. 

321 ``lX`` is the results before the activation function, 

322 ``aX`` is the results after the activation function, the prediction. 

323 """ 

324 cache = dict(lX=self._predict(X)) 

325 cache['aX'] = self.activation_(cache['lX']) 

326 return cache 

327 

328 def _common_loss_dloss(self, X, y, cache=None): 

329 """ 

330 Common beginning to methods *loss*, *dlossds*, 

331 *dlossdw*. 

332 """ 

333 if cache is not None and 'aX' in cache: 

334 act = cache['aX'] 

335 else: 

336 act = self.predict(X) 

337 return act 

338 

339 def loss(self, X, y, cache=None): 

340 """ 

341 Computes the loss. Returns a float. 

342 """ 

343 act = self._common_loss_dloss(X, y, cache=cache) 

344 if len(X.shape) == 1: 

345 return self.losss_(act, y) # pylint: disable=E1120 

346 return self.losss_(act, y) # pylint: disable=E1120 

347 

348 def dlossds(self, X, y, cache=None): 

349 """ 

350 Computes the loss derivative due to prediction error. 

351 """ 

352 act = self._common_loss_dloss(X, y, cache=cache) 

353 return self.dlossds_(act, y) 

354 

355 def gradient_backward(self, graddx, X, inputs=False, cache=None): 

356 """ 

357 Computes the gradients at point *X*. 

358 

359 :param graddx: existing gradient against the inputs 

360 :param X: computes the gradient in X 

361 :param inputs: if False, derivative against the coefficients, 

362 otherwise against the inputs. 

363 :param cache: cache intermediate results 

364 :return: gradient 

365 """ 

366 if cache is None: 

367 cache = self.fill_cache(X) 

368 

369 pred = cache['aX'] 

370 ga = self.gradient_(pred) 

371 if len(ga.shape) == 2: 

372 f = graddx @ ga 

373 else: 

374 f = graddx * ga 

375 

376 if inputs: 

377 if len(self.coef.shape) == 1: 

378 rgrad = numpy.empty(X.shape) 

379 rgrad[:] = self.coef[1:] 

380 rgrad *= f 

381 else: 

382 rgrad = numpy.sum( 

383 self.coef[:, 1:] * f.reshape((-1, 1)), axis=0) 

384 return rgrad 

385 

386 rgrad = numpy.empty(self.coef.shape) 

387 if len(self.coef.shape) == 1: 

388 rgrad[0] = 1 

389 rgrad[1:] = X 

390 rgrad *= f 

391 else: 

392 rgrad[:, 0] = 1 

393 rgrad[:, 1:] = X 

394 rgrad *= f.reshape((-1, 1)) 

395 return rgrad