Source code for aftercovid.models.epidemic_regressor

# coding: utf-8
"""
Implementation of a model for epidemics propagation.
"""
import numpy
from sklearn.base import BaseEstimator, RegressorMixin
from .covid_sird import CovidSIRD
from .covid_sird_cst import CovidSIRDc


[docs]class EpidemicRegressor(BaseEstimator, RegressorMixin): """ Follows :epkg:`scikit-learn` API. Trains a model on observed data from an epidemic. :param model: model to train, `'SIR'` or `'SIRD'` refers to `CovidSIRD <aftercovid.models.CovidSIRD>`, `SIRDc` refers to `CovidSIRDc <aftercovid.models.CovidSIRDc>` :param t: implicit feature :param max_iter: number of iteration :param learning_rate_init: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param lr_schedule: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param momentum: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param power_t: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param early_th: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param verbose: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param min_threshold: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>`, if `'auto'`, the value depends on the models, if is `0.01` for model `SIR`, it means every coefficient must be greater than 0.01. :param max_threshold: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>`, upper bound :param init: dictionary, initializes the model with this parameters Once trained the model holds a member `model_` which contains the trained model and `iter_` which holds the number of training iteration. It also keep track of the coefficients in a dictionary in attribute `coef_`. """ def __init__(self, model='SIR', t=0, max_iter=100, learning_rate_init=0.1, lr_schedule='constant', momentum=0.9, power_t=0.5, early_th=None, min_threshold='auto', max_threshold='auto', verbose=False, init=None): if init is not None: if isinstance(init, EpidemicRegressor): if hasattr(init, 'coef_'): init = init.coef_.copy() else: init = None # pragma: no cover elif not isinstance(init, dict): raise TypeError( f"init must be a dictionary not {type(init)}.") BaseEstimator.__init__(self) RegressorMixin.__init__(self) self.t = t self.model = model self.max_iter = max_iter self.learning_rate_init = learning_rate_init self.lr_schedule = lr_schedule self.momentum = momentum self.power_t = power_t self.early_th = early_th self.verbose = verbose if min_threshold == 'auto': if model.upper() in ('SIR', 'SIRD'): min_threshold = 0.0001 elif model.upper() in ('SIRC', ): pmin = dict(beta=0.001, nu=0.0001, mu=0.0001, a=-1., b=0., c=0.) min_threshold = numpy.array( [pmin[k[0]] for k in CovidSIRDc.P0]) elif model.upper() in ('SIRDC'): pmin = dict(beta=0.001, nu=0.001, mu=0.001, a=-1., b=0., c=0.) min_threshold = numpy.array( [pmin[k[0]] for k in CovidSIRDc.P0]) if max_threshold == 'auto': if model.upper() in ('SIR', 'SIRD'): max_threshold = 1. elif model.upper() in ('SIRC', 'SIRDC'): pmax = dict(beta=1., nu=0.5, mu=0.5, a=0., b=4., c=2.) max_threshold = numpy.array( [pmax[k[0]] for k in CovidSIRDc.P0]) self.min_threshold = min_threshold self.max_threshold = max_threshold self._get_model() self.init = init if init is not None: self.coef_ = init def _get_model(self): if self.model.lower() in ('sir', 'sird'): return CovidSIRD() if self.model.lower() in ('sirc', 'sirdc'): return CovidSIRDc() raise ValueError( f"Unknown model name '{self.model}'.")
[docs] def fit(self, X, y): """ Trains a model to approximate its derivative as much as possible. """ if not hasattr(self, 'model_'): self.model_ = self._get_model() self.model_.rnd() total = numpy.sum(X, axis=1) mi, ma = total.min(), total.max() err = (ma - mi) / mi if err > 1e-5: raise RuntimeError( # pragma: no cover f"Population is not constant, in [{mi}, {ma}].") if self.init is not None: for k, v in self.init.items(): self.model_[k] = v self.model_['N'] = (ma + mi) / 2 self.model_.fit( X, y, learning_rate_init=self.learning_rate_init, max_iter=self.max_iter, early_th=self.early_th, verbose=self.verbose, lr_schedule=self.lr_schedule, power_t=self.power_t, momentum=self.momentum, min_threshold=self.min_threshold, max_threshold=self.max_threshold) self.iter_ = self.model_.iter_ self.coef_ = self.model_.params_dict return self
[docs] def predict(self, X): """ Predicts the derivatives. """ if not hasattr(self, 'model_'): raise RuntimeError("Model was not trained.") return self.model_.predict(X)
[docs] def simulate(self, X, n=7): """ Predicts and simulates the epidemics. Every row of *X* is a starting point, the function then simulates the epidemics for the next *n* days for every starting point. :param X: data :param n: number of days :return: quantities, matrix of shape *(X.shape[0], n, number of parameters)* """ if not hasattr(self, "model_"): raise RuntimeError( # pragma: no cover "Model was not trained.") clq = self.model_.quantity_names if len(clq) != X.shape[1]: raise RuntimeError( # pragma: no cover f"Unapexected shape for X ({X.shape}), " f"expecting {len(clq)} columns.") res = None for i in range(X.shape[0]): for k, v in zip(clq, X[i]): self.model_[k] = v pred = self.model_.iterate2array(n=n, derivatives=False) if res is None: res = numpy.zeros((X.shape[0], ) + pred.shape) res[i, :, :] = pred return res
[docs] def predict_many(self, X, n=7): """ Predicts the derivatives and the series for many days. :param X: series :param n: number of days :return: derivates and series, return shape is *(X.shape[0], number of parameters, n)* """ if not hasattr(self, 'model_'): raise RuntimeError("Model was not trained.") # pragma: no cover deri = numpy.empty(X.shape + (n, )) curv = numpy.empty(X.shape + (n, )) for i in range(0, n): d = self.predict(X) deri[:, :, i] = d X += d curv[:, :, i] = X return deri, curv
[docs] def score(self, X, y, norm=None): """ Scores the prediction of the derivatives. :param X: data :param y: expected derivatives :param norm: norm to return the norm used to optimize (L2) or 'L1' to return the L1 norm :return: score """ if not hasattr(self, 'model_'): raise RuntimeError( # pragma: no cover "Model was not trained.") if norm is None: return self.model_.score(X, y) if norm.lower() == 'l1': return self.model_.score_l1(X, y) raise ValueError(f"Unexpected norm {norm!r}.")