Coverage for pyquickhelper/filehelper/files_status.py: 76%
134 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief keep the status of a folder, assuming this folder is not moved
5"""
6import os
7import datetime
8from ..loghelper.flog import noLOG
9from .file_info import convert_st_date_to_datetime, checksum_md5, FileInfo
12class FilesStatus:
13 """
14 This class maintains a list of files
15 and does some verifications in order to check if a file
16 was modified or not (if yes, then it will be updated to the website).
17 """
19 def __init__(self, file, fLOG=noLOG):
20 """
21 file which will contains the status
22 @param file file, if None, fill _children
23 @param fLOG logging function
24 """
25 self._file = file
26 self.copyFiles = {}
27 self.fileKeep = file
28 self.LOG = fLOG
30 if os.path.exists(self.fileKeep):
31 with open(self.fileKeep, "r", encoding="utf8") as f:
32 for ni, _ in enumerate(f.readlines()):
33 if ni == 0 and _.startswith("\ufeff"):
34 _ = _[len("\ufeff"):] # pragma: no cover
35 spl = _.strip("\r\n ").split("\t")
36 try:
37 if len(spl) >= 2:
38 a, b = spl[:2]
39 obj = FileInfo(a, int(b), None, None, None)
40 if len(spl) > 2 and len(spl[2]) > 0:
41 obj.set_date(
42 convert_st_date_to_datetime(spl[2]))
43 if len(spl) > 3 and len(spl[3]) > 0:
44 obj.set_mdate(
45 convert_st_date_to_datetime(spl[3]))
46 if len(spl) > 4 and len(spl[4]) > 0:
47 obj.set_md5(spl[4])
48 self.copyFiles[a] = obj
49 else:
50 raise ValueError( # pragma: no cover
51 "expecting a filename and a date on this line: " + _)
52 except Exception as e:
53 raise RuntimeError( # pragma: no cover
54 f"issue with line:\n {_} -- {spl}") from e
56 # contains all file to update
57 self.modifiedFile = {}
59 def __iter__(self):
60 """
61 Iterates on all files stored in the current file,
62 yield a couple *(filename, FileInfo)*.
63 """
64 for a, b in self.copyFiles.items():
65 yield a, b
67 def iter_modified(self):
68 """
69 Iterates on all modified files yield a
70 couple *(filename, reason)*.
71 """
72 for a, b in self.modifiedFile:
73 yield a, b
75 def save_dates(self, checkfile=None):
76 """
77 Saves the status of the copy.
79 @param checkfile check the status for file checkfile
80 """
81 typstr = str
82 if checkfile is None:
83 checkfile = []
84 rows = []
85 for k in sorted(self.copyFiles):
86 obj = self.copyFiles[k]
87 da = "" if obj.date is None else str(obj.date)
88 mda = "" if obj.mdate is None else str(obj.mdate)
89 sum5 = "" if obj.checksum is None else str(obj.checksum)
91 if k in checkfile and len(da) == 0:
92 raise ValueError( # pragma: no cover
93 "There should be a date for file " + k + "\n" + str(obj))
94 if k in checkfile and len(mda) == 0:
95 raise ValueError( # pragma: no cover
96 "There should be a mdate for file " + k + "\n" + str(obj))
97 if k in checkfile and len(sum5) <= 10:
98 raise ValueError( # pragma: no cover
99 "There should be a checksum( for file " + k + "\n" + str(obj))
101 values = [k, typstr(obj.size), da, mda, sum5]
102 sval = "%s\n" % "\t".join(values)
103 if "\tNone" in sval:
104 raise AssertionError( # pragma: no cover
105 "This case should happen " + sval + "\n" + str(obj))
107 rows.append(sval)
109 with open(self.fileKeep, "w", encoding="utf8") as f:
110 for r in rows:
111 f.write(r)
113 def has_been_modified_and_reason(self, file):
114 """
115 Returns *(True, reason)* if a file was modified or *(False, None)* if not.
116 @param file filename
117 @return *(True, reason)* or *(False, None)*
118 """
119 res = True
120 reason = None
121 typstr = str
123 if file not in self.copyFiles:
124 reason = "new"
125 res = True
126 else:
127 obj = self.copyFiles[file]
128 st = os.stat(file)
129 if st.st_size != obj.size:
130 reason = f"size {str(st.st_size)} != old size {typstr(obj.size)}"
131 res = True
132 else:
133 ld = obj.mdate
134 _m = st.st_mtime
135 d = convert_st_date_to_datetime(_m)
136 if d != ld:
137 # dates are different but files might be the same
138 if obj.checksum is not None:
139 ch = checksum_md5(file)
140 if ch != obj.checksum:
141 reason = "date/md5 %s != old date %s md5 %s != %s" % (
142 typstr(ld), typstr(d), obj.checksum, ch)
143 res = True
144 else:
145 res = False
146 else:
147 # it cannot know, it does nothing
148 res = False
149 else:
150 # mda.... no expected modification (dates did not change)
151 res = False
153 return res, reason
155 def add_modified_file(self, file, reason):
156 """
157 Adds a file the modified list of files.
159 @param file file to add
160 @param reason reason for modification
161 """
162 if file in self.modifiedFile:
163 raise KeyError(f"file {file} is already present")
164 self.modifiedFile[file] = reason
166 def add_if_modified(self, file):
167 """
168 Adds a file to self.modifiedList if it was modified.
169 @param file filename
170 @return True or False
171 """
172 res, reason = self.has_been_modified_and_reason(file)
173 if res:
174 self.add_modified_file(res, reason)
175 return res
177 def difference(self, files, u4=False, nlog=None):
178 """
179 Goes through the list of files and tells which one has changed.
181 @param files @see cl FileTreeNode
182 @param u4 @see cl FileTreeNode (changes the output)
183 @param nlog if not None, print something every ``nlog`` processed files
184 @return iterator on files which changed
185 """
186 memo = {}
187 if u4:
188 nb = 0
189 for file in files:
190 memo[file.fullname] = True
191 if file._file is None:
192 continue
193 nb += 1
194 if nlog is not None and nb % nlog == 0:
195 self.LOG( # pragma: no cover
196 "[FileTreeStatus], processed", nb, "files")
198 full = file.fullname
199 r, reason = self.has_been_modified_and_reason(full)
200 if r:
201 if reason == "new":
202 r = (">+", file._file, file, None)
203 yield r
204 else:
205 r = (">", file._file, file, None)
206 yield r
207 else:
208 r = ("==", file._file, file, None)
209 yield r
210 else:
211 nb = 0
212 for file in files:
213 memo[file.fullpath] = True
214 nb += 1
215 if nlog is not None and nb % nlog == 0:
216 self.LOG("[FileTreeStatus], processed", nb, "files")
217 full = file.fullname
218 if self.has_been_modified_and_reason(full):
219 yield file
221 for file in self.copyFiles.values():
222 if file.filename not in memo:
223 yield ("<+", file.filename, None, None)
225 def update_copied_file(self, file, delete=False):
226 """
227 Updates the file in copyFiles (before saving), update all fields.
228 @param file filename
229 @param delete to remove this file
230 @return file object
231 """
232 if delete:
233 if file not in self.copyFiles:
234 raise FileNotFoundError( # pragma: no cover
235 f"Unable to find a file in the list of monitored files: '{file}'.")
236 del self.copyFiles[file]
237 return None
238 st = os.stat(file)
239 size = st.st_size
240 mdate = convert_st_date_to_datetime(st.st_mtime)
241 date = datetime.datetime.now()
242 md = checksum_md5(file)
243 obj = FileInfo(file, size, date, mdate, md)
244 self.copyFiles[file] = obj
245 return obj