Coverage for pyquickhelper/loghelper/flog.py: 86%
452 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief logging functionalities
6The function fLOG (or fLOG) is used to logged everything into a log file.
8::
10 from pyquickhelper.loghelper.flog import fLOG
11 fLOG(OutputPrint = True) # the logs are also displayed in the output stream
12 fLOG(LogPath = "...") # chanages the path returned by GetPath
13 fLOG("un", "deux", 4, ["gt"]) # log everything in a log file
15 from pyquickhelper.loghelper.flog import GetPath ()
16 print GetPath() # return the log path (file temp_log.txt)
18 fLOG(LogPath = "c:/temp/log_path") # change the log path, creates it if it does not exist
20@warning This module inserts static variable in module :epkg:`*py:sys`.
21 I was done to deal with several instances of the same module
22 in earlier versions of :epkg:`python`.
23"""
24import copy
25import datetime
26import decimal
27import math
28import os
29import pprint
30import random
31import sys
32import time
33import re
34import zipfile
35import urllib.request as urllib_request
36from .flog_fake_classes import FlogStatic, LogFakeFileStream, LogFileStream, PQHException
37from .run_cmd import run_cmd
40flog_static = FlogStatic()
43def init(path=None, filename=None, create=True, path_add=None):
44 """
45 initialisation
46 @param path new path, if path==*"###"*, then uses ``d:\\temp\\log_pyquickhelper``
47 if it exists or ``c:\\temp\\log_pyquickhelper`` if not
48 @param filename new filename
49 @param create force the creation
50 @param path_add subfolder to append to the current folder
52 This function is also called when LogPath is specified while calling function fLOG.
53 """
54 if path_add is None:
55 path_add = [] # pragma: no cover
56 if path is None:
57 path = flog_static.store_log_values["__log_path"]
59 if path == "###":
60 if sys.platform.startswith("win"): # pragma: no cover
61 path = "d:\\temp" if os.path.exists("d:\\temp") else "c:\\temp"
62 path = os.path.join(path, "log_pyquickhelper")
63 else:
64 path = "/tmp"
65 path = os.path.join(path, "log_pyquickhelper")
67 if len(path_add) > 0: # pragma: no cover
68 if not isinstance(path_add, list):
69 path_add = [path_add]
70 temp = []
71 for p in path_add:
72 spl = os.path.splitext(p)
73 temp.append(spl[0])
74 path = os.path.join(path, *temp)
76 if filename is None:
77 filename = flog_static.store_log_values["__log_file_name"]
79 if (flog_static.store_log_values["__log_path"] != path or flog_static.store_log_values["__log_file_name"] != filename) \
80 and flog_static.store_log_values["__log_file"] is not None:
81 flog_static.store_log_values["__log_file"].close()
82 flog_static.store_log_values["__log_file"] = None
83 flog_static.store_log_values["__log_path"] = path
84 flog_static.store_log_values["__log_file_name"] = filename
86 if create:
87 if not os.path.exists(flog_static.store_log_values["__log_path"]):
88 os.makedirs(flog_static.store_log_values["__log_path"])
89 else: # pragma: no cover
90 if not os.path.exists(flog_static.store_log_values["__log_path"]):
91 raise PQHException(
92 "unable to find path " + flog_static.store_log_values["__log_path"])
95def GetSepLine():
96 """
97 Always returns ``\\n``
98 """
99 return "\n" # pragma: no cover
102def GetPath():
103 """
104 returns a path where the log file is stored.
105 @return path to the logs
106 """
107 return flog_static.store_log_values["__log_path"]
110def Print(redirect=True):
111 """
112 if True, redirect everything which is displayed to the standard output
113 """
114 lock = flog_static.store_log_values.get("Lock", False)
115 if not lock:
116 flog_static.store_log_values["__log_display"] = redirect
119def GetLogFile(physical=False, filename=None):
120 """
121 Returns a file name containing the log
123 :param physical: use a physical file or not
124 :param filename: file name (if physical is True, default value is ``temp_log.txt``)
125 :return: a pointer to a log file
126 :rtype: str
127 :raises OSError: if this file cannot be created
128 """
129 if flog_static.store_log_values["__log_file"] is None:
130 if physical:
131 path = GetPath()
132 if flog_static.store_log_values["__log_file_name"] is None:
133 if os.path.exists(path):
134 flog_static.store_log_values["__log_file_name"] = os.path.join(
135 path, flog_static.store_log_values["__log_const"])
136 else:
137 raise PQHException( # pragma: no cover
138 "unable to create a log file in folder " + path)
140 if not isinstance(flog_static.store_log_values["__log_file_name"], str):
141 flog_static.store_log_values["__log_file"] = flog_static.store_log_values[
142 "__log_file_name"]
143 else:
144 flog_static.store_log_values[
145 "__log_file"] = LogFileStream(filename=filename)
146 else:
147 flog_static.store_log_values["__log_file"] = LogFakeFileStream()
149 return flog_static.store_log_values["__log_file"]
152def noLOG(*args, **kwargs):
153 """
154 does nothing
155 """
156 if len(args) > 0:
157 return args[0]
158 return None # pragma: no cover
161def fLOG(*args, **kwargs):
162 """
163 Builds a message on a single line with the date, it deals with encoding issues.
165 @param args list of fields
166 @param kwargs dictionary of fields (see below)
167 @exception OSError When the log file cannot be created.
169 About parameter *p*:
171 - if *p* contains *OutputPrint*, call ``Print(OutputPrint)``
172 - if *p* contains *LogPath*, it calls ``init(v)``
173 - if *p* contains *LogFile*, it changes the log file name
174 (it creates a new one, the previous is closed).
175 - if *p* contains *LogPathAdd*, it adds this path to the temporary file
176 - if *p* contains *Lock*, it locks option *OutputPrint*
177 - if *p* contains *UnLock*, it unlocks option *OutputPrint*
178 - if *p* contains *_pp*, it uses :epkg:`*py:pprint`
180 Example:
182 ::
184 fLOG (LogPath = "###", LogPathAdd = __file__, OutputPrint = True)
186 .. faqref::
187 :title: How to activate the logs?
189 The following instruction will do:
191 ::
193 fLOG(OutputPrint=True)
195 To log everything into a file:
197 ::
199 fLOG(OutputPrint=True, LogFile="log_file.txt")
201 Parameter *OutputStream* allows to print
202 the message on a different stream.
203 """
204 path_add = kwargs.get("LogPathAdd", [])
205 outstream = kwargs.get('OutputStream', None)
206 if outstream is not None:
207 del kwargs['OutputStream']
209 lock = kwargs.get("Lock", None)
210 if lock is not None:
211 flog_static.store_log_values["Lock"] = lock
213 if "LogFile" in kwargs and "LogPath" in kwargs:
214 init(kwargs["LogPath"], kwargs["LogFile"])
215 elif "LogFile" in kwargs:
216 init(filename=kwargs["LogFile"], path_add=path_add)
217 elif "LogPath" in kwargs:
218 init(path=kwargs["LogPath"], path_add=path_add)
220 def myprint(s):
221 if outstream is not None:
222 outstream.write(s + '\n')
223 else:
224 print(s) # pragma: no cover
226 if "OutputPrint" in kwargs:
227 Print(kwargs["OutputPrint"])
229 if "LogFile" in kwargs:
230 GetLogFile(True, filename=kwargs["LogFile"])
232 message = fLOGFormat(flog_static.store_log_values["__log_file_sep"],
233 *args, **kwargs)
234 GetLogFile().write(message)
235 if flog_static.store_log_values["__log_display"]:
236 try:
237 myprint(message.strip("\r\n"))
238 except UnicodeEncodeError: # pragma: no cover
239 mes = "\n".join(repr(message.strip("\r\n")).split("\\n"))
240 try:
241 myprint(mes)
242 except UnicodeEncodeError:
243 mes2 = mes.encode("utf-8").decode("cp1252", errors="ignore")
244 myprint(mes2)
246 GetLogFile().flush()
247 if len(args) > 0:
248 return args[0]
249 return None
252def fLOGFormat(sep, *args, **kwargs):
253 """
254 Formats a message.
256 @param sep line separator
257 @param args list of anything
258 @param kwargs dictioanry of anything
259 @return string
261 if *_pp* is True, the function uses :epkg:`*py:pprint`.
262 """
263 upp = kwargs.get('_pp', False)
264 dt = datetime.datetime(2009, 1, 1).now()
265 typstr = str
266 if len(args) > 0:
267 def _str_process(s):
268 if isinstance(s, str):
269 if upp:
270 return pprint.pformat(s)
271 return s
272 if isinstance(s, bytes):
273 return s.decode("utf8")
274 try:
275 if upp:
276 return pprint.pformat(s)
277 return typstr(s)
278 except Exception as e: # pragma: no cover
279 raise RuntimeError( # pragma: no cover
280 "unable to convert s into string: type(s)=" + str(type(s))) from e
282 message = (str(dt).split(".", maxsplit=1)[0] + " " +
283 " ".join([_str_process(s) for s in args]) + sep)
284 st = " "
285 else:
286 message = typstr(dt).split(".", maxsplit=1)[0] + " "
287 st = " "
289 messages = [message]
291 for k, v in kwargs.items():
292 if k in ("OutputPrint", '_pp') and v:
293 continue
294 message = st + f"{typstr(k)} = {typstr(v)}{sep}"
295 messages.append(message)
296 return sep.join(messages)
299def _this_fLOG(*args, **kwargs):
300 """
301 Other name private to this module.
302 """
303 fLOG(*args, **kwargs) # pragma: no cover
306def get_relative_path(folder, file, exists=True, absolute=True):
307 """
308 private function, return the relative path or absolute between a folder and a file,
309 use `relpath <https://docs.python.org/3/library/os.path.html#os.path.relpath>`_
311 @param folder folder
312 @param file file
313 @param exists check existence
314 @param absolute if True return a path which starts from the root
315 @return relative path
316 @rtype str
317 """
318 if exists:
319 if not os.path.exists(folder):
320 raise PQHException(folder + " does not exist.") # pragma: no cover
321 if not os.path.exists(file):
322 raise PQHException(file + " does not exist.") # pragma: no cover
323 sd = os.path.normpath(folder).replace("\\", "/").split("/")
324 sf = os.path.normpath(file).replace("\\", "/").split("/")
325 i = 0
326 while i < len(sd):
327 if i >= len(sf):
328 break
329 if sf[i] != sd[i]:
330 break
331 i += 1
332 if absolute:
333 res = copy.copy(sd)
334 else:
335 res = []
336 j = i
337 while i < len(sd):
338 i += 1
339 res.append("..") # pragma: no cover
340 res.extend(sf[j:])
341 return os.path.join(*res)
344def download(httpfile, path_unzip=None, outfile=None, flatten=True, fLOG=None):
345 """
346 Download a file to the folder path_unzip if not present, if the downloading is interrupted,
347 the next time, it will start from where it stopped. Before downloading, the function creates a temporary file,
348 which means the downloading has began. If the connection is lost, an exception is raised and the program stopped.
349 Next time, the program will detect the existence of the temporary file and will start downloading from where it previously stopped.
350 After it ends, the temporary file is removed.
352 @param httpfile (str) url
353 @param path_unzip (str) path where to unzip the file, if None, choose GetPath ()
354 @param outfile (str) if None, the function will assign a filename unless this parameter is specified
355 @param flatten (bool) put all files in the same folder (forget subfolders)
356 @param fLOG (str) logging function
357 @return local file name
358 """
359 if fLOG is None:
360 fLOG = noLOG
361 if fLOG == "fLOG":
362 fLOG = fLOG # pylint: disable=W0127
363 if path_unzip is None:
364 path_unzip = GetPath()
365 file = _check_source(httpfile, path_unzip=path_unzip,
366 outfile=outfile, flatten=flatten, fLOG=fLOG)
367 return file
370def unzip(file, path_unzip=None, outfile=None, flatten=True, fLOG=noLOG):
371 """
372 Unzips a file into the temporary folder,
373 the function expects to have only one zipped file.
375 @param file (str) zip files
376 @param path_unzip (str) where to unzip the file, if None, choose GetPath ()
377 @param outfile (str) if None, the function will assign a filename unless this parameter is specified
378 @param flatten (bool) put all files in the same folder (forget subfolders)
379 @return expanded file name
380 """
381 if path_unzip is None:
382 path_unzip = GetPath()
383 fLOG("[loghelper.flog] unzip file", file)
384 file = _check_source(file, path_unzip=path_unzip,
385 outfile=outfile, flatten=flatten, fLOG=fLOG)
387 nb = 0
388 while not os.path.exists(file) and nb < 10:
389 time.sleep(0.5)
390 nb += 1
392 if not os.path.exists(file):
393 raise FileNotFoundError(file) # pragma: no cover
395 return file
398def _get_file_url(url, path):
399 """
400 build a filename knowing an url
402 @param url url
403 @param path where to download the file
404 @return filename
405 """
406 path = path + "/" + \
407 url.replace("/", "!") \
408 .replace(":", "") \
409 .replace(".", "-") \
410 .replace("=", "_") \
411 .replace("?", "_")
412 spl = path.split("-")
413 if len(spl) >= 2:
414 ext = spl[len(spl) - 1].lower()
415 if 2 <= len(ext) <= 3 and ext in [
416 "png", "jpeg", "jpg", "zip", "txt", "gif", "py", "cpp",
417 "gz", "pdf", "tif", "py", "html", "h", "hpp", "cc"]:
418 spl = path.split("-")
419 spl = spl[:len(spl) - 1]
420 path = "-".join(spl) + "." + ext
421 return path
424def _get_file_txt(zipname):
425 """
426 build a filename knowing an url, same name but in default_path
427 @param zipname filename of the zip
428 @return filename
429 """
430 file = os.path.split(zipname)[1]
431 file = file.replace(".zip", ".txt")
432 file = file.replace(".ZIP", ".txt")
433 file = file.replace(".gz", ".txt")
434 file = file.replace(".GZ", ".txt")
435 return file
438def _check_zip_file(filename, path_unzip, outfile, flatten=True, fLOG=noLOG):
439 """
440 This function tests if a file is a zip file (extension zip),
441 if it is the case, it unzips it into another file and return the new name,
442 if the unzipped file already exists, the file is not unzipped a second time.
444 @param filename any filename (.zip or not), if txt, it has no effect
445 @param path_unzip if None, unzip it where it stands, otherwise, place it into path
446 @param outfile if None, the function will assign a filename unless this parameter is specified
447 @param flatten unzip all files into the same directory
448 @param fLOG logging function
449 @return the unzipped file or filename if the format was not zip
450 """
451 if path_unzip is None:
452 raise ValueError("path_unzip cannot be None") # pragma: no cover
453 file, ext = os.path.splitext(filename)
454 ext = ext.lower()
455 if ext == ".gz":
457 import gzip
459 if outfile is None:
460 dest = filename.split("!")
461 dest = dest[len(dest) - 1]
462 ext = os.path.splitext(dest)[1]
463 dest = dest.replace(ext, ".txt")
464 path = os.path.split(filename)
465 path = "/".join(path[:len(path) - 1])
466 dest = path + "/" + dest
467 else:
468 dest = outfile
470 if not os.path.exists(dest):
471 file = gzip.GzipFile(filename, "r")
472 if outfile is None:
473 dest = os.path.split(dest)[1]
474 dest = os.path.join(path_unzip, dest)
476 if os.path.exists(dest):
477 st1 = datetime.datetime.utcfromtimestamp(
478 os.stat(filename).st_mtime)
479 st2 = datetime.datetime.utcfromtimestamp(
480 os.stat(dest).st_mtime)
481 if st2 > st1:
482 fLOG("[loghelper.flog] ungzipping file (already done)", dest)
483 return dest
485 fLOG("[loghelper.flog] ungzipping file", dest)
486 f = open(dest, "w")
487 data = file.read(2 ** 27)
488 size = 0
489 while len(data) > 0:
490 size += len(data)
491 fLOG("[loghelper.flog] ungzipping ", size, "bytes")
492 if isinstance(data, bytes):
493 f.write(bytes.decode(data))
494 else:
495 f.write(data)
496 data = file.read(2 ** 27)
497 f.close()
498 file.close()
500 return dest
502 if ext == ".zip":
504 try:
505 file = zipfile.ZipFile(filename, "r")
506 except Exception as e: # pragma: no cover
507 fLOG("[loghelper.flog] problem with ", filename)
508 raise e
510 if len(file.infolist()) != 1:
511 if outfile is not None:
512 raise PQHException( # pragma: no cover
513 "the archive contains %d files and not one as you expected "
514 "by filling outfile" % len(file.infolist()))
515 fLOG("[loghelper.flog] unzip file (multiple) ", filename)
516 # message = "\n".join ([ fi.filename for fi in file.infolist() ] )
517 # raise RuntimeError.YstException("ColumnInfoSet.load_from_file: file %s contains no file or more than one file\n" + message)
518 folder = os.path.split(filename)[0]
519 todo = 0
520 _zip7_path = r"c:\Program Files\7-Zip"
521 zip7 = not flatten and os.path.exists(_zip7_path)
522 if zip7:
523 fLOG("[loghelper.flog] using ", _zip7_path) # pragma: no cover
524 wait = []
525 for info in file.infolist():
526 # equivalent to is_dir (Python 3.6+)
527 if info.filename[-1] == '/':
528 continue
529 fileinside = info.filename
530 dest = os.path.join(folder, fileinside)
531 if not os.path.exists(dest):
532 fol = os.path.split(dest)[0]
533 if not os.path.exists(fol):
534 os.makedirs(fol)
535 if os.path.exists(dest):
536 st1 = datetime.datetime.utcfromtimestamp(
537 os.stat(filename).st_mtime)
538 st2 = datetime.datetime.utcfromtimestamp(
539 os.stat(dest).st_mtime)
540 if st2 > st1:
541 continue
543 if not sys.platform.startswith("win") or not zip7:
544 data = file.read(fileinside)
545 if flatten:
546 dest2 = os.path.split(dest)[1]
547 dest2 = os.path.join(path_unzip, dest2)
548 else:
549 dest2 = dest
550 fLOG("[loghelper.flog] unzipping file", dest2)
551 wait.append(dest2)
552 f = open(dest2, "wb" if isinstance(
553 data, bytes) else "w")
554 f.write(data)
555 f.close()
556 else:
557 todo += 1
559 if todo > 0 and zip7: # pragma: no cover
560 dest = os.path.realpath(path_unzip)
561 cmd = '"' + _zip7_path + \
562 f'\\7z.exe" x -y -r -o"{dest}" "{os.path.realpath(filename)}"'
563 out, err = run_cmd(cmd, wait=True)
564 if len(err) > 0:
565 raise PQHException(
566 f"command {cmd} failed\n{err}")
567 if "Error" in out:
568 raise PQHException(
569 f"command {cmd} failed\n{out}")
570 else:
571 dest = path_unzip
573 file.close()
575 ch = False
576 while not ch:
577 ch = True
578 for a in wait:
579 if not os.path.exists(a): # pragma: no cover
580 ch = False
581 break
582 time.sleep(0.5)
584 return dest
586 else:
587 for info in file.infolist():
588 fileinside = info.filename
590 path = os.path.split(filename)
591 dest = outfile if outfile is not None else path[
592 0] + "/" + fileinside
593 if not os.path.exists(dest):
594 data = file.read(fileinside)
595 if outfile is None:
596 if flatten:
597 dest = os.path.split(dest)[1]
598 dest = os.path.join(path_unzip, dest)
599 else:
600 dest = os.path.join(path_unzip, dest)
602 if os.path.exists(dest):
603 st1 = datetime.datetime.utcfromtimestamp(
604 os.stat(filename).st_mtime)
605 st2 = datetime.datetime.utcfromtimestamp(
606 os.stat(dest).st_mtime)
607 if st2 > st1:
608 fLOG(
609 "[loghelper.flog] unzipping one file (already done)", dest)
610 return dest
612 fLOG("[loghelper.flog] unzipping one file", dest)
613 if isinstance(data, bytes):
614 f = open(dest, "wb")
615 f.write(data)
616 else:
617 f = open(dest, "w")
618 f.write(data)
619 f.close()
620 file.close()
621 return dest
623 return filename
626def _first_more_recent(f1, path):
627 """
628 Checks if the first file (opened url)
629 is more recent of the second file (path).
630 @param f1 opened url
631 @param path path name
632 @return boolean
633 """
634 typstr = str
635 s = typstr(f1.info())
636 da = re.compile("Last[-]Modified: (.+) GMT").search(s)
637 if da is None:
638 return True
639 else: # pragma: no cover
640 da = da.groups()[0]
641 gr = re.compile(
642 "[\\w, ]* ([ \\d]{2}) ([\\w]{3}) ([\\d]{4}) "
643 "([\\d]{2}):([\\d]{2}):([\\d]{2})").search(da)
644 if gr is None:
645 return True
646 gr = gr.groups()
647 dau = datetime.datetime(
648 int(gr[2]),
649 flog_static.store_log_values["month_date"][gr[1].lower()],
650 int(gr[0]), int(gr[3]), int(gr[4]), int(gr[5]))
652 p = time.ctime(os.path.getmtime(path))
653 gr = re.compile(
654 "[\\w, ]* ([\\w]{3}) ([ \\d]{2}) ([\\d]{2}):([\\d]{2}):"
655 "([\\d]{2}) ([\\d]{4})").search(p)
656 if gr is None:
657 return True
658 gr = gr.groups()
659 da = datetime.datetime(
660 int(gr[5]),
661 flog_static.store_log_values["month_date"][gr[0].lower()],
662 int(gr[1]), int(gr[2]), int(gr[3]), int(gr[4]))
664 file = da
665 return dau > file
668def _check_url_file(url, path_download, outfile, fLOG=noLOG):
669 """If *url* is an url, download the file and return the downloaded
670 if it has already been downloaded, it is not downloaded again.
671 @param url url
672 @param path_download download the file here
673 @param outfile if None, the function will assign a filename unless this parameter is specified
674 @param fLOG logging function
675 @return the filename
676 """
677 urll = url.lower()
678 if "http://" in urll or "https://" in urll:
679 dest = outfile if outfile is not None else _get_file_url(
680 url, path_download)
681 down = False
682 nyet = dest + ".notyet"
684 if os.path.exists(dest) and not os.path.exists(nyet):
685 try: # pragma: no cover
686 fLOG("[loghelper.flog] trying to connect", url)
687 f1 = urllib_request.urlopen(url)
688 down = _first_more_recent(f1, dest)
689 newdate = down
690 f1.close()
691 except IOError: # pragma: no cover
692 fLOG(
693 "unable to connect Internet, working offline for url", url)
694 down = False
695 else:
696 down = True
697 newdate = False
699 if down:
700 if newdate:
701 fLOG( # pragma: no cover
702 "[loghelper.flog] downloading (updated) ", url)
703 else:
704 fLOG("[loghelper.flog] downloading ", url)
706 if (len(url) > 4 and
707 url[-4].lower() in [".txt", ".csv", ".tsv", ".log", '.tmpl']):
708 fLOG( # pragma: no cover
709 f"[loghelper.flog] creating text file '{dest}'")
710 formatopen = "w" # pragma: no cover
711 else:
712 fLOG(
713 f"[loghelper.flog] creating binary file '{dest}'")
714 formatopen = "wb"
716 if os.path.exists(nyet): # pragma: no cover
717 size = os.stat(dest).st_size
718 fLOG("[loghelper.flog] resume downloading (stop at",
719 size, f") from '{url}'")
720 request = urllib_request.Request(url)
721 request.add_header("Range", "bytes=%d-" % size)
722 fu = urllib_request.urlopen(request)
723 f = open(dest, formatopen.replace( # pylint: disable=W1501
724 "w", "a")) # pylint: disable=W1501
725 else:
726 fLOG("[loghelper.flog] downloading ", url)
727 request = urllib_request.Request(url)
728 fu = urllib_request.urlopen(url)
729 f = open(dest, formatopen)
731 open(nyet, "w").close()
732 c = fu.read(2 ** 21)
733 size = 0
734 while len(c) > 0:
735 size += len(c)
736 fLOG("[loghelper.flog] size", size)
737 f.write(c)
738 f.flush()
739 c = fu.read(2 ** 21)
740 fLOG("[loghelper.flog] end downloading")
741 f.close()
742 fu.close()
743 os.remove(nyet)
745 url = dest
746 return url
749def _check_source(fileurl, path_unzip, outfile, flatten=True, fLOG=noLOG):
750 """
751 Check the existence of a file, downloads it if not existing.
753 @param fileurl can be an url, a zip file, a text file
754 @param path_unzip if None, unzip the file where it stands, otherwise, put it in path
755 @param outfile if None, the function will assign a filename unless this parameter is specified
756 @param flatten extract all files into the same directory
757 @param fLOG logging function
758 @return a text file name
760 if it is:
761 - an url: download it and copy it into default_path
762 - a zipfile: beside the true file
763 - a text file: do nothing
765 If the file has already been downloaded and unzipped, it is not done twice.
766 """
767 if outfile is not None and os.path.splitext(
768 outfile)[1].lower() == os.path.splitext(fileurl)[1].lower():
769 file = _check_url_file(
770 fileurl, path_download=path_unzip, outfile=outfile, fLOG=fLOG)
771 return file
772 else:
773 file = _check_url_file(
774 fileurl, path_download=path_unzip, outfile=None, fLOG=fLOG)
775 txt = _check_zip_file(
776 file, path_unzip=path_unzip, outfile=outfile, fLOG=fLOG, flatten=flatten)
777 if not os.path.exists(txt): # pragma: no cover
778 message = "_check_source: unable to find file '" + \
779 txt + "' source '" + fileurl + "'"
780 raise PQHException(message)
781 return txt
784def get_prefix():
785 """
786 Returns a prefix for a file based on time.
787 """
788 typstr = str
789 t = datetime.datetime(2010, 1, 1).now()
790 t = typstr(t).replace(":", "_").replace("/", "_").replace(" ", "_")
791 t += "_" + typstr(random.randint(0, 1000000)) + "_"
792 return os.path.join(GetPath(), "temp_" + t)
795def removedirs(folder, silent=False, use_command_line=False):
796 """
797 Removes all files and folders in *folder*.
799 @param folder folder
800 @param silent silent mode or not
801 @param use_command_line see below
802 @return list of not remove files or folders
804 Sometimes it fails due to PermissionError exception,
805 in that case, you can try to remove the folder through the command
806 line ``rmdir /q /s + <folder>``. In that case, the function
807 does not return the list of removed files but the output of
808 the command line
809 """
810 if use_command_line:
811 if sys.platform.startswith("win"): # pragma: no cover
812 out, err = run_cmd("rmdir /s /q " + folder, wait=True)
813 else:
814 out, err = run_cmd("rm -Rf " + folder, wait=True)
815 if len(err) > 0: # pragma: no cover
816 raise RuntimeError(f"Unable to remove '{folder}'\n{err}")
817 return out
819 file, rep = [], []
820 for r, d, f in os.walk(folder):
821 for a in d:
822 rep.append(os.path.join(r, a))
823 for a in f:
824 file.append(os.path.join(r, a))
825 impos = []
826 file.sort()
827 rep.sort(reverse=True)
828 for f in file:
829 try:
830 if os.path.exists(f):
831 os.remove(f)
832 except Exception as e: # pragma: no cover
833 typstr = str
834 fLOG(
835 "Unable to remove file '{0}' --- {1}".format(f, typstr(e).replace("\n", " ")))
836 if silent:
837 impos.append(f)
838 else:
839 raise
840 for f in rep:
841 try:
842 if os.path.exists(f):
843 os.removedirs(f)
844 except Exception as e: # pragma: no cover
845 typstr = str
846 fLOG(
847 "Unable to remove folder '{0}' --- {1}".format(f, typstr(e).replace("\n", " ")))
848 if silent:
849 impos.append(f)
850 else:
851 raise
853 if os.path.exists(folder):
854 try:
855 os.rmdir(folder)
856 except Exception as e: # pragma: no cover
857 impos.append(folder)
858 return impos
861def guess_type_value(x, none=None):
862 """
863 Guessees the type of a value.
865 @param x type
866 @param none if True and all values are empty, return None
867 @return type
869 @warning if an integer starts with a zero, then it is a string
870 """
871 try:
872 int(x)
873 if x[0] == '0' and len(x) > 1:
874 return str
875 return int if len(x) < 9 else str
876 except ValueError:
877 try:
878 x = float(x)
879 return float # pragma: no cover
880 except ValueError:
881 if none:
882 if x is None:
883 return None # pragma: no cover
884 try:
885 if len(x) > 0:
886 return str # pragma: no cover
887 return None
888 except Exception: # pragma: no cover
889 return None
890 return str
893def guess_type_value_type(none=True):
894 """
895 @param none if True and all values are empty, return None
896 @return the list of types recognized by guess_type_value
897 """
898 typstr = str
899 return [None, typstr, int, float] if none else [typstr, int, float]
902def get_default_value_type(ty, none=True):
903 """
904 @param ty type in guess_type_value_type
905 @param none if True and all values are empty, return None
906 @return a default value for this type
907 """
908 if ty is None and none:
909 return None
910 if ty == str:
911 return ""
912 if ty == int:
913 return 0
914 if ty == decimal.Decimal:
915 return decimal.Decimal(0)
916 if ty == float:
917 return 0.0
918 raise PQHException( # pragma: no cover
919 "type expected in " + str(guess_type_value_type()))
922def guess_type_list(args, tolerance=0.01, none=True):
923 """
924 guess the type of a list
925 @param args list
926 @param tolerance let's denote m as the frequency of the most representative type,
927 and m2 the second one, if m2 > m * tolerance --> str
928 @param none if True and all values are empty, return None
929 @return type, length (order of preference (int, float, str))
930 the parameter length has a meaning only for str result
931 """
932 defa = None if none else str
933 length = 0
934 typstr = str
935 if args in [typstr, float, int, None, decimal.Decimal]:
936 raise PQHException("this case is unexpected %s" %
937 typstr(args)) # pragma: no cover
939 if len(args) == 0:
940 res = defa
942 elif len(args) == 1:
943 res = guess_type_value(args[0], none)
944 if res == typstr:
945 length = len(args[0])
946 else:
947 count = {}
948 for x in args:
949 t = guess_type_value(x, none)
950 length = max(length, len(x))
951 if t in count:
952 count[t] += 1
953 else:
954 count[t] = 1
956 val = [(v, k) for k, v in count.items()]
957 val.sort(reverse=True)
958 if len(val) == 1:
959 res = val[0][1]
960 elif val[0][0] * tolerance < val[1][0]:
961 res = str
962 else:
963 res = val[0][1]
965 if res != typstr:
966 olength = 0
967 else:
968 if length > 0:
969 x = math.log(length) / math.log(2) + 0.99999
970 x = int(x)
971 olength = math.exp(x * math.log(2)) + 0.9999
972 olength = int(olength) * 2
973 else:
974 olength = length
976 return res, olength
979def guess_machine_parameter():
980 """
981 Determines many parameters on this machine:
982 - machine name
983 - user name
984 - domain...
985 @return dictionary { name : value }
986 """
987 val = ["COMPUTERNAME", "NUMBER_OF_PROCESSORS", "OS",
988 "PATH", "USERDOMAIN", "USERNAME", "USERPROFILE",
989 "windir", "TEMP", "USER"]
990 res = {}
991 sep = ";" if sys.platform.startswith("win") else ":"
992 for v in val:
993 if v == "PATH":
994 x = os.getenv(v)
995 x = x.split(sep)
996 res[v] = x
997 else:
998 res[v] = os.getenv(v)
1000 if not sys.platform.startswith("win"):
1001 if "TEMP" not in res or res["TEMP"] is None:
1002 res["TEMP"] = "/tmp"
1004 return res
1007def IsEmptyString(s):
1008 """
1009 Empty string or not?
1011 :param s: any string (str, None)
1012 :return: is it empty or not?
1013 :rtype: bool
1014 :raises PQHException: when a type is unexpected
1015 """
1016 if s is None:
1017 return True
1018 if isinstance(s, str):
1019 return len(s) == 0
1020 raise PQHException( # pragma: no cover
1021 f"the type is unexpected {type(s)}")
1024def load_content_file_with_encoding(filename):
1025 """
1026 Tries different encoding to load a file, tries utf8, latin1 and None.
1027 @param filename filename
1028 @return couple (content, encoding)
1029 """
1030 error = None
1031 for enc in ["utf8", "latin1", None]:
1032 try:
1033 with open(filename, "r", encoding=enc) as f:
1034 content = f.read()
1035 return content, enc
1036 except Exception as e: # pragma: no cover
1037 error = e
1038 raise error # pragma: no cover