Coverage for src/sparkouille/datasets/eurostat.py: 85%

67 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 14:24 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Datasets from :epkg:`Eurostat`. 

5""" 

6import os 

7import gzip 

8import numpy 

9import pandas 

10import pyensae.datasource 

11from pyquickhelper.loghelper import noLOG 

12 

13 

14def table_mortalite_euro_stat( 

15 url="http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file=data/", 

16 name="demo_mlifetable.tsv.gz", final_name="mortalite.txt", 

17 whereTo=".", stop_at=None, fLOG=noLOG): 

18 """ 

19 This function retrieves mortality table from `EuroStat 

20 <http://ec.europa.eu/eurostat/fr>`_ through 

21 `table de mortalité <http://www.data-publica.com/ 

22 opendata/7098--population-et-conditions-sociales-table-de-mortalite-de-1960-a-2010>`_ 

23 (*this link is currently broken, data-publica does not provide 

24 such a database anymore, a copy is provided*). 

25 

26 @param url data source 

27 @param name data table name 

28 @param final_name the data is compressed, it needs to be uncompressed into a file, 

29 this parameter defines its name 

30 @param whereTo data needs to be downloaded, location of this place 

31 @param stop_at the overall process is quite long, if not None, 

32 it only keeps the first rows 

33 @param fLOG logging function 

34 @return data_frame 

35 

36 The function checks the file final_name exists. 

37 If it is the case, the data is not downloaded twice. 

38 The header contains a weird format as coordinates are separated by a comma:: 

39 

40 indic_de,sex,age,geo\\time 2013 2012 2011 2010 2009 

41 

42 We need to preprocess the data to split this information into columns. 

43 The overall process takes 4-5 minutes, 10 seconds to download (< 10 Mb), 

44 4-5 minutes to preprocess the data (it could be improved). The processed data 

45 contains the following columns:: 

46 

47 ['annee', 'valeur', 'age', 'age_num', 'indicateur', 'genre', 'pays'] 

48 

49 Columns *age* and *age_num* look alike. *age_num* is numeric and is equal 

50 to *age* except when *age_num* is 85. Everybody above that age 

51 fall into the same category. The table contains many indicators: 

52 

53 * PROBSURV: Probabilité de survie entre deux âges exacts (px) 

54 * LIFEXP: Esperance de vie à l'âge exact (ex) 

55 * SURVIVORS: Nombre des survivants à l'âge exact (lx) 

56 * PYLIVED: Nombre d'années personnes vécues entre deux âges exacts (Lx) 

57 * DEATHRATE: Taux de mortalité à l'âge x (Mx) 

58 * PROBDEATH: Probabilité de décès entre deux âges exacts (qx) 

59 * TOTPYLIVED: Nombre total d'années personne vécues après l'âge exact (Tx) 

60 """ 

61 if os.path.exists(final_name) and os.stat(final_name).st_size > 1e7: 

62 return final_name 

63 

64 temp = final_name + ".remove.txt" 

65 if not os.path.exists(temp) or os.stat(temp).st_size < 1e7: 

66 local = pyensae.datasource.download_data( 

67 name, url=url, whereTo=whereTo) 

68 local = local[0] + ".gz" 

69 with gzip.open(local, 'rb') as f: 

70 file_content = f.read() 

71 content = str(file_content, encoding="utf8") 

72 with open(temp, "w", encoding="utf8") as f: 

73 f.write(content) 

74 

75 def format_age(s): 

76 "local function" 

77 if s.startswith("Y_"): 

78 if s.startswith("Y_LT"): 

79 return "YLT" + s[4:] 

80 if s.startswith("Y_GE"): 

81 return "YGE" + s[4:] 

82 raise SyntaxError(s) # pragma: no cover 

83 i = int(s.strip("Y")) 

84 return "Y%02d" % i 

85 

86 def format_age_num(s): 

87 "local function" 

88 if s.startswith("Y_"): 

89 if s.startswith("Y_LT"): 

90 return float(s.replace("Y_LT", "")) 

91 if s.startswith("Y_GE"): 

92 return float(s.replace("Y_GE", "")) 

93 raise SyntaxError(s) # pragma: no cover 

94 i = int(s.strip("Y")) 

95 return float(i) 

96 

97 def format_value(s): 

98 "local function" 

99 if s.strip() == ":": 

100 return numpy.nan 

101 return float(s.strip(" ebp")) 

102 

103 fLOG("step 0, reading") 

104 dff = pandas.read_csv(temp, sep="\t", encoding="utf8") 

105 

106 if stop_at is not None: 

107 fLOG("step 0, shortening") 

108 dfsmall = dff.head(n=stop_at) 

109 df = dfsmall 

110 else: 

111 df = dff 

112 

113 fLOG("step 1, size=", df.shape) 

114 dfi = df.reset_index().set_index("indic_de,sex,age,geo\\time") 

115 dfi = dfi.drop('index', axis=1) 

116 dfs = dfi.stack() 

117 dfs = pandas.DataFrame({"valeur": dfs}) 

118 

119 fLOG("step 2, size=", dfs.shape) 

120 dfs["valeur"] = dfs["valeur"].astype(str) 

121 dfs["valeur"] = dfs["valeur"].apply(format_value) 

122 dfs = dfs[dfs.valeur >= 0].copy() 

123 dfs = dfs.reset_index() 

124 dfs.columns = ["index", "annee", "valeur"] 

125 

126 fLOG("step 3, size=", dfs.shape) 

127 dfs["age"] = dfs["index"].apply(lambda i: format_age(i.split(",")[2])) 

128 dfs["age_num"] = dfs["index"].apply( 

129 lambda i: format_age_num(i.split(",")[2])) 

130 dfs["indicateur"] = dfs["index"].apply(lambda i: i.split(",")[0]) 

131 dfs["genre"] = dfs["index"].apply(lambda i: i.split(",")[1]) 

132 dfs["pays"] = dfs["index"].apply(lambda i: i.split(",")[3]) 

133 

134 fLOG("step 4") 

135 dfy = dfs.drop('index', axis=1) 

136 dfy.to_csv(final_name, sep="\t", encoding="utf8", index=False) 

137 return final_name