Coverage for pyquickhelper/loghelper/custom_log.py: 84%

62 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Creates a custom log (open a text file and flushes everything in it). 

5""" 

6import datetime 

7import os 

8 

9 

10class CustomLog: 

11 """ 

12 Implements a custom logging function. 

13 This class is not protected against multithreading. 

14 Usage: 

15 

16 :: 

17 

18 clog = CustomLog("folder") 

19 clog('[fct]', info) 

20 """ 

21 

22 def __init__(self, folder=None, filename=None, create=True, parent=None): 

23 """ 

24 initialisation 

25 

26 @param folder folder (created if not exists) 

27 @param filename new filename 

28 @param create force the creation 

29 @param parent logging function (called after this one if not None) 

30 """ 

31 folder = os.path.abspath(folder) 

32 self._folder = folder 

33 self._parent = parent 

34 if not os.path.exists(folder): 

35 os.makedirs(folder) # pragma: no cover 

36 typstr = str 

37 if filename is None: 

38 i = 0 

39 filename = "log_custom_%03d.txt" % i 

40 fullpath = os.path.join(folder, filename) 

41 while os.path.exists(fullpath): 

42 i += 1 

43 filename = "custom_log_%03d.txt" % i 

44 fullpath = os.path.join(folder, filename) 

45 self._filename = filename 

46 self._fullpath = fullpath 

47 self._handle = open(self._fullpath, "w", encoding="utf-8") 

48 self._close = True 

49 elif isinstance(filename, typstr): 

50 self._filename = filename 

51 self._fullpath = os.path.join(folder, filename) 

52 self._handle = open(self._fullpath, "w", encoding="utf-8") 

53 self._close = True 

54 else: 

55 self._handle = filename 

56 self._close = False 

57 self._filename = None 

58 self._fullpath = None 

59 

60 @property 

61 def filename(self): 

62 """ 

63 returns *_filename* 

64 """ 

65 return self._filename 

66 

67 @property 

68 def fullpath(self): 

69 """ 

70 returns *_fullpath* 

71 """ 

72 return self._fullpath 

73 

74 def __del__(self): 

75 """ 

76 Closes the stream if needed. 

77 """ 

78 if self._close: 

79 self._handle.close() 

80 

81 def __call__(self, *args, **kwargs): 

82 """ 

83 Log anything. 

84 """ 

85 self.fLOG(*args, **kwargs) 

86 if self._parent is not None: 

87 self._parent(*args, **kwargs) 

88 

89 def fLOG(self, *args, **kwargs): 

90 """ 

91 Builds a message on a single line with the date, it deals with encoding issues. 

92 

93 @param args list of fields 

94 @param kwargs dictionary of fields 

95 """ 

96 dt = datetime.datetime(2009, 1, 1).now() 

97 typstr = str 

98 if len(args) > 0: 

99 def _str_process(s): 

100 if isinstance(s, str): 

101 return s 

102 if isinstance(s, bytes): 

103 return s.decode("utf8") # pragma: no cover 

104 try: 

105 return str(s) 

106 except Exception as e: # pragma: no cover 

107 raise RuntimeError( 

108 f"Unable to convert s into string: type(s)={type(s)!r}") from e 

109 

110 message = (str(dt).split(".", maxsplit=1)[0] + " " + 

111 " ".join([_str_process(s) for s in args]) + "\n") 

112 

113 self._handle.write(message) 

114 st = " " 

115 else: 

116 st = typstr(dt).split(".", maxsplit=1)[0] + " " # pragma: no cover 

117 

118 for k, v in kwargs.items(): 

119 message = st + \ 

120 "%s = %s%s" % ( 

121 typstr(k), typstr(v), "\n") 

122 if "INNER JOIN" in message: 

123 break # pragma: no cover 

124 self._handle.write(message) 

125 self._handle.flush()