Coverage for src/manydataapi/plotting/timeseries.py: 98%
58 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-02 08:38 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-02 08:38 +0200
1"""
2@file
3@brief Common plots for timeseries.
4"""
5import numpy
8def get_index_date(df):
9 """
10 Returns the only column date.
11 Raises an exception otherwise.
13 @param df dataframe
14 @return column name
15 """
16 df = df.select_dtypes(include=[numpy.datetime64])
17 if df.shape[1] != 1:
18 raise RuntimeError( # pragma: no cover
19 "Unable to find a single column date in {}.".format(
20 list(zip(df.columns, df.dtypes))))
21 return df.columns[0]
24def get_new_column(df, name):
25 """
26 Get a new column which does not exists in df.
28 @param name suggestion
29 """
30 while name in df.columns:
31 name += "_"
32 return name
35def plot_aggregated_ts(df, value, date=None, agg="month", ax=None,
36 kind='bar', **kwargs):
37 """
38 Plots an aggregated time series by a period of time.
40 @param df dataframe
41 @param value column to show
42 @param date column to use as a date,
43 if None, it assume there is one and only one
44 @param agg aggregation by ``'month'``, ``'day'``,
45 ``'year'``, ``'weekday'``, ``'hour'``,
46 ``weekhour'``
47 @param kind graph style
48 @param ax existing ax
49 @param kwargs additional parameter for the graph
50 @return ax
52 .. plot::
54 import matplotlib.pyplot as plt
55 from manaydataapi.timeseries import plot_aggregated_ts, daily_timeseries
56 df = plot_aggregated_ts()
57 plot_aggregated_ts(df, value='X', agg='month')
58 plt.show()
59 """
60 if not ax:
61 import matplotlib.pyplot as plt # pragma: no cover
62 ax = plt.gca() # pragma: no cover
63 if date is None:
64 date = get_index_date(df)
65 df = df[[date, value]].copy()
67 if agg == 'weekhour':
68 col1 = get_new_column(df, 'weekday')
69 df[col1] = df[date].dt.weekday
70 col2 = get_new_column(df, 'hour')
71 df[col2] = df[date].dt.hour
72 key = [col2, col1]
73 vals = [_ for _ in sorted(set(df[col1])) if not numpy.isnan(_)]
74 drop_cols = [col1]
75 if date is not None:
76 drop_cols.append(date)
77 for v in vals:
78 gr = df[df[col1] == v].drop(drop_cols, axis=1).groupby(col2).sum()
79 gr.columns = ['wk=%d' % v]
80 gr.plot(kind=kind, ax=ax, **kwargs)
81 else:
82 if agg == "month":
83 col1 = get_new_column(df, 'month')
84 df[col1] = df[date].dt.month
85 col2 = get_new_column(df, 'year')
86 df[col2] = df[date].dt.year
87 key = [col2, col1]
88 elif agg == 'year':
89 col2 = get_new_column(df, 'year')
90 df[col2] = df[date].dt.year
91 key = col2
92 elif agg == 'day':
93 col1 = get_new_column(df, 'month')
94 df[col1] = df[date].dt.month
95 col2 = get_new_column(df, 'year')
96 df[col2] = df[date].dt.year
97 col3 = get_new_column(df, 'day')
98 df[col3] = df[date].dt.day
99 key = [col2, col1, col3]
100 elif agg == 'weekday':
101 col1 = get_new_column(df, 'weekday')
102 df[col1] = df[date].dt.weekday
103 key = col1
104 elif agg == 'hour':
105 col1 = get_new_column(df, 'hour')
106 df[col1] = df[date].dt.hour
107 key = col1
108 else:
109 raise ValueError("Unknown aggregation '{}'.".format(agg))
110 gr = df.drop(date, axis=1).groupby(key).sum()
111 gr.plot(kind=kind, ax=ax, **kwargs)
112 return ax