Coverage for src/ensae_teaching_cs/automation_students/projects_repository.py: 61%

561 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-04-28 06:23 +0200

1""" 

2@file 

3@brief Some automation helpers to grab mails from students about their projects. 

4""" 

5import re 

6import os 

7import sys 

8import json 

9import textwrap 

10import warnings 

11import zipfile 

12from urllib.parse import urlparse 

13import numpy 

14from pyquickhelper.loghelper import noLOG 

15from pyquickhelper.texthelper import remove_diacritics 

16from pyquickhelper.filehelper import remove_folder, explore_folder_iterfile 

17from pyquickhelper.filehelper import ( 

18 unzip_files, zip_files, ungzip_files, un7zip_files, unrar_files, 

19 untar_files 

20) 

21from pyquickhelper.helpgen import nb2html 

22from pyquickhelper.ipythonhelper import upgrade_notebook 

23from pymmails import EmailMessageRenderer, EmailMessage 

24from .repository_exception import RegexRepositoryException, TooManyProjectsException 

25from ..td_1a import edit_distance 

26from ..homeblog.python_exemple_py_to_html import py_to_html_file 

27 

28 

29class ProjectsRepository: 

30 """ 

31 Handle a repository of students projects. 

32 See example :ref:`sphx_glr_automation_fetch_student_projects_from_gmail.py`. 

33 """ 

34 

35 class MailNotFound(Exception): 

36 """ 

37 Raises an exception if mail not found. 

38 """ 

39 pass 

40 

41 _email_regex = re.compile("[*] *e?mails? *: *([^*+\\n]+)") 

42 _gitlab_regex = re.compile("[*] *gitlab *: *([^*+\\n]+[.]git)") 

43 _video_regex = re.compile("[*] *videos? *: *([^*\\n]+)") 

44 

45 def __init__(self, location, suivi="suivi.rst", fLOG=noLOG): 

46 """ 

47 Location of the repository. 

48 

49 @param location location of the repository 

50 @param suivi name of the file gathering information about each project 

51 """ 

52 self._location = location 

53 self._suivi = suivi 

54 self.fLOG = fLOG 

55 

56 @property 

57 def Location(self): 

58 """ 

59 @return location of the repository 

60 """ 

61 return self._location 

62 

63 @property 

64 def Groups(self): 

65 """ 

66 Returns all available groups in the repository. 

67 """ 

68 return [_ for _ in os.listdir(self._location) 

69 if os.path.isdir(os.path.join(self._location, _))] 

70 

71 def get_group_location(self, group): 

72 """ 

73 Returns the local folder associated to a group. 

74 

75 @param group group name 

76 @return local folder 

77 """ 

78 return os.path.join(self._location, group) 

79 

80 @staticmethod 

81 def get_regex(path, regex, suivi="suivi.rst", skip_if_empty=False): 

82 """ 

83 Retrieves data from file ``suivi.rst`` using a regular expression. 

84 

85 @param path sub folder to look into 

86 @param suivi name of the file ``suivi.rst`` 

87 @param skip_if_empty skip of no mail? 

88 @return list of mails 

89 """ 

90 if not os.path.exists(path): 

91 raise FileNotFoundError(path) # pragma: no cover 

92 filename = os.path.join(path, suivi) 

93 if not os.path.exists(filename): 

94 raise FileNotFoundError(filename) # pragma: no cover 

95 

96 try: 

97 with open(filename, "r", encoding="utf8") as f: 

98 content = f.read() 

99 except UnicodeDecodeError as e: 

100 raise ValueError( # pragma: no cover 

101 f'unable to parse file:\n File "{filename}", line 1') from e 

102 

103 mails = regex.findall(content) 

104 if len(mails) == 0: 

105 if skip_if_empty: 

106 return [] 

107 raise RuntimeError( # pragma: no cover 

108 "Unable to find the regular expression '{0}' in '{1}'".format( 

109 regex.pattern, filename)) 

110 

111 allmails = [] 

112 for m in mails: 

113 allmails.extend(m.strip("\n\r\t ").split(";")) 

114 

115 return [_.strip() for _ in allmails for _ in allmails] 

116 

117 def get_emails(self, group, skip_if_empty=False): 

118 """ 

119 Retrieves student emails from file ``suivi.rst``. 

120 

121 @param group group 

122 @param skip_if_empty skip if no mail? 

123 @return list of mails 

124 """ 

125 path = os.path.join(self._location, group) 

126 allmails = ProjectsRepository.get_regex(path, 

127 ProjectsRepository._email_regex, self._suivi, 

128 skip_if_empty=skip_if_empty) 

129 for a in allmails: 

130 if "\n" in a: 

131 raise ValueError( # pragma: no cover 

132 "unable to interpret " + str([a]) + " from path " + path) 

133 ff = a.split("@") 

134 if len(ff) != 2: 

135 raise RegexRepositoryException( # pragma: no cover 

136 "unable to understand mail {0} in {1} (suivi={2} (mail separator is ;)".format( 

137 a, 

138 path, 

139 self._suivi)) 

140 return allmails 

141 

142 def get_videos(self, group): 

143 """ 

144 Retrieves student emails from file ``suivi.rst``. 

145 

146 @param group group 

147 @return list of videos 

148 """ 

149 return ProjectsRepository.get_regex(group, ProjectsRepository._video_regex, self._suivi) 

150 

151 def get_sections(self, group): 

152 """ 

153 Extracts sections from a filename used to follow a group of students. 

154 

155 @param group group 

156 @return dictionary { section : content } 

157 

158 Example of a file:: 

159 

160 rapport 

161 +++++++ 

162 

163 * bla 1 

164 

165 extrait 

166 +++++++ 

167 

168 :: 

169 

170 paragraphe 1 

171 

172 paragraphe 2 

173 

174 """ 

175 path = os.path.join(self._location, group) 

176 if not os.path.exists(path): 

177 raise FileNotFoundError(path) # pragma: no cover 

178 filename = os.path.join(path, self._suivi) 

179 if not os.path.exists(filename): 

180 raise FileNotFoundError(filename) # pragma: no cover 

181 

182 try: 

183 with open(filename, "r", encoding="utf8") as f: 

184 content = f.read() 

185 except UnicodeDecodeError as e: 

186 raise ValueError( # pragma: no cover 

187 f'unable to parse file:\n File "{filename}", line 1') from e 

188 

189 lines = [_.strip("\r").rstrip() for _ in content.split("\n")] 

190 added_in = [] 

191 sections = {"": []} 

192 title = "" 

193 for i, line in enumerate(lines): 

194 if len(line) == 0: 

195 sections[title].append(line) 

196 added_in.append(title) 

197 else: 

198 f = line[0] 

199 if f == " ": 

200 if title is not None: 

201 sections[title].append(line) 

202 added_in.append(title) 

203 else: 

204 sections[""].append(line) 

205 added_in.append("") 

206 elif f in "=+-": 

207 if line == f * len(line): 

208 title = lines[i - 1] 

209 if len(added_in) > 0: 

210 t = added_in[-1] 

211 sections[t] = sections[t][:-1] 

212 added_in[-1] = title 

213 if f == "=": 

214 sections["title"] = [title] 

215 added_in.append("title") 

216 title = "title" 

217 else: 

218 sections[title] = [] 

219 added_in.append(title) 

220 else: 

221 sections[title].append(line) 

222 added_in.append(title) 

223 else: 

224 sections[title].append(line) 

225 added_in.append(title) 

226 

227 return sections 

228 

229 _regex_split = re.compile("[-;,. @]") 

230 

231 @staticmethod 

232 def match_mail(name, emails, threshold=3, exc=True): 

233 """ 

234 Tries to match a name among a list of mails. 

235 

236 @param name a name (first name last name separated by a space) 

237 @param emails list of emails 

238 @param threshold above this threshold, mails and names don't match 

239 @param exc raise an Exception if not found 

240 @return list of available mails, boolean 

241 

242 The second results is True if no email were found in the list. 

243 """ 

244 # we check the easy case 

245 if isinstance(name, float): 

246 name = str(name) if not numpy.isnan(name) else "" 

247 if name in emails: 

248 return [(0, name)] 

249 

250 pieces = [_.strip() for _ in ProjectsRepository._regex_split.split( 

251 remove_diacritics(name.lower()))] 

252 pieces.sort() 

253 pieces = " ".join(pieces) 

254 res = [] 

255 for email in emails: 

256 spl = [_.strip() for _ in ProjectsRepository._regex_split.split( 

257 remove_diacritics(email.split("@")[0].lower()))] 

258 spl.sort() 

259 mail = " ".join(spl) 

260 d = edit_distance(mail, pieces)[0] 

261 res.append((d, email)) 

262 res = [_ for _ in res if _[0] <= threshold] 

263 res.sort() 

264 if exc and len(res) == 0: 

265 raise ProjectsRepository.MailNotFound( # pragma: no cover 

266 "unable to find a mail for {0} among\n{1}".format(name, "\n".join(emails))) 

267 return res 

268 

269 @staticmethod 

270 def match_mails(names, emails, threshold=3, exc=True, skip_names=None): 

271 """ 

272 Tries to match a series of names among a list of mails. 

273 

274 @param names list of names (first name last name separated by a space) 

275 @param emails list of emails 

276 @param threshold above this threshold, mails and names don't match 

277 @param exc raise an Exception if not found 

278 @param skip_names the second boolean is True is one of the name 

279 belongs to this list 

280 @return list of available mails, boolean 

281 

282 The second results is True if no email were found in the list. 

283 """ 

284 res = [] 

285 skip = False 

286 for name in names: 

287 if skip_names is not None and name in skip_names: 

288 skip = True 

289 r = ProjectsRepository.match_mail(name, emails, threshold, exc) 

290 res.extend([_[1] for _ in r]) 

291 return res, skip 

292 

293 @staticmethod 

294 def create_folders_from_dataframe(df, root, report="suivi.rst", col_student=None, col_group="Groupe", 

295 col_subject="Sujet", col_mail="mail", overwrite=False, email_function=None, 

296 must_have_email=True, skip_if_nomail=False, skip_names=None, 

297 fLOG=noLOG): 

298 """ 

299 Creates a series of folders for groups of students. 

300 

301 @param root where to create the folders 

302 @param col_student column which contains the student name (firt name + last name), 

303 equal to *col_mail* if *None* 

304 @param col_group index of the group (it can be *None* if each student is a group) 

305 @param col_subject column which contains the subject 

306 @param col_mail if there is a column which contains the mail in the input dataframe 

307 @param df DataFrame 

308 @param email_function function which infers email from first and last names, see below 

309 @param report report file 

310 @param overwrite if False, skip if the report already exists 

311 @param must_have_email if True, raises an exception if no mail is found 

312 @param skip_if_nomail skip a name if no mail is found 

313 @param skip_names less checking for a given set of names 

314 @param fLOG logging function 

315 @return list of creates folders 

316 

317 The function *email_function* has the following signature:: 

318 

319 def email_function(names): 

320 # part of a names is a list of tokens 

321 # ... 

322 return list of mails, skip=boolean 

323 

324 The boolean tells the function to skip this group. 

325 *email_function* can be a list of mails. In that case, 

326 this function is replaced by @see me match_mails. 

327 """ 

328 if col_mail is None and email_function is None: 

329 raise ValueError( # pragma: no cover 

330 "col_mail cannot be None if email_function is None") 

331 if col_student is None: 

332 col_student = col_mail 

333 

334 def local_email_function(names, skip_names): 

335 return ProjectsRepository.match_mails(names, email_function, 

336 exc=False, skip_names=skip_names) 

337 

338 def local_email_function_column(names, skip_names, mapping): 

339 res = [] 

340 skip = False 

341 for name in names: 

342 if skip_names is not None and name in skip_names: 

343 skip = True 

344 r = mapping.get(name, None) 

345 if r: 

346 res.append(r) 

347 return res, skip 

348 

349 if isinstance(email_function, (list, set)): 

350 if col_mail is None: 

351 local_function = local_email_function 

352 else: 

353 try: 

354 ind_student = list(df.columns).index(col_student) + 1 

355 ind_mail = list(df.columns).index(col_mail) + 1 

356 except ValueError as e: 

357 raise ValueError( # pragma: no cover 

358 "Unable to find '{0}' or '{1}' in {2}".format( 

359 col_student, col_mail, df.columns)) from e 

360 mapping = {} 

361 for row in df.itertuples(): 

362 mapping[row[ind_student]] = row[ind_mail] 

363 local_function = \ 

364 lambda names, skip, mp=mapping: \ 

365 local_email_function_column(names, skip_names, mp) 

366 else: 

367 local_function = email_function 

368 

369 def ul(last): 

370 res = "" 

371 for i, c in enumerate(last): 

372 if c == " ": 

373 res += "." 

374 elif c == "-": 

375 res += "." 

376 elif c == '@': 

377 break 

378 else: 

379 res += c 

380 return res 

381 

382 folds = [] 

383 

384 if df.shape[1] == 0: 

385 raise RuntimeError("No column in the dataframe.") # pragma: no cover 

386 

387 if col_group: 

388 gr = df.groupby(col_group) 

389 else: 

390 df2 = df.copy() 

391 df2["gid"] = df.index 

392 df2["gid2"] = df2.gid.apply(lambda x: "G%d" % x) 

393 gr = df2.groupby("gid2") 

394 

395 fLOG("[ProjectsRepository.create_folders_from_dataframe] number of groups {0}".format( 

396 len(gr))) 

397 

398 for name, group in gr: 

399 if col_subject: 

400 s = list(set(group[col_subject].copy())) 

401 s = [_ for _ in s if not isinstance( 

402 _, float) or ~numpy.isnan(_)] 

403 if len(s) > 1: 

404 raise TooManyProjectsException( # pragma: no cover 

405 "more than one subject for group: " + str(name) + "\n" + str(s)) 

406 elif len(s) == 0: 

407 s = ["unknown"] 

408 subject = s[0] 

409 else: 

410 subject = None 

411 

412 eleves = list(group[col_student]) 

413 eleves.sort() 

414 

415 if email_function is not None: 

416 mails, skip = local_function(eleves, skip_names) 

417 if must_have_email and (not skip and len(mails) == 0): 

418 # we skip only if a group has no mails at all 

419 if isinstance(email_function, (list, set)): 

420 mes = "unable to find a mail for\n{0}\nname={1}\nskip:{4}\n{5}\namong\n{3}\nGROUP\n{2}\nlocal_function: {6}" 

421 raise ProjectsRepository.MailNotFound( # pragma: no cover 

422 mes.format("; ".join(f"'{_}'" for _ in eleves), 

423 name, group, "\n".join(email_function), 

424 skip, skip_names, local_function)) 

425 raise ProjectsRepository.MailNotFound( # pragma: no cover 

426 "unable to find a mail for {0}\nname={1}\n with function\n{3}\nGROUP\n{2}\nTYPE:\n{4}".format( 

427 " ;".join(eleves), name, group, email_function, type(email_function))) 

428 if skip_if_nomail and (not skip and len(mails) == 0): 

429 fLOG("[ProjectsRepository.create_folders_from_dataframe] skipping {0}".format( 

430 "; ".join(eleves))) 

431 continue 

432 if mails: 

433 for m in mails: 

434 if "@" not in m: 

435 raise ValueError( # pragma: no cover 

436 f"mails contains a mail with no @: {m}") 

437 if "<" in m or ">" in m: 

438 raise ValueError( # pragma: no cover 

439 f"one mail contains weird characters: {m}") 

440 jmail = "; ".join(mails) 

441 else: 

442 jmail = None 

443 else: 

444 jmail = None 

445 

446 if jmail is not None: 

447 if "@" not in jmail: 

448 raise ValueError( # pragma: no cover 

449 f"jmail does not contain any @: {jmail}") 

450 

451 members = ", ".join(map(str, eleves)) 

452 content = [members] 

453 content.append("=" * len(members)) 

454 content.append("") 

455 

456 content.append(f"* members: {members}") 

457 if subject: 

458 content.append(f"* subject: {subject}") 

459 content.append(f"* G: {name}") 

460 

461 if jmail: 

462 content.append("* mails: " + jmail) 

463 

464 content.append("") 

465 content.append("") 

466 

467 last = "-".join(ul(a) for a in sorted(map(str, eleves))) 

468 

469 folder = os.path.join(root, last) 

470 filename = os.path.join(folder, report) 

471 

472 if not os.path.exists(folder): 

473 if '@' in folder: 

474 raise ValueError( # pragma: no cover 

475 f"Folder '{folder}' must not contain '@'.") 

476 os.mkdir(folder) 

477 

478 if overwrite or not os.path.exists(filename): 

479 with open(filename, "w", encoding="utf8") as f: 

480 f.write("\n".join(content)) 

481 

482 folds.append(folder) 

483 

484 proj = ProjectsRepository(root, suivi=report, fLOG=fLOG) 

485 

486 if must_have_email: 

487 for gr in proj.Groups: 

488 mails = proj.get_emails(gr) 

489 if len(mails) == 0: 

490 raise ValueError( # pragma: no cover 

491 f"No mail for group '{gr}'.") 

492 return proj 

493 

494 def enumerate_group_mails(self, group, mailbox, subfolder, date=None, 

495 skip_function=None, max_dest=5): 

496 """ 

497 Enumerates all mails sent by or sent to a given group. 

498 

499 @param group group (if None, goes through all mails) 

500 @param mailbox mailbox (see `pymmails <http://www.xavierdupre.fr/app/pymmails/helpsphinx/>`_) 

501 @param subfolder which subfolder of the mailbox to look into 

502 @param date date 

503 @param skip_function if not None, use this function on the header/body to avoid loading the entire message (and skip it) 

504 @param max_dest maximum number of receivers 

505 @return iterator on mails 

506 """ 

507 if group is None: 

508 for group_ in self.Groups: 

509 self.fLOG( 

510 f"[ProjectsRepository.enumerate_group_mails] group='{group_}'") 

511 iter = self.enumerate_group_mails(group_, mailbox, subfolder=subfolder, 

512 date=date, skip_function=skip_function, max_dest=max_dest) 

513 for mail in iter: 

514 yield mail 

515 else: 

516 mails = self.get_emails(group) 

517 self.fLOG("[ProjectsRepository.enumerate_group_mails] mails='{0}' folder='{1}' date={2}".format( 

518 str(mails), subfolder, date)) 

519 iter = mailbox.enumerate_search_person( 

520 person=mails, 

521 folder=subfolder, 

522 skip_function=skip_function, 

523 date=date, 

524 max_dest=5) 

525 for mail in iter: 

526 yield mail 

527 

528 def dump_group_mails(self, renderer, group, mailbox, subfolder, date=None, 

529 skip_function=None, max_dest=5, filename="index_mails.html", 

530 overwrite=False, skip_if_empty=False, convert_files=False): 

531 """ 

532 Enumerates all mails sent by or sent to a given group. 

533 

534 @param renderer instance of class `EmailMessageListRenderer 

535 <http://www.xavierdupre.fr/app/pymmails/helpsphinx/pymmails/render/ 

536 email_message_list_renderer.html>`_ 

537 @param group group 

538 @param mailbox mailbox (see `pymmails <http://www.xavierdupre.fr/app/pymmails/helpsphinx/>`_) 

539 @param subfolder which subfolder of the mailbox to look into 

540 @param date date 

541 @param skip_function if not None, use this function on the header/body to avoid loading 

542 the entire message (and skip it) 

543 @param max_dest maximum number of receivers 

544 @param filename filename which gathers a link to every mail 

545 @param overwrite overwrite 

546 @param skip_if_empty skip if no mail? 

547 @param convert_files unzip and convert 

548 @return list of files (see `EmailMessageListRenderer.write 

549 <http://www.xavierdupre.fr/app/pymmails/helpsphinx/pymmails/render/ 

550 email_message_list_renderer.html>`_) 

551 

552 zip, gz, rar, 7z can be uncompressed. 

553 It then convert *.py* and *.ipynb* into html. 

554 """ 

555 if group is None: 

556 res = [] 

557 for group_ in self.Groups: 

558 r = self.dump_group_mails(renderer, group_, mailbox, subfolder=subfolder, 

559 date=date, skip_function=skip_function, max_dest=max_dest, 

560 overwrite=overwrite, skip_if_empty=skip_if_empty, 

561 convert_files=convert_files) 

562 res.extend(r) 

563 return res 

564 else: 

565 mails = self.get_emails(group, skip_if_empty=skip_if_empty) 

566 if skip_if_empty and len(mails) == 0: 

567 self.fLOG("[ProjectsRepository.dump_group_mails] SKIP group='{0}' folder='{1}' date={2} mails={3}".format( 

568 group, subfolder, date, str(mails))) 

569 return [] 

570 else: 

571 self.fLOG("[ProjectsRepository.dump_group_mails] group='{0}' folder='{1}' date={2} mails={3}".format( 

572 group, subfolder, date, str(mails))) 

573 

574 def iter_mail(body=True): 

575 return mailbox.enumerate_search_person(person=mails, folder=subfolder, 

576 skip_function=skip_function, date=date, 

577 max_dest=max_dest, body=body) 

578 nbmails = len(self.list_mails(group)) 

579 nbcur = len(list(iter_mail(body=False))) 

580 if nbmails != nbcur: 

581 overwrite = True 

582 self.fLOG("[dump_group_mails] group='{0}' - new mails".format( 

583 group), nbcur, "<", "nbmails") 

584 

585 iter = iter_mail(body=True) 

586 location = self.get_group_location(group) 

587 

588 r = renderer.write(iter=iter, location=location, 

589 filename=filename, overwrite=overwrite, 

590 file_jsatt="_summaryattachements_raw.json", 

591 attach_folder="attachments") 

592 renderer.flush() 

593 

594 # attachments in JSON format 

595 json_att = [] 

596 metadata = {} 

597 

598 for name in self.enumerate_group_files(group): 

599 if "attachments" not in name or not name.endswith('.metadata'): 

600 continue 

601 sname = os.path.relpath(name, location).replace("\\", "/") 

602 metadata[sname[:-9]] = sname 

603 

604 for name in self.enumerate_group_files(group): 

605 if "attachments" not in name or name.endswith('.metadata'): 

606 continue 

607 sname = os.path.relpath(name, location).replace("\\", "/") 

608 info = dict(a=sname, name=sname) 

609 if sname in metadata: 

610 info['info'] = f'<a href="{metadata[sname]}">metadata</a>' 

611 json_att.append(info) 

612 

613 if convert_files: 

614 converted = self.unzip_convert(group) 

615 for conv in converted: 

616 sconv = os.path.relpath(conv, location).replace("\\", "/") 

617 json_att.append( 

618 dict(a=sconv, name=sconv, unzip_convert='Yes')) 

619 

620 file_jsatt = os.path.join(location, "_summaryattachements.json") 

621 if json_att and not renderer.BufferWrite.exists(file_jsatt, local=not overwrite): 

622 f = renderer.BufferWrite.open( 

623 file_jsatt, text=True, encoding='utf-8') 

624 js = json.dumps(json_att) 

625 f.write(js) 

626 

627 return r 

628 

629 def remove_group(self, group): 

630 """ 

631 Removes a group. 

632 

633 @param group group 

634 @return list of removed files 

635 

636 See `remove_folder <http://www.xavierdupre.fr/app/pyquickhelper/helpsphinx/ 

637 pyquickhelper/filehelper/synchelper.html#module-pyquickhelper.filehelper.synchelper>`_. 

638 """ 

639 loc = self.get_group_location(group) 

640 return remove_folder(loc) 

641 

642 def enumerate_group_files(self, group): 

643 """ 

644 Enumerates all files in a group. 

645 

646 @param group group 

647 @return iterator on files 

648 """ 

649 if group is None: 

650 for g in self.Groups: 

651 for _ in self.enumerate_group_files(g): 

652 yield _ 

653 else: 

654 loc = self.get_group_location(group) 

655 for _ in explore_folder_iterfile(loc): 

656 yield _ 

657 

658 def list_mails(self, group): 

659 """ 

660 Returns the number of mails of a group. 

661 

662 @param group group name 

663 @return list of mails 

664 """ 

665 names = list(self.enumerate_group_files(group)) 

666 mails = [] 

667 for name in names: 

668 if "attachments" in name: 

669 continue 

670 name_d = os.path.split(name)[-1] 

671 if name_d.startswith("d_") and name_d.endswith(".html"): 

672 mails.append(name) 

673 return mails 

674 

675 def zip_group(self, group, outfile, addition=None): 

676 """ 

677 Zips a group. 

678 

679 @param group group 

680 @param outfile output file 

681 @param addition additional files (sequence) 

682 @return list of zipped files 

683 """ 

684 def iter_files(): 

685 for _ in self.enumerate_group_files(group): 

686 yield _ 

687 if addition: 

688 for _ in addition: 

689 yield _ 

690 return zip_files(outfile, iter_files(), root=self._location) 

691 

692 _link_regex = re.compile("(https?[:][^ \\\"<>)(]+)") 

693 

694 _known_strings = ["xavierdupre.fr", "doodle", "ensaenotebook", "teralab", 

695 "outlook.com", "gohlke", "support.google", "help.github", 

696 "api.jcdecaux"] 

697 

698 _default_template_summary = """<?xml version="1.0" encoding="utf-8"?> 

699 <head> 

700 <meta http-equiv="content-type" content="text/html; charset=utf-8" /> 

701 </head> 

702 <body> 

703 <html> 

704 <head> 

705 <title>{{ title }}</title> 

706 <link rel="stylesheet" type="text/css" href="{{ css }}"> 

707 </head> 

708 <body> 

709 <h1>{{ title }}</h1> 

710 <ol type="1"> 

711 {% for ps in groups %} 

712 <li><a href="{{ ps["link"] }}">{{ ps["group"] }}</a><small><i> 

713 {{ ps["nb"] }} files - {{ format_size(ps["size"]) }} - 

714 {% if len(ps["emails"]) > 0 %} 

715 last mail {{ ps["emails"][-1]["date"] }} ---{% else %} 

716 No mail found. {% endif %} 

717 {{ len(ps["attachments"]) }} attachments</i></small> 

718 {% if len(ps["attachments"]) + len(ps["links"]) > 0 %} 

719 <ul> 

720 {% for day, att, data in ps["attachments"] %} 

721 <li>att: {{ day }} - <a href="{{ att }}">{{ os.path.split(att)[-1] }}</a></li> 

722 {% endfor %} 

723 {% for date, from_, url, domain, last in ps["links"] %} 

724 <li>link: {{ date }} <a href="{{ url }}">{{ domain }} // {{ last }}</a> from {{ from_ }}</li> 

725 {% endfor %} 

726 </ul> 

727 {% endif %} 

728 {% if len(ps["created_files"]) > 0 %} 

729 <ul> 

730 {% for name, relpath, size in ps["created_files"] %} 

731 <li>added: <a href="{{ relpath }}">{{ name }}</a> {{ size }}</li> 

732 {% endfor %} 

733 </ul> 

734 {% endif %} 

735 </li> 

736 {% endfor %} 

737 </ol> 

738 </body> 

739 </html> 

740 """.replace(" ", "") 

741 

742 def write_run_command(self, filename=None, renderer=None): 

743 """ 

744 Writes a command script to run a server for this local content. 

745 The server runs the javascripts fetching for local files. 

746 The content is available at ``http://localhost:9000/``. 

747 """ 

748 if filename is None: 

749 if sys.platform.startswith('win'): 

750 filename = "run_server.bat" 

751 else: 

752 filename = "run_server.sh" 

753 

754 url = "http://localhost:9000/" 

755 content = textwrap.dedent(""" 

756 echo Open a browser with url '{}' 

757 python3 -m http.server 9000 

758 """).format(url) 

759 dest = os.path.join(self.Location, filename) 

760 self.fLOG(f"[write_run_command] write '{dest}'.") 

761 with open(dest, 'w') as f: 

762 f.write(content) 

763 

764 def write_summary(self, renderer=None, link="index_mails.html", 

765 outfile="index.html", title="summary", 

766 nolink_if=None): 

767 """ 

768 Produces a summary and uses a :epkg:`Jinja2` template. 

769 

770 @param renderer instance of `EmailMessageRenderer 

771 <http://www.xavierdupre.fr/app/pymmails/ 

772 helpsphinx//pymmails/render/email_message_renderer.html>`_), 

773 can be None 

774 @param link look for this file in each folder 

775 @param outfile output file 

776 @param nolink_if link containing those strings will be removed (if None, a default set will be assigned) 

777 @param title title 

778 @return summary 

779 

780 The current default template is:: 

781 

782 .. runpython:: 

783 

784 from ensae_teaching_cs.automation_students.projects_repository import _default_template_summary_template 

785 print(_default_template_summary) 

786 """ 

787 if nolink_if is None: 

788 nolink_if = ProjectsRepository._known_strings 

789 

790 def filter_in(url): 

791 if "\n" in url or "\r" in url or "\t" in url: 

792 return False 

793 if url.endswith("&quot;"): 

794 return False 

795 for _ in nolink_if: 

796 if _ in url: 

797 return False 

798 if ".ipynb_checkpoints" in url: 

799 return False 

800 return True 

801 

802 def clean_url(u): 

803 u = u.replace("&#43;", "+").strip(".#'/ \r\n\t ") 

804 if u.endswith("&nbsp;"): 

805 u = u[:-6] 

806 return u 

807 

808 def url_domain_name(url): 

809 r = urlparse(url) 

810 domain = r.netloc 

811 name = [_ for _ in url.split("/") if _] 

812 last = name[-1] if len(name) > 0 else domain 

813 if len(last) > 30: 

814 last = last[-30:] 

815 return domain, clean_url(last) 

816 

817 def format_size(s): 

818 if s <= 2 ** 11: 

819 return f"{s} bytes" 

820 elif s <= 2 ** 21: 

821 return f"{s // 2 ** 10} Kb" 

822 elif s <= 2 ** 31: 

823 return f"{s // 2 ** 20} Mb" 

824 else: 

825 return f"{s // 2 ** 30} Gb" 

826 

827 groups = [] 

828 for group in self.Groups: 

829 lp = os.path.join(self.get_group_location(group), link) 

830 if os.path.exists(lp): 

831 c = os.path.relpath(lp, self._location), group 

832 else: 

833 c = f"file:///{group}", group 

834 nb_files = 0 

835 size = 0 

836 atts = [] 

837 emails = [] 

838 links = [] 

839 created_files = [] 

840 for name in self.enumerate_group_files(group): 

841 if name.endswith(".metadata"): 

842 continue 

843 loc = self.get_group_location(group) 

844 nb_files += 1 

845 tn = name 

846 size += os.stat(tn).st_size 

847 folder = os.path.split(name)[0] 

848 splf = folder.replace("\\", "/").split("/") 

849 if folder.endswith("attachments"): 

850 meta = name + ".metadata" 

851 if os.path.exists(meta): 

852 data = EmailMessage.read_metadata(meta) 

853 day = data["date"].strftime("%Y-%m-%d") 

854 else: 

855 data = None 

856 day = "" 

857 atts.append((day, os.path.relpath( 

858 name, self._location), data)) 

859 elif "attachments" in splf: 

860 rel = os.path.relpath(name, loc) 

861 dest = os.path.relpath(name, self._location) 

862 if rel == dest: 

863 raise RuntimeError( # pragma: no cover 

864 f"weird\n{rel}\n{dest}") 

865 ssize = format_size(os.stat(name).st_size) 

866 if "__MACOSX" not in rel and "__MACOSX" not in dest and \ 

867 ".ipynb_checkpoints" not in dest and ".ipynb_checkpoints" not in rel: 

868 created_files.append((rel, dest, ssize)) 

869 else: 

870 mail = os.path.split(name)[-1] 

871 res = EmailMessage.interpret_default_filename(mail) 

872 if "date" in res and "uid" in res and "from" in res: 

873 emails.append( 

874 (res["date"], res["from"], res["uid"], res)) 

875 with open(os.path.join(loc, mail), "r", encoding="utf8") as f: 

876 content = f.read() 

877 urls = ProjectsRepository._link_regex.findall(content) 

878 if urls: 

879 for u in set(urls): 

880 u = clean_url(u) 

881 if not filter_in(u): 

882 continue 

883 domain, last = url_domain_name(u) 

884 links.append( 

885 (res["date"], res["from"], clean_url(u), domain, last)) 

886 

887 # we sort 

888 atts.sort() 

889 links.sort() 

890 

891 # we clean duplicated links 

892 mlinks = links 

893 links = [] 

894 done = {} 

895 for date, from_, url, domain, last in mlinks: 

896 if url in done: 

897 continue 

898 if "__MACOSX" in url or "__MACOSX" in last or \ 

899 ".ipynb_checkpoints" in last or ".ipynb_checkpoints" in url: 

900 continue 

901 links.append((date, from_, url, domain, last)) 

902 done[url] = True 

903 

904 # we create the variable for the template 

905 emails = [_[-1] for _ in sorted(emails)] 

906 c = dict(link=c[0].replace("\\", "/"), group=c[1], nb=nb_files, 

907 size=size, attachments=atts, emails=emails, links=links, 

908 created_files=created_files) 

909 

910 groups.append(c) 

911 

912 # final summary 

913 if renderer is None: 

914 tmpl = ProjectsRepository._default_template_summary 

915 renderer = EmailMessageRenderer(tmpl=tmpl, fLOG=self.fLOG) 

916 dof = True 

917 else: 

918 dof = False 

919 res = renderer.write(filename=outfile, location=self.Location, 

920 mail=None, attachments=None, groups=groups, 

921 title=title, len=len, os=os, 

922 format_size=format_size) 

923 if dof: 

924 renderer.flush() 

925 return res 

926 

927 def unzip_convert(self, group): 

928 """ 

929 Unzips files and convert notebooks into :epkg:`HTML`. 

930 

931 @param group group name 

932 @return list of new files 

933 """ 

934 self.unzip_files(group) 

935 return self.convert_files(group) 

936 

937 def unzip_files(self, group): 

938 """ 

939 Unzips files and convert notebooks into :epkg:`HTML`. 

940 

941 @param group group name 

942 @return list of new filess 

943 """ 

944 def fvalid(zip_name, local_name): 

945 if "__pycache__" in zip_name: 

946 return False 

947 if zip_name.endswith(".pyc"): 

948 return False 

949 return True 

950 

951 def clean_f(folder): 

952 folder = folder.replace(" ", "_").replace( 

953 ",", "_").replace("&", "_").replace("\r", "_") 

954 folder = folder.replace("\n", "_").replace("\t", "_") 

955 return folder 

956 

957 names = list(self.enumerate_group_files(group)) 

958 files = [] 

959 for name in names: 

960 if "attachments" not in name: 

961 continue 

962 ext = os.path.splitext(name)[-1] 

963 if ext == ".zip": 

964 folder = os.path.splitext(name)[0] + "_zip" 

965 folder = clean_f(folder) 

966 if not os.path.exists(folder): 

967 self.fLOG( 

968 f"[ProjectsRepository.unzip_files] unzip '{name}'") 

969 self.fLOG( 

970 f"[ProjectsRepository.unzip_files] creating '{folder}'") 

971 os.makedirs(folder) 

972 try: 

973 lf = unzip_files( 

974 name, folder, fLOG=self.fLOG, fvalid=fvalid, fail_if_error=False) 

975 except (zipfile.BadZipFile, NotImplementedError, OSError) as e: 

976 self.fLOG( 

977 f"[ProjectsRepository.unzip_files] ERROR: unable to unzip '{name}' because of '{e}']") 

978 lf = [] 

979 files.extend(lf) 

980 else: 

981 # already done, we do not do it again 

982 pass 

983 elif ext == ".7z": 

984 folder = os.path.splitext(name)[0] + "_7z" 

985 folder = clean_f(folder) 

986 if not os.path.exists(folder): 

987 self.fLOG( 

988 f"[ProjectsRepository.un7zip_files] un7zip '{name}'") 

989 self.fLOG( 

990 f"[ProjectsRepository.un7zip_files] creating '{folder}'") 

991 os.makedirs(folder) 

992 lf = un7zip_files( 

993 name, folder, fLOG=self.fLOG, fvalid=fvalid) 

994 files.extend(lf) 

995 else: 

996 # already done, we do not do it again 

997 pass 

998 elif ext == ".rar": 

999 folder = os.path.splitext(name)[0] + "_rar" 

1000 folder = clean_f(folder) 

1001 if not os.path.exists(folder): 

1002 self.fLOG( 

1003 f"[ProjectsRepository.unrar_files] unrar '{name}'") 

1004 self.fLOG( 

1005 f"[ProjectsRepository.unrar_files] creating '{folder}'") 

1006 os.makedirs(folder) 

1007 lf = unrar_files( 

1008 name, folder, fLOG=self.fLOG, fvalid=fvalid) 

1009 files.extend(lf) 

1010 else: 

1011 # already done, we do not do it again 

1012 pass 

1013 elif name.endswith(".tar.gz"): 

1014 folder = os.path.splitext(name)[0] + "_targz" 

1015 folder = clean_f(folder) 

1016 if not os.path.exists(folder): 

1017 self.fLOG( 

1018 f"[ProjectsRepository.untar_files] ungzip '{name}'") 

1019 self.fLOG( 

1020 f"[ProjectsRepository.untar_files] creating '{folder}'") 

1021 os.makedirs(folder) 

1022 unzip = "pkl.gz" not in name 

1023 lf = untar_files(name, folder, fLOG=self.fLOG) 

1024 files.extend(lf) 

1025 else: 

1026 # already done, we do not do it again 

1027 pass 

1028 elif ext == ".gz": 

1029 folder = os.path.splitext(name)[0] + "_gz" 

1030 folder = clean_f(folder) 

1031 if not os.path.exists(folder): 

1032 self.fLOG( 

1033 f"[ProjectsRepository.ungzip_files] ungzip '{name}'") 

1034 self.fLOG( 

1035 f"[ProjectsRepository.ungzip_files] creating '{folder}'") 

1036 os.makedirs(folder) 

1037 unzip = "pkl.gz" not in name 

1038 lf = ungzip_files( 

1039 name, folder, fLOG=self.fLOG, fvalid=fvalid, unzip=unzip) 

1040 files.extend(lf) 

1041 else: 

1042 # already done, we do not do it again 

1043 pass 

1044 return files 

1045 

1046 def convert_files(self, group): 

1047 """ 

1048 Converts all notebooks and python scripts into :epkg:`HTML` for a group. 

1049 

1050 @param group group name 

1051 @return list of new files 

1052 """ 

1053 names = list(self.enumerate_group_files(group)) 

1054 files = [] 

1055 for name in names: 

1056 if "attachments" not in name: 

1057 continue 

1058 ext = os.path.splitext(name)[-1] 

1059 if ext == ".ipynb": 

1060 self.fLOG( 

1061 f"[ProjectsRepository.convert_files] convert '{name}'.") 

1062 out = name + ".html" 

1063 if os.path.exists(out): 

1064 warnings.warn( 

1065 f"[convert_files] overwriting '{out}'") 

1066 try: 

1067 upgrade_notebook(name) 

1068 nb2html(name, out, exc=False) 

1069 files.append(out) 

1070 except Exception as e: 

1071 warnings.warn( 

1072 f"Unable to convert a notebook '{name}' because of {e}.") 

1073 elif ext == ".py": 

1074 self.fLOG( 

1075 f"[ProjectsRepository.convert_files] convert '{name}'") 

1076 out = name + ".html" 

1077 if os.path.exists(out): 

1078 warnings.warn( 

1079 f"[convert_files] overwriting '{out}'") 

1080 try: 

1081 py_to_html_file(name, out, False, title=os.path.relpath( 

1082 name, self.get_group_location(group))) 

1083 files.append(out) 

1084 except Exception: 

1085 # the syntax of the python file might be wrong 

1086 warnings.warn( 

1087 f"unable to convert File \"{name}\"") 

1088 return files