Coverage for src/sparkouille/datasets/eurostat.py: 85%
67 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 14:24 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 14:24 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Datasets from :epkg:`Eurostat`.
5"""
6import os
7import gzip
8import numpy
9import pandas
10import pyensae.datasource
11from pyquickhelper.loghelper import noLOG
14def table_mortalite_euro_stat(
15 url="http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file=data/",
16 name="demo_mlifetable.tsv.gz", final_name="mortalite.txt",
17 whereTo=".", stop_at=None, fLOG=noLOG):
18 """
19 This function retrieves mortality table from `EuroStat
20 <http://ec.europa.eu/eurostat/fr>`_ through
21 `table de mortalité <http://www.data-publica.com/
22 opendata/7098--population-et-conditions-sociales-table-de-mortalite-de-1960-a-2010>`_
23 (*this link is currently broken, data-publica does not provide
24 such a database anymore, a copy is provided*).
26 @param url data source
27 @param name data table name
28 @param final_name the data is compressed, it needs to be uncompressed into a file,
29 this parameter defines its name
30 @param whereTo data needs to be downloaded, location of this place
31 @param stop_at the overall process is quite long, if not None,
32 it only keeps the first rows
33 @param fLOG logging function
34 @return data_frame
36 The function checks the file final_name exists.
37 If it is the case, the data is not downloaded twice.
38 The header contains a weird format as coordinates are separated by a comma::
40 indic_de,sex,age,geo\\time 2013 2012 2011 2010 2009
42 We need to preprocess the data to split this information into columns.
43 The overall process takes 4-5 minutes, 10 seconds to download (< 10 Mb),
44 4-5 minutes to preprocess the data (it could be improved). The processed data
45 contains the following columns::
47 ['annee', 'valeur', 'age', 'age_num', 'indicateur', 'genre', 'pays']
49 Columns *age* and *age_num* look alike. *age_num* is numeric and is equal
50 to *age* except when *age_num* is 85. Everybody above that age
51 fall into the same category. The table contains many indicators:
53 * PROBSURV: Probabilité de survie entre deux âges exacts (px)
54 * LIFEXP: Esperance de vie à l'âge exact (ex)
55 * SURVIVORS: Nombre des survivants à l'âge exact (lx)
56 * PYLIVED: Nombre d'années personnes vécues entre deux âges exacts (Lx)
57 * DEATHRATE: Taux de mortalité à l'âge x (Mx)
58 * PROBDEATH: Probabilité de décès entre deux âges exacts (qx)
59 * TOTPYLIVED: Nombre total d'années personne vécues après l'âge exact (Tx)
60 """
61 if os.path.exists(final_name) and os.stat(final_name).st_size > 1e7:
62 return final_name
64 temp = final_name + ".remove.txt"
65 if not os.path.exists(temp) or os.stat(temp).st_size < 1e7:
66 local = pyensae.datasource.download_data(
67 name, url=url, whereTo=whereTo)
68 local = local[0] + ".gz"
69 with gzip.open(local, 'rb') as f:
70 file_content = f.read()
71 content = str(file_content, encoding="utf8")
72 with open(temp, "w", encoding="utf8") as f:
73 f.write(content)
75 def format_age(s):
76 "local function"
77 if s.startswith("Y_"):
78 if s.startswith("Y_LT"):
79 return "YLT" + s[4:]
80 if s.startswith("Y_GE"):
81 return "YGE" + s[4:]
82 raise SyntaxError(s) # pragma: no cover
83 i = int(s.strip("Y"))
84 return "Y%02d" % i
86 def format_age_num(s):
87 "local function"
88 if s.startswith("Y_"):
89 if s.startswith("Y_LT"):
90 return float(s.replace("Y_LT", ""))
91 if s.startswith("Y_GE"):
92 return float(s.replace("Y_GE", ""))
93 raise SyntaxError(s) # pragma: no cover
94 i = int(s.strip("Y"))
95 return float(i)
97 def format_value(s):
98 "local function"
99 if s.strip() == ":":
100 return numpy.nan
101 return float(s.strip(" ebp"))
103 fLOG("step 0, reading")
104 dff = pandas.read_csv(temp, sep="\t", encoding="utf8")
106 if stop_at is not None:
107 fLOG("step 0, shortening")
108 dfsmall = dff.head(n=stop_at)
109 df = dfsmall
110 else:
111 df = dff
113 fLOG("step 1, size=", df.shape)
114 dfi = df.reset_index().set_index("indic_de,sex,age,geo\\time")
115 dfi = dfi.drop('index', axis=1)
116 dfs = dfi.stack()
117 dfs = pandas.DataFrame({"valeur": dfs})
119 fLOG("step 2, size=", dfs.shape)
120 dfs["valeur"] = dfs["valeur"].astype(str)
121 dfs["valeur"] = dfs["valeur"].apply(format_value)
122 dfs = dfs[dfs.valeur >= 0].copy()
123 dfs = dfs.reset_index()
124 dfs.columns = ["index", "annee", "valeur"]
126 fLOG("step 3, size=", dfs.shape)
127 dfs["age"] = dfs["index"].apply(lambda i: format_age(i.split(",")[2]))
128 dfs["age_num"] = dfs["index"].apply(
129 lambda i: format_age_num(i.split(",")[2]))
130 dfs["indicateur"] = dfs["index"].apply(lambda i: i.split(",")[0])
131 dfs["genre"] = dfs["index"].apply(lambda i: i.split(",")[1])
132 dfs["pays"] = dfs["index"].apply(lambda i: i.split(",")[3])
134 fLOG("step 4")
135 dfy = dfs.drop('index', axis=1)
136 dfy.to_csv(final_name, sep="\t", encoding="utf8", index=False)
137 return final_name