Coverage for mlinsights/timeseries/preprocessing.py: 84%

57 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-08-09 08:45 +0200

1""" 

2@file 

3@brief Timeseries preprocessing. 

4""" 

5import numpy 

6from .base import BaseReciprocalTimeSeriesTransformer 

7 

8 

9class TimeSeriesDifference(BaseReciprocalTimeSeriesTransformer): 

10 """ 

11 Computes timeseries differences. 

12 """ 

13 

14 def __init__(self, degree=1): 

15 """ 

16 @param degree number of differences 

17 """ 

18 BaseReciprocalTimeSeriesTransformer.__init__(self, degree) 

19 

20 @property 

21 def degree(self): 

22 """ 

23 Returns the degree. 

24 """ 

25 return self.context_length 

26 

27 def fit(self, X, y, sample_weight=None): 

28 """ 

29 Stores the first values. 

30 """ 

31 self.X_ = X[:self.degree].copy() 

32 self.y_ = y[:self.degree].copy() 

33 for n in range(1, self.degree): 

34 self.y_[n:] -= self.y_[n - 1:-1] 

35 return self 

36 

37 def transform(self, X, y, sample_weight=None): 

38 """ 

39 Transforms both *X* and *y*. 

40 Returns *X* and *y*, returns 

41 *sample_weight* as well if not None. 

42 """ 

43 for _ in range(self.degree): 

44 y = y[1:] - y[:-1] 

45 X = X[1:] 

46 if sample_weight is None: 

47 return X, y 

48 return X, y, sample_weight[1:] 

49 

50 def get_fct_inv(self): 

51 """ 

52 Returns the reverse tranform. 

53 """ 

54 return TimeSeriesDifferenceInv(self).fit() 

55 

56 

57class TimeSeriesDifferenceInv(BaseReciprocalTimeSeriesTransformer): 

58 """ 

59 Computes the reverse of @see cl TimeSeriesDifference. 

60 """ 

61 

62 def __init__(self, estimator): 

63 """ 

64 @param estimator of type @see cl TimeSeriesDifference 

65 """ 

66 BaseReciprocalTimeSeriesTransformer.__init__( 

67 self, estimator.context_length) 

68 if not isinstance(estimator, TimeSeriesDifference): 

69 raise TypeError( # pragma: no cover 

70 f"estimator must be of type TimeSeriesDifference not " 

71 f"{type(estimator)}.") 

72 self.estimator = estimator 

73 

74 def fit(self, X=None, y=None, sample_weight=None): 

75 """ 

76 Checks that estimator is fitted. 

77 """ 

78 if not hasattr(self.estimator, 'X_'): 

79 raise RuntimeError( # pragma: no cover 

80 "Estimator is not fitted.") 

81 self.estimator_ = self.estimator 

82 return self 

83 

84 def transform(self, X, y, sample_weight=None): 

85 """ 

86 Transforms both *X* and *y*. 

87 Returns *X* and *y*, returns 

88 *sample_weight* as well if not None. 

89 """ 

90 if len(y.shape) == 1: 

91 y = y.reshape((y.shape[0], 1)) 

92 squeeze = True 

93 else: 

94 squeeze = False 

95 if len(self.estimator_.y_.shape) == 1: 

96 y0 = self.estimator_.y_.reshape((y.shape[0], 1)) 

97 else: 

98 y0 = self.estimator_.y_ 

99 r0 = self.estimator_.X_.shape[0] 

100 

101 nx = numpy.empty((r0 + X.shape[0], X.shape[1]), dtype=X.dtype) 

102 nx[:r0, :] = self.estimator_.X_ 

103 nx[r0:, :] = X 

104 

105 ny = numpy.empty((r0 + X.shape[0], y.shape[1]), dtype=X.dtype) 

106 ny[:r0, :] = y0 

107 ny[r0:, :] = y 

108 

109 for i in range(self.estimator_.degree): 

110 numpy.cumsum(ny[r0 - i - 1:, :], axis=0, out=ny[r0 - i - 1:, :]) 

111 if squeeze: 

112 ny = numpy.squeeze(ny) 

113 if sample_weight is None: 

114 return nx, ny 

115 nw = numpy.zeros(ny.shape[0]) 

116 de = nw.shape[0] - sample_weight.shape[0] 

117 nw[de:] = sample_weight 

118 return nx, ny, nw