Source code for aftercovid.preprocess.ts

"""
Preprocesses timeseries about COVID.
"""
import numpy


[docs]def ts_remove_decreasing_values(series): """ Returns a series with no decreasing values (only growing). Data are sometimes normalized and show negative values but the past remains unchanged. This functions decreases past values until the series is growing. :param series: series :return: new series """ def normalize(series, index): origin = series[0] diff = series[1:] - series[:-1] delta = series[index - 1] - series[index] h = int(float(delta) / index) + 1 delta += h delta_ = delta + 1 while delta > 0 and delta < delta_: delta_ = delta pos = index - 2 while pos > 0 and delta > 0: if diff[pos] > h: d = min(h, delta) diff[pos] -= d delta -= d elif diff[pos] > 1: diff[pos] -= 1 delta -= 1 pos -= 1 diff[index - 1] = h series[1:] = origin + diff.cumsum() if series.dtype in (numpy.int64, numpy.int32): if hasattr(series, 'values'): values = series.values.copy() else: values = series.copy() points = [] for i in range(1, len(values)): if values[i] < values[i - 1]: points.append(i) for p in reversed(points): normalize(values, p) return values raise NotImplementedError( "Not implemented for real types.")
[docs]def ts_moving_average(series, n=7, center=True): """ Computes the moving average of a differential series. The function handles nan as well. The outputs does not contain any nan unless there are too many consecutive nans. :param series: timeseries :param n: window :param center: centered average :return: moving average (of same size) """ if hasattr(series, 'values'): cls = series.__class__ columns = getattr(series, 'columns', None) name = getattr(series, 'name', None) index = series.index series = series.values as_df = True else: as_df = False cls = None if center and n % 2 != 1: raise ValueError("If center is True, n should be odd.") dtype = numpy.float64 if series.dtype != numpy.float32 else numpy.float32 series = series.astype(dtype) weights = numpy.ones(series.shape, dtype=dtype) isna = numpy.isnan(series) weights[isna] = 0 series[isna] = 0 ret = numpy.cumsum(series.astype(dtype), axis=0) wet = numpy.cumsum(weights.astype(dtype), axis=0) res = numpy.zeros(ret.shape, dtype) if center: d = n // 2 res[d + 1:-d] = (ret[n:] - ret[:-n]) / (wet[n:] - wet[:-n]) for i in range(0, d + 1): res[i] = numpy.divide(ret[i + d - 1], wet[i + d - 1]) res[-i - 1] = numpy.divide(ret[-1] - ret[-(i + d) - 1], wet[-1] - wet[-(i + d) - 1]) else: res[n:] = (ret[n:] - ret[:-n]) / (wet[n:] - wet[:-n]) for i in range(0, n): res[i] = numpy.divide(ret[i], wet[i]) if as_df: if columns is not None: return cls(series, columns=columns, index=index) if name is not None: return cls(series, name=name, index=index) return res
[docs]def ts_normalise_negative_values(series, n=7, extreme=4): """ *series* is a differential series which should not have any negative values. The function removes unexpected high value and negative value. These extremes are replaced by a local average. The function handles nan as well. The outputs does not contain any nan unless there are too many consecutive nans. :param series: differential values :param n: moving average :param extreme: removes extreme values, if the series is higher or lower than its moverage * th or / th :return: corrected series """ if hasattr(series, 'values'): cls = series.__class__ columns = getattr(series, 'columns', None) name = getattr(series, 'name', None) index = series.index series = series.values as_df = True else: as_df = False mov = ts_moving_average(series, n=n, center=True) series = series.astype(mov.dtype) isna = numpy.isnan(series) series_raw = series series = series.copy().astype(mov.dtype) series[isna] = 0 total = numpy.sum(series, axis=0) rep = (numpy.isnan(series_raw) | (series < 0) | (mov / extreme > series) | (mov * extreme < series)) series[rep] = mov[rep] nonan = series.copy() isna = numpy.isnan(nonan) nonan[isna] = 0 series = numpy.maximum(series, 0) new_total = numpy.sum(nonan, axis=0) series *= total / new_total if as_df: if columns is not None: return cls(series, columns=columns, index=index) if name is not None: return cls(series, name=name, index=index) return series