Coverage for src/manydataapi/plotting/timeseries.py: 98%

58 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-02 08:38 +0200

1""" 

2@file 

3@brief Common plots for timeseries. 

4""" 

5import numpy 

6 

7 

8def get_index_date(df): 

9 """ 

10 Returns the only column date. 

11 Raises an exception otherwise. 

12 

13 @param df dataframe 

14 @return column name 

15 """ 

16 df = df.select_dtypes(include=[numpy.datetime64]) 

17 if df.shape[1] != 1: 

18 raise RuntimeError( # pragma: no cover 

19 "Unable to find a single column date in {}.".format( 

20 list(zip(df.columns, df.dtypes)))) 

21 return df.columns[0] 

22 

23 

24def get_new_column(df, name): 

25 """ 

26 Get a new column which does not exists in df. 

27 

28 @param name suggestion 

29 """ 

30 while name in df.columns: 

31 name += "_" 

32 return name 

33 

34 

35def plot_aggregated_ts(df, value, date=None, agg="month", ax=None, 

36 kind='bar', **kwargs): 

37 """ 

38 Plots an aggregated time series by a period of time. 

39 

40 @param df dataframe 

41 @param value column to show 

42 @param date column to use as a date, 

43 if None, it assume there is one and only one 

44 @param agg aggregation by ``'month'``, ``'day'``, 

45 ``'year'``, ``'weekday'``, ``'hour'``, 

46 ``weekhour'`` 

47 @param kind graph style 

48 @param ax existing ax 

49 @param kwargs additional parameter for the graph 

50 @return ax 

51 

52 .. plot:: 

53 

54 import matplotlib.pyplot as plt 

55 from manaydataapi.timeseries import plot_aggregated_ts, daily_timeseries 

56 df = plot_aggregated_ts() 

57 plot_aggregated_ts(df, value='X', agg='month') 

58 plt.show() 

59 """ 

60 if not ax: 

61 import matplotlib.pyplot as plt # pragma: no cover 

62 ax = plt.gca() # pragma: no cover 

63 if date is None: 

64 date = get_index_date(df) 

65 df = df[[date, value]].copy() 

66 

67 if agg == 'weekhour': 

68 col1 = get_new_column(df, 'weekday') 

69 df[col1] = df[date].dt.weekday 

70 col2 = get_new_column(df, 'hour') 

71 df[col2] = df[date].dt.hour 

72 key = [col2, col1] 

73 vals = [_ for _ in sorted(set(df[col1])) if not numpy.isnan(_)] 

74 drop_cols = [col1] 

75 if date is not None: 

76 drop_cols.append(date) 

77 for v in vals: 

78 gr = df[df[col1] == v].drop(drop_cols, axis=1).groupby(col2).sum() 

79 gr.columns = ['wk=%d' % v] 

80 gr.plot(kind=kind, ax=ax, **kwargs) 

81 else: 

82 if agg == "month": 

83 col1 = get_new_column(df, 'month') 

84 df[col1] = df[date].dt.month 

85 col2 = get_new_column(df, 'year') 

86 df[col2] = df[date].dt.year 

87 key = [col2, col1] 

88 elif agg == 'year': 

89 col2 = get_new_column(df, 'year') 

90 df[col2] = df[date].dt.year 

91 key = col2 

92 elif agg == 'day': 

93 col1 = get_new_column(df, 'month') 

94 df[col1] = df[date].dt.month 

95 col2 = get_new_column(df, 'year') 

96 df[col2] = df[date].dt.year 

97 col3 = get_new_column(df, 'day') 

98 df[col3] = df[date].dt.day 

99 key = [col2, col1, col3] 

100 elif agg == 'weekday': 

101 col1 = get_new_column(df, 'weekday') 

102 df[col1] = df[date].dt.weekday 

103 key = col1 

104 elif agg == 'hour': 

105 col1 = get_new_column(df, 'hour') 

106 df[col1] = df[date].dt.hour 

107 key = col1 

108 else: 

109 raise ValueError("Unknown aggregation '{}'.".format(agg)) 

110 gr = df.drop(date, axis=1).groupby(key).sum() 

111 gr.plot(kind=kind, ax=ax, **kwargs) 

112 return ax