Coverage for src/actuariat_python/data/elections.py: 89%

178 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-02 07:38 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Various function to download data about **French** elections. 

5""" 

6import re 

7import os 

8import warnings 

9from html.parser import HTMLParser 

10from html.entities import name2codepoint 

11from http.client import RemoteDisconnected 

12import urllib.error 

13import urllib.request 

14from urllib.error import HTTPError, URLError 

15import pandas 

16from pyquickhelper.loghelper import noLOG 

17from pyquickhelper.filehelper import unzip_files 

18from pyensae.datasource import download_data 

19from pyensae.datasource.http_retrieve import DownloadDataException 

20from .data_exceptions import DataNotAvailableError, DataFormatException 

21 

22 

23def elections_presidentielles_local_files(load=False): 

24 """ 

25 Returns the list of files included in this module about French elections. 

26 

27 @param load True: load the data 

28 @return list of local files 

29 

30 If the data is loaded, the function returns a dictionary of dataframe, 

31 one per round. 

32 """ 

33 this = os.path.dirname(__file__) 

34 data = os.path.abspath(os.path.join(this, "data_elections")) 

35 res = [os.path.join(data, "cdsp_presi2012t1_circ.xls"), 

36 os.path.join(data, "cdsp_presi2012t2_circ.xls")] 

37 for r in res: 

38 if not os.path.exists(r): 

39 raise FileNotFoundError(r) # pragma: no cover 

40 

41 if not load: 

42 return res 

43 df1 = pandas.read_excel(res[0], sheet_name=1) 

44 df2 = pandas.read_excel(res[1], sheet_name=1) 

45 return dict(circ1=df1, circ2=df2) 

46 

47 

48def elections_presidentielles(url=None, local=False, agg=None): 

49 """ 

50 Downloads the data for the French elections from *data.gouv.fr*. 

51 

52 @param url url (None for default value) 

53 @param local prefer local data instead of remote 

54 @param agg kind of aggregation desired (see below) 

55 @return dictionaries of DataFrame (one entry for each round) 

56 

57 The default url comes from 

58 `Elections présidentielle 2012 - Résultats <https://www.data.gouv.fr/fr/datasets/election-presidentielle-2012-resultats-572124/>`_. 

59 You can get more at 

60 `Elections présidentielles 1965-2012 <https://www.data.gouv.fr/fr/datasets/elections-presidentielles-1965-2012-1/>`_. 

61 

62 If url is None, we pull some data from folder 

63 :ref:`data/election <l-data-elections>`. 

64 

65 Parameter *agg*: 

66 

67 * *circ* or *None* for no aggregation 

68 * *dep* to aggregate per department 

69 """ 

70 if agg is None: 

71 if local: 

72 return elections_presidentielles_local_files(load=True) 

73 else: 

74 if url is None: 

75 url = "http://static.data.gouv.fr/ff/e9c9483d39e00030815089aca1e2939f9cb99a84b0136e43056790e47bb4f0.xls" 

76 url0 = None 

77 else: 

78 url0 = url 

79 try: 

80 df = pandas.read_excel(url, sheet_name=None) 

81 return df 

82 except (HTTPError, URLError, TimeoutError) as e: # pragma: no cover 

83 if url0 is None: 

84 return elections_presidentielles_local_files(load=True) 

85 raise DataNotAvailableError( 

86 "unable to get data from " + url) from e 

87 else: 

88 res = elections_presidentielles(url=url, local=local, agg=None) 

89 if agg == "circ": 

90 return res 

91 if agg == "dep": 

92 keys = list(res.keys()) 

93 for k in keys: 

94 col = res[k].columns 

95 key = col[:2] 

96 df = res[k].groupby(list(key)) 

97 df = df.sum() 

98 df = df.reset_index(drop=False) 

99 res["dep" + k[-1:]] = df 

100 return res 

101 raise ValueError( # pragma: no cover 

102 "unkown value for agg: '{0}'".format(agg)) 

103 

104 

105def elections_legislatives_bureau_vote(source=None, folder=".", fLOG=noLOG): 

106 """ 

107 Retrieves data from 

108 `Résultat des élections législatives françaises de 2012 au niveau bureau de vote 

109 <https://www.data.gouv.fr/fr/datasets/resultat-des-elections-legislatives-francaises-de-2012-au-niveau-bureau-de-vote-nd/>`_. 

110 

111 @param source should be None unless you want to use the backup plan ("xd") 

112 @param folder where to download 

113 @return list of dataframe 

114 

115 Others sources: 

116 

117 * `Résultats élections municipales 2014 par bureau de vote 

118 <http://www.nosdonnees.fr/dataset/resultats-elections-municipales-2014-par-bureau-de-vote>`_ 

119 * `Elections 2015 - Découpage des bureaux de Vote 

120 <https://www.data.gouv.fr/fr/datasets/elections-2015-decoupage-des-bureaux-de-vote/>`_ 

121 * `Contours des cantons électoraux départementaux 2015 

122 <https://www.data.gouv.fr/fr/datasets/contours-des-cantons-electoraux-departementaux-2015/>`_ 

123 * `Découpage électoral de la commune, pour les élections législatives 

124 <https://www.data.gouv.fr/fr/datasets/circonscriptions/>`_ (weird bizarre) 

125 * `Statistiques démographiques INSEE sur les nouvelles circonscriptions législatives de 2012 

126 <https://www.data.gouv.fr/fr/datasets/statistiques-demographiques-insee 

127 -sur-les-nouvelles-circonscriptions-legislatives-de-2012-nd/>`_ 

128 """ 

129 if source is None: 

130 try: # pragma: no cover 

131 with urllib.request.urlopen("http://www.nosdonnees.fr/") as f: 

132 url = "http://www.nosdonnees.fr/storage/f/2013-03-05T184148/" 

133 if f is None: 

134 raise RuntimeError( 

135 "Not sure we can continue. Pretty sure we should stop.") 

136 except (urllib.error.HTTPError, RemoteDisconnected): # pragma: no cover 

137 url = "xd" 

138 file = "LG12_BV_T1T2.zip" 

139 else: 

140 url = source 

141 file = "LG12_BV_T1T2.zip" 

142 data = download_data(file, website=url, whereTo=folder, fLOG=fLOG) 

143 res = {} 

144 for d in data: 

145 df = pandas.read_csv(d, encoding="latin-1", sep=";", low_memory=False) 

146 if d.endswith("_T2.txt"): 

147 key = "T2" 

148 elif d.endswith("_T1.txt"): 

149 key = "T1" 

150 else: 

151 raise ValueError( # pragma: no cover 

152 "Unable to guess key for filename: '{0}'".format(d)) 

153 res[key] = df 

154 return res 

155 

156 

157def elections_legislatives_circonscription_geo(source="xd", folder=".", fLOG=noLOG): 

158 """ 

159 Retrieves data from 

160 `Countours des circonscriptions des législatives <https://www.data.gouv.fr/fr/ 

161 datasets/countours-des-circonscriptions-des-legislatives-nd/>`_. 

162 

163 @param source should be None unless you want to use the backup plan ("xd") 

164 @param folder where to download 

165 @return list of dataframe 

166 """ 

167 if source is None: 

168 raise NotImplementedError( # pragma: no cover 

169 "use source='xd'") 

170 url = source 

171 file = "toxicode_circonscriptions_legislatives.zip" 

172 data = download_data(file, website=url, whereTo=folder, fLOG=fLOG) 

173 for d in data: 

174 if d.endswith(".csv"): 

175 df = pandas.read_csv(d, sep=",", encoding="utf-8") 

176 return df 

177 raise DataNotAvailableError( 

178 "unable to find any csv file in '{0}'".format(file)) 

179 

180 

181def elections_vote_places_geo(source="xd", folder=".", fLOG=noLOG): 

182 """ 

183 Retrieves data vote places (bureaux de vote in French) 

184 with geocodes. 

185 

186 @param source should be None unless you want to use the backup plan ("xd") 

187 @param folder where to download 

188 @param fLOG logging function 

189 @return list of dataframe 

190 """ 

191 if source is None: 

192 raise NotImplementedError("use source='xd'") 

193 url = source 

194 file = "bureauxvotegeo.zip" 

195 data = download_data(file, website=url, whereTo=folder, fLOG=fLOG) 

196 for d in data: 

197 if d.endswith(".txt"): 

198 df = pandas.read_csv(d, sep="\t", encoding="utf-8") 

199 return df 

200 raise DataNotAvailableError( 

201 "Unable to find any csv file in '{0}'".format(file)) 

202 

203 

204def villes_geo(folder=".", as_df=False, fLOG=noLOG): 

205 """ 

206 Retrieves data vote places (bureaux de vote in French) 

207 with geocodes. 

208 

209 @param folder where to download 

210 @param as_df return as a dataframe 

211 @param fLOG logging function 

212 @return list of dataframe 

213 """ 

214 this = os.path.abspath(os.path.dirname(__file__)) 

215 data = os.path.join(this, "data_elections", "villesgeo.zip") 

216 geo = unzip_files(data, where_to=folder) 

217 if isinstance(geo, list): 

218 res = geo[0] 

219 else: 

220 res = geo 

221 if as_df: 

222 return pandas.read_csv(res, encoding="utf-8", sep="\t") 

223 return res 

224 

225 

226class _HTMLToText(HTMLParser): 

227 

228 def __init__(self): 

229 HTMLParser.__init__(self) 

230 self._buf = [] 

231 self.hide_output = False 

232 

233 def handle_starttag(self, tag, attrs): 

234 if tag in ('p', 'br') and not self.hide_output: 

235 self._buf.append('\n') 

236 elif tag in ('script', 'style'): 

237 self.hide_output = True 

238 

239 def handle_startendtag(self, tag, attrs): 

240 if tag == 'br': 

241 self._buf.append('\n') 

242 

243 def handle_endtag(self, tag): 

244 if tag == 'p': 

245 self._buf.append('\n') 

246 elif tag in ('script', 'style'): 

247 self.hide_output = False 

248 

249 def handle_data(self, data): 

250 if data and not self.hide_output: 

251 self._buf.append(re.sub(r'\s+', ' ', data)) 

252 

253 def handle_entityref(self, name): 

254 if name in name2codepoint and not self.hide_output: 

255 c = name2codepoint[name] 

256 self._buf.append(c) 

257 

258 def handle_charref(self, name): 

259 if not self.hide_output: 

260 n = int(name[1:], 16) if name.startswith('x') else int(name) 

261 self._buf.append(n) 

262 

263 def get_text(self): 

264 return re.sub(r' +', ' ', ''.join(self._buf)) 

265 

266 

267def html_to_text(html): 

268 """ 

269 Given a piece of HTML, return the plain text it contains. 

270 This handles entities and char refs, but not javascript and stylesheets. 

271 """ 

272 parser = _HTMLToText() 

273 parser.feed(html) 

274 parser.close() 

275 return parser.get_text() 

276 

277 

278def _elections_vote_place_address_patterns_(): 

279 return [ 

280 "bureau( de vote)?[- ]*n[^0-9]([0-9]{1,3})[- ]+(.*?)[- ]+([0-9]{5})[- ]+([-a-zéèàùâêîôûïöäëü']{2,40})[. ]"] 

281 

282 

283def elections_vote_place_address(folder=".", hide_warning=False, fLOG=noLOG): 

284 """ 

285 Scrapes and extracts addresses for every vote place (bureau de vote in French). 

286 

287 @param folder where to download the scraped pages 

288 @param hide_warnings hide warnings 

289 @param fLOG logging function 

290 @return dictionary 

291 

292 The function does not retrieve everything due to the irregular format. 

293 Sometimes, the city is missing or written above. 

294 """ 

295 _elections_vote_place_address_patterns = _elections_vote_place_address_patterns_() 

296 

297 files = [] 

298 for deps in range(1, 96): 

299 last = "bureaudevote%02d.htm" % deps 

300 url = "http://bureaudevote.fr/" 

301 try: 

302 f = download_data(last, website=url, whereTo=folder, fLOG=fLOG) 

303 except (urllib.error.HTTPError, DownloadDataException): # pragma: no cover 

304 # backup plan 

305 files = download_data("bureauxdevote.zip", 

306 website="xd", whereTo=folder, fLOG=fLOG) 

307 break 

308 if isinstance(f, list): 

309 f = f[0] 

310 files.append(f) 

311 

312 # extract data 

313 regex = [re.compile(_) for _ in _elections_vote_place_address_patterns] 

314 rows = [] 

315 exc = [] 

316 for data in files: 

317 lrows = [] 

318 with open(data, "r", encoding="iso-8859-1") as f: 

319 content = f.read().lower() 

320 content = html_to_text(content) 

321 content0 = content 

322 content = content.replace("\n", " ").replace("\t", " ") 

323 atous = [] 

324 for reg in regex: 

325 atous.extend(reg.findall(content)) 

326 if len(atous) < 4 and len(atous) > 0: 

327 mes = "Not enough vote places ({2}) in\n{0}\nFOUND\n{3}\nCONTENT\n{1}".format( 

328 data, content0, len(atous), "\n".join(str(_) for _ in atous)) 

329 exc.append(Exception(mes)) 

330 if len(atous) > 1: 

331 for t in atous: 

332 ad = t[-3].split("-") 

333 address = ad[-1].strip(" ./<>-") 

334 place = "-".join(ad[:-1]).strip(" ./<>-") 

335 if "bureau de vote" in place: 

336 if not hide_warning: 

337 warnings.warn("Too long address {0}".format(t)) 

338 else: 

339 try: 

340 lrows.append(dict(n=int(t[1]), city=t[-1].strip(" .<>/"), 

341 zip=t[-2], address=address, 

342 place=place)) 

343 except ValueError as e: # pragma: no cover 

344 raise DataFormatException( 

345 "issue with {0}".format(t)) from e 

346 if len(lrows[-1]["city"]) <= 1: 

347 mes = "No City in {0}\nROWS\n{2}\nCONTENT\n{1}".format( 

348 t, content0, "\n".join(str(_) for _ in lrows)) # pragma: no cover 

349 raise DataFormatException(mes) # pragma: no cover 

350 if lrows: 

351 rows.extend(lrows) 

352 elif "06.htm" in data: 

353 mes = "Not enough vote places ({2}) in\n{0}\nFOUND\n{3}\nCONTENT\n{1}".format( 

354 data, content0, len(lrows), "\n".join(str(_) for _ in lrows)) # pragma: no cover 

355 raise DataFormatException(mes) # pragma: no cover 

356 if len(exc) > 2: 

357 mes = "Exception raised: {0}\n---------\n{1}".format( # pragma: no cover 

358 len(exc), "\n########################\n".join(str(_) for _ in exc)) 

359 raise DataFormatException(mes) # pragma: no cover 

360 return pandas.DataFrame(rows)