Coverage for src/ensae_teaching_cs/special/elections.py: 85%

227 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-04-28 06:23 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Contains a class to process elections results (France) 

5""" 

6import random 

7import numpy 

8import pandas 

9 

10 

11class ElectionResults: 

12 """ 

13 Processes data coming from 

14 `data.gouv.fr <http://www.data.gouv.fr/content/search?SortBy=Pertinence&SortOrder=0&SearchText=%C3%A9lections+2012>`_. 

15 

16 The class uses `pandas <http://pandas.pydata.org/>`_ to process the data. 

17 See `Elections françaises <http://www.xavierdupre.fr/blog/2013-12-06_nojs.html>`_. 

18 See `Optimisation sous contraintes appliquée au calcul du report des voix <http://www.xavierdupre.fr/blog/2013-12-07_nojs.html>`_. 

19 """ 

20 

21 def __init__(self, file, year=None, level="Départements"): 

22 """ 

23 Loads the data downloaded from 

24 `data.gouv.fr <http://www.data.gouv.fr/content/search?SortBy=Pertinence&SortOrder=0&SearchText=%C3%A9lections+2012>`_. 

25 

26 @param file xls file 

27 @param year year (optional) 

28 @param level ``Départements`` or ``Cantons`` 

29 """ 

30 self.year = year 

31 self.level = level.lower().replace("s", "") 

32 if isinstance(file, list): 

33 self.tours = file 

34 else: 

35 self.tours = [pandas.read_excel(file, sheet_name=f"{level} T1", engine='openpyxl'), 

36 pandas.read_excel(file, sheet_name=f"{level} T2", engine='openpyxl')] 

37 for i, t in enumerate(self.tours): 

38 if len(t) == 0: 

39 raise RuntimeError("no data for tour %d" % (i + 1)) 

40 self.tours = [self.process_tour(_) for _ in self.tours] 

41 for i, t in enumerate(self.tours): 

42 if len(t) == 0: 

43 raise RuntimeError("no data for tour %d" % i) 

44 try: 

45 self.tours = [ 

46 _.sort_values(f"Libellé du {self.level}", inplace=False) for _ in self.tours] 

47 except Exception as e: 

48 message = "unable to sort, shape={1} columns={0}".format( 

49 ",".join(self.tours[0].columns), self.tours[0].shape) 

50 raise RuntimeError(message) from e 

51 

52 def get_candidates_votes(self, round): 

53 """ 

54 Returns the numbers of voters for each candidate. 

55 

56 @param round 0 or 1 

57 @return dictionary 

58 """ 

59 cols0 = [_ for _ in self.tours[ 

60 round].columns if _ not in self.LevelCol] 

61 sums = [self.tours[round][c].sum() for c in cols0] 

62 return {c: s for c, s in zip(cols0, sums)} 

63 

64 def correct(self, method=None): 

65 """ 

66 Corrects the second round in a way there is the same number of voters. 

67 

68 @param method some preprocess before going on (see below) 

69 

70 About ``method``: 

71 

72 - *'N'* --> correct the number of voters for each regions 

73 - *'cand'* --> gives the same weights to every candidates 

74 """ 

75 if method == "N": 

76 if len(self.T0) != len(self.T1): 

77 raise RuntimeError( 

78 "unable to proceed because of different numbers of regions") 

79 cols0 = [_ for _ in self.tours[ 

80 0].columns if _ not in self.LevelCol] 

81 cols1 = [_ for _ in self.tours[ 

82 1].columns if _ not in self.LevelCol] 

83 for i in range(len(self.T0)): 

84 s1 = self.T0.loc[i, cols0].sum() 

85 s2 = self.T1.loc[i, cols1].sum() 

86 coef = 1.0 * s1 / s2 

87 for c in cols1: 

88 self.T1.loc[i, c] *= coef 

89 elif method == "cand": 

90 cols0 = [_ for _ in self.tours[ 

91 0].columns if _ not in self.LevelCol] 

92 sums = [self.T0[c].sum() for c in cols0] 

93 total = sum(sums) 

94 for c, s in zip(cols0, sums): 

95 self.T0[c] = self.T0[c] * total / s 

96 self.correct("N") 

97 else: 

98 raise NotImplementedError("unknown method: " + method) 

99 

100 def __str__(self): 

101 """usual""" 

102 message = "Year: {0} T1: {1} T2: {2}".format( 

103 self.Year, len(self.tours[0]), len(self.tours[1])) 

104 return message 

105 

106 def GetNbCandidates(self, round): 

107 """ 

108 Returns the number of candidates. 

109 @param round round (0 or 1) 

110 @return number of candidates 

111 """ 

112 return len(self.tours[round].columns) - 4 

113 

114 @property 

115 def Year(self): 

116 """ 

117 Returns the year. 

118 """ 

119 return self.year 

120 

121 @property 

122 def Level(self): 

123 """ 

124 Returns the level (``département`` or ``canton``). 

125 """ 

126 return self.level 

127 

128 @property 

129 def LevelCol(self): 

130 """ 

131 Returns the column associated to the level (their name depends on the level). 

132 """ 

133 return [f"Code du {self.level}", f"Libellé du {self.level}"] 

134 

135 @property 

136 def T0(self): 

137 """ 

138 Returns the dataframe for the first round. 

139 """ 

140 return self.tours[0] 

141 

142 @property 

143 def T1(self): 

144 """ 

145 Returns the dataframe for the second round. 

146 """ 

147 return self.tours[1] 

148 

149 def process_tour(self, tour): 

150 """ 

151 Keeps the interesting columns, move candidates name as column name. 

152 

153 @param tour dataframe 

154 @return dataframe 

155 """ 

156 keep = [isinstance(_, (float, int, numpy.int64, numpy.float64)) and ~numpy.isnan(_) 

157 for _ in tour["Abstentions"]] 

158 tour = tour.loc[keep, :] 

159 names = [_ for _ in tour.columns if _.startswith("Nom")] 

160 res = [] 

161 for n in names: 

162 c = list(tour[n]) 

163 res.extend(c) 

164 unique = set(res) 

165 unique = list(unique) 

166 

167 try: 

168 unique.sort() 

169 except TypeError as e: 

170 msg = ','.join(tour.columns) 

171 raise RuntimeError( 

172 f"Unable to sort {unique!r}\ncolumns:\n{msg}") from e 

173 

174 columns0 = [f"Code du {self.level}", f"Libellé du {self.level}", ] 

175 columns1 = ["Abstentions", "Blancs et nuls", ] 

176 

177 def proc(row): 

178 res = {} 

179 for i, v in enumerate(row): 

180 k = tour.columns[i] 

181 if k in columns0: 

182 res[k] = row[i] 

183 elif k in columns1: 

184 res[k] = row[i] 

185 elif k.startswith("Nom"): 

186 res[v] = row[i + 2] 

187 badkeys = [_ for _ in res if len(_) == 0] 

188 if len(badkeys) > 0: 

189 return None 

190 return res 

191 rows = list(map(lambda r: proc(r), tour.values)) 

192 rows = [_ for _ in rows if _ is not None] 

193 return pandas.DataFrame(rows) 

194 

195 def vote_transfer(self): 

196 """ 

197 Computes the votes between the two rounds using 

198 contrainsts optimization, the optimization 

199 requires :epkg:`cvxopt`. 

200 

201 See `Optimisation sous contraintes appliquée au calcul du report des voix <http://www.xavierdupre.fr/blog/2013-12-07_nojs.html>`_. 

202 

203 @return results (as a DataFrame) 

204 """ 

205 cols0 = [_ for _ in self.tours[0] if _ not in self.LevelCol] 

206 X = self.tours[0][cols0].values 

207 X = numpy.matrix(X) 

208 

209 cols1 = [_ for _ in self.tours[1] if _ not in self.LevelCol] 

210 Y = self.tours[1][cols1].values 

211 Y = numpy.matrix(Y) 

212 

213 nbC = Y.shape[1] 

214 lin, col = X.shape 

215 

216 # construction de Q 

217 def _zeros(lin, col): 

218 return [[0.0 for i in range(0, col)] for j in range(0, lin)] 

219 bigX = [numpy.matrix(_zeros(lin, col * nbC)) for i in range(0, nbC)] 

220 

221 for i in range(0, nbC): 

222 bigX[i][:, col * i:col * (i + 1)] = X[:, :] 

223 

224 pX = [] 

225 for m in bigX: 

226 pX.append(m.transpose() * m) 

227 

228 Q = None 

229 for m in pX: 

230 if Q is None: 

231 Q = +m 

232 else: 

233 Q += m * 2 

234 

235 # construction de p 

236 p = None 

237 for i in range(0, nbC): 

238 tr = bigX[i].transpose() 

239 y2 = Y[:, i] * (-2) 

240 t = tr * y2 

241 if p is None: 

242 p = t 

243 else: 

244 p += t 

245 

246 # construction de G, h 

247 def _identite(n): 

248 return [[0.0 if i != j else 1.0 for i in range(0, n)] for j in range(0, n)] 

249 h = numpy.matrix(_zeros(col * nbC, 1)) 

250 G = - numpy.matrix(_identite(col * nbC)) 

251 

252 # construction de C,b 

253 b = numpy.matrix(_zeros(col, 1)) 

254 b[:, :] = 1.0 

255 C = numpy.matrix(_zeros(col, col * nbC)) 

256 for i in range(0, col): 

257 for ni in range(0, nbC): 

258 C[i, i + col * ni] = 1.0 

259 

260 # résolutation 

261 from cvxopt import solvers 

262 from cvxopt import matrix 

263 

264 Q = matrix(Q) 

265 p = matrix(p) 

266 G = matrix(G) 

267 h = matrix(h) 

268 C = matrix(C) 

269 b = matrix(b) 

270 

271 old = solvers.options.get("show_progress", True) 

272 solvers.options["show_progress"] = False 

273 sol = solvers.qp(Q, p, G, h, C, b) 

274 solvers.options["show_progress"] = old 

275 coef = sol['x'] 

276 

277 res = numpy.matrix(_zeros(col, nbC)) 

278 for i in range(0, nbC): 

279 res[:, i] = coef[col * i:col * (i + 1)] 

280 

281 rown = [_ for _ in self.tours[0].columns if _ not in self.LevelCol] 

282 coln = [_ for _ in self.tours[1].columns if _ not in self.LevelCol] 

283 return pandas.DataFrame(data=res, index=rown, columns=coln) 

284 

285 def resample(self, method="uniform"): 

286 """ 

287 Builds a new sample: it produces a results with the same number of 

288 rows, but each rows is randomly drawn from the current data. 

289 This is needed for the bootstrap procedures. 

290 

291 @param method ``weight`` or ``uniform`` 

292 @return two matrices 

293 """ 

294 if len(self.T0) != len(self.T1): 

295 raise RuntimeError( 

296 "unable to proceeed, we need to draw the same regions, assuming both matrices are sorted in the same order") 

297 

298 def resample_matrix(mat, h): 

299 return mat.loc[h, :] 

300 if method == "uniform": 

301 n = len(self.T0) 

302 h = [random.randint(0, n - 1) for i in range(0, n)] 

303 else: 

304 def find_index(x): 

305 s = 0 

306 for i, _ in enumerate(self.WeightsNorm): 

307 s += _ 

308 if x < s: 

309 return i 

310 return len(self.WeightsNorm) - 1 

311 n = len(self.T0) 

312 h = [find_index(random.random()) for i in range(0, n)] 

313 

314 return ElectionResults([resample_matrix(self.T0, h), 

315 resample_matrix(self.T1, h), ], 

316 year=self.year, level=self.level) 

317 

318 def get_people(self, round=0): 

319 """ 

320 Returns the number of people per regions. 

321 @param round first (0) or second (1) round 

322 @return series 

323 """ 

324 return self.tours[round].apply(lambda row: sum([row[_] for _ in self.tours[round].columns if _ not in self.LevelCol]), axis=1) 

325 

326 @property 

327 def WeightsNorm(self): 

328 """ 

329 Returns the proportion of voters for each regions. 

330 """ 

331 if "weightsnorm" not in self.__dict__: 

332 self.weightsnorm = list(self.get_people()) 

333 s = sum(self.weightsnorm) 

334 self.weightsnorm = [_ * 1.0 / s for _ in self.weightsnorm] 

335 return self.weightsnorm 

336 

337 @staticmethod 

338 def min_max_mean_std(series, alpha=0.05): 

339 """ 

340 returns the mean standard deviation, bounds of the confidence interval 

341 

342 @param series list of numbers 

343 @param alpha confidence level 

344 @return mean, std, low, high 

345 """ 

346 series = list(sorted(series)) 

347 a = int(len(series) * alpha / 2) 

348 low, high = series[a], series[-a - 1] 

349 mean = sum(series) / len(series) 

350 std = sum([(x - mean) ** 2 for x in series]) / len(series) 

351 return mean, std ** 0.5, low, high 

352 

353 def bootstrap(self, iter=1000, method="vote_transfer", alpha=0.05, fLOG=None, **params): 

354 """ 

355 Uses the bootstrap method to compute confidence intervals 

356 see `bootstrap <http://fr.wikipedia.org/wiki/Bootstrap_%28statistiques%29>`_. 

357 

358 @param iter number of iteration 

359 @param method method to bootstrap 

360 @param alpha confidence level 

361 @param fLOG logging function or none 

362 @param params parameters to give to ``method`` 

363 @return four matrices, averaged results, sigma, lower bound, higher bound 

364 """ 

365 if fLOG is None: 

366 fLOG = lambda *x: "" 

367 fLOG("sampling", iter) 

368 samples = [self.resample() for i in range(iter)] 

369 if method == "vote_transfer": 

370 matrices = [_.vote_transfer(**params) for _ in samples] 

371 else: 

372 raise NotImplementedError() 

373 

374 mean = matrices[0].copy() 

375 std = matrices[0].copy() 

376 low = matrices[0].copy() 

377 high = matrices[0].copy() 

378 

379 shape = mean.shape 

380 fLOG("level for each coefficient", shape) 

381 for i in range(0, shape[0]): 

382 for j in range(0, shape[1]): 

383 series = [m.iloc[i, j] for m in matrices] 

384 xmean, xstd, xlow, xhigh = ElectionResults.min_max_mean_std( 

385 series, alpha=alpha) 

386 mean.iloc[i, j] = xmean 

387 std.iloc[i, j] = xstd 

388 low.iloc[i, j] = xlow 

389 high.iloc[i, j] = xhigh 

390 return mean, std, low, high 

391 

392 @staticmethod 

393 def combine_into_string(matrices, float_format=str, agg_format=str): 

394 """ 

395 Combines two matrices into one before displaying it. 

396 

397 @param matrices list of matrices (same dimension) 

398 @param float_format to format each float of all matrices 

399 @param agg_format to build the aggregated string 

400 @return matrixes (dataframe) 

401 

402 Example: 

403 

404 :: 

405 

406 def pour(x) : 

407 if x < 0.01 : return "" 

408 else : return "%2.0f" % (x*100) + "%" 

409 

410 boot = el.bootstrap(iter=10) 

411 comb = el.combine_string( [boot[2],boot[3]], pour, lambda v : "%s-%s" % tuple(v)) 

412 """ 

413 shape = matrices[0].shape 

414 res = [["" for i in range(shape[1])] for j in range(shape[0])] 

415 for i in range(0, shape[0]): 

416 for j in range(0, shape[1]): 

417 series = [float_format(m.iloc[i, j]) for m in matrices] 

418 res[i][j] = agg_format(series) 

419 return pandas.DataFrame(data=res, columns=list(matrices[0].columns), index=list(matrices[0].index))