Coverage for src/ensae_teaching_cs/automation_students/projects_repository.py: 61%
561 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-04-28 06:23 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2023-04-28 06:23 +0200
1"""
2@file
3@brief Some automation helpers to grab mails from students about their projects.
4"""
5import re
6import os
7import sys
8import json
9import textwrap
10import warnings
11import zipfile
12from urllib.parse import urlparse
13import numpy
14from pyquickhelper.loghelper import noLOG
15from pyquickhelper.texthelper import remove_diacritics
16from pyquickhelper.filehelper import remove_folder, explore_folder_iterfile
17from pyquickhelper.filehelper import (
18 unzip_files, zip_files, ungzip_files, un7zip_files, unrar_files,
19 untar_files
20)
21from pyquickhelper.helpgen import nb2html
22from pyquickhelper.ipythonhelper import upgrade_notebook
23from pymmails import EmailMessageRenderer, EmailMessage
24from .repository_exception import RegexRepositoryException, TooManyProjectsException
25from ..td_1a import edit_distance
26from ..homeblog.python_exemple_py_to_html import py_to_html_file
29class ProjectsRepository:
30 """
31 Handle a repository of students projects.
32 See example :ref:`sphx_glr_automation_fetch_student_projects_from_gmail.py`.
33 """
35 class MailNotFound(Exception):
36 """
37 Raises an exception if mail not found.
38 """
39 pass
41 _email_regex = re.compile("[*] *e?mails? *: *([^*+\\n]+)")
42 _gitlab_regex = re.compile("[*] *gitlab *: *([^*+\\n]+[.]git)")
43 _video_regex = re.compile("[*] *videos? *: *([^*\\n]+)")
45 def __init__(self, location, suivi="suivi.rst", fLOG=noLOG):
46 """
47 Location of the repository.
49 @param location location of the repository
50 @param suivi name of the file gathering information about each project
51 """
52 self._location = location
53 self._suivi = suivi
54 self.fLOG = fLOG
56 @property
57 def Location(self):
58 """
59 @return location of the repository
60 """
61 return self._location
63 @property
64 def Groups(self):
65 """
66 Returns all available groups in the repository.
67 """
68 return [_ for _ in os.listdir(self._location)
69 if os.path.isdir(os.path.join(self._location, _))]
71 def get_group_location(self, group):
72 """
73 Returns the local folder associated to a group.
75 @param group group name
76 @return local folder
77 """
78 return os.path.join(self._location, group)
80 @staticmethod
81 def get_regex(path, regex, suivi="suivi.rst", skip_if_empty=False):
82 """
83 Retrieves data from file ``suivi.rst`` using a regular expression.
85 @param path sub folder to look into
86 @param suivi name of the file ``suivi.rst``
87 @param skip_if_empty skip of no mail?
88 @return list of mails
89 """
90 if not os.path.exists(path):
91 raise FileNotFoundError(path) # pragma: no cover
92 filename = os.path.join(path, suivi)
93 if not os.path.exists(filename):
94 raise FileNotFoundError(filename) # pragma: no cover
96 try:
97 with open(filename, "r", encoding="utf8") as f:
98 content = f.read()
99 except UnicodeDecodeError as e:
100 raise ValueError( # pragma: no cover
101 f'unable to parse file:\n File "{filename}", line 1') from e
103 mails = regex.findall(content)
104 if len(mails) == 0:
105 if skip_if_empty:
106 return []
107 raise RuntimeError( # pragma: no cover
108 "Unable to find the regular expression '{0}' in '{1}'".format(
109 regex.pattern, filename))
111 allmails = []
112 for m in mails:
113 allmails.extend(m.strip("\n\r\t ").split(";"))
115 return [_.strip() for _ in allmails for _ in allmails]
117 def get_emails(self, group, skip_if_empty=False):
118 """
119 Retrieves student emails from file ``suivi.rst``.
121 @param group group
122 @param skip_if_empty skip if no mail?
123 @return list of mails
124 """
125 path = os.path.join(self._location, group)
126 allmails = ProjectsRepository.get_regex(path,
127 ProjectsRepository._email_regex, self._suivi,
128 skip_if_empty=skip_if_empty)
129 for a in allmails:
130 if "\n" in a:
131 raise ValueError( # pragma: no cover
132 "unable to interpret " + str([a]) + " from path " + path)
133 ff = a.split("@")
134 if len(ff) != 2:
135 raise RegexRepositoryException( # pragma: no cover
136 "unable to understand mail {0} in {1} (suivi={2} (mail separator is ;)".format(
137 a,
138 path,
139 self._suivi))
140 return allmails
142 def get_videos(self, group):
143 """
144 Retrieves student emails from file ``suivi.rst``.
146 @param group group
147 @return list of videos
148 """
149 return ProjectsRepository.get_regex(group, ProjectsRepository._video_regex, self._suivi)
151 def get_sections(self, group):
152 """
153 Extracts sections from a filename used to follow a group of students.
155 @param group group
156 @return dictionary { section : content }
158 Example of a file::
160 rapport
161 +++++++
163 * bla 1
165 extrait
166 +++++++
168 ::
170 paragraphe 1
172 paragraphe 2
174 """
175 path = os.path.join(self._location, group)
176 if not os.path.exists(path):
177 raise FileNotFoundError(path) # pragma: no cover
178 filename = os.path.join(path, self._suivi)
179 if not os.path.exists(filename):
180 raise FileNotFoundError(filename) # pragma: no cover
182 try:
183 with open(filename, "r", encoding="utf8") as f:
184 content = f.read()
185 except UnicodeDecodeError as e:
186 raise ValueError( # pragma: no cover
187 f'unable to parse file:\n File "{filename}", line 1') from e
189 lines = [_.strip("\r").rstrip() for _ in content.split("\n")]
190 added_in = []
191 sections = {"": []}
192 title = ""
193 for i, line in enumerate(lines):
194 if len(line) == 0:
195 sections[title].append(line)
196 added_in.append(title)
197 else:
198 f = line[0]
199 if f == " ":
200 if title is not None:
201 sections[title].append(line)
202 added_in.append(title)
203 else:
204 sections[""].append(line)
205 added_in.append("")
206 elif f in "=+-":
207 if line == f * len(line):
208 title = lines[i - 1]
209 if len(added_in) > 0:
210 t = added_in[-1]
211 sections[t] = sections[t][:-1]
212 added_in[-1] = title
213 if f == "=":
214 sections["title"] = [title]
215 added_in.append("title")
216 title = "title"
217 else:
218 sections[title] = []
219 added_in.append(title)
220 else:
221 sections[title].append(line)
222 added_in.append(title)
223 else:
224 sections[title].append(line)
225 added_in.append(title)
227 return sections
229 _regex_split = re.compile("[-;,. @]")
231 @staticmethod
232 def match_mail(name, emails, threshold=3, exc=True):
233 """
234 Tries to match a name among a list of mails.
236 @param name a name (first name last name separated by a space)
237 @param emails list of emails
238 @param threshold above this threshold, mails and names don't match
239 @param exc raise an Exception if not found
240 @return list of available mails, boolean
242 The second results is True if no email were found in the list.
243 """
244 # we check the easy case
245 if isinstance(name, float):
246 name = str(name) if not numpy.isnan(name) else ""
247 if name in emails:
248 return [(0, name)]
250 pieces = [_.strip() for _ in ProjectsRepository._regex_split.split(
251 remove_diacritics(name.lower()))]
252 pieces.sort()
253 pieces = " ".join(pieces)
254 res = []
255 for email in emails:
256 spl = [_.strip() for _ in ProjectsRepository._regex_split.split(
257 remove_diacritics(email.split("@")[0].lower()))]
258 spl.sort()
259 mail = " ".join(spl)
260 d = edit_distance(mail, pieces)[0]
261 res.append((d, email))
262 res = [_ for _ in res if _[0] <= threshold]
263 res.sort()
264 if exc and len(res) == 0:
265 raise ProjectsRepository.MailNotFound( # pragma: no cover
266 "unable to find a mail for {0} among\n{1}".format(name, "\n".join(emails)))
267 return res
269 @staticmethod
270 def match_mails(names, emails, threshold=3, exc=True, skip_names=None):
271 """
272 Tries to match a series of names among a list of mails.
274 @param names list of names (first name last name separated by a space)
275 @param emails list of emails
276 @param threshold above this threshold, mails and names don't match
277 @param exc raise an Exception if not found
278 @param skip_names the second boolean is True is one of the name
279 belongs to this list
280 @return list of available mails, boolean
282 The second results is True if no email were found in the list.
283 """
284 res = []
285 skip = False
286 for name in names:
287 if skip_names is not None and name in skip_names:
288 skip = True
289 r = ProjectsRepository.match_mail(name, emails, threshold, exc)
290 res.extend([_[1] for _ in r])
291 return res, skip
293 @staticmethod
294 def create_folders_from_dataframe(df, root, report="suivi.rst", col_student=None, col_group="Groupe",
295 col_subject="Sujet", col_mail="mail", overwrite=False, email_function=None,
296 must_have_email=True, skip_if_nomail=False, skip_names=None,
297 fLOG=noLOG):
298 """
299 Creates a series of folders for groups of students.
301 @param root where to create the folders
302 @param col_student column which contains the student name (firt name + last name),
303 equal to *col_mail* if *None*
304 @param col_group index of the group (it can be *None* if each student is a group)
305 @param col_subject column which contains the subject
306 @param col_mail if there is a column which contains the mail in the input dataframe
307 @param df DataFrame
308 @param email_function function which infers email from first and last names, see below
309 @param report report file
310 @param overwrite if False, skip if the report already exists
311 @param must_have_email if True, raises an exception if no mail is found
312 @param skip_if_nomail skip a name if no mail is found
313 @param skip_names less checking for a given set of names
314 @param fLOG logging function
315 @return list of creates folders
317 The function *email_function* has the following signature::
319 def email_function(names):
320 # part of a names is a list of tokens
321 # ...
322 return list of mails, skip=boolean
324 The boolean tells the function to skip this group.
325 *email_function* can be a list of mails. In that case,
326 this function is replaced by @see me match_mails.
327 """
328 if col_mail is None and email_function is None:
329 raise ValueError( # pragma: no cover
330 "col_mail cannot be None if email_function is None")
331 if col_student is None:
332 col_student = col_mail
334 def local_email_function(names, skip_names):
335 return ProjectsRepository.match_mails(names, email_function,
336 exc=False, skip_names=skip_names)
338 def local_email_function_column(names, skip_names, mapping):
339 res = []
340 skip = False
341 for name in names:
342 if skip_names is not None and name in skip_names:
343 skip = True
344 r = mapping.get(name, None)
345 if r:
346 res.append(r)
347 return res, skip
349 if isinstance(email_function, (list, set)):
350 if col_mail is None:
351 local_function = local_email_function
352 else:
353 try:
354 ind_student = list(df.columns).index(col_student) + 1
355 ind_mail = list(df.columns).index(col_mail) + 1
356 except ValueError as e:
357 raise ValueError( # pragma: no cover
358 "Unable to find '{0}' or '{1}' in {2}".format(
359 col_student, col_mail, df.columns)) from e
360 mapping = {}
361 for row in df.itertuples():
362 mapping[row[ind_student]] = row[ind_mail]
363 local_function = \
364 lambda names, skip, mp=mapping: \
365 local_email_function_column(names, skip_names, mp)
366 else:
367 local_function = email_function
369 def ul(last):
370 res = ""
371 for i, c in enumerate(last):
372 if c == " ":
373 res += "."
374 elif c == "-":
375 res += "."
376 elif c == '@':
377 break
378 else:
379 res += c
380 return res
382 folds = []
384 if df.shape[1] == 0:
385 raise RuntimeError("No column in the dataframe.") # pragma: no cover
387 if col_group:
388 gr = df.groupby(col_group)
389 else:
390 df2 = df.copy()
391 df2["gid"] = df.index
392 df2["gid2"] = df2.gid.apply(lambda x: "G%d" % x)
393 gr = df2.groupby("gid2")
395 fLOG("[ProjectsRepository.create_folders_from_dataframe] number of groups {0}".format(
396 len(gr)))
398 for name, group in gr:
399 if col_subject:
400 s = list(set(group[col_subject].copy()))
401 s = [_ for _ in s if not isinstance(
402 _, float) or ~numpy.isnan(_)]
403 if len(s) > 1:
404 raise TooManyProjectsException( # pragma: no cover
405 "more than one subject for group: " + str(name) + "\n" + str(s))
406 elif len(s) == 0:
407 s = ["unknown"]
408 subject = s[0]
409 else:
410 subject = None
412 eleves = list(group[col_student])
413 eleves.sort()
415 if email_function is not None:
416 mails, skip = local_function(eleves, skip_names)
417 if must_have_email and (not skip and len(mails) == 0):
418 # we skip only if a group has no mails at all
419 if isinstance(email_function, (list, set)):
420 mes = "unable to find a mail for\n{0}\nname={1}\nskip:{4}\n{5}\namong\n{3}\nGROUP\n{2}\nlocal_function: {6}"
421 raise ProjectsRepository.MailNotFound( # pragma: no cover
422 mes.format("; ".join(f"'{_}'" for _ in eleves),
423 name, group, "\n".join(email_function),
424 skip, skip_names, local_function))
425 raise ProjectsRepository.MailNotFound( # pragma: no cover
426 "unable to find a mail for {0}\nname={1}\n with function\n{3}\nGROUP\n{2}\nTYPE:\n{4}".format(
427 " ;".join(eleves), name, group, email_function, type(email_function)))
428 if skip_if_nomail and (not skip and len(mails) == 0):
429 fLOG("[ProjectsRepository.create_folders_from_dataframe] skipping {0}".format(
430 "; ".join(eleves)))
431 continue
432 if mails:
433 for m in mails:
434 if "@" not in m:
435 raise ValueError( # pragma: no cover
436 f"mails contains a mail with no @: {m}")
437 if "<" in m or ">" in m:
438 raise ValueError( # pragma: no cover
439 f"one mail contains weird characters: {m}")
440 jmail = "; ".join(mails)
441 else:
442 jmail = None
443 else:
444 jmail = None
446 if jmail is not None:
447 if "@" not in jmail:
448 raise ValueError( # pragma: no cover
449 f"jmail does not contain any @: {jmail}")
451 members = ", ".join(map(str, eleves))
452 content = [members]
453 content.append("=" * len(members))
454 content.append("")
456 content.append(f"* members: {members}")
457 if subject:
458 content.append(f"* subject: {subject}")
459 content.append(f"* G: {name}")
461 if jmail:
462 content.append("* mails: " + jmail)
464 content.append("")
465 content.append("")
467 last = "-".join(ul(a) for a in sorted(map(str, eleves)))
469 folder = os.path.join(root, last)
470 filename = os.path.join(folder, report)
472 if not os.path.exists(folder):
473 if '@' in folder:
474 raise ValueError( # pragma: no cover
475 f"Folder '{folder}' must not contain '@'.")
476 os.mkdir(folder)
478 if overwrite or not os.path.exists(filename):
479 with open(filename, "w", encoding="utf8") as f:
480 f.write("\n".join(content))
482 folds.append(folder)
484 proj = ProjectsRepository(root, suivi=report, fLOG=fLOG)
486 if must_have_email:
487 for gr in proj.Groups:
488 mails = proj.get_emails(gr)
489 if len(mails) == 0:
490 raise ValueError( # pragma: no cover
491 f"No mail for group '{gr}'.")
492 return proj
494 def enumerate_group_mails(self, group, mailbox, subfolder, date=None,
495 skip_function=None, max_dest=5):
496 """
497 Enumerates all mails sent by or sent to a given group.
499 @param group group (if None, goes through all mails)
500 @param mailbox mailbox (see `pymmails <http://www.xavierdupre.fr/app/pymmails/helpsphinx/>`_)
501 @param subfolder which subfolder of the mailbox to look into
502 @param date date
503 @param skip_function if not None, use this function on the header/body to avoid loading the entire message (and skip it)
504 @param max_dest maximum number of receivers
505 @return iterator on mails
506 """
507 if group is None:
508 for group_ in self.Groups:
509 self.fLOG(
510 f"[ProjectsRepository.enumerate_group_mails] group='{group_}'")
511 iter = self.enumerate_group_mails(group_, mailbox, subfolder=subfolder,
512 date=date, skip_function=skip_function, max_dest=max_dest)
513 for mail in iter:
514 yield mail
515 else:
516 mails = self.get_emails(group)
517 self.fLOG("[ProjectsRepository.enumerate_group_mails] mails='{0}' folder='{1}' date={2}".format(
518 str(mails), subfolder, date))
519 iter = mailbox.enumerate_search_person(
520 person=mails,
521 folder=subfolder,
522 skip_function=skip_function,
523 date=date,
524 max_dest=5)
525 for mail in iter:
526 yield mail
528 def dump_group_mails(self, renderer, group, mailbox, subfolder, date=None,
529 skip_function=None, max_dest=5, filename="index_mails.html",
530 overwrite=False, skip_if_empty=False, convert_files=False):
531 """
532 Enumerates all mails sent by or sent to a given group.
534 @param renderer instance of class `EmailMessageListRenderer
535 <http://www.xavierdupre.fr/app/pymmails/helpsphinx/pymmails/render/
536 email_message_list_renderer.html>`_
537 @param group group
538 @param mailbox mailbox (see `pymmails <http://www.xavierdupre.fr/app/pymmails/helpsphinx/>`_)
539 @param subfolder which subfolder of the mailbox to look into
540 @param date date
541 @param skip_function if not None, use this function on the header/body to avoid loading
542 the entire message (and skip it)
543 @param max_dest maximum number of receivers
544 @param filename filename which gathers a link to every mail
545 @param overwrite overwrite
546 @param skip_if_empty skip if no mail?
547 @param convert_files unzip and convert
548 @return list of files (see `EmailMessageListRenderer.write
549 <http://www.xavierdupre.fr/app/pymmails/helpsphinx/pymmails/render/
550 email_message_list_renderer.html>`_)
552 zip, gz, rar, 7z can be uncompressed.
553 It then convert *.py* and *.ipynb* into html.
554 """
555 if group is None:
556 res = []
557 for group_ in self.Groups:
558 r = self.dump_group_mails(renderer, group_, mailbox, subfolder=subfolder,
559 date=date, skip_function=skip_function, max_dest=max_dest,
560 overwrite=overwrite, skip_if_empty=skip_if_empty,
561 convert_files=convert_files)
562 res.extend(r)
563 return res
564 else:
565 mails = self.get_emails(group, skip_if_empty=skip_if_empty)
566 if skip_if_empty and len(mails) == 0:
567 self.fLOG("[ProjectsRepository.dump_group_mails] SKIP group='{0}' folder='{1}' date={2} mails={3}".format(
568 group, subfolder, date, str(mails)))
569 return []
570 else:
571 self.fLOG("[ProjectsRepository.dump_group_mails] group='{0}' folder='{1}' date={2} mails={3}".format(
572 group, subfolder, date, str(mails)))
574 def iter_mail(body=True):
575 return mailbox.enumerate_search_person(person=mails, folder=subfolder,
576 skip_function=skip_function, date=date,
577 max_dest=max_dest, body=body)
578 nbmails = len(self.list_mails(group))
579 nbcur = len(list(iter_mail(body=False)))
580 if nbmails != nbcur:
581 overwrite = True
582 self.fLOG("[dump_group_mails] group='{0}' - new mails".format(
583 group), nbcur, "<", "nbmails")
585 iter = iter_mail(body=True)
586 location = self.get_group_location(group)
588 r = renderer.write(iter=iter, location=location,
589 filename=filename, overwrite=overwrite,
590 file_jsatt="_summaryattachements_raw.json",
591 attach_folder="attachments")
592 renderer.flush()
594 # attachments in JSON format
595 json_att = []
596 metadata = {}
598 for name in self.enumerate_group_files(group):
599 if "attachments" not in name or not name.endswith('.metadata'):
600 continue
601 sname = os.path.relpath(name, location).replace("\\", "/")
602 metadata[sname[:-9]] = sname
604 for name in self.enumerate_group_files(group):
605 if "attachments" not in name or name.endswith('.metadata'):
606 continue
607 sname = os.path.relpath(name, location).replace("\\", "/")
608 info = dict(a=sname, name=sname)
609 if sname in metadata:
610 info['info'] = f'<a href="{metadata[sname]}">metadata</a>'
611 json_att.append(info)
613 if convert_files:
614 converted = self.unzip_convert(group)
615 for conv in converted:
616 sconv = os.path.relpath(conv, location).replace("\\", "/")
617 json_att.append(
618 dict(a=sconv, name=sconv, unzip_convert='Yes'))
620 file_jsatt = os.path.join(location, "_summaryattachements.json")
621 if json_att and not renderer.BufferWrite.exists(file_jsatt, local=not overwrite):
622 f = renderer.BufferWrite.open(
623 file_jsatt, text=True, encoding='utf-8')
624 js = json.dumps(json_att)
625 f.write(js)
627 return r
629 def remove_group(self, group):
630 """
631 Removes a group.
633 @param group group
634 @return list of removed files
636 See `remove_folder <http://www.xavierdupre.fr/app/pyquickhelper/helpsphinx/
637 pyquickhelper/filehelper/synchelper.html#module-pyquickhelper.filehelper.synchelper>`_.
638 """
639 loc = self.get_group_location(group)
640 return remove_folder(loc)
642 def enumerate_group_files(self, group):
643 """
644 Enumerates all files in a group.
646 @param group group
647 @return iterator on files
648 """
649 if group is None:
650 for g in self.Groups:
651 for _ in self.enumerate_group_files(g):
652 yield _
653 else:
654 loc = self.get_group_location(group)
655 for _ in explore_folder_iterfile(loc):
656 yield _
658 def list_mails(self, group):
659 """
660 Returns the number of mails of a group.
662 @param group group name
663 @return list of mails
664 """
665 names = list(self.enumerate_group_files(group))
666 mails = []
667 for name in names:
668 if "attachments" in name:
669 continue
670 name_d = os.path.split(name)[-1]
671 if name_d.startswith("d_") and name_d.endswith(".html"):
672 mails.append(name)
673 return mails
675 def zip_group(self, group, outfile, addition=None):
676 """
677 Zips a group.
679 @param group group
680 @param outfile output file
681 @param addition additional files (sequence)
682 @return list of zipped files
683 """
684 def iter_files():
685 for _ in self.enumerate_group_files(group):
686 yield _
687 if addition:
688 for _ in addition:
689 yield _
690 return zip_files(outfile, iter_files(), root=self._location)
692 _link_regex = re.compile("(https?[:][^ \\\"<>)(]+)")
694 _known_strings = ["xavierdupre.fr", "doodle", "ensaenotebook", "teralab",
695 "outlook.com", "gohlke", "support.google", "help.github",
696 "api.jcdecaux"]
698 _default_template_summary = """<?xml version="1.0" encoding="utf-8"?>
699 <head>
700 <meta http-equiv="content-type" content="text/html; charset=utf-8" />
701 </head>
702 <body>
703 <html>
704 <head>
705 <title>{{ title }}</title>
706 <link rel="stylesheet" type="text/css" href="{{ css }}">
707 </head>
708 <body>
709 <h1>{{ title }}</h1>
710 <ol type="1">
711 {% for ps in groups %}
712 <li><a href="{{ ps["link"] }}">{{ ps["group"] }}</a><small><i>
713 {{ ps["nb"] }} files - {{ format_size(ps["size"]) }} -
714 {% if len(ps["emails"]) > 0 %}
715 last mail {{ ps["emails"][-1]["date"] }} ---{% else %}
716 No mail found. {% endif %}
717 {{ len(ps["attachments"]) }} attachments</i></small>
718 {% if len(ps["attachments"]) + len(ps["links"]) > 0 %}
719 <ul>
720 {% for day, att, data in ps["attachments"] %}
721 <li>att: {{ day }} - <a href="{{ att }}">{{ os.path.split(att)[-1] }}</a></li>
722 {% endfor %}
723 {% for date, from_, url, domain, last in ps["links"] %}
724 <li>link: {{ date }} <a href="{{ url }}">{{ domain }} // {{ last }}</a> from {{ from_ }}</li>
725 {% endfor %}
726 </ul>
727 {% endif %}
728 {% if len(ps["created_files"]) > 0 %}
729 <ul>
730 {% for name, relpath, size in ps["created_files"] %}
731 <li>added: <a href="{{ relpath }}">{{ name }}</a> {{ size }}</li>
732 {% endfor %}
733 </ul>
734 {% endif %}
735 </li>
736 {% endfor %}
737 </ol>
738 </body>
739 </html>
740 """.replace(" ", "")
742 def write_run_command(self, filename=None, renderer=None):
743 """
744 Writes a command script to run a server for this local content.
745 The server runs the javascripts fetching for local files.
746 The content is available at ``http://localhost:9000/``.
747 """
748 if filename is None:
749 if sys.platform.startswith('win'):
750 filename = "run_server.bat"
751 else:
752 filename = "run_server.sh"
754 url = "http://localhost:9000/"
755 content = textwrap.dedent("""
756 echo Open a browser with url '{}'
757 python3 -m http.server 9000
758 """).format(url)
759 dest = os.path.join(self.Location, filename)
760 self.fLOG(f"[write_run_command] write '{dest}'.")
761 with open(dest, 'w') as f:
762 f.write(content)
764 def write_summary(self, renderer=None, link="index_mails.html",
765 outfile="index.html", title="summary",
766 nolink_if=None):
767 """
768 Produces a summary and uses a :epkg:`Jinja2` template.
770 @param renderer instance of `EmailMessageRenderer
771 <http://www.xavierdupre.fr/app/pymmails/
772 helpsphinx//pymmails/render/email_message_renderer.html>`_),
773 can be None
774 @param link look for this file in each folder
775 @param outfile output file
776 @param nolink_if link containing those strings will be removed (if None, a default set will be assigned)
777 @param title title
778 @return summary
780 The current default template is::
782 .. runpython::
784 from ensae_teaching_cs.automation_students.projects_repository import _default_template_summary_template
785 print(_default_template_summary)
786 """
787 if nolink_if is None:
788 nolink_if = ProjectsRepository._known_strings
790 def filter_in(url):
791 if "\n" in url or "\r" in url or "\t" in url:
792 return False
793 if url.endswith("""):
794 return False
795 for _ in nolink_if:
796 if _ in url:
797 return False
798 if ".ipynb_checkpoints" in url:
799 return False
800 return True
802 def clean_url(u):
803 u = u.replace("+", "+").strip(".#'/ \r\n\t ")
804 if u.endswith(" "):
805 u = u[:-6]
806 return u
808 def url_domain_name(url):
809 r = urlparse(url)
810 domain = r.netloc
811 name = [_ for _ in url.split("/") if _]
812 last = name[-1] if len(name) > 0 else domain
813 if len(last) > 30:
814 last = last[-30:]
815 return domain, clean_url(last)
817 def format_size(s):
818 if s <= 2 ** 11:
819 return f"{s} bytes"
820 elif s <= 2 ** 21:
821 return f"{s // 2 ** 10} Kb"
822 elif s <= 2 ** 31:
823 return f"{s // 2 ** 20} Mb"
824 else:
825 return f"{s // 2 ** 30} Gb"
827 groups = []
828 for group in self.Groups:
829 lp = os.path.join(self.get_group_location(group), link)
830 if os.path.exists(lp):
831 c = os.path.relpath(lp, self._location), group
832 else:
833 c = f"file:///{group}", group
834 nb_files = 0
835 size = 0
836 atts = []
837 emails = []
838 links = []
839 created_files = []
840 for name in self.enumerate_group_files(group):
841 if name.endswith(".metadata"):
842 continue
843 loc = self.get_group_location(group)
844 nb_files += 1
845 tn = name
846 size += os.stat(tn).st_size
847 folder = os.path.split(name)[0]
848 splf = folder.replace("\\", "/").split("/")
849 if folder.endswith("attachments"):
850 meta = name + ".metadata"
851 if os.path.exists(meta):
852 data = EmailMessage.read_metadata(meta)
853 day = data["date"].strftime("%Y-%m-%d")
854 else:
855 data = None
856 day = ""
857 atts.append((day, os.path.relpath(
858 name, self._location), data))
859 elif "attachments" in splf:
860 rel = os.path.relpath(name, loc)
861 dest = os.path.relpath(name, self._location)
862 if rel == dest:
863 raise RuntimeError( # pragma: no cover
864 f"weird\n{rel}\n{dest}")
865 ssize = format_size(os.stat(name).st_size)
866 if "__MACOSX" not in rel and "__MACOSX" not in dest and \
867 ".ipynb_checkpoints" not in dest and ".ipynb_checkpoints" not in rel:
868 created_files.append((rel, dest, ssize))
869 else:
870 mail = os.path.split(name)[-1]
871 res = EmailMessage.interpret_default_filename(mail)
872 if "date" in res and "uid" in res and "from" in res:
873 emails.append(
874 (res["date"], res["from"], res["uid"], res))
875 with open(os.path.join(loc, mail), "r", encoding="utf8") as f:
876 content = f.read()
877 urls = ProjectsRepository._link_regex.findall(content)
878 if urls:
879 for u in set(urls):
880 u = clean_url(u)
881 if not filter_in(u):
882 continue
883 domain, last = url_domain_name(u)
884 links.append(
885 (res["date"], res["from"], clean_url(u), domain, last))
887 # we sort
888 atts.sort()
889 links.sort()
891 # we clean duplicated links
892 mlinks = links
893 links = []
894 done = {}
895 for date, from_, url, domain, last in mlinks:
896 if url in done:
897 continue
898 if "__MACOSX" in url or "__MACOSX" in last or \
899 ".ipynb_checkpoints" in last or ".ipynb_checkpoints" in url:
900 continue
901 links.append((date, from_, url, domain, last))
902 done[url] = True
904 # we create the variable for the template
905 emails = [_[-1] for _ in sorted(emails)]
906 c = dict(link=c[0].replace("\\", "/"), group=c[1], nb=nb_files,
907 size=size, attachments=atts, emails=emails, links=links,
908 created_files=created_files)
910 groups.append(c)
912 # final summary
913 if renderer is None:
914 tmpl = ProjectsRepository._default_template_summary
915 renderer = EmailMessageRenderer(tmpl=tmpl, fLOG=self.fLOG)
916 dof = True
917 else:
918 dof = False
919 res = renderer.write(filename=outfile, location=self.Location,
920 mail=None, attachments=None, groups=groups,
921 title=title, len=len, os=os,
922 format_size=format_size)
923 if dof:
924 renderer.flush()
925 return res
927 def unzip_convert(self, group):
928 """
929 Unzips files and convert notebooks into :epkg:`HTML`.
931 @param group group name
932 @return list of new files
933 """
934 self.unzip_files(group)
935 return self.convert_files(group)
937 def unzip_files(self, group):
938 """
939 Unzips files and convert notebooks into :epkg:`HTML`.
941 @param group group name
942 @return list of new filess
943 """
944 def fvalid(zip_name, local_name):
945 if "__pycache__" in zip_name:
946 return False
947 if zip_name.endswith(".pyc"):
948 return False
949 return True
951 def clean_f(folder):
952 folder = folder.replace(" ", "_").replace(
953 ",", "_").replace("&", "_").replace("\r", "_")
954 folder = folder.replace("\n", "_").replace("\t", "_")
955 return folder
957 names = list(self.enumerate_group_files(group))
958 files = []
959 for name in names:
960 if "attachments" not in name:
961 continue
962 ext = os.path.splitext(name)[-1]
963 if ext == ".zip":
964 folder = os.path.splitext(name)[0] + "_zip"
965 folder = clean_f(folder)
966 if not os.path.exists(folder):
967 self.fLOG(
968 f"[ProjectsRepository.unzip_files] unzip '{name}'")
969 self.fLOG(
970 f"[ProjectsRepository.unzip_files] creating '{folder}'")
971 os.makedirs(folder)
972 try:
973 lf = unzip_files(
974 name, folder, fLOG=self.fLOG, fvalid=fvalid, fail_if_error=False)
975 except (zipfile.BadZipFile, NotImplementedError, OSError) as e:
976 self.fLOG(
977 f"[ProjectsRepository.unzip_files] ERROR: unable to unzip '{name}' because of '{e}']")
978 lf = []
979 files.extend(lf)
980 else:
981 # already done, we do not do it again
982 pass
983 elif ext == ".7z":
984 folder = os.path.splitext(name)[0] + "_7z"
985 folder = clean_f(folder)
986 if not os.path.exists(folder):
987 self.fLOG(
988 f"[ProjectsRepository.un7zip_files] un7zip '{name}'")
989 self.fLOG(
990 f"[ProjectsRepository.un7zip_files] creating '{folder}'")
991 os.makedirs(folder)
992 lf = un7zip_files(
993 name, folder, fLOG=self.fLOG, fvalid=fvalid)
994 files.extend(lf)
995 else:
996 # already done, we do not do it again
997 pass
998 elif ext == ".rar":
999 folder = os.path.splitext(name)[0] + "_rar"
1000 folder = clean_f(folder)
1001 if not os.path.exists(folder):
1002 self.fLOG(
1003 f"[ProjectsRepository.unrar_files] unrar '{name}'")
1004 self.fLOG(
1005 f"[ProjectsRepository.unrar_files] creating '{folder}'")
1006 os.makedirs(folder)
1007 lf = unrar_files(
1008 name, folder, fLOG=self.fLOG, fvalid=fvalid)
1009 files.extend(lf)
1010 else:
1011 # already done, we do not do it again
1012 pass
1013 elif name.endswith(".tar.gz"):
1014 folder = os.path.splitext(name)[0] + "_targz"
1015 folder = clean_f(folder)
1016 if not os.path.exists(folder):
1017 self.fLOG(
1018 f"[ProjectsRepository.untar_files] ungzip '{name}'")
1019 self.fLOG(
1020 f"[ProjectsRepository.untar_files] creating '{folder}'")
1021 os.makedirs(folder)
1022 unzip = "pkl.gz" not in name
1023 lf = untar_files(name, folder, fLOG=self.fLOG)
1024 files.extend(lf)
1025 else:
1026 # already done, we do not do it again
1027 pass
1028 elif ext == ".gz":
1029 folder = os.path.splitext(name)[0] + "_gz"
1030 folder = clean_f(folder)
1031 if not os.path.exists(folder):
1032 self.fLOG(
1033 f"[ProjectsRepository.ungzip_files] ungzip '{name}'")
1034 self.fLOG(
1035 f"[ProjectsRepository.ungzip_files] creating '{folder}'")
1036 os.makedirs(folder)
1037 unzip = "pkl.gz" not in name
1038 lf = ungzip_files(
1039 name, folder, fLOG=self.fLOG, fvalid=fvalid, unzip=unzip)
1040 files.extend(lf)
1041 else:
1042 # already done, we do not do it again
1043 pass
1044 return files
1046 def convert_files(self, group):
1047 """
1048 Converts all notebooks and python scripts into :epkg:`HTML` for a group.
1050 @param group group name
1051 @return list of new files
1052 """
1053 names = list(self.enumerate_group_files(group))
1054 files = []
1055 for name in names:
1056 if "attachments" not in name:
1057 continue
1058 ext = os.path.splitext(name)[-1]
1059 if ext == ".ipynb":
1060 self.fLOG(
1061 f"[ProjectsRepository.convert_files] convert '{name}'.")
1062 out = name + ".html"
1063 if os.path.exists(out):
1064 warnings.warn(
1065 f"[convert_files] overwriting '{out}'")
1066 try:
1067 upgrade_notebook(name)
1068 nb2html(name, out, exc=False)
1069 files.append(out)
1070 except Exception as e:
1071 warnings.warn(
1072 f"Unable to convert a notebook '{name}' because of {e}.")
1073 elif ext == ".py":
1074 self.fLOG(
1075 f"[ProjectsRepository.convert_files] convert '{name}'")
1076 out = name + ".html"
1077 if os.path.exists(out):
1078 warnings.warn(
1079 f"[convert_files] overwriting '{out}'")
1080 try:
1081 py_to_html_file(name, out, False, title=os.path.relpath(
1082 name, self.get_group_location(group)))
1083 files.append(out)
1084 except Exception:
1085 # the syntax of the python file might be wrong
1086 warnings.warn(
1087 f"unable to convert File \"{name}\"")
1088 return files