Coverage for pyquickhelper/pycode/utils_tests.py: 34%

87 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief This extension contains various functionalities to help unittesting. 

4""" 

5import os 

6import sys 

7import unittest 

8from datetime import datetime 

9import warnings 

10import sqlite3 

11from .code_exceptions import CoverageException 

12from .coverage_helper import ( 

13 publish_coverage_on_codecov, find_coverage_report, 

14 coverage_combine) 

15from .utils_tests_stringio import StringIOAndFile 

16 

17 

18class TestWrappedException(Exception): 

19 "Raised by @see fn main_wrapper_tests" 

20 pass 

21 

22 

23def _modifies_coverage_report(name, bsrcp, bproj): 

24 conn = sqlite3.connect(name) 

25 sql = [] 

26 for row in conn.execute("select * from file"): 

27 name = row[1] 

28 for b in bsrcp: 

29 name = name.replace(b, bproj) 

30 name = name.replace('\\', '/') 

31 s = f"UPDATE file SET path='{name}' WHERE id={row[0]};" 

32 sql.append(s) 

33 

34 c = conn.cursor() 

35 for s in sql: 

36 c.execute(s) 

37 conn.commit() 

38 conn.close() 

39 

40 

41def main_wrapper_tests(logfile, skip_list=None, processes=False, add_coverage=False, 

42 report_folder=None, skip_function=None, 

43 coverage_options=None, 

44 coverage_exclude_lines=None, additional_ut_path=None, 

45 covtoken=None, stdout=None, stderr=None, 

46 filter_warning=None, dump_coverage=None, 

47 add_coverage_folder=None, coverage_root="src", fLOG=None): 

48 """ 

49 Calls function :func:`main <pyquickhelper.unittests.utils_tests.main>` 

50 and throws an exception if it fails. 

51 

52 @param logfile locatio of a logfile 

53 @param skip_list to skip a list of unit tests (by index, starting by 1) 

54 @param processes to run the unit test in a separate process (with function @see fn run_cmd), 

55 however, to make that happen, you need to specify 

56 ``exit=False`` for each test file, see `unittest.main 

57 <https://docs.python.org/3/library/unittest.html#unittest.main>`_ 

58 @param add_coverage (bool) run the unit tests and measure the coverage at the same time 

59 @param report_folder (str) folder where the coverage report will be stored 

60 @param skip_function *function(filename,content,duration) --> boolean* to skip a unit test 

61 @param coverage_options (dict) options for module coverage as a dictionary, see below, default is None 

62 @param coverage_exclude_lines (list) options for module coverage, lines to exclude from the coverage report, defaul is None 

63 @param additional_ut_path (list) additional paths to add when running the unit tests 

64 @param covtoken (str|tuple(str, str)) token used when publishing coverage report to `codecov <https://codecov.io/>`_ 

65 or None to not publish 

66 @param stdout if not None, write output on this stream instead of *sys.stdout* 

67 @param stderr if not None, write errors on this stream instead of *sys.stderr* 

68 @param filter_warning function which removes some warnings in the final output, 

69 if None, the function filters out some recurrent warnings 

70 in jupyter (signature: ``def filter_warning(w: warning) -> bool``), 

71 @see fn default_filter_warning 

72 @param dump_coverage dump or copy the coverage at this location 

73 @param add_coverage_folder additional coverage folder reports 

74 @param coverage_root subfolder for the coverage 

75 @param fLOG ``function(*l, **p)``, logging function 

76 

77 *covtoken* can be a string ``<token>`` or a 

78 tuple ``(<token>, <condition>)``. The condition is evaluated 

79 by the python interpreter and determines whether or not the coverage 

80 needs to be published. 

81 

82 .. faqref:: 

83 :title: How to build pyquickhelper with Jenkins? 

84 :index: Jenkins 

85 

86 :epkg:`Jenkins` is a task scheduler for continuous integration. 

87 You can easily schedule batch command to build and run unit tests for a specific project. 

88 To build pyquickhelper, you need to install :epkg:`python`, 

89 :epkg:`pymyinstall`, 

90 :epkg:`miktex`, :epkg:`pandoc`, 

91 :epkg:`sphinx`. 

92 

93 Once Jenkins is installed, the command to schedule is:: 

94 

95 set PATH=%PATH%;%USERPOFILE%\\AppData\\Local\\Pandoc 

96 build_setup_help_on_windows.bat 

97 

98 This works if you installed Jenkins with your credentials. 

99 Otherwise, the path to ``pandoc.exe`` needs to be changed. 

100 And you can also read `Schedule builds with Jenkins 

101 <http://www.xavierdupre.fr/blog/2014-12-06_nojs.html>`_. 

102 :epkg:`node.js` might be required if a notebooks contain javascript. 

103 

104 Parameters *add_coverage* and *report_folder* are used to compute the coverage 

105 using the module `coverage <http://nedbatchelder.com/code/coverage/>`_. 

106 The function does something about the following error: 

107 

108 :: 

109 

110 _tkinter.TclError: no display name and no $DISPLAY environment variable 

111 

112 It is due to :epkg:`matplotlib`. 

113 See `Generating matplotlib graphs without a running X server 

114 <http://stackoverflow.com/questions/4931376/ 

115 generating-matplotlib-graphs-without-a-running-x-server>`_. 

116 If the skip function is None, it will replace it by 

117 the function @see fn default_skip_function. 

118 

119 Parameters *coverage_options*, *coverage_exclude_lines*, *additional_ut_path*: 

120 see class `Coverage <https://coverage.readthedocs.io/en/coverage-5.5/api_coverage.html>`_ 

121 and `Configuration files <https://coverage.readthedocs.io/en/coverage-4.0b1/config.html>`_ 

122 to specify those options. If both values are left to None, this function will 

123 compute the code coverage for all files in this module. The function 

124 now exports the coverage options which were used. 

125 For example, to exclude files from the coverage report:: 

126 

127 coverage_options=dict(omit=["*exclude*.py"]) 

128 

129 Parameter *covtoken*: used to post the coverage report to 

130 `codecov <https://codecov.io/>`_. 

131 """ 

132 # delayed import 

133 from ..loghelper.os_helper import get_user 

134 

135 if skip_function is None: # pragma: no cover 

136 from .utils_tests_private import default_skip_function 

137 skip_function = default_skip_function 

138 

139 if fLOG is None: # pragma: no cover 

140 from ..loghelper.flog import noLOG 

141 fLOG = noLOG 

142 

143 whole_ouput = StringIOAndFile(logfile) 

144 runner = unittest.TextTestRunner(verbosity=0, stream=whole_ouput) 

145 path = os.path.abspath(os.path.join(os.path.split(logfile)[0])) 

146 stdout_this = stdout if stdout else sys.stdout 

147 datetime_begin = datetime.now() 

148 

149 def _find_source(fold): # pragma: no cover 

150 fold0 = fold 

151 exists = os.path.exists(os.path.join(fold, ".gitignore")) 

152 while not exists: 

153 if len(fold) < 2: 

154 raise FileNotFoundError( 

155 f"Unable to guess source from '{fold0}'.") 

156 fold = os.path.split(fold)[0] 

157 exists = os.path.exists(os.path.join(fold, ".gitignore")) 

158 return os.path.normpath(os.path.abspath(fold)) 

159 

160 def run_main(): 

161 # delayed import to speed up import of pycode 

162 from .utils_tests_private import main_run_test 

163 res = main_run_test( 

164 runner, path_test=path, skip=-1, skip_list=skip_list, 

165 processes=processes, skip_function=skip_function, 

166 additional_ut_path=additional_ut_path, stdout=stdout, stderr=stderr, 

167 filter_warning=filter_warning, fLOG=fLOG) 

168 return res 

169 

170 if "win" not in sys.platform and "DISPLAY" not in os.environ: 

171 # issue detected with travis 

172 # _tkinter.TclError: no display name and 

173 # no $DISPLAY environment variable 

174 # os.environ["DISPLAY"] = "localhost:0" 

175 pass 

176 

177 # other coverage reports 

178 if (add_coverage_folder is not None and 

179 dump_coverage is not None): # pragma: no cover 

180 sub = os.path.split(dump_coverage)[0] 

181 sub = os.path.split(sub)[-1] 

182 other_cov_folders = find_coverage_report( 

183 add_coverage_folder, exclude=sub) 

184 mes = f"[main_wrapper_tests] other_cov_folders...sub='{sub}'" 

185 stdout_this.write(mes + "\n") 

186 for k, v in sorted(other_cov_folders.items()): 

187 mes = f"[main_wrapper_tests] k='{k}' v={v}" 

188 stdout_this.write(mes + "\n") 

189 if len(other_cov_folders) == 0: 

190 other_cov_folders = None 

191 else: 

192 other_cov_folders = None 

193 

194 # to deal with: _tkinter.TclError: no display name and no $DISPLAY 

195 # environment variable 

196 from .tkinter_helper import fix_tkinter_issues_virtualenv, _first_execution 

197 fLOG("[main_wrapper_tests] MODULES (1): matplotlib already imported", 

198 "matplotlib" in sys.modules, "first execution", _first_execution) 

199 r = fix_tkinter_issues_virtualenv(fLOG=fLOG) 

200 fLOG("[main_wrapper_tests] MODULES (2): matplotlib imported", 

201 "matplotlib" in sys.modules, "first execution", _first_execution) 

202 fLOG("[main_wrapper_tests] fix_tkinter_issues_virtualenv", r) 

203 

204 # project_var_name 

205 folder = os.path.normpath( 

206 os.path.join(os.path.dirname(logfile), "..", "src")) 

207 if not os.path.exists(folder): 

208 folder = os.path.normpath( 

209 os.path.join(os.path.dirname(logfile), "..")) 

210 if not os.path.exists(folder): 

211 raise FileNotFoundError(folder) # pragma: no cover 

212 

213 def selec_name(folder, name): 

214 if name.startswith('_') or name.startswith('.'): 

215 return False 

216 if name in ('bin', 'dist', 'build'): 

217 return False # pragma: no cover 

218 if '.egg' in name or 'dist_module27' in name: 

219 return False 

220 fold = os.path.join(folder, name) 

221 if not os.path.isdir(fold): 

222 return False 

223 init = os.path.join(fold, '__init__.py') 

224 if not os.path.exists(init): 

225 return False # pragma: no cover 

226 return True 

227 

228 content = [_ for _ in os.listdir(folder) if selec_name(folder, _)] 

229 if len(content) != 1: 

230 raise FileNotFoundError( # pragma: no cover 

231 "Unable to guess the project name in '{0}', content=\n{1}\n---" 

232 "\n{2}\n---".format(folder, "\n".join(content), 

233 "\n".join(os.listdir(folder)))) 

234 

235 project_var_name = content[0] 

236 src_abs = os.path.normpath(os.path.abspath( 

237 os.path.join(os.path.dirname(logfile), ".."))) 

238 

239 root_src = os.path.join(src_abs, "src", project_var_name) 

240 if not os.path.exists(root_src): 

241 root_src = os.path.join(src_abs, project_var_name) 

242 if not os.path.exists(root_src): 

243 raise FileNotFoundError( # pragma: no cover 

244 f"Unable to find '{root_src}'.") 

245 srcp = os.path.relpath(root_src, os.getcwd()) 

246 

247 if get_user() in srcp: 

248 raise FileNotFoundError( # pragma: no cover 

249 f"The location of the source should not contain '{get_user()}': {srcp}") 

250 

251 # coverage 

252 if add_coverage: # pragma: no cover 

253 stdout_this.write("[main_wrapper_tests] --- COVERAGE BEGIN ---\n") 

254 if report_folder is None: 

255 report_folder = os.path.join( 

256 os.path.abspath(os.path.dirname(logfile)), "..", "_doc", 

257 "sphinxdoc", "source", "coverage") 

258 

259 fLOG("[main_wrapper_tests] current folder", os.getcwd()) 

260 fLOG("[main_wrapper_tests] enabling coverage", srcp) 

261 dfile = os.path.join(report_folder, ".coverage") 

262 

263 # we clean previous report or we create an empty folder 

264 if os.path.exists(report_folder): 

265 for afile in os.listdir(report_folder): 

266 full = os.path.join(report_folder, afile) 

267 os.remove(full) 

268 

269 # we run the coverage 

270 if coverage_options is None: 

271 coverage_options = {} 

272 if "source" in coverage_options: 

273 coverage_options["source"].append(srcp) 

274 else: 

275 coverage_options["source"] = [srcp] 

276 if "data_file" not in coverage_options: 

277 coverage_options["data_file"] = dfile 

278 

279 from coverage import coverage 

280 cov = coverage(**coverage_options) 

281 if coverage_exclude_lines is not None: 

282 for line in coverage_exclude_lines: 

283 cov.exclude(line) 

284 else: 

285 cov.exclude("raise NotImplementedError") 

286 stdout_this.write("[main_wrapper_tests] ENABLE COVERAGE\n") 

287 cov.start() 

288 

289 res = run_main() 

290 

291 cov.stop() 

292 stdout_this.write( 

293 f"[main_wrapper_tests] STOP COVERAGE + REPORT into '{report_folder}\n'") 

294 

295 from coverage.misc import CoverageException as RawCoverageException 

296 try: 

297 cov.html_report(directory=report_folder) 

298 except RawCoverageException as e: 

299 raise RuntimeError( 

300 "Unable to publish the coverage repot into '{}'," 

301 "\nsource='{}'\ndata='{}'".format( 

302 report_folder, coverage_options["source"], 

303 coverage_options.get("data_file", ''))) from e 

304 outfile = os.path.join(report_folder, "coverage_report.xml") 

305 cov.xml_report(outfile=outfile) 

306 cov.save() 

307 srcp_s = [] 

308 

309 # we clean absolute path from the produced files 

310 def clean_absolute_path(): 

311 fLOG("[main_wrapper_tests] replace ", 

312 srcp, ' by ', project_var_name) 

313 srcp_s.clear() 

314 srcp_s.extend([os.path.abspath(os.path.normpath(srcp)), 

315 os.path.normpath(srcp)]) 

316 bsrcp = [bytes(b, encoding="utf-8") for b in srcp_s] 

317 bproj = bytes(project_var_name, encoding="utf-8") 

318 for afile in os.listdir(report_folder): 

319 full = os.path.join(report_folder, afile) 

320 if '.coverage' in afile: 

321 # sqlite3 format 

322 _modifies_coverage_report( 

323 full, srcp_s, project_var_name) 

324 else: 

325 with open(full, "rb") as f: 

326 content = f.read() 

327 for b in bsrcp: 

328 content = content.replace(b, bproj) 

329 with open(full, "wb") as f: 

330 f.write(content) 

331 

332 clean_absolute_path() 

333 

334 # we print debug information for the coverage 

335 def write_covlog(covs): 

336 fLOG("[main_wrapper_tests] add debug information") 

337 outcov = os.path.join(report_folder, "covlog.txt") 

338 rows = [] 

339 rows.append("COVERAGE OPTIONS") 

340 for k, v in sorted(coverage_options.items()): 

341 rows.append(f"{k}={v}") 

342 rows.append("") 

343 rows.append("EXCLUDE LINES") 

344 for k in cov.get_exclude_list(): 

345 rows.append(k) 

346 rows.append("") 

347 rows.append("OPTIONS") 

348 for option_spec in sorted(cov.config.CONFIG_FILE_OPTIONS): 

349 attr = option_spec[0] 

350 if attr == "sort": 

351 # we skip, it raises an exception with coverage 4.2 

352 continue 

353 v = getattr(cov.config, attr) 

354 st = f"{attr}={v}" 

355 rows.append(st) 

356 rows.append("") 

357 

358 if covs is not None: 

359 for add in sorted(covs): 

360 rows.append(f"MERGE='{add}'") 

361 

362 content = "\n".join(rows) 

363 

364 reps = [] 

365 for _ in srcp_s[:1]: 

366 __ = os.path.normpath(os.path.join(_, "..", "..", "..")) 

367 __ += "/" 

368 reps.append(__) 

369 reps.append(__.replace("/", "\\")) 

370 reps.append(__.replace("/", "\\\\")) 

371 reps.append(__.replace("\\", "\\\\")) 

372 

373 for s in reps: 

374 content = content.replace(s, "") 

375 

376 with open(outcov, "w", encoding="utf8") as f: 

377 f.write(content) 

378 

379 write_covlog(None) 

380 

381 if dump_coverage is not None: 

382 # delayed import 

383 from ..filehelper import synchronize_folder 

384 src = os.path.dirname(outfile) 

385 stdout_this.write( 

386 "[main_wrapper_tests] dump coverage from '{1}' to '{0}'" 

387 "\n".format(dump_coverage, outfile)) 

388 synchronize_folder(src, dump_coverage, 

389 copy_1to2=True, fLOG=fLOG) 

390 

391 if other_cov_folders is not None: 

392 source = _find_source(src) 

393 if not source: 

394 raise FileNotFoundError( 

395 f"Unable to find source '{source}' from '{src}'") 

396 if coverage_root: 

397 source_src = os.path.join(source, coverage_root) 

398 if os.path.exists(source_src): 

399 source = source_src 

400 stdout_this.write( 

401 f"[main_wrapper_tests] ADD COVERAGE for source='{source}'\n") 

402 covs = list(_[0] for _ in other_cov_folders.values()) 

403 covs.append(os.path.abspath( 

404 os.path.normpath(os.path.join(src, '.coverage')))) 

405 stdout_this.write( 

406 f"[main_wrapper_tests] ADD COVERAGE COMBINE={covs}\n") 

407 stdout_this.write( 

408 f"[main_wrapper_tests] DUMP INTO='{src}'\n") 

409 try: 

410 coverage_combine(covs, src, source) 

411 write_covlog(covs) 

412 except Exception as e: 

413 warnings.warn("[main_wrapper_tests] {}".format( 

414 str(e).replace("\n", " "))) 

415 

416 if covtoken: 

417 if isinstance(covtoken, tuple): 

418 if eval(covtoken[1]): 

419 # publishing token 

420 mes = ( 

421 "[main_wrapper_tests] PUBLISH COVERAGE to " 

422 "codecov '{0}' EVAL ({1})".format( 

423 covtoken[0], covtoken[1])) 

424 if stdout is not None: 

425 stdout.write(mes) 

426 stdout_this.write(mes + '\n') 

427 fLOG(mes) 

428 publish_coverage_on_codecov( 

429 token=covtoken[0], path=outfile, fLOG=fLOG) 

430 else: 

431 fLOG( 

432 "[main_wrapper_tests] skip publishing " 

433 "coverage to codecov due to False:", 

434 covtoken[1]) 

435 else: 

436 # publishing token 

437 fLOG( 

438 "[main_wrapper_tests] publishing coverage to " 

439 "codecov %r." % covtoken) 

440 publish_coverage_on_codecov( 

441 token=covtoken, path=outfile, fLOG=fLOG) 

442 else: 

443 stdout_this.write( 

444 f"[main_wrapper_tests] NO PUBLISHING {covtoken}.\n") 

445 stdout_this.write("[main_wrapper_tests] --- COVERAGE END ---\n") 

446 else: 

447 stdout_this.write( 

448 "[main_wrapper_tests] --- NO COVERAGE BEGIN ---\n") 

449 if covtoken and (not isinstance(covtoken, tuple) or eval(covtoken[1])): 

450 raise CoverageException( # pragma: no cover 

451 "covtoken is not null but add_coverage is not True, coverage cannot be published") 

452 res = run_main() 

453 stdout_this.write("[main_wrapper_tests] --- NO COVERAGE END ---\n") 

454 

455 fLOG("[main_wrapper_tests] SUMMARY -------------------------") 

456 for r in res["tests"]: 

457 k = str(r[1]) 

458 if "errors=0" not in k or "failures=0" not in k: 

459 fLOG("*", r[1], r[0]) # pragma: no cover 

460 

461 fLOG("[main_wrapper_tests] CHECK EXCEPTION -----------------") 

462 err = res.get("err", "") 

463 if len(err) > 0: # pragma: no cover 

464 # Remove most of the Sphinx warnings (sphinx < 1.8) 

465 fLOG("[main_wrapper_tests] EXCEPTION BEGIN") 

466 fLOG(err) 

467 fLOG("[main_wrapper_tests] EXCEPTION END") 

468 if len(err) > 10000: 

469 raise TestWrappedException(err) # pragma: no cover 

470 

471 datetime_end = datetime.now() 

472 

473 rows = ["[main_wrapper_tests] END", 

474 f"[main_wrapper_tests] begin time {datetime_begin}", 

475 f"[main_wrapper_tests] end time {datetime_end}", 

476 f"[main_wrapper_tests] duration {datetime_end - datetime_begin}"] 

477 for row in rows: 

478 fLOG(row) 

479 stdout_this.write(row + "\n") 

480 return res