Coverage for aftercovid/models/model_estimation.py: 96%

46 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-04-25 03:09 +0200

1# coding: utf-8 

2""" 

3Optimizes a model over true data. 

4""" 

5import warnings 

6import numpy 

7import pandas 

8from .epidemic_regressor import EpidemicRegressor 

9 

10 

11def find_best_model(Xt, yt, lrs, stop_loss, verbose=0, 

12 init=None, model_name='SIRD', 

13 max_iter=500): 

14 """ 

15 Finds the best model over a short period of time. 

16 

17 :param Xt: matrix Nx4 with times series 

18 :param Yt: matrix Nx4 with expected differentiated time series 

19 :param lrs: learning rates to try 

20 :param stop_loss: stops trying other learning rate if the loss is below 

21 this threshold 

22 :param verbose: display progress information (uses `print`) 

23 :param init: initialized model to start the optimisation from 

24 this parameters 

25 :param model_name: name of the model to optimize, `'SIRD'`, `'SIRDc'`, 

26 see :class:`EpidemicRegressor <aftercovid.models.EpidemicRegressor>` 

27 :param max_iter: maximum number of iterator to train every model 

28 :return: (best model, best loss, best learning rate) 

29 """ 

30 best_est, best_loss, best_lr = None, None, None 

31 m = None 

32 for ilr, lr in enumerate(lrs): 

33 if verbose: 

34 print( # pragma: no cover 

35 f"--- TRY {ilr + 1}/{len(lrs)}: {lr}") 

36 with warnings.catch_warnings(): 

37 warnings.simplefilter("ignore", RuntimeWarning) 

38 m = EpidemicRegressor( 

39 model_name, learning_rate_init=lr, max_iter=max_iter, 

40 early_th=stop_loss, verbose=verbose, init=m) 

41 try: 

42 m.fit(Xt, yt) 

43 except RuntimeError as e: # pragma: no cover 

44 if verbose: 

45 print(f'ERROR: {e}') 

46 continue 

47 loss = m.score(Xt, yt) 

48 if numpy.isnan(loss): 

49 continue # pragma: no cover 

50 if best_est is None or best_loss > loss: 

51 best_est = m 

52 best_loss = loss 

53 best_lr = lr 

54 if best_loss < stop_loss: 

55 return best_est, best_loss, best_lr # pragma: no cover 

56 return best_est, best_loss, best_lr 

57 

58 

59def rolling_estimation(X, y, 

60 lrs=(1e8, 1e6, 1e4, 1e2, 1, 1e-2, 1e-4, 1e-6), 

61 delay=21, stop_loss=1, init=None, 

62 model_name='SIRD', max_iter=500, verbose=0, 

63 dates=None): 

64 """ 

65 Estimates a model over a rolling windows whose size is 

66 *delay* days. See :ref:`l-example-rolling-estimation` to 

67 see how to use that function. 

68 

69 :param Xt: matrix Nx4 with times series 

70 :param Yt: matrix Nx4 with expected differentiated time series 

71 :param lrs: learning rates to try 

72 :param delay: size of the rolling window 

73 :param stop_loss: stops trying other learning rate if the loss is below 

74 this threshold 

75 :param verbose: display progress information (uses `print`) 

76 :param init: initialized model to start the optimisation from 

77 this parameters 

78 :param model_name: name of the model to optimize, `'SIRD'`, `'SIRDc'`, 

79 see :class:`EpidemicRegressor <aftercovid.models.EpidemicRegressor>` 

80 :param max_iter: maximum number of iterator to train every model 

81 :param dates: dates for every row in X, can be None 

82 :return: (results, last best model) 

83 """ 

84 coefs = [] 

85 m = None 

86 kdates = ( 

87 list(range(0, X.shape[0] - delay + 1 - 28, 7)) + 

88 list(range(X.shape[0] - delay + 1 - 28, 

89 X.shape[0] - delay + 1 - 7, 2)) + 

90 list(range(X.shape[0] - delay + 1 - 7, 

91 X.shape[0] - delay + 1, 1))) 

92 kdates = [d for d in kdates if d > 0] 

93 for k in kdates: 

94 end = min(k + delay, X.shape[0]) 

95 Xt, yt = X[k:end], y[k:end] 

96 if any(numpy.isnan(Xt.ravel())) or any(numpy.isnan(yt.ravel())): 

97 continue # pragma: no cover 

98 m, loss, lr = find_best_model( 

99 Xt, yt, lrs, stop_loss, init=m, model_name=model_name, 

100 max_iter=max_iter) 

101 if m is None: 

102 if verbose: # pragma: no cover 

103 print(f"k={k} loss=nan") 

104 find_best_model( 

105 Xt, yt, [1e8, 1e6, 1e4, 1e2, 1, 

106 1e-2, 1e-4, 1e-6], 10, 

107 init=m, verbose=True) 

108 continue 

109 loss = m.score(Xt, yt) 

110 loss_l1 = m.score(Xt, yt, 'l1') 

111 if verbose: 

112 print("k={} iter={} loss={:1.3f} l1={:1.3g} coef={} R0={} " 

113 "lr={} cn={}".format( 

114 k, m.iter_, loss, loss_l1, 

115 m.model_._val_p, m.model_.R0(), lr, 

116 m.model_.correctness().sum())) 

117 obs = dict(k=k, loss=loss, loss_l1=loss_l1, it=m.iter_, 

118 R0=m.model_.R0(), lr=lr, 

119 correctness=m.model_.correctness().sum(), 

120 date=k if dates is None else dates[end - 1]) 

121 obs.update({k: v for k, v in zip( 

122 m.model_.param_names, m.model_._val_p)}) 

123 coefs.append(obs) 

124 

125 dfcoef = pandas.DataFrame(coefs) 

126 dfcoef = dfcoef.set_index("date") 

127 return dfcoef, m