Coverage for aftercovid/models/epidemic_regressor.py: 100%

101 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-04-24 03:09 +0200

1# coding: utf-8 

2""" 

3Implementation of a model for epidemics propagation. 

4""" 

5import numpy 

6from sklearn.base import BaseEstimator, RegressorMixin 

7from .covid_sird import CovidSIRD 

8from .covid_sird_cst import CovidSIRDc 

9 

10 

11class EpidemicRegressor(BaseEstimator, RegressorMixin): 

12 """ 

13 Follows :epkg:`scikit-learn` API. 

14 Trains a model on observed data from an epidemic. 

15 

16 :param model: model to train, `'SIR'` or `'SIRD'` 

17 refers to `CovidSIRD <aftercovid.models.CovidSIRD>`, 

18 `SIRDc` refers to `CovidSIRDc 

19 <aftercovid.models.CovidSIRDc>` 

20 :param t: implicit feature 

21 :param max_iter: number of iteration 

22 :param learning_rate_init: see :class:`SGDOptimizer 

23 <aftercovid.optim.SGDOptimizer>` 

24 :param lr_schedule: see :class:`SGDOptimizer 

25 <aftercovid.optim.SGDOptimizer>` 

26 :param momentum: see :class:`SGDOptimizer 

27 <aftercovid.optim.SGDOptimizer>` 

28 :param power_t: see :class:`SGDOptimizer 

29 <aftercovid.optim.SGDOptimizer>` 

30 :param early_th: see :class:`SGDOptimizer 

31 <aftercovid.optim.SGDOptimizer>` 

32 :param verbose: see :class:`SGDOptimizer 

33 <aftercovid.optim.SGDOptimizer>` 

34 :param min_threshold: see :class:`SGDOptimizer 

35 <aftercovid.optim.SGDOptimizer>`, if `'auto'`, 

36 the value depends on the models, if is `0.01` 

37 for model `SIR`, it means every coefficient must 

38 be greater than 0.01. 

39 :param max_threshold: see :class:`SGDOptimizer 

40 <aftercovid.optim.SGDOptimizer>`, upper bound 

41 :param init: dictionary, initializes the model 

42 with this parameters 

43 

44 Once trained the model holds a member `model_` 

45 which contains the trained model and `iter_` 

46 which holds the number of training iteration. 

47 It also keep track of the coefficients in a dictionary 

48 in attribute `coef_`. 

49 """ 

50 

51 def __init__(self, model='SIR', t=0, max_iter=100, 

52 learning_rate_init=0.1, lr_schedule='constant', 

53 momentum=0.9, power_t=0.5, early_th=None, 

54 min_threshold='auto', max_threshold='auto', 

55 verbose=False, init=None): 

56 if init is not None: 

57 if isinstance(init, EpidemicRegressor): 

58 if hasattr(init, 'coef_'): 

59 init = init.coef_.copy() 

60 else: 

61 init = None # pragma: no cover 

62 elif not isinstance(init, dict): 

63 raise TypeError( 

64 f"init must be a dictionary not {type(init)}.") 

65 BaseEstimator.__init__(self) 

66 RegressorMixin.__init__(self) 

67 self.t = t 

68 self.model = model 

69 self.max_iter = max_iter 

70 self.learning_rate_init = learning_rate_init 

71 self.lr_schedule = lr_schedule 

72 self.momentum = momentum 

73 self.power_t = power_t 

74 self.early_th = early_th 

75 self.verbose = verbose 

76 if min_threshold == 'auto': 

77 if model.upper() in ('SIR', 'SIRD'): 

78 min_threshold = 0.0001 

79 elif model.upper() in ('SIRC', ): 

80 pmin = dict(beta=0.001, nu=0.0001, mu=0.0001, 

81 a=-1., b=0., c=0.) 

82 min_threshold = numpy.array( 

83 [pmin[k[0]] for k in CovidSIRDc.P0]) 

84 elif model.upper() in ('SIRDC'): 

85 pmin = dict(beta=0.001, nu=0.001, mu=0.001, 

86 a=-1., b=0., c=0.) 

87 min_threshold = numpy.array( 

88 [pmin[k[0]] for k in CovidSIRDc.P0]) 

89 if max_threshold == 'auto': 

90 if model.upper() in ('SIR', 'SIRD'): 

91 max_threshold = 1. 

92 elif model.upper() in ('SIRC', 'SIRDC'): 

93 pmax = dict(beta=1., nu=0.5, mu=0.5, 

94 a=0., b=4., c=2.) 

95 max_threshold = numpy.array( 

96 [pmax[k[0]] for k in CovidSIRDc.P0]) 

97 self.min_threshold = min_threshold 

98 self.max_threshold = max_threshold 

99 self._get_model() 

100 self.init = init 

101 if init is not None: 

102 self.coef_ = init 

103 

104 def _get_model(self): 

105 if self.model.lower() in ('sir', 'sird'): 

106 return CovidSIRD() 

107 if self.model.lower() in ('sirc', 'sirdc'): 

108 return CovidSIRDc() 

109 raise ValueError( 

110 f"Unknown model name '{self.model}'.") 

111 

112 def fit(self, X, y): 

113 """ 

114 Trains a model to approximate its derivative as much as 

115 possible. 

116 """ 

117 if not hasattr(self, 'model_'): 

118 self.model_ = self._get_model() 

119 self.model_.rnd() 

120 total = numpy.sum(X, axis=1) 

121 mi, ma = total.min(), total.max() 

122 err = (ma - mi) / mi 

123 if err > 1e-5: 

124 raise RuntimeError( # pragma: no cover 

125 f"Population is not constant, in [{mi}, {ma}].") 

126 if self.init is not None: 

127 for k, v in self.init.items(): 

128 self.model_[k] = v 

129 self.model_['N'] = (ma + mi) / 2 

130 self.model_.fit( 

131 X, y, learning_rate_init=self.learning_rate_init, 

132 max_iter=self.max_iter, early_th=self.early_th, 

133 verbose=self.verbose, lr_schedule=self.lr_schedule, 

134 power_t=self.power_t, momentum=self.momentum, 

135 min_threshold=self.min_threshold, 

136 max_threshold=self.max_threshold) 

137 self.iter_ = self.model_.iter_ 

138 self.coef_ = self.model_.params_dict 

139 return self 

140 

141 def predict(self, X): 

142 """ 

143 Predicts the derivatives. 

144 """ 

145 if not hasattr(self, 'model_'): 

146 raise RuntimeError("Model was not trained.") 

147 return self.model_.predict(X) 

148 

149 def simulate(self, X, n=7): 

150 """ 

151 Predicts and simulates the epidemics. 

152 Every row of *X* is a starting point, 

153 the function then simulates the epidemics for the next 

154 *n* days for every starting point. 

155 

156 :param X: data 

157 :param n: number of days 

158 :return: quantities, matrix of shape 

159 *(X.shape[0], n, number of parameters)* 

160 """ 

161 if not hasattr(self, "model_"): 

162 raise RuntimeError( # pragma: no cover 

163 "Model was not trained.") 

164 clq = self.model_.quantity_names 

165 if len(clq) != X.shape[1]: 

166 raise RuntimeError( # pragma: no cover 

167 f"Unapexected shape for X ({X.shape}), " 

168 f"expecting {len(clq)} columns.") 

169 res = None 

170 for i in range(X.shape[0]): 

171 for k, v in zip(clq, X[i]): 

172 self.model_[k] = v 

173 pred = self.model_.iterate2array(n=n, derivatives=False) 

174 if res is None: 

175 res = numpy.zeros((X.shape[0], ) + pred.shape) 

176 res[i, :, :] = pred 

177 return res 

178 

179 def predict_many(self, X, n=7): 

180 """ 

181 Predicts the derivatives and the series 

182 for many days. 

183 

184 :param X: series 

185 :param n: number of days 

186 :return: derivates and series, return shape is 

187 *(X.shape[0], number of parameters, n)* 

188 """ 

189 if not hasattr(self, 'model_'): 

190 raise RuntimeError("Model was not trained.") # pragma: no cover 

191 deri = numpy.empty(X.shape + (n, )) 

192 curv = numpy.empty(X.shape + (n, )) 

193 for i in range(0, n): 

194 d = self.predict(X) 

195 deri[:, :, i] = d 

196 X += d 

197 curv[:, :, i] = X 

198 return deri, curv 

199 

200 def score(self, X, y, norm=None): 

201 """ 

202 Scores the prediction of the derivatives. 

203 

204 :param X: data 

205 :param y: expected derivatives 

206 :param norm: norm to return the norm used to optimize (L2) 

207 or 'L1' to return the L1 norm 

208 :return: score 

209 """ 

210 if not hasattr(self, 'model_'): 

211 raise RuntimeError( # pragma: no cover 

212 "Model was not trained.") 

213 if norm is None: 

214 return self.model_.score(X, y) 

215 if norm.lower() == 'l1': 

216 return self.model_.score_l1(X, y) 

217 raise ValueError(f"Unexpected norm {norm!r}.")