Coverage for mlinsights/timeseries/agg.py: 96%

49 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-08-09 08:45 +0200

1""" 

2@file 

3@brief Data aggregation for timeseries. 

4""" 

5import datetime 

6import pandas 

7from pandas.tseries.frequencies import to_offset 

8 

9 

10def _get_column_name(df, name='agg'): 

11 """ 

12 Returns a unique column name not in the existing dataframe. 

13 

14 @param df dataframe 

15 @param name prefix 

16 @return new column name 

17 """ 

18 while name in df.columns: 

19 name += '_' 

20 return name 

21 

22 

23def aggregate_timeseries(df, index='time', values='y', 

24 unit='half-hour', agg='sum', 

25 per=None): 

26 """ 

27 Aggregates timeseries assuming the data is in a dataframe. 

28 

29 @param df dataframe 

30 @param index time column 

31 @param values value or values column 

32 @param unit aggregate over a specific period 

33 @param sum kind of aggregation 

34 @param per second aggregation, per week... 

35 @return aggregated values 

36 """ 

37 if df is None: 

38 if len(values.shape) == 1: 

39 df = pandas.DataFrame(dict(time=index, y=values)) 

40 values = 'y' 

41 else: 

42 df = pandas.DataFrame(dict(time=index)) 

43 for i in range(values.shape[1]): 

44 df['y%d' % i] = values[:, i] 

45 values = list(df.columns)[1:] 

46 index = 'time' 

47 

48 def round_(serie, freq, per): 

49 fr = to_offset(freq) 

50 res = pandas.DatetimeIndex(serie).floor(fr) # pylint: disable=E1101 

51 if per is None: 

52 return res 

53 if per == 'week': 

54 pyres = res.to_pydatetime() 

55 return pandas.to_timedelta( 

56 map( 

57 lambda t: datetime.timedelta( 

58 days=t.weekday(), hours=t.hour, minutes=t.minute), 

59 pyres)) 

60 if per == 'month': 

61 pyres = res.to_pydatetime() 

62 return pandas.to_timedelta( 

63 map( 

64 lambda t: datetime.timedelta( 

65 days=t.day, hours=t.hour, minutes=t.minute), 

66 pyres)) 

67 raise ValueError( # pragma: no cover 

68 f"Unknown frequency '{per}'.") 

69 

70 agg_name = _get_column_name(df) 

71 df = df.copy() 

72 if unit == 'half-hour': 

73 freq = datetime.timedelta(minutes=30) 

74 df[agg_name] = round_(df[index], freq, per) 

75 else: 

76 raise ValueError( # pragma: no cover 

77 f"Unknown time unit '{unit}'.") 

78 if not isinstance(values, list): 

79 values = [values] 

80 if agg == 'sum': 

81 gr = df[[agg_name] + values].groupby(agg_name, as_index=False).sum() 

82 agg_name = _get_column_name(gr, 'week' + index) 

83 gr.columns = [agg_name] + list(gr.columns[1:]) 

84 elif agg == 'norm': 

85 gr = df[[agg_name] + values].groupby(agg_name, as_index=False).sum() 

86 agg_name = _get_column_name(gr, 'week' + index) 

87 agg_cols = list(gr.columns[1:]) 

88 gr.columns = [agg_name] + agg_cols 

89 for c in agg_cols: 

90 su = gr[c].sum() 

91 if su != 0: 

92 gr[c] /= su 

93 else: 

94 raise ValueError( # pragma: no cover 

95 f"Unknown aggregation '{agg}'.") 

96 return gr.sort_values(agg_name).reset_index(drop=True)