Coverage for src/actuariat_python/data/elections.py: 89%
178 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-02 07:38 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-02 07:38 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Various function to download data about **French** elections.
5"""
6import re
7import os
8import warnings
9from html.parser import HTMLParser
10from html.entities import name2codepoint
11from http.client import RemoteDisconnected
12import urllib.error
13import urllib.request
14from urllib.error import HTTPError, URLError
15import pandas
16from pyquickhelper.loghelper import noLOG
17from pyquickhelper.filehelper import unzip_files
18from pyensae.datasource import download_data
19from pyensae.datasource.http_retrieve import DownloadDataException
20from .data_exceptions import DataNotAvailableError, DataFormatException
23def elections_presidentielles_local_files(load=False):
24 """
25 Returns the list of files included in this module about French elections.
27 @param load True: load the data
28 @return list of local files
30 If the data is loaded, the function returns a dictionary of dataframe,
31 one per round.
32 """
33 this = os.path.dirname(__file__)
34 data = os.path.abspath(os.path.join(this, "data_elections"))
35 res = [os.path.join(data, "cdsp_presi2012t1_circ.xls"),
36 os.path.join(data, "cdsp_presi2012t2_circ.xls")]
37 for r in res:
38 if not os.path.exists(r):
39 raise FileNotFoundError(r) # pragma: no cover
41 if not load:
42 return res
43 df1 = pandas.read_excel(res[0], sheet_name=1)
44 df2 = pandas.read_excel(res[1], sheet_name=1)
45 return dict(circ1=df1, circ2=df2)
48def elections_presidentielles(url=None, local=False, agg=None):
49 """
50 Downloads the data for the French elections from *data.gouv.fr*.
52 @param url url (None for default value)
53 @param local prefer local data instead of remote
54 @param agg kind of aggregation desired (see below)
55 @return dictionaries of DataFrame (one entry for each round)
57 The default url comes from
58 `Elections présidentielle 2012 - Résultats <https://www.data.gouv.fr/fr/datasets/election-presidentielle-2012-resultats-572124/>`_.
59 You can get more at
60 `Elections présidentielles 1965-2012 <https://www.data.gouv.fr/fr/datasets/elections-presidentielles-1965-2012-1/>`_.
62 If url is None, we pull some data from folder
63 :ref:`data/election <l-data-elections>`.
65 Parameter *agg*:
67 * *circ* or *None* for no aggregation
68 * *dep* to aggregate per department
69 """
70 if agg is None:
71 if local:
72 return elections_presidentielles_local_files(load=True)
73 else:
74 if url is None:
75 url = "http://static.data.gouv.fr/ff/e9c9483d39e00030815089aca1e2939f9cb99a84b0136e43056790e47bb4f0.xls"
76 url0 = None
77 else:
78 url0 = url
79 try:
80 df = pandas.read_excel(url, sheet_name=None)
81 return df
82 except (HTTPError, URLError, TimeoutError) as e: # pragma: no cover
83 if url0 is None:
84 return elections_presidentielles_local_files(load=True)
85 raise DataNotAvailableError(
86 "unable to get data from " + url) from e
87 else:
88 res = elections_presidentielles(url=url, local=local, agg=None)
89 if agg == "circ":
90 return res
91 if agg == "dep":
92 keys = list(res.keys())
93 for k in keys:
94 col = res[k].columns
95 key = col[:2]
96 df = res[k].groupby(list(key))
97 df = df.sum()
98 df = df.reset_index(drop=False)
99 res["dep" + k[-1:]] = df
100 return res
101 raise ValueError( # pragma: no cover
102 "unkown value for agg: '{0}'".format(agg))
105def elections_legislatives_bureau_vote(source=None, folder=".", fLOG=noLOG):
106 """
107 Retrieves data from
108 `Résultat des élections législatives françaises de 2012 au niveau bureau de vote
109 <https://www.data.gouv.fr/fr/datasets/resultat-des-elections-legislatives-francaises-de-2012-au-niveau-bureau-de-vote-nd/>`_.
111 @param source should be None unless you want to use the backup plan ("xd")
112 @param folder where to download
113 @return list of dataframe
115 Others sources:
117 * `Résultats élections municipales 2014 par bureau de vote
118 <http://www.nosdonnees.fr/dataset/resultats-elections-municipales-2014-par-bureau-de-vote>`_
119 * `Elections 2015 - Découpage des bureaux de Vote
120 <https://www.data.gouv.fr/fr/datasets/elections-2015-decoupage-des-bureaux-de-vote/>`_
121 * `Contours des cantons électoraux départementaux 2015
122 <https://www.data.gouv.fr/fr/datasets/contours-des-cantons-electoraux-departementaux-2015/>`_
123 * `Découpage électoral de la commune, pour les élections législatives
124 <https://www.data.gouv.fr/fr/datasets/circonscriptions/>`_ (weird bizarre)
125 * `Statistiques démographiques INSEE sur les nouvelles circonscriptions législatives de 2012
126 <https://www.data.gouv.fr/fr/datasets/statistiques-demographiques-insee
127 -sur-les-nouvelles-circonscriptions-legislatives-de-2012-nd/>`_
128 """
129 if source is None:
130 try: # pragma: no cover
131 with urllib.request.urlopen("http://www.nosdonnees.fr/") as f:
132 url = "http://www.nosdonnees.fr/storage/f/2013-03-05T184148/"
133 if f is None:
134 raise RuntimeError(
135 "Not sure we can continue. Pretty sure we should stop.")
136 except (urllib.error.HTTPError, RemoteDisconnected): # pragma: no cover
137 url = "xd"
138 file = "LG12_BV_T1T2.zip"
139 else:
140 url = source
141 file = "LG12_BV_T1T2.zip"
142 data = download_data(file, website=url, whereTo=folder, fLOG=fLOG)
143 res = {}
144 for d in data:
145 df = pandas.read_csv(d, encoding="latin-1", sep=";", low_memory=False)
146 if d.endswith("_T2.txt"):
147 key = "T2"
148 elif d.endswith("_T1.txt"):
149 key = "T1"
150 else:
151 raise ValueError( # pragma: no cover
152 "Unable to guess key for filename: '{0}'".format(d))
153 res[key] = df
154 return res
157def elections_legislatives_circonscription_geo(source="xd", folder=".", fLOG=noLOG):
158 """
159 Retrieves data from
160 `Countours des circonscriptions des législatives <https://www.data.gouv.fr/fr/
161 datasets/countours-des-circonscriptions-des-legislatives-nd/>`_.
163 @param source should be None unless you want to use the backup plan ("xd")
164 @param folder where to download
165 @return list of dataframe
166 """
167 if source is None:
168 raise NotImplementedError( # pragma: no cover
169 "use source='xd'")
170 url = source
171 file = "toxicode_circonscriptions_legislatives.zip"
172 data = download_data(file, website=url, whereTo=folder, fLOG=fLOG)
173 for d in data:
174 if d.endswith(".csv"):
175 df = pandas.read_csv(d, sep=",", encoding="utf-8")
176 return df
177 raise DataNotAvailableError(
178 "unable to find any csv file in '{0}'".format(file))
181def elections_vote_places_geo(source="xd", folder=".", fLOG=noLOG):
182 """
183 Retrieves data vote places (bureaux de vote in French)
184 with geocodes.
186 @param source should be None unless you want to use the backup plan ("xd")
187 @param folder where to download
188 @param fLOG logging function
189 @return list of dataframe
190 """
191 if source is None:
192 raise NotImplementedError("use source='xd'")
193 url = source
194 file = "bureauxvotegeo.zip"
195 data = download_data(file, website=url, whereTo=folder, fLOG=fLOG)
196 for d in data:
197 if d.endswith(".txt"):
198 df = pandas.read_csv(d, sep="\t", encoding="utf-8")
199 return df
200 raise DataNotAvailableError(
201 "Unable to find any csv file in '{0}'".format(file))
204def villes_geo(folder=".", as_df=False, fLOG=noLOG):
205 """
206 Retrieves data vote places (bureaux de vote in French)
207 with geocodes.
209 @param folder where to download
210 @param as_df return as a dataframe
211 @param fLOG logging function
212 @return list of dataframe
213 """
214 this = os.path.abspath(os.path.dirname(__file__))
215 data = os.path.join(this, "data_elections", "villesgeo.zip")
216 geo = unzip_files(data, where_to=folder)
217 if isinstance(geo, list):
218 res = geo[0]
219 else:
220 res = geo
221 if as_df:
222 return pandas.read_csv(res, encoding="utf-8", sep="\t")
223 return res
226class _HTMLToText(HTMLParser):
228 def __init__(self):
229 HTMLParser.__init__(self)
230 self._buf = []
231 self.hide_output = False
233 def handle_starttag(self, tag, attrs):
234 if tag in ('p', 'br') and not self.hide_output:
235 self._buf.append('\n')
236 elif tag in ('script', 'style'):
237 self.hide_output = True
239 def handle_startendtag(self, tag, attrs):
240 if tag == 'br':
241 self._buf.append('\n')
243 def handle_endtag(self, tag):
244 if tag == 'p':
245 self._buf.append('\n')
246 elif tag in ('script', 'style'):
247 self.hide_output = False
249 def handle_data(self, data):
250 if data and not self.hide_output:
251 self._buf.append(re.sub(r'\s+', ' ', data))
253 def handle_entityref(self, name):
254 if name in name2codepoint and not self.hide_output:
255 c = name2codepoint[name]
256 self._buf.append(c)
258 def handle_charref(self, name):
259 if not self.hide_output:
260 n = int(name[1:], 16) if name.startswith('x') else int(name)
261 self._buf.append(n)
263 def get_text(self):
264 return re.sub(r' +', ' ', ''.join(self._buf))
267def html_to_text(html):
268 """
269 Given a piece of HTML, return the plain text it contains.
270 This handles entities and char refs, but not javascript and stylesheets.
271 """
272 parser = _HTMLToText()
273 parser.feed(html)
274 parser.close()
275 return parser.get_text()
278def _elections_vote_place_address_patterns_():
279 return [
280 "bureau( de vote)?[- ]*n[^0-9]([0-9]{1,3})[- ]+(.*?)[- ]+([0-9]{5})[- ]+([-a-zéèàùâêîôûïöäëü']{2,40})[. ]"]
283def elections_vote_place_address(folder=".", hide_warning=False, fLOG=noLOG):
284 """
285 Scrapes and extracts addresses for every vote place (bureau de vote in French).
287 @param folder where to download the scraped pages
288 @param hide_warnings hide warnings
289 @param fLOG logging function
290 @return dictionary
292 The function does not retrieve everything due to the irregular format.
293 Sometimes, the city is missing or written above.
294 """
295 _elections_vote_place_address_patterns = _elections_vote_place_address_patterns_()
297 files = []
298 for deps in range(1, 96):
299 last = "bureaudevote%02d.htm" % deps
300 url = "http://bureaudevote.fr/"
301 try:
302 f = download_data(last, website=url, whereTo=folder, fLOG=fLOG)
303 except (urllib.error.HTTPError, DownloadDataException): # pragma: no cover
304 # backup plan
305 files = download_data("bureauxdevote.zip",
306 website="xd", whereTo=folder, fLOG=fLOG)
307 break
308 if isinstance(f, list):
309 f = f[0]
310 files.append(f)
312 # extract data
313 regex = [re.compile(_) for _ in _elections_vote_place_address_patterns]
314 rows = []
315 exc = []
316 for data in files:
317 lrows = []
318 with open(data, "r", encoding="iso-8859-1") as f:
319 content = f.read().lower()
320 content = html_to_text(content)
321 content0 = content
322 content = content.replace("\n", " ").replace("\t", " ")
323 atous = []
324 for reg in regex:
325 atous.extend(reg.findall(content))
326 if len(atous) < 4 and len(atous) > 0:
327 mes = "Not enough vote places ({2}) in\n{0}\nFOUND\n{3}\nCONTENT\n{1}".format(
328 data, content0, len(atous), "\n".join(str(_) for _ in atous))
329 exc.append(Exception(mes))
330 if len(atous) > 1:
331 for t in atous:
332 ad = t[-3].split("-")
333 address = ad[-1].strip(" ./<>-")
334 place = "-".join(ad[:-1]).strip(" ./<>-")
335 if "bureau de vote" in place:
336 if not hide_warning:
337 warnings.warn("Too long address {0}".format(t))
338 else:
339 try:
340 lrows.append(dict(n=int(t[1]), city=t[-1].strip(" .<>/"),
341 zip=t[-2], address=address,
342 place=place))
343 except ValueError as e: # pragma: no cover
344 raise DataFormatException(
345 "issue with {0}".format(t)) from e
346 if len(lrows[-1]["city"]) <= 1:
347 mes = "No City in {0}\nROWS\n{2}\nCONTENT\n{1}".format(
348 t, content0, "\n".join(str(_) for _ in lrows)) # pragma: no cover
349 raise DataFormatException(mes) # pragma: no cover
350 if lrows:
351 rows.extend(lrows)
352 elif "06.htm" in data:
353 mes = "Not enough vote places ({2}) in\n{0}\nFOUND\n{3}\nCONTENT\n{1}".format(
354 data, content0, len(lrows), "\n".join(str(_) for _ in lrows)) # pragma: no cover
355 raise DataFormatException(mes) # pragma: no cover
356 if len(exc) > 2:
357 mes = "Exception raised: {0}\n---------\n{1}".format( # pragma: no cover
358 len(exc), "\n########################\n".join(str(_) for _ in exc))
359 raise DataFormatException(mes) # pragma: no cover
360 return pandas.DataFrame(rows)