Coverage for src/mlstatpy/ml/roc.py: 95%

303 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-13 20:42 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief About :epkg:`ROC`. 

5""" 

6import math 

7import itertools 

8from enum import Enum 

9import pandas 

10import numpy 

11 

12 

13class ROC: 

14 """ 

15 Helper to draw a :epkg:`ROC` curve. 

16 """ 

17 

18 class CurveType(Enum): 

19 """ 

20 Curve types: 

21 

22 * *PROBSCORE*: 1 - False Positive / True Positive 

23 * *ERRPREC*: error / recall 

24 * *RECPREC*: precision / recall 

25 * *ROC*: False Positive / True Positive 

26 * *SKROC*: False Positive / True Positive 

27 (`scikit-learn 

28 <http://scikit-learn.org/stable/modules/generated/sklearn.metrics.roc_curve.html>`_) 

29 """ 

30 PROBSCORE = 2 

31 ERRREC = 3 

32 RECPREC = 4 

33 ROC = 5 

34 SKROC = 6 

35 

36 def __init__(self, y_true=None, y_score=None, sample_weight=None, df=None): 

37 """ 

38 Initialisation with a dataframe and two or three columns: 

39 

40 * column 1: score (y_score) 

41 * column 2: expected answer (boolean) (y_true) 

42 * column 3: weight (optional) (sample_weight) 

43 

44 @param y_true if *df* is None, *y_true*, *y_score*, *sample_weight* must be filled, 

45 *y_true* is whether or None the answer is true. 

46 *y_true* means the prediction is right. 

47 @param y_score score prediction 

48 @param sample_weight weights 

49 @param df dataframe or array or list, 

50 it must contains 2 or 3 columns always in the same order 

51 """ 

52 if df is None: 

53 df = pandas.DataFrame() 

54 df["score"] = y_score 

55 df["label"] = y_true 

56 if sample_weight is not None: 

57 df["weight"] = sample_weight 

58 self.data = df 

59 elif isinstance(df, list): 

60 if len(df[0]) == 2: 

61 self.data = pandas.DataFrame(df, columns=["score", "label"]) 

62 else: 

63 self.data = pandas.DataFrame( 

64 df, columns=["score", "label", "weight"]) 

65 elif isinstance(df, numpy.ndarray): 

66 if df.shape[1] == 2: 

67 self.data = pandas.DataFrame(df, columns=["score", "label"]) 

68 else: 

69 self.data = pandas.DataFrame( 

70 df, columns=["score", "label", "weight"]) 

71 elif not isinstance(df, pandas.DataFrame): 

72 raise TypeError( # pragma: no cover 

73 "df should be a DataFrame, not {0}".format(type(df))) 

74 else: 

75 self.data = df.copy() 

76 self.data.sort_values(self.data.columns[0], inplace=True) 

77 self.data.reset_index(drop=True, inplace=True) 

78 if self.data.shape[1] == 2: 

79 self.data["weight"] = 1.0 

80 

81 @property 

82 def Data(self): 

83 """ 

84 Returns the underlying dataframe. 

85 """ 

86 return self.data 

87 

88 def __len__(self): 

89 """ 

90 usual 

91 """ 

92 return len(self.data) 

93 

94 def __repr__(self): 

95 """ 

96 Shows first elements, precision rate. 

97 """ 

98 return self.__str__() 

99 

100 def __str__(self): 

101 """ 

102 Shows first elements, precision rate. 

103 """ 

104 rows = [] 

105 rows.append("Overall precision: %3.2f - AUC=%f" % 

106 (self.precision(), self.auc())) 

107 rows.append("--------------") 

108 rows.append(str(self.data.head(min(5, len(self))))) 

109 rows.append("--------------") 

110 rows.append(str(self.data.tail(min(5, len(self))))) 

111 rows.append("--------------") 

112 roc = self.compute_roc_curve(10, ROC.CurveType.ROC) 

113 rows.append(str(roc)) 

114 rows.append("--------------") 

115 roc = self.compute_roc_curve(10, ROC.CurveType.ERRREC) 

116 rows.append(str(roc)) 

117 return "\n".join(rows) 

118 

119 def confusion(self, score=None, nb=10, curve=CurveType.ROC, bootstrap=False): 

120 """ 

121 Computes the confusion matrix for a specific *score* 

122 or all if *score* is None. 

123 

124 @param score score or None. 

125 @param nb number of scores (if *score* is None) 

126 @param curve see :class:`CurveType <mlstatpy.ml.roc.ROC.CurveType>` 

127 @param boostrap builds the curve after resampling 

128 @return One row if score is precised, many roww is score is None 

129 """ 

130 if not bootstrap: 

131 cloud = self.data.copy() 

132 else: 

133 cloud = self.random_cloud() 

134 

135 if score is None: 

136 

137 sum_weights = cloud[cloud.columns[2]].sum() 

138 if nb <= 0: 

139 nb = len(cloud) 

140 else: 

141 nb = min(nb, len(cloud)) 

142 seuil = numpy.arange(nb + 1) * sum_weights / nb 

143 

144 cloud = cloud.iloc[::-1].copy() 

145 cloud["lw"] = cloud[cloud.columns[2]] * cloud[cloud.columns[1]] 

146 cloud["cw"] = cloud[cloud.columns[2]].cumsum() 

147 cloud["clw"] = cloud["lw"].cumsum() 

148 if cloud.columns[4] != "cw": 

149 raise ValueError( 

150 "Column 4 should be 'cw'.") # pragma: no cover 

151 if cloud.columns[5] != "clw": 

152 raise ValueError( 

153 "Column 5 should be 'clw'.") # pragma: no cover 

154 

155 pos_roc = 0 

156 pos_seuil = 0 

157 if curve is ROC.CurveType.ROC: 

158 roc = pandas.DataFrame(0, index=numpy.arange( 

159 nb + 2), columns=["True Positive", "False Positive", 

160 "False Negative", "True Negative", 

161 "threshold"]) 

162 sum_good_weights = cloud.iloc[-1, 5] 

163 sum_bad_weights = sum_weights - sum_good_weights 

164 roc.iloc[0, 0] = 0 

165 roc.iloc[0, 1] = 0 

166 roc.iloc[0, 2] = sum_good_weights 

167 roc.iloc[0, 3] = sum_bad_weights 

168 roc.iloc[0, 4] = max(cloud.iloc[:, 0]) 

169 pos_roc += 1 

170 for i in range(len(cloud)): 

171 if cloud.iloc[i, 4] > seuil[pos_seuil]: 

172 tp = cloud.iloc[i, 5] 

173 fp = cloud.iloc[i, 4] - cloud.iloc[i, 5] 

174 roc.iloc[pos_roc, 0] = tp 

175 roc.iloc[pos_roc, 1] = fp 

176 roc.iloc[pos_roc, 2] = sum_good_weights - tp 

177 roc.iloc[pos_roc, 3] = sum_bad_weights - fp 

178 roc.iloc[pos_roc, 4] = cloud.iloc[i, 0] 

179 pos_roc += 1 

180 pos_seuil += 1 

181 roc.iloc[pos_roc:, 0] = sum_good_weights 

182 roc.iloc[pos_roc:, 1] = sum_bad_weights 

183 roc.iloc[pos_roc:, 2] = 0 

184 roc.iloc[pos_roc:, 3] = 0 

185 roc.iloc[pos_roc:, 4] = min(cloud.iloc[:, 0]) 

186 return roc 

187 raise NotImplementedError( # pragma: no cover 

188 "Unexpected type '{0}', only ROC is allowed.".format(curve)) 

189 

190 # if score is not None 

191 roc = self.confusion(nb=len(self), curve=curve, 

192 bootstrap=False, score=None) 

193 roc = roc[roc["threshold"] <= score] 

194 if len(roc) == 0: 

195 raise ValueError( # pragma: no cover 

196 "The requested confusion is empty for score={0}.".format(score)) 

197 return roc[:1] 

198 

199 def precision(self): 

200 """ 

201 Computes the precision. 

202 """ 

203 score, weight = self.data.columns[0], self.data.columns[2] 

204 return (self.data[score] * self.data[weight] * 1.0).sum() / self.data[weight].sum() 

205 

206 def compute_roc_curve(self, nb=100, curve=CurveType.ROC, bootstrap=False): 

207 """ 

208 Computes a ROC curve with *nb* points avec nb, 

209 if *nb == -1*, there are as many as points as the data contains, 

210 if *bootstrap == True*, it draws random number to create confidence 

211 interval based on bootstrap method. 

212 

213 @param nb number of points for the curve 

214 @param curve see :class:`CurveType <mlstatpy.ml.roc.ROC.CurveType>` 

215 @param boostrap builds the curve after resampling 

216 @return DataFrame (metrics and threshold) 

217 

218 If *curve* is *SKROC*, the parameter *nb* is not taken into account. 

219 It should be set to 0. 

220 """ 

221 if curve is ROC.CurveType.ERRREC: 

222 roc = self.compute_roc_curve( 

223 nb=nb, curve=ROC.CurveType.RECPREC, bootstrap=bootstrap) 

224 roc["error"] = - roc["precision"] + 1 

225 return roc[["error", "recall", "threshold"]] 

226 if curve is ROC.CurveType.PROBSCORE: 

227 roc = self.compute_roc_curve( 

228 nb=nb, curve=ROC.CurveType.ROC, bootstrap=bootstrap) 

229 roc["P(->s)"] = roc["False Positive Rate"] 

230 roc["P(+<s)"] = - roc["True Positive Rate"] + 1 

231 return roc[["P(+<s)", "P(->s)", "threshold"]] 

232 

233 if not bootstrap: 

234 cloud = self.data.copy() 

235 else: 

236 cloud = self.random_cloud() 

237 

238 if curve is ROC.CurveType.SKROC: 

239 if nb > 0: 

240 raise NotImplementedError( # pragma: no cover 

241 "nb must be <= 0 si curve is SKROC") 

242 from sklearn.metrics import roc_curve 

243 fpr, tpr, thresholds = roc_curve(y_true=cloud[cloud.columns[1]], 

244 y_score=cloud[cloud.columns[0]], 

245 sample_weight=cloud[cloud.columns[2]]) 

246 roc = pandas.DataFrame(0, index=numpy.arange(len(fpr)), 

247 columns=["False Positive Rate", "True Positive Rate", "threshold"]) 

248 roc_cols = list(roc.columns) 

249 roc[roc_cols[0]] = fpr 

250 roc[roc_cols[1]] = tpr 

251 roc[roc_cols[2]] = thresholds 

252 return roc 

253 

254 sum_weights = cloud[cloud.columns[2]].sum() 

255 if nb <= 0: 

256 nb = len(cloud) 

257 else: 

258 nb = min(nb, len(cloud)) 

259 seuil = numpy.arange(nb + 1) * sum_weights / nb 

260 

261 cloud = cloud.iloc[::-1].copy() 

262 cloud["lw"] = cloud[cloud.columns[2]] * cloud[cloud.columns[1]] 

263 cloud["cw"] = cloud[cloud.columns[2]].cumsum() 

264 cloud["clw"] = cloud["lw"].cumsum() 

265 sum_weights_ans = cloud["lw"].sum() 

266 if cloud.columns[4] != "cw": 

267 raise ValueError("Column 4 should be 'cw'.") # pragma: no cover 

268 if cloud.columns[5] != "clw": 

269 raise ValueError("Column 5 should be 'clw'.") # pragma: no cover 

270 

271 pos_roc = 0 

272 pos_seuil = 0 

273 

274 if curve is ROC.CurveType.ROC: 

275 roc = pandas.DataFrame(0, index=numpy.arange( 

276 nb + 1), columns=["False Positive Rate", "True Positive Rate", "threshold"]) 

277 sum_good_weights = cloud.iloc[-1, 5] 

278 sum_bad_weights = sum_weights - sum_good_weights 

279 for i in range(len(cloud)): 

280 if cloud.iloc[i, 4] > seuil[pos_seuil]: 

281 ds = cloud.iloc[i, 4] - cloud.iloc[i, 5] 

282 roc.iloc[pos_roc, 0] = ds / sum_bad_weights 

283 roc.iloc[pos_roc, 1] = cloud.iloc[i, 5] / sum_good_weights 

284 roc.iloc[pos_roc, 2] = cloud.iloc[i, 0] 

285 pos_roc += 1 

286 pos_seuil += 1 

287 roc.iloc[pos_roc:, 0] = ( 

288 cloud.iloc[-1, 4] - cloud.iloc[-1, 5]) / sum_bad_weights 

289 roc.iloc[pos_roc:, 1] = cloud.iloc[-1, 5] / sum_good_weights 

290 roc.iloc[pos_roc:, 2] = cloud.iloc[-1, 0] 

291 

292 elif curve is ROC.CurveType.RECPREC: 

293 roc = pandas.DataFrame(0, index=numpy.arange( 

294 nb + 1), columns=["recall", "precision", "threshold"]) 

295 for i in range(len(cloud)): 

296 if cloud.iloc[i, 4] > seuil[pos_seuil]: 

297 roc.iloc[pos_roc, 0] = cloud.iloc[i, 4] / sum_weights 

298 if cloud.iloc[i, 4] > 0: 

299 roc.iloc[pos_roc, 1] = cloud.iloc[ 

300 i, 5] / cloud.iloc[i, 4] 

301 else: 

302 roc.iloc[pos_roc, 1] = 0.0 

303 roc.iloc[pos_roc, 2] = cloud.iloc[i, 0] 

304 pos_roc += 1 

305 pos_seuil += 1 

306 roc.iloc[pos_roc:, 0] = 1.0 

307 roc.iloc[pos_roc:, 1] = sum_weights_ans / sum_weights 

308 roc.iloc[pos_roc:, 2] = cloud.iloc[-1, 0] 

309 

310 else: 

311 raise NotImplementedError( # pragma: no cover 

312 "Unknown curve type '{}'.".format(curve)) 

313 

314 return roc 

315 

316 def random_cloud(self): 

317 """ 

318 Resamples among the data. 

319 

320 @return DataFrame 

321 """ 

322 res = self.data.sample(len(self.data), weights=self.data[ 

323 self.data.columns[2]], replace=True) 

324 return res.sort_values(res.columns[0]) 

325 

326 def plot(self, nb=100, curve=CurveType.ROC, bootstrap=0, 

327 ax=None, thresholds=False, **kwargs): 

328 """ 

329 Plots a :epkg:`ROC` curve. 

330 

331 @param nb number of points 

332 @param curve see :class:`CurveType <mlstatpy.ml.roc.ROC.CurveType>` 

333 @param boostrap number of curves for the boostrap (0 for None) 

334 @param ax axis 

335 @param thresholds use thresholds for the X axis 

336 @param kwargs sent to `pandas.plot <http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.plot.html>`_ 

337 @return ax 

338 """ 

339 nb_bootstrap = 0 

340 if bootstrap > 0: 

341 ckwargs = kwargs.copy() 

342 if 'color' not in ckwargs: 

343 ckwargs['color'] = 'r' 

344 if 'linewidth' not in kwargs: 

345 ckwargs['linewidth'] = 0.2 

346 ckwargs['legend'] = False 

347 if 'label' in ckwargs: 

348 del ckwargs['label'] 

349 for _ in range(0, bootstrap): 

350 roc = self.compute_roc_curve(nb, curve=curve, bootstrap=True) 

351 if thresholds: 

352 cols = list(_ for _ in roc.columns if _ != "threshold") 

353 roc = roc.sort_values("threshold").reset_index(drop=True) 

354 ax = roc.plot(x="threshold", y=cols, 

355 ax=ax, label=['_nolegend_' for i in cols], **ckwargs) 

356 else: 

357 cols = list(_ for _ in roc.columns[ 

358 1:] if _ != "threshold") 

359 roc = roc.sort_values( 

360 roc.columns[0]).reset_index(drop=True) 

361 ax = roc.plot(x=roc.columns[0], y=cols, 

362 ax=ax, label=['_nolegend_' for i in cols], **ckwargs) 

363 nb_bootstrap += len(cols) 

364 bootstrap = 0 

365 

366 if bootstrap <= 0: 

367 if 'legend' not in kwargs: 

368 kwargs['legend'] = False 

369 roc = self.compute_roc_curve(nb, curve=curve) 

370 if not thresholds: 

371 roc = roc[[_ for _ in roc.columns if _ != "threshold"]] 

372 

373 cols = list(_ for _ in roc.columns if _ != "threshold") 

374 final = 0 

375 if thresholds: 

376 if 'label' in kwargs and len(cols) != len(kwargs['label']): 

377 raise ValueError( # pragma: no cover 

378 'label must have {0} values'.format(len(cols))) 

379 roc = roc.sort_values("threshold").reset_index(drop=True) 

380 ax = roc.plot(x="threshold", y=cols, ax=ax, **kwargs) 

381 ax.set_ylim([0, 1]) 

382 ax.set_xlabel("thresholds") 

383 final += len(cols) 

384 diag = 0 

385 else: 

386 if 'label' in kwargs and len(cols) - 1 != len(kwargs['label']): 

387 raise ValueError( 

388 'label must have {0} values'.format(len(cols) - 1)) 

389 final += len(cols) - 1 

390 roc = roc.sort_values(cols[0]).reset_index(drop=True) 

391 ax = roc.plot(x=cols[0], y=cols[1:], ax=ax, **kwargs) 

392 if curve is ROC.CurveType.ROC or curve is ROC.CurveType.SKROC: 

393 ax.plot([0, 1], [0, 1], "k--", linewidth=1) 

394 diag = 1 

395 else: 

396 diag = 0 

397 ax.set_xlabel(roc.columns[0]) 

398 if len(roc.columns) == 2: 

399 ax.set_ylabel(roc.columns[1]) 

400 ax.set_xlim([0, 1]) 

401 ax.set_ylim([0, 1]) 

402 

403 # legend 

404 handles, labels = ax.get_legend_handles_labels() 

405 tot = final + nb_bootstrap 

406 diag = len(handles) - diag 

407 handles = handles[:-tot] + handles[-final:diag] 

408 new_labels = labels[:-tot] + labels[-final:diag] 

409 ax.legend(handles, new_labels) 

410 

411 return ax 

412 

413 def auc(self, cloud=None): 

414 """ 

415 Computes the area under the curve (:epkg:`AUC`). 

416 

417 @param cloud data or None to use ``self.data``, the function 

418 assumes the data is sorted. 

419 @return AUC 

420 

421 The first column is the label, the second one is the score, 

422 the third one is the weight. 

423 """ 

424 if cloud is None: 

425 cloud = self.data 

426 good = cloud[cloud[cloud.columns[1]] == 1] 

427 wrong = cloud[cloud[cloud.columns[1]] == 0] 

428 auc = 0.0 

429 for a, b in itertools.product(good.itertuples(False), wrong.itertuples(False)): 

430 if a[0] > b[0]: 

431 auc += a[2] * b[2] 

432 elif a[0] >= b[0]: 

433 auc += a[2] * b[2] / 2 

434 if auc == 0 and good.shape[0] + wrong.shape[0] < self.data.shape[0]: 

435 raise ValueError( # pragma: no cover 

436 "Label are not right, expect 0 and 1 not {0}".format( 

437 set(cloud[cloud.columns[1]]))) 

438 n = len(wrong) * len(good) 

439 if n > 0: 

440 auc /= float(n) 

441 return auc 

442 

443 def auc_interval(self, bootstrap=10, alpha=0.95): 

444 """ 

445 Determines a confidence interval for the :epkg:`AUC` with bootstrap. 

446 

447 @param bootstrap number of random estimation 

448 @param alpha define the confidence interval 

449 @return dictionary of values 

450 """ 

451 if bootstrap <= 1: 

452 raise ValueError( 

453 "Use auc instead, bootstrap < 2") # pragma: no cover 

454 rate = [] 

455 for _ in range(0, bootstrap): 

456 cloud = self.random_cloud() 

457 auc = self.auc(cloud) 

458 rate.append(auc) 

459 

460 rate.sort() 

461 ra = self.auc(self.data) 

462 

463 i1 = int(alpha * len(rate) / 2) 

464 i2 = max(i1, len(rate) - i1 - 1) 

465 med = rate[len(rate) // 2] 

466 moy = float(sum(rate)) / len(rate) 

467 var = 0 

468 for r in rate: 

469 var += r * r 

470 var = float(var) / len(rate) 

471 var = var - moy * moy 

472 return dict(auc=ra, interval=(rate[i1], rate[i2]), 

473 min=rate[0], max=rate[len(rate) - 1], 

474 mean=moy, var=math.sqrt(var), mediane=med) 

475 

476 def roc_intersect(self, roc, x): 

477 """ 

478 The :epkg:`ROC` curve is defined by a set of points. 

479 This function interpolates those points to determine 

480 *y* for any *x*. 

481 

482 @param roc ROC curve 

483 @param x x 

484 @return y 

485 """ 

486 below = roc[roc[roc.columns[0]] <= x] 

487 i = len(below) 

488 if i == len(roc): 

489 return 0.0 

490 

491 p2 = tuple(roc.iloc[i, :]) 

492 if i - 1 <= 0: 

493 p1 = (1, 1) 

494 else: 

495 p1 = tuple(roc.iloc[i - 1, :]) 

496 

497 if p1[0] == p2[0]: 

498 return (p1[1] + p2[0]) / 2 

499 return (x - p1[0]) / (p2[0] - p1[0]) * (p2[1] - p1[1]) + p1[1] 

500 

501 def roc_intersect_interval(self, x, nb, curve=CurveType.ROC, bootstrap=10, alpha=0.05): 

502 """ 

503 Computes a confidence interval for the value returned by 

504 @see me roc_intersect. 

505 

506 @param roc ROC curve 

507 @param x x 

508 @param curve see :class:`CurveType <mlstatpy.ml.roc.ROC.CurveType>` 

509 @return dictionary 

510 """ 

511 

512 rate = [] 

513 for _ in range(0, bootstrap): 

514 roc = self.compute_roc_curve(nb, curve=curve, bootstrap=True) 

515 r = self.roc_intersect(roc, x) 

516 rate.append(r) 

517 

518 rate.sort() 

519 

520 roc = self.compute_roc_curve(nb, curve=curve) 

521 ra = self.roc_intersect(roc, x) 

522 

523 i1 = int(alpha * len(rate) / 2) 

524 i2 = max(i1, len(rate) - i1 - 1) 

525 med = rate[len(rate) // 2] 

526 moy = float(sum(rate)) / len(rate) 

527 var = 0 

528 for r in rate: 

529 var += r * r 

530 var = float(var) / len(rate) 

531 var = var - moy * moy 

532 return dict(y=ra, interval=(rate[i1], rate[i2]), 

533 min=rate[0], max=rate[len(rate) - 1], 

534 mean=moy, var=math.sqrt(var), mediane=med)