Coverage for src/ensae_teaching_cs/homeblog/copyfile.py: 69%

185 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-04-28 06:23 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Copy files 

5""" 

6import os 

7import shutil 

8import datetime 

9from pyquickhelper.loghelper import fLOG 

10from .clean_python_script_before_exporting_outside import cleanFileFromtohtmlreplace 

11from .utils_file import checksum_md5 

12from .latex_svg_gif import replace_file 

13 

14 

15class FileToCopy: 

16 

17 def __init__(self, filename, size, date, mdate, checksum): 

18 """constructor""" 

19 self.filename = filename 

20 self.size = size 

21 self.date = date 

22 self.mdate = mdate # modification date 

23 self.checksum = checksum 

24 if date is not None and not isinstance(self.date, datetime.datetime): 

25 raise ValueError( # pragma: no cover 

26 f"mismatch for date ({str(type(date))}) and file {filename}") 

27 if mdate is not None and not isinstance(self.mdate, datetime.datetime): 

28 raise ValueError( # pragma: no cover 

29 f"mismatch for mdate ({str(type(mdate))}) and file {filename}") 

30 if not isinstance(size, int): 

31 raise ValueError( # pragma: no cover 

32 f"mismatch for size ({str(type(size))}) and file {filename}") 

33 if checksum is not None and not isinstance(checksum, str): 

34 raise ValueError( # pragma: no cover 

35 f"mismatch for checksum ({str(type(checksum))}) and file {filename}") 

36 if date is not None and mdate is not None: 

37 if mdate > date: 

38 raise ValueError( 

39 "expecting mdate <= date for file " + filename) 

40 

41 def __str__(self): 

42 return "File[name=%s, size=%d (%s), mdate=%s (%s), date=%s (%s), md5=%s (%s)]" % \ 

43 (self.filename, 

44 self.size, str(type(self.size)), 

45 str(self.mdate), str(type(self.mdate)), 

46 str(self.date), str(type(self.date)), 

47 self.checksum, str(type(self.checksum))) 

48 

49 def set_date(self, date): 

50 self.date = date 

51 if not isinstance(self.date, datetime.datetime): 

52 raise ValueError("mismatch for date (%s) and file %s" % 

53 (str(type(date)), self.filename)) 

54 

55 def set_mdate(self, mdate): 

56 self.mdate = mdate 

57 if not isinstance(self.mdate, datetime.datetime): 

58 raise ValueError("mismatch for date (%s) and file %s" % 

59 (str(type(mdate)), self.filename)) 

60 

61 def set_md5(self, checksum): 

62 self.checksum = checksum 

63 if not isinstance(checksum, str): 

64 raise ValueError("mismatch for checksum (%s) and file %s" % 

65 (str(type(checksum)), self.filename)) 

66 

67 

68class CopyFileForFtp: 

69 """ 

70 This classes maintains a list of files 

71 and does some verifications in order to check if a file 

72 was modified or not (if yes, then it will be updated to the website). 

73 """ 

74 @staticmethod 

75 def convert_st_date_to_datetime(t): 

76 if isinstance(t, str): 

77 if "." in t: 

78 return datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S.%f") 

79 return datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S") 

80 return datetime.datetime.fromtimestamp(t) 

81 

82 def __init__(self, file, 

83 logfunction=fLOG, 

84 bloggiflatex="blog/giflatex/", 

85 giflatex="giflatex", 

86 giflatextemp="giflatex/temp", 

87 specificTrigger=False): 

88 self.copyFiles = {} 

89 self.fileKeep = file 

90 self.LOG = logfunction 

91 self.bloggiflatex = bloggiflatex 

92 self.giflatex = giflatex 

93 self.giflatextemp = giflatextemp 

94 self.specificTrigger = specificTrigger 

95 

96 if os.path.exists(self.fileKeep): # pragma: no cover 

97 f = open(self.fileKeep, "r") 

98 for _ in f.readlines(): 

99 spl = _.strip("\r\n ").split("\t") 

100 try: 

101 if len(spl) >= 2: 

102 a, b = spl[:2] 

103 obj = FileToCopy(a, int(b), None, None, None) 

104 if len(spl) > 2 and len(spl[2]) > 0: 

105 obj.set_date( 

106 CopyFileForFtp.convert_st_date_to_datetime(spl[2])) 

107 if len(spl) > 3 and len(spl[3]) > 0: 

108 obj.set_mdate( 

109 CopyFileForFtp.convert_st_date_to_datetime(spl[3])) 

110 if len(spl) > 4 and len(spl[4]) > 0: 

111 obj.set_md5(spl[4]) 

112 self.copyFiles[a] = obj 

113 else: 

114 raise ValueError( # pragma: no cover 

115 "expecting a filename and a date on this line: " + _) 

116 except Exception as e: # pragma: no cover 

117 fLOG("issue with line", _, spl) 

118 raise e 

119 

120 f.close() 

121 

122 # contains all file to update 

123 self.modifiedFile = [] 

124 

125 def save_dates(self, checkfile=None): 

126 """ 

127 Saves the status of the copy. 

128 

129 @param checkfile check the status for file checkfile 

130 """ 

131 if checkfile is None: 

132 checkfile = [] 

133 rows = [] 

134 for k in sorted(self.copyFiles): 

135 obj = self.copyFiles[k] 

136 da = "" if obj.date is None else str(obj.date) 

137 mda = "" if obj.mdate is None else str(obj.mdate) 

138 sum5 = "" if obj.checksum is None else str(obj.checksum) 

139 

140 if k in checkfile and len(da) == 0: 

141 raise ValueError( # pragma: no cover 

142 "there should be a date for file " + k + "\n" + str(obj)) 

143 if k in checkfile and len(mda) == 0: 

144 raise ValueError( # pragma: no cover 

145 "there should be a mdate for file " + k + "\n" + str(obj)) 

146 if k in checkfile and len(sum5) <= 10: 

147 raise ValueError( # pragma: no cover 

148 "there should be a checksum( for file " + k + "\n" + str(obj)) 

149 

150 values = [k, str(obj.size), da, mda, sum5] 

151 sval = "%s\n" % "\t".join(values) 

152 if "\tNone" in sval: 

153 raise AssertionError( # pragma: no cover 

154 "this case should happen " + sval + "\n" + str(obj)) 

155 

156 rows.append(sval) 

157 

158 f = open(self.fileKeep, "w") 

159 for r in rows: 

160 f.write(r) 

161 f.close() 

162 

163 def has_been_modified_and_reason(self, file): 

164 """ 

165 Returns True, reason if a file was modified or False, None if not. 

166 

167 @param file filename 

168 @return True,reason or False,None 

169 """ 

170 res = True 

171 reason = None 

172 

173 if file not in self.copyFiles: 

174 reason = "new" 

175 res = True 

176 else: 

177 obj = self.copyFiles[file] 

178 st = os.stat(file) 

179 if st.st_size != obj.size: 

180 reason = f"size {str(st.st_size)} != old size {str(obj.size)}" 

181 res = True 

182 else: 

183 l_ = obj.mdate 

184 _m = st.st_mtime 

185 d = CopyFileForFtp.convert_st_date_to_datetime(_m) 

186 if d != l_: 

187 # les dates sont différentes mais les fichiers peuvent être 

188 # différents 

189 if obj.checksum is not None: 

190 ch = checksum_md5(file) 

191 if ch != obj.checksum: 

192 reason = "date/md5 %s != old date %s md5 %s != %s" % ( 

193 str(l_), str(d), obj.checksum, ch) 

194 res = True 

195 else: 

196 res = False 

197 else: 

198 # on ne peut pas savoir, dans le doute, on s'abstient 

199 res = False 

200 else: 

201 # mda.... mais pas sûr (la date n'a pas changé) 

202 res = False 

203 

204 if res: 

205 self.modifiedFile.append((file, reason)) 

206 return res, reason 

207 

208 def add_if_modified(self, file): 

209 """ 

210 Adds a file to self.modifiedList if it was modified. 

211 

212 @param file filename 

213 @return True or False 

214 """ 

215 res, reason = self.has_been_modified_and_reason(file) 

216 if res: 

217 memo = [_ for _ in self.modifiedFile if _[0] == file] 

218 if len(memo) == 0: 

219 # not already added 

220 self.modifiedFile.append((file, reason)) 

221 return res 

222 

223 def update_copied_file(self, file): 

224 """ 

225 Updates the file in copyFiles (before saving), update all field. 

226 

227 @param file filename 

228 @return file object 

229 """ 

230 st = os.stat(file) 

231 size = st.st_size 

232 mdate = CopyFileForFtp.convert_st_date_to_datetime(st.st_mtime) 

233 date = datetime.datetime.now() 

234 md = checksum_md5(file) 

235 obj = FileToCopy(file, size, date, mdate, md) 

236 self.copyFiles[file] = obj 

237 return obj 

238 

239 def copy_file(self, file, to, doFTP=True, doClean=False, to_is_a_file=False): 

240 """ 

241 Processes a file copy. 

242 

243 @param file file to copy 

244 @param to destination (folder) 

245 @param doFTP if True, does some latex modifications (creates an image) 

246 @param doClean if True, does some cleaning before the copy 

247 (for script in pyhome having section such as the one in tableformula.py) 

248 @param to_is_a_file it means to is a file, not a folder 

249 """ 

250 if doClean and doFTP: 

251 raise AssertionError( # pragma: no cover 

252 "this case is not meant to happen, doClean and doFTP, set up at the same time") 

253 if len(to) == 0: 

254 raise ValueError( # pragma: no cover 

255 "an empty folder is not allowed for parameter to") 

256 

257 folder = to 

258 if not os.path.exists(folder): 

259 ffff, last = os.path.split(to) 

260 if to_is_a_file: 

261 folder = ffff 

262 elif "." in last: 

263 raise ValueError("are you sure to is not a file :" + to + "?") 

264 

265 if not os.path.exists(folder): 

266 self.LOG("[copy_file] creating folder ", folder) 

267 os.makedirs(folder) 

268 

269 if doFTP: 

270 if file not in self.copyFiles or \ 

271 os.stat(file).st_size != self.copyFiles[file].size: 

272 

273 # some exception for latex 

274 if self.specificTrigger and "2013-02-04" not in file and \ 

275 file.endswith(".html") and "-" in file: 

276 fLOG("[copy_file] latex exception for: ", file) 

277 replace_file(file, file, self.bloggiflatex, 

278 self.giflatex, self.LOG, self.giflatextemp) 

279 

280 if not os.path.exists(to): 

281 self.LOG("[copy_file] creating directory ", to) 

282 os.mkdir(to) 

283 

284 self.LOG(f"[copy_file] copy of '{file}' to '{to}'.") 

285 reason = "new" if file not in self.copyFiles else \ 

286 ("new size %s != old size %s" % (str(os.stat(file).st_size), 

287 str(self.copyFiles[file].size))) 

288 

289 try: 

290 try: 

291 shutil.copy(file, to) 

292 except shutil.SameFileError: # pragma: no cover 

293 pass 

294 self.modifiedFile.append((file, reason)) 

295 return to 

296 

297 except Exception as e: # pragma: no cover 

298 self.LOG( 

299 f"[copy_file] issue with '{file}' copy to '{to}'.") 

300 self.LOG( 

301 f"[copy_file] message d'erreur {type(e)}: {e}") 

302 return to 

303 else: 

304 try: 

305 shutil.copy(file, to) 

306 if not os.path.isfile(to): 

307 to = os.path.join(to, os.path.split(file)[-1]) 

308 fLOG("copy ", file, " as ", to) 

309 except Exception as e: # pragma: no cover 

310 self.LOG("issue with ", file, " copy to ", to) 

311 self.LOG("message d'erreur ", e) 

312 

313 if doClean: 

314 f = open(to, "r") 

315 content = f.read() 

316 f.close() 

317 

318 newcontent = cleanFileFromtohtmlreplace(content) 

319 

320 if newcontent != content: 

321 fLOG(" cleaning python script ", to) 

322 f = open(to, "w") 

323 f.write(newcontent) 

324 f.close() 

325 

326 return to 

327 

328 def copy_file_ext(self, file, exte, to, doFTP=True, doClean=False): 

329 """ 

330 @see me copy_file, 

331 """ 

332 res = [] 

333 if not os.path.exists(file): 

334 raise FileNotFoundError(file) # pragma: no cover 

335 nb = 0 

336 fi = os.listdir(file) 

337 for f in fi: 

338 if not os.path.isfile(file + "/" + f): 

339 continue 

340 ext = os.path.splitext(f)[1] 

341 if exte is None or ext[1:] == exte: 

342 self.copy_file(file + "/" + f, to, doFTP, doClean) 

343 res.append(file + "/" + f) 

344 nb += 1 

345 if nb == 0: 

346 raise RuntimeError( # pragma: no cover 

347 f"No file found in '{file}'.") 

348 return res 

349 

350 def copy_file_contains(self, file, pattern, to, doFTP=True, doClean=False): 

351 """ 

352 @see me copy_file 

353 """ 

354 fi = os.listdir(file) 

355 nb = 0 

356 for f in fi: 

357 if not os.path.isfile(file + "/" + f): 

358 continue 

359 if pattern in f: 

360 self.copy_file(file + "/" + f, to, doFTP, doClean) 

361 nb += 1 

362 if nb == 0: 

363 raise RuntimeError( # pragma: no cover 

364 f"No file found in '{file}'.")