Coverage for aftercovid/models/epidemic_regressor.py: 100%
101 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-04-24 03:09 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2024-04-24 03:09 +0200
1# coding: utf-8
2"""
3Implementation of a model for epidemics propagation.
4"""
5import numpy
6from sklearn.base import BaseEstimator, RegressorMixin
7from .covid_sird import CovidSIRD
8from .covid_sird_cst import CovidSIRDc
11class EpidemicRegressor(BaseEstimator, RegressorMixin):
12 """
13 Follows :epkg:`scikit-learn` API.
14 Trains a model on observed data from an epidemic.
16 :param model: model to train, `'SIR'` or `'SIRD'`
17 refers to `CovidSIRD <aftercovid.models.CovidSIRD>`,
18 `SIRDc` refers to `CovidSIRDc
19 <aftercovid.models.CovidSIRDc>`
20 :param t: implicit feature
21 :param max_iter: number of iteration
22 :param learning_rate_init: see :class:`SGDOptimizer
23 <aftercovid.optim.SGDOptimizer>`
24 :param lr_schedule: see :class:`SGDOptimizer
25 <aftercovid.optim.SGDOptimizer>`
26 :param momentum: see :class:`SGDOptimizer
27 <aftercovid.optim.SGDOptimizer>`
28 :param power_t: see :class:`SGDOptimizer
29 <aftercovid.optim.SGDOptimizer>`
30 :param early_th: see :class:`SGDOptimizer
31 <aftercovid.optim.SGDOptimizer>`
32 :param verbose: see :class:`SGDOptimizer
33 <aftercovid.optim.SGDOptimizer>`
34 :param min_threshold: see :class:`SGDOptimizer
35 <aftercovid.optim.SGDOptimizer>`, if `'auto'`,
36 the value depends on the models, if is `0.01`
37 for model `SIR`, it means every coefficient must
38 be greater than 0.01.
39 :param max_threshold: see :class:`SGDOptimizer
40 <aftercovid.optim.SGDOptimizer>`, upper bound
41 :param init: dictionary, initializes the model
42 with this parameters
44 Once trained the model holds a member `model_`
45 which contains the trained model and `iter_`
46 which holds the number of training iteration.
47 It also keep track of the coefficients in a dictionary
48 in attribute `coef_`.
49 """
51 def __init__(self, model='SIR', t=0, max_iter=100,
52 learning_rate_init=0.1, lr_schedule='constant',
53 momentum=0.9, power_t=0.5, early_th=None,
54 min_threshold='auto', max_threshold='auto',
55 verbose=False, init=None):
56 if init is not None:
57 if isinstance(init, EpidemicRegressor):
58 if hasattr(init, 'coef_'):
59 init = init.coef_.copy()
60 else:
61 init = None # pragma: no cover
62 elif not isinstance(init, dict):
63 raise TypeError(
64 f"init must be a dictionary not {type(init)}.")
65 BaseEstimator.__init__(self)
66 RegressorMixin.__init__(self)
67 self.t = t
68 self.model = model
69 self.max_iter = max_iter
70 self.learning_rate_init = learning_rate_init
71 self.lr_schedule = lr_schedule
72 self.momentum = momentum
73 self.power_t = power_t
74 self.early_th = early_th
75 self.verbose = verbose
76 if min_threshold == 'auto':
77 if model.upper() in ('SIR', 'SIRD'):
78 min_threshold = 0.0001
79 elif model.upper() in ('SIRC', ):
80 pmin = dict(beta=0.001, nu=0.0001, mu=0.0001,
81 a=-1., b=0., c=0.)
82 min_threshold = numpy.array(
83 [pmin[k[0]] for k in CovidSIRDc.P0])
84 elif model.upper() in ('SIRDC'):
85 pmin = dict(beta=0.001, nu=0.001, mu=0.001,
86 a=-1., b=0., c=0.)
87 min_threshold = numpy.array(
88 [pmin[k[0]] for k in CovidSIRDc.P0])
89 if max_threshold == 'auto':
90 if model.upper() in ('SIR', 'SIRD'):
91 max_threshold = 1.
92 elif model.upper() in ('SIRC', 'SIRDC'):
93 pmax = dict(beta=1., nu=0.5, mu=0.5,
94 a=0., b=4., c=2.)
95 max_threshold = numpy.array(
96 [pmax[k[0]] for k in CovidSIRDc.P0])
97 self.min_threshold = min_threshold
98 self.max_threshold = max_threshold
99 self._get_model()
100 self.init = init
101 if init is not None:
102 self.coef_ = init
104 def _get_model(self):
105 if self.model.lower() in ('sir', 'sird'):
106 return CovidSIRD()
107 if self.model.lower() in ('sirc', 'sirdc'):
108 return CovidSIRDc()
109 raise ValueError(
110 f"Unknown model name '{self.model}'.")
112 def fit(self, X, y):
113 """
114 Trains a model to approximate its derivative as much as
115 possible.
116 """
117 if not hasattr(self, 'model_'):
118 self.model_ = self._get_model()
119 self.model_.rnd()
120 total = numpy.sum(X, axis=1)
121 mi, ma = total.min(), total.max()
122 err = (ma - mi) / mi
123 if err > 1e-5:
124 raise RuntimeError( # pragma: no cover
125 f"Population is not constant, in [{mi}, {ma}].")
126 if self.init is not None:
127 for k, v in self.init.items():
128 self.model_[k] = v
129 self.model_['N'] = (ma + mi) / 2
130 self.model_.fit(
131 X, y, learning_rate_init=self.learning_rate_init,
132 max_iter=self.max_iter, early_th=self.early_th,
133 verbose=self.verbose, lr_schedule=self.lr_schedule,
134 power_t=self.power_t, momentum=self.momentum,
135 min_threshold=self.min_threshold,
136 max_threshold=self.max_threshold)
137 self.iter_ = self.model_.iter_
138 self.coef_ = self.model_.params_dict
139 return self
141 def predict(self, X):
142 """
143 Predicts the derivatives.
144 """
145 if not hasattr(self, 'model_'):
146 raise RuntimeError("Model was not trained.")
147 return self.model_.predict(X)
149 def simulate(self, X, n=7):
150 """
151 Predicts and simulates the epidemics.
152 Every row of *X* is a starting point,
153 the function then simulates the epidemics for the next
154 *n* days for every starting point.
156 :param X: data
157 :param n: number of days
158 :return: quantities, matrix of shape
159 *(X.shape[0], n, number of parameters)*
160 """
161 if not hasattr(self, "model_"):
162 raise RuntimeError( # pragma: no cover
163 "Model was not trained.")
164 clq = self.model_.quantity_names
165 if len(clq) != X.shape[1]:
166 raise RuntimeError( # pragma: no cover
167 f"Unapexected shape for X ({X.shape}), "
168 f"expecting {len(clq)} columns.")
169 res = None
170 for i in range(X.shape[0]):
171 for k, v in zip(clq, X[i]):
172 self.model_[k] = v
173 pred = self.model_.iterate2array(n=n, derivatives=False)
174 if res is None:
175 res = numpy.zeros((X.shape[0], ) + pred.shape)
176 res[i, :, :] = pred
177 return res
179 def predict_many(self, X, n=7):
180 """
181 Predicts the derivatives and the series
182 for many days.
184 :param X: series
185 :param n: number of days
186 :return: derivates and series, return shape is
187 *(X.shape[0], number of parameters, n)*
188 """
189 if not hasattr(self, 'model_'):
190 raise RuntimeError("Model was not trained.") # pragma: no cover
191 deri = numpy.empty(X.shape + (n, ))
192 curv = numpy.empty(X.shape + (n, ))
193 for i in range(0, n):
194 d = self.predict(X)
195 deri[:, :, i] = d
196 X += d
197 curv[:, :, i] = X
198 return deri, curv
200 def score(self, X, y, norm=None):
201 """
202 Scores the prediction of the derivatives.
204 :param X: data
205 :param y: expected derivatives
206 :param norm: norm to return the norm used to optimize (L2)
207 or 'L1' to return the L1 norm
208 :return: score
209 """
210 if not hasattr(self, 'model_'):
211 raise RuntimeError( # pragma: no cover
212 "Model was not trained.")
213 if norm is None:
214 return self.model_.score(X, y)
215 if norm.lower() == 'l1':
216 return self.model_.score_l1(X, y)
217 raise ValueError(f"Unexpected norm {norm!r}.")