Coverage for pyquickhelper/pycode/utils_tests_private.py: 90%

315 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief This extension contains various functionalities to help unittesting. 

4""" 

5import os 

6import sys 

7import glob 

8import re 

9import unittest 

10import warnings 

11from io import StringIO 

12from .utils_tests_stringio import StringIOAndFile 

13from .default_filter_warning import default_filter_warning 

14from ..filehelper.synchelper import remove_folder 

15from ..loghelper.flog import run_cmd, noLOG 

16 

17 

18def get_test_file(filter, folder=None, no_subfolder=False, fLOG=noLOG, root=None): 

19 """ 

20 Returns the list of test files. 

21 

22 @param folder path to look (or paths to look if it is a list) 

23 @param filter only select files matching the pattern (ex: test*) 

24 @param no_subfolder the function investigates the folder *folder* and does not try any subfolder in 

25 ``{"_nrt", "_unittest", "_unittests"}`` 

26 @param fLOG logging function 

27 @param root root or folder which contains the project, 

28 rules applyong on folder name will not apply on it 

29 @return a list of test files 

30 """ 

31 if no_subfolder: 

32 dirs = [folder] 

33 expected = {} 

34 else: 

35 expected = {"_nrt", "_unittest", "_unittests"} 

36 if folder is None: 

37 path = os.path.split(__file__)[0] 

38 dirs = [os.path.join(path, "..", "..", d) for d in expected] 

39 elif isinstance(folder, str): 

40 if not os.path.exists(folder): 

41 raise FileNotFoundError(folder) # pragma: no cover 

42 last = os.path.split(folder)[-1] 

43 if last in expected: 

44 dirs = [folder] 

45 else: 

46 dirs = [os.path.join(folder, d) for d in expected] 

47 else: 

48 dirs = folder 

49 for d in dirs: 

50 if not os.path.exists(d): 

51 raise FileNotFoundError(d) 

52 

53 def simplify(folds): 

54 if len(expected) == 0: 

55 expe = {"_nrt", "_unittest", "_unittests"} 

56 else: 

57 expe = expected 

58 res = [] 

59 for fold in folds: 

60 nn = fold 

61 nf = fold.replace("\\", "/").split('/') 

62 for nr in expe: 

63 if nr in nf: 

64 i = nf.index(nr) - 1 

65 nn = "/".join(nf[i:]) 

66 res.append(nn) 

67 return res 

68 

69 copypaths = list(sys.path) 

70 

71 li = [] 

72 for fold in dirs: 

73 if "__pycache__" in fold or "site-packages" in fold: 

74 continue 

75 if not os.path.exists(fold): 

76 continue 

77 if fold not in sys.path and fold != ".": 

78 sys.path.append(fold) 

79 content = glob.glob(fold + "/" + filter) 

80 if filter != "temp_*": 

81 if root is not None: 

82 def remove_root(p): 

83 if p.startswith(root): 

84 return p[len(root):] 

85 return p 

86 couples = [(remove_root(il), il) for il in content] 

87 else: 

88 couples = [(il, il) for il in content] 

89 

90 content = [] 

91 for il, fu in couples: 

92 if "test_" in il and ".py" in il and ".py.err" not in il and \ 

93 ".py.out" not in il and ".py.warn" not in il and \ 

94 "test_main" not in il and "temp_" not in il and \ 

95 "temp2_" not in il and ".pyo" not in il and \ 

96 "out.test_copyfile.py.2.txt" not in il and \ 

97 ".pyc" not in il and ".pyd" not in il and \ 

98 ".so" not in il and ".py~" not in il: 

99 content.append(fu) 

100 li.extend(content) 

101 fLOG("[get_test_file], inspecting", simplify(dirs)) 

102 

103 lid = glob.glob(fold + "/*") 

104 for il in lid: 

105 if os.path.isdir(il): 

106 temp = get_test_file( 

107 filter, il, no_subfolder=True, fLOG=fLOG, root=root) 

108 temp = list(temp) 

109 li.extend(temp) 

110 

111 # we restore sys.path 

112 sys.path = copypaths 

113 

114 return li 

115 

116 

117def get_estimation_time(file): 

118 """ 

119 Return an estimation of the processing time, 

120 it extracts the number in ``(time=5s)`` for example. 

121 

122 @param file filename 

123 @return int 

124 """ 

125 try: 

126 f = open(file, "r", errors="ignore") 

127 li = f.readlines() 

128 f.close() 

129 except Exception as e: # pragma: no cover 

130 warnings.warn(f"Issue with '{file}'\n{type(e)}\n{e}", UserWarning) 

131 return 10 

132 try: 

133 s = ''.join(li) 

134 except Exception as e: # pragma: no cover 

135 warnings.warn( 

136 "Probably an enconding issue for file '{0}'\n{1}\n{2}".format( 

137 file, type(e), e), UserWarning) 

138 return 10 

139 c = re.compile("[(]time=([0-9]+)s[)]").search(s) 

140 if c is None: 

141 return 0 

142 return int(c.groups()[0]) 

143 

144 

145def import_files(li, additional_ut_path=None, fLOG=noLOG): 

146 """ 

147 Runs all tests in file list ``li``. 

148 

149 @param li list of files (python scripts) 

150 @param additional_ut_path additional paths to add when running the unit tests 

151 @param fLOG logging function 

152 @return list of tests [ ( testsuite, file) ] 

153 """ 

154 allsuite = [] 

155 for le in li: 

156 

157 copypath = list(sys.path) 

158 

159 sdir = os.path.split(le)[0] 

160 if sdir not in sys.path: 

161 sys.path.append(sdir) 

162 if additional_ut_path: 

163 for p in additional_ut_path: 

164 if isinstance(p, tuple): 

165 if p[1]: 

166 sys.path.insert(0, p[0]) 

167 else: 

168 sys.path.append(p[0]) 

169 else: 

170 sys.path.append(p) 

171 tl = os.path.split(le)[1] 

172 fi = tl.replace(".py", "") 

173 if "test_do_not_include" in fi: 

174 continue 

175 

176 try: 

177 mo = __import__(fi) 

178 except Exception as e: # pragma: no cover 

179 raise ImportError( 

180 "Unable to import '{}' due to {}.\nsys.path=\n{}".format( 

181 fi, e, "\n".join(sys.path))) 

182 

183 # some tests can mess up with the import path 

184 sys.path = copypath 

185 

186 cl = dir(mo) 

187 for c in cl: 

188 if len(c) < 5 or c[:4] != "Test": 

189 continue 

190 # test class c 

191 testsuite = unittest.TestSuite() 

192 loc = locals() 

193 exec( 

194 compile("di = dir (mo." + c + ")", "", "exec"), globals(), loc) 

195 di = loc["di"] 

196 for d in di: 

197 if len(d) >= 6 and d[:5] == "_test": 

198 raise RuntimeError( # pragma: no cover 

199 f"a function _test is still deactivated {d} in {c}") 

200 if len(d) < 5 or d[:4] != "test": 

201 continue 

202 # method d.c 

203 loc = locals() 

204 code = "t = mo." + c + "(\"" + d + "\")" 

205 cp = compile(code, "", "exec") 

206 try: 

207 exec(cp, globals(), loc) 

208 except Exception as e: # pragma: no cover 

209 raise RuntimeError( 

210 f"Unable to execute code '{code}'") from e 

211 t = loc["t"] 

212 testsuite.addTest(t) 

213 

214 allsuite.append((testsuite, le)) 

215 

216 return allsuite 

217 

218 

219def clean(folder=None, fLOG=noLOG): 

220 """ 

221 Does the cleaning. 

222 

223 @param dir directory 

224 @param fLOG logging function 

225 """ 

226 # do not use SVN here just in case some files are not checked in. 

227 for log_file in ["temp_hal_log.txt", "temp_hal_log2.txt", 

228 "temp_hal_log_.txt", "temp_log.txt", "temp_log2.txt", ]: 

229 li = get_test_file(log_file, folder=folder) 

230 for el in li: 

231 try: 

232 if os.path.isfile(el): 

233 os.remove(el) 

234 except Exception as e: # pragma: no cover 

235 fLOG("[clean] unable to remove file '{}' due to {}".format( 

236 el, str(e).replace("\n", " "))) 

237 

238 li = get_test_file("temp_*") 

239 for el in li: 

240 try: 

241 if os.path.isfile(el): 

242 os.remove(el) 

243 except Exception as e: # pragma: no cover 

244 fLOG("[clean] unable to remove file '{}' due to {}".format( 

245 el, str(e).replace("\n", " "))) 

246 for el in li: 

247 try: 

248 if os.path.isdir(el): 

249 remove_folder(el) 

250 except Exception as e: # pragma: no cover 

251 fLOG("[clean] unable to remove dir '{}' due to {}".format( 

252 el, str(e).replace("\n", " "))) 

253 

254 

255def main_run_test(runner, path_test=None, limit_max=1e9, log=False, skip=-1, skip_list=None, 

256 on_stderr=False, processes=False, skip_function=None, 

257 additional_ut_path=None, stdout=None, stderr=None, filter_warning=None, 

258 fLOG=noLOG): 

259 """ 

260 Runs all unit tests, 

261 the function looks into the folder _unittest and extract from all files 

262 beginning by `test_` all methods starting by `test_`. 

263 Each files should mention an execution time. 

264 Tests are sorted by increasing order. 

265 

266 @param runner unittest Runner 

267 @param path_test path to look, if None, looks for defaults path related to this project 

268 @param limit_max avoid running tests longer than limit seconds 

269 @param log if True, enables intermediate files 

270 @param skip if skip != -1, skip the first "skip" test files 

271 @param skip_list skip unit test id in this list (by index, starting by 1) 

272 @param skip_function *function(filename,content,duration) --> boolean* to skip a unit test 

273 @param on_stderr if True, publish everything on stderr at the end 

274 @param processes to run the unit test in a separate process (with function @see fn run_cmd), 

275 however, to make that happen, you need to specify 

276 ``exit=False`` for each test file, see `unittest.main 

277 <https://docs.python.org/3/library/unittest.html#unittest.main>`_ 

278 @param additional_ut_path additional paths to add when running the unit tests 

279 @param stdout if not None, use this stream instead of *sys.stdout* 

280 @param stderr if not None, use this stream instead of *sys.stderr* 

281 @param filter_warning function which removes some warnings in the final output, 

282 if None, the function filters out some recurrent warnings 

283 in jupyter (signature: ``def filter_warning(w: warning) -> bool``), 

284 @see fn default_filter_warning 

285 @param fLOG logging function 

286 @return dictionnary: ``{ "err": err, "tests":list of couple (file, test results) }`` 

287 """ 

288 if os.environ.get('PYTHONPATH', '') == 'src': 

289 full_src = os.path.abspath('src') 

290 if not os.path.exists(full_src): 

291 raise FileNotFoundError( 

292 f"Unable to interpret path {'src'!r} - {full_src!r}.") 

293 os.environ['PYTHONPATH'] = full_src 

294 if skip_list is None: 

295 skip_list = set() 

296 else: 

297 skip_list = set(skip_list) 

298 if filter_warning is None: 

299 filter_warning = default_filter_warning 

300 

301 # checking that the module does not belong to the installed modules 

302 if path_test is not None: 

303 path_module = os.path.join(sys.executable, "Lib", "site-packages") 

304 paths = [os.path.join(path_module, "src"), ] 

305 for path in paths: 

306 if os.path.exists(path): 

307 raise FileExistsError( # pragma: no cover 

308 f"This path should not exist '{path}'.") 

309 

310 def short_name(el): 

311 cut = os.path.split(el) 

312 cut = os.path.split(cut[0])[-1] + "/" + cut[-1] 

313 return cut 

314 

315 # sort the test by increasing expected time 

316 fLOG(f"[main_run_test] path_test {path_test!r}") 

317 li = get_test_file("test*", folder=path_test, fLOG=fLOG, root=path_test) 

318 if len(li) == 0: 

319 raise FileNotFoundError( # pragma: no cover 

320 f"No test files in {path_test!r}.") 

321 est = [get_estimation_time(el) for el in li] 

322 co = [(e, short_name(el), el) for e, el in zip(est, li)] 

323 co.sort() 

324 

325 # we check we do not run twice the same file 

326 done = {} 

327 duplicate = [] 

328 for _, cut, lc in co: 

329 if cut in done: 

330 duplicate.append((cut, lc)) 

331 done[cut] = True 

332 

333 if len(duplicate) > 0: # pragma: no cover 

334 s = list(set(duplicate)) 

335 s.sort() 

336 mes = "\n".join(str(_) for _ in s) 

337 raise RuntimeError("Duplicated test files were detected:\n" + mes) 

338 

339 # check existing 

340 if len(co) == 0: 

341 raise FileNotFoundError( # pragma: no cover 

342 f"Unable to find any test files in '{path_test}'.") 

343 

344 if skip != -1: 

345 fLOG(f"[main_run_test] found {len(co)} test files skipping.") 

346 else: 

347 fLOG(f"[main_run_test] found {len(co)} test files.") 

348 

349 # extract the test classes 

350 cco = [] 

351 duration = {} 

352 index = 0 

353 for e, cut, l in co: 

354 if e > limit_max: 

355 continue # pragma: no cover 

356 cco.append((e, l)) 

357 cut = os.path.split(l) 

358 cut = os.path.split(cut[0])[-1] + "/" + cut[-1] 

359 duration[cut] = e 

360 index += 1 

361 

362 exp = re.compile("Ran ([0-9]+) tests? in ([.0-9]+)s") 

363 

364 # run the test 

365 li = [a[1] for a in cco] 

366 suite = import_files(li, additional_ut_path=additional_ut_path, fLOG=fLOG) 

367 lis = [os.path.split(name)[-1] for _, name in suite] 

368 keep = [] 

369 

370 # redirect standard output, error 

371 fLOG("[main_run_test] redirect stdout, stderr") 

372 memo_stdout = sys.stdout 

373 memout = sys.stdout if stdout is None else stdout 

374 fail = 0 

375 allwarn = [] 

376 

377 memo_stderr = sys.stderr 

378 memerr = sys.stderr if stderr is None else stderr 

379 fullstderr = StringIO() 

380 

381 # displays 

382 memout.write("[main_run_test] ---- JENKINS BEGIN UNIT TESTS ----") 

383 memout.write(f"[main_run_test] ---- BEGIN UNIT TEST for {path_test!r}") 

384 

385 # display all tests 

386 for i, s in enumerate(suite): 

387 if skip >= 0 and i < skip: 

388 continue # pragma: no cover 

389 if i + 1 in skip_list: 

390 continue # pragma: no cover 

391 cut = os.path.split(s[1]) 

392 cut = os.path.split(cut[0])[-1] + "/" + cut[-1] 

393 if skip_function is not None: 

394 with open(s[1], "r") as f: 

395 content = f.read() 

396 if skip_function(s[1], content, duration.get(cut, None)): 

397 continue 

398 

399 if cut not in duration: 

400 raise RuntimeError("[{0}] not found in\n{1}".format( 

401 cut, "\n".join(sorted(duration.keys())))) 

402 dur = duration[cut] 

403 zzz = "\ntest % 3d (%04ds), %s" % (i + 1, dur, cut) 

404 memout.write(zzz) 

405 memout.write("\n") 

406 

407 # displays 

408 memout.write("[main_run_test] ---- RUN UT\n") 

409 memout.write( 

410 "[main_run_test] ---- len(suite)=%d len(skip_list)=%d skip=%d\n" % ( 

411 len(suite), len(skip_list), skip)) 

412 original_stream = runner.stream.stream if isinstance( 

413 runner.stream.stream, StringIOAndFile) else None 

414 

415 # run all tests 

416 failed_test = {} 

417 n_runs = 0 

418 last_s = None 

419 for i, s in enumerate(suite): 

420 last_s = s 

421 if skip >= 0 and i < skip: 

422 continue # pragma: no cover 

423 if i + 1 in skip_list: 

424 continue # pragma: no cover 

425 cut = os.path.split(s[1]) 

426 cut = os.path.split(cut[0])[-1] + "/" + cut[-1] 

427 if skip_function is not None: 

428 with open(s[1], "r") as f: 

429 content = f.read() 

430 if skip_function(s[1], content, duration.get(cut, None)): 

431 continue 

432 

433 zzz = "running test % 3d, %s" % (i + 1, cut) 

434 zzz += (60 - len(zzz)) * " " 

435 memout.write(zzz) 

436 

437 # the errors are logged into a file just beside the test file 

438 newstdr = StringIOAndFile(s[1] + ".err") 

439 keepstdr = sys.stderr 

440 sys.stderr = newstdr 

441 list_warn = [] 

442 

443 if processes: 

444 cmd = sys.executable.replace("w.exe", ".exe") + " " + li[i] 

445 out, err = run_cmd(cmd, wait=True) 

446 if len(err) > 0: 

447 sys.stderr.write(err) # pragma: no cover 

448 else: 

449 with warnings.catch_warnings(record=True) as w: 

450 warnings.simplefilter("always") 

451 if original_stream is not None: 

452 original_stream.begin_test(s[1]) 

453 r = runner.run(s[0]) 

454 out = r.stream.getvalue() 

455 if original_stream is not None: 

456 original_stream.end_test(s[1]) 

457 for ww in w: 

458 list_warn.append((ww, s)) 

459 n_runs += 1 

460 

461 ti = exp.findall(out)[-1] 

462 # don't modify it, PyCharm does not get it right (ti is a tuple) 

463 add = " ran %s tests in %ss" % ti 

464 

465 sys.stderr = keepstdr 

466 

467 memout.write(add) 

468 

469 if not r.wasSuccessful(): # pragma: no cover 

470 err = out.split("===========") 

471 err = err[-1] 

472 memout.write("\n") 

473 failed_test[cut] = err 

474 try: 

475 memout.write(err) 

476 except UnicodeDecodeError: 

477 err_e = err.decode("ascii", errors="ignore") 

478 memout.write(err_e) 

479 except UnicodeEncodeError: 

480 try: 

481 err_e = err.encode("ascii", errors="ignore") 

482 memout.write(err_e) 

483 except TypeError: 

484 err_e = err.encode("ascii", errors="ignore").decode( 

485 'ascii', errors='ingore') 

486 memout.write(err_e) 

487 

488 # stores the output in case of an error 

489 with open(s[1] + ".err", "w", encoding="utf-8", errors="ignore") as f: 

490 f.write(out) 

491 

492 fail += 1 

493 

494 fullstderr.write("\n#-----" + lis[i] + "\n") 

495 fullstderr.write("OUT:\n") 

496 fullstderr.write(out) 

497 

498 if err: 

499 fullstderr.write("[pyqerror]o:\n") 

500 try: 

501 fullstderr.write(err) 

502 except UnicodeDecodeError: 

503 err_e = err.decode("ascii", errors="ignore") 

504 fullstderr.write(err_e) 

505 except UnicodeEncodeError: 

506 err_e = err.encode("ascii", errors="ignore") 

507 fullstderr.write(err_e) 

508 

509 list_warn = [(w, s) for w, s in list_warn if filter_warning(w)] 

510 if len(list_warn) > 0: 

511 fullstderr.write("*[pyqwarning]:\n") 

512 warndone = set() 

513 for w, slw in list_warn: 

514 sw = str(slw) 

515 if sw not in warndone: 

516 # we display only one time the same warning 

517 fullstderr.write(f"w{i}: {sw}\n") 

518 warndone.add(sw) 

519 serr = newstdr.getvalue() 

520 if serr.strip(" \n\r\t"): 

521 fullstderr.write("ERRs:\n") 

522 fullstderr.write(serr) 

523 else: 

524 list_warn = [(w, s) for w, s in list_warn if filter_warning(w)] 

525 allwarn.append((lis[i], list_warn)) 

526 val = newstdr.getvalue() 

527 if val.strip(" \n\r\t"): 

528 # Remove most of the Sphinx warnings (sphinx < 1.8) 

529 lines = val.strip(" \n\r\t").split("\n") 

530 lines = [ 

531 _ for _ in lines if _ and "is already registered, it will be overridden" not in _] 

532 val = "\n".join(lines) 

533 if len(val) > 0 and is_valid_error(val): # pragma: no cover 

534 fullstderr.write("\n*-----" + lis[i] + "\n") 

535 if len(list_warn) > 0: 

536 fullstderr.write("[main_run_test] +WARN:\n") 

537 for w, _ in list_warn: 

538 fullstderr.write( 

539 f"[in:{cut}] w{i}: {str(w)}\n") 

540 if val.strip(" \n\r\t"): 

541 fullstderr.write(f"[in:{cut}] ERRv:\n") 

542 fullstderr.write(val) 

543 

544 memout.write("\n") 

545 keep.append((last_s[1], r)) 

546 

547 # displays 

548 memout.write("[main_run_test] ---- END UT\n") 

549 memout.write("[main_run_test] ---- JENKINS END UNIT TESTS ----\n") 

550 if n_runs == 0: 

551 raise RuntimeError( # pragma: no cover 

552 "No unit tests was run.") 

553 

554 fLOG("[main_run_test] restore stdout, stderr") 

555 

556 # end, catch standard output and err 

557 sys.stderr = memo_stderr 

558 sys.stdout = memo_stdout 

559 val = fullstderr.getvalue() 

560 

561 if len(val) > 0: # pragma: no cover 

562 fLOG("[main_run_test] -- STDERR (from unittests) on STDOUT") 

563 fLOG(val) 

564 fLOG("[main_run_test] -- end STDERR on STDOUT") 

565 

566 if on_stderr: 

567 memerr.write( 

568 "[main_run_test] ##### STDERR (from unittests) #####\n") 

569 memerr.write(val) 

570 memerr.write("[main_run_test] ##### end STDERR #####\n") 

571 

572 if fail == 0: 

573 clean(fLOG=fLOG) 

574 

575 fLOG("[main_run_test] printing warnings") 

576 

577 for fi, lw in allwarn: 

578 if len(lw) > 0: 

579 memout.write(f"[main_run_test] -WARN: {fi}\n") 

580 wdone = {} 

581 for i, (w, s) in enumerate(lw): 

582 sw = str(w) 

583 if sw in wdone: 

584 continue 

585 wdone[sw] = w 

586 try: 

587 sw = f" w{i}: {w}\n" 

588 except UnicodeEncodeError: # pragma: no cover 

589 sw = " w{0}: Unable to convert a warnings of type {1} into a string (1)".format( 

590 i, type(w)) 

591 try: 

592 memout.write(sw) 

593 except UnicodeEncodeError: # pragma: no cover 

594 sw = " w{0}: Unable to convert a warnings of type {1} into a string (2)".format( 

595 i, type(w)) 

596 memout.write(sw) 

597 

598 fLOG("[main_run_test] END of unit tests") 

599 memout.write("[main_run_test] END of unit tests\n") 

600 return dict(err=val, tests=keep, failed=failed_test) 

601 

602 

603def is_valid_error(error): 

604 """ 

605 Checks if the text written on stderr is an error or not, 

606 a local server can push logs on this stream, 

607 it looks for keywords such as ``Exception``, 

608 ``Error``, ``TraceBack``... 

609 

610 @param error text 

611 @return boolean 

612 """ 

613 lines = error.strip("\n\r").replace("\r", "").split('\n') 

614 if lines[0] == "--- Logging error ---": 

615 return False 

616 lines = [ 

617 line for line in lines if "No module named 'numpy.core._multiarray_umath'" not in line] 

618 error = "\n".join(lines) 

619 keys = ["Exception", "Error", "TraceBack", "invalid", " line "] 

620 error = error.lower() 

621 for key in keys: 

622 if key.lower() in error: 

623 return True 

624 return False 

625 

626 

627def default_skip_function(name, code, duration): 

628 """ 

629 Default skip function for function @see fn main_wrapper_tests. 

630 

631 @param name name of the test file 

632 @param code code of the test file 

633 @param duration estimated duration of the tests (specified in the file documentation) 

634 @return True if skipped, False otherwise 

635 """ 

636 if "test_SKIP_" in name or "test_LONG_" in name or "test_GUI_" in name: 

637 return True 

638 return False