Coverage for pyquickhelper/filehelper/compression_helper.py: 84%

204 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief Functions about compressing files. 

4""" 

5 

6import os 

7import zipfile 

8import datetime 

9import gzip 

10import sys 

11import warnings 

12import tarfile 

13from io import BytesIO 

14 

15from ..loghelper.flog import noLOG, run_cmd 

16from .fexceptions import FileException 

17from ..texthelper.diacritic_helper import remove_diacritics 

18from .synchelper import explore_folder 

19 

20 

21def zip_files(filename, file_set, root=None, fLOG=noLOG): 

22 """ 

23 Zips all files from an iterator. 

24 

25 @param filename final zip file (can be None) 

26 @param file_set iterator on file to add 

27 @param root if not None, all path are relative to this path 

28 @param fLOG logging function 

29 @return number of added files (or content if filename is None) 

30 

31 *filename* can be None, the function compresses 

32 into bytes without saving the results. 

33 """ 

34 nb = 0 

35 a1980 = datetime.datetime(1980, 1, 1) 

36 if filename is None: 

37 filename = BytesIO() 

38 with zipfile.ZipFile(filename, 'w') as myzip: 

39 for file in file_set: 

40 if not os.path.exists(file): 

41 continue 

42 if fLOG: 

43 fLOG(f"[zip_files] '{file}'") 

44 st = os.stat(file) 

45 atime = datetime.datetime.fromtimestamp(st.st_atime) 

46 mtime = datetime.datetime.fromtimestamp(st.st_mtime) 

47 if atime < a1980 or mtime < a1980: # pragma: no cover 

48 new_mtime = st.st_mtime + (4 * 3600) # new modification time 

49 while datetime.datetime.fromtimestamp(new_mtime) < a1980: 

50 new_mtime += (4 * 3600) # new modification time 

51 

52 fLOG( 

53 f"[zip_files] changing time timestamp for file '{file}'") 

54 os.utime(file, (st.st_atime, new_mtime)) 

55 

56 arcname = os.path.relpath(file, root) if root else None 

57 myzip.write(file, arcname=arcname) 

58 nb += 1 

59 return filename.getvalue() if isinstance(filename, BytesIO) else nb 

60 

61 

62def unzip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True, 

63 fail_if_error=True): 

64 """ 

65 Unzips files from a zip archive. 

66 

67 @param zipf archive (or bytes or BytesIO) 

68 @param where_to destination folder (can be None, the result is a list of tuple) 

69 @param fLOG logging function 

70 @param fvalid function which takes two paths (zip name, local name) and return True if the file 

71 must be unzipped, False otherwise, if None, the default answer is True 

72 @param remove_space remove spaces in created local path (+ ``',()``) 

73 @param fail_if_error fails if an error is encountered 

74 (typically a weird character in a filename), 

75 otherwise a warning is thrown. 

76 @return list of unzipped files 

77 """ 

78 if isinstance(zipf, bytes): 

79 zipf = BytesIO(zipf) 

80 

81 try: 

82 with zipfile.ZipFile(zipf, "r"): 

83 pass 

84 except zipfile.BadZipFile as e: # pragma: no cover 

85 if isinstance(zipf, BytesIO): 

86 raise e 

87 raise IOError(f"Unable to read file '{zipf}'") from e 

88 

89 files = [] 

90 with zipfile.ZipFile(zipf, "r") as file: 

91 for info in file.infolist(): 

92 if fLOG: 

93 fLOG(f"[unzip_files] unzip '{info.filename}'") 

94 if where_to is None: 

95 try: 

96 content = file.read(info.filename) 

97 except zipfile.BadZipFile as e: # pragma: no cover 

98 if fail_if_error: 

99 raise zipfile.BadZipFile( 

100 f"Unable to extract '{info.filename}' due to {e}") from e 

101 warnings.warn( 

102 f"Unable to extract '{info.filename}' due to {e}", UserWarning) 

103 continue 

104 files.append((info.filename, content)) 

105 else: 

106 clean = remove_diacritics(info.filename) 

107 if remove_space: 

108 clean = clean.replace(" ", "").replace("'", "").replace(",", "_") \ 

109 .replace("(", "_").replace(")", "_") 

110 tos = os.path.join(where_to, clean) 

111 if not os.path.exists(tos): 

112 if fvalid and not fvalid(info.filename, tos): 

113 fLOG("[unzip_files] skipping", info.filename) 

114 continue 

115 try: 

116 data = file.read(info.filename) 

117 except zipfile.BadZipFile as e: # pragma: no cover 

118 if fail_if_error: 

119 raise zipfile.BadZipFile( 

120 f"Unable to extract '{info.filename}' due to {e}") from e 

121 warnings.warn( 

122 f"Unable to extract '{info.filename}' due to {e}", UserWarning) 

123 continue 

124 # check encoding to avoid characters not allowed in paths 

125 if not os.path.exists(tos): 

126 if sys.platform.startswith("win"): 

127 tos = tos.replace("/", "\\") 

128 finalfolder = os.path.split(tos)[0] 

129 if not os.path.exists(finalfolder): 

130 fLOG("[unzip_files] creating folder (zip)", 

131 os.path.abspath(finalfolder)) 

132 try: 

133 os.makedirs(finalfolder) 

134 except FileNotFoundError as e: # pragma: no cover 

135 mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format( 

136 info.filename, tos, finalfolder, len(finalfolder)) 

137 raise FileNotFoundError(mes) from e 

138 if not info.filename.endswith("/"): 

139 try: 

140 with open(tos, "wb") as u: 

141 u.write(data) 

142 except FileNotFoundError as e: # pragma: no cover 

143 # probably an issue in the path name 

144 # the next lines are just here to distinguish 

145 # between the two cases 

146 if not os.path.exists(finalfolder): 

147 raise e 

148 newname = info.filename.replace( 

149 " ", "_").replace(",", "_") 

150 if sys.platform.startswith("win"): 

151 newname = newname.replace("/", "\\") 

152 tos = os.path.join(where_to, newname) 

153 finalfolder = os.path.split(tos)[0] 

154 if not os.path.exists(finalfolder): 

155 fLOG("[unzip_files] creating folder (zip)", 

156 os.path.abspath(finalfolder)) 

157 os.makedirs(finalfolder) 

158 with open(tos, "wb") as u: 

159 u.write(data) 

160 files.append(tos) 

161 fLOG("[unzip_files] unzipped ", 

162 info.filename, " to ", tos) 

163 elif not tos.endswith("/"): # pragma: no cover 

164 files.append(tos) 

165 elif not info.filename.endswith("/"): # pragma: no cover 

166 files.append(tos) 

167 return files 

168 

169 

170def gzip_files(filename, file_set, encoding=None, fLOG=noLOG): 

171 """ 

172 Compresses all files from an iterator in a zip file 

173 and then in a gzip file. 

174 

175 @param filename final gzip file (double compression, extension should something like .zip.gz) 

176 @param file_set iterator on file to add 

177 @param encoding encoding of input files (no double compression then) 

178 @param fLOG logging function 

179 @return bytes (if filename is None) or None 

180 """ 

181 if filename is None: 

182 filename = BytesIO() 

183 if encoding is None: 

184 content = zip_files(None, file_set, fLOG=fLOG) 

185 f = gzip.open(filename, 'wb') 

186 f.write(content) 

187 f.close() 

188 return filename.getvalue() if isinstance(filename, BytesIO) else None 

189 f = gzip.open(filename, 'wt', encoding="utf-8") 

190 for name in file_set: 

191 with open(name, "r", encoding="utf-8") as ft: 

192 content = ft.read() 

193 f.write(content) 

194 f.close() 

195 return filename.getvalue() if isinstance(filename, BytesIO) else None 

196 

197 

198def ungzip_files(filename, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True, 

199 unzip=True, encoding=None): 

200 """ 

201 Uncompresses files from a gzip file. 

202 

203 @param filename final gzip file (double compression, extension should something like .zip.gz) 

204 @param where_to destination folder (can be None, the result is a list of tuple) 

205 @param fLOG logging function 

206 @param fvalid function which takes two paths (zip name, local name) and return True if the file 

207 must be unzipped, False otherwise, if None, the default answer is True 

208 @param remove_space remove spaces in created local path (+ ``',()``) 

209 @param unzip unzip file after gzip 

210 @param encoding encoding 

211 @return list of unzipped files 

212 """ 

213 if isinstance(filename, bytes): 

214 is_file = False 

215 filename = BytesIO(filename) 

216 else: 

217 is_file = True 

218 

219 if encoding is None: 

220 f = gzip.open(filename, 'rb') 

221 content = f.read() 

222 f.close() 

223 if unzip: 

224 try: 

225 return unzip_files(content, where_to=where_to, fLOG=fLOG) 

226 except Exception as e: # pragma: no cover 

227 raise IOError( 

228 f"Unable to unzip file '{filename}'") from e 

229 elif where_to is not None: 

230 filename = os.path.split(filename)[-1].replace(".gz", "") 

231 filename = os.path.join(where_to, filename) 

232 with open(filename, "wb") as f: 

233 f.write(content) 

234 return filename 

235 return content 

236 else: 

237 f = gzip.open(filename, 'rt', encoding="utf-8") 

238 content = f.read() 

239 f.close() 

240 if is_file: 

241 filename = filename.replace(".gz", "") 

242 with open(filename, "wb") as f: 

243 f.write(content) 

244 return filename 

245 return content 

246 

247 

248def zip7_files(filename_7z, file_set, fLOG=noLOG, temp_folder="."): 

249 """ 

250 If :epkg:`7z` is installed, the function uses it 

251 to compress file into 7z format. The file *filename_7z* must not exist. 

252 

253 @param filename_7z final destination 

254 @param file_set list of files to compress 

255 @param fLOG logging function 

256 @param temp_folder the function stores the list of files in a file in the 

257 folder *temp_folder*, it will be removed afterwords 

258 @return number of added files 

259 

260 .. faqref:: 

261 :title: Why module pylzma does not work? 

262 :lid: faq-pylzma-ref 

263 

264 The module :epkg:`pylzma` 

265 failed to decompress the file produced by the latest version 

266 of :epkg:`7z` (2016-09-23). The compression 

267 was changed by tweaking the command line. LZMA is used instead LZMA2. 

268 The current version does not include this 

269 `commit <https://github.com/fancycode/pylzma/commit/b5c3c2bd4ab7abfb65de772861ecc600fe37394b>`_. 

270 Or you can clone the package 

271 `sdpython.pylzma <https://github.com/sdpython/pylzma>`_ 

272 and build it yourself with ``python setup.py bdist_wheel``. 

273 """ 

274 if sys.platform.startswith("win"): # pragma: no cover 

275 exe = r"C:\Program Files\7-Zip\7z.exe" 

276 if not os.path.exists(exe): 

277 raise FileNotFoundError(f"unable to find: {exe}") 

278 elif sys.platform.startswith("darwin"): 

279 exe = "7za" # pragma: no cover 

280 else: 

281 exe = "7z" 

282 

283 if os.path.exists(filename_7z): 

284 raise FileException( # pragma: no cover 

285 f"'{filename_7z}' already exists") 

286 

287 notxist = [fn for fn in file_set if not os.path.exists(fn)] 

288 if len(notxist) > 0: 

289 raise FileNotFoundError( # pragma: no cover 

290 "unable to compress unexisting files:\n{0}".format("\n".join(notxist))) 

291 

292 flist = os.path.join(temp_folder, "listfiles7z.txt") 

293 with open(flist, "w", encoding="utf8") as f: 

294 f.write("\n".join(file_set)) 

295 

296 cmd = f'"{exe}" -m0=lzma -mfb=64 a "{filename_7z}" "@{flist}"' 

297 out, err = run_cmd(cmd, wait=True) 

298 if "Error:" in out or not os.path.exists(filename_7z): 

299 raise FileException( # pragma: no cover 

300 "An error occurred with cmd: '{0}'\n" 

301 "--OUT--\n{1}\n--ERR--\n{2}\n----".format( 

302 cmd, out, err)) 

303 return len(file_set) 

304 

305 

306def un7zip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, 

307 remove_space=True, cmd_line=False): 

308 """ 

309 Unzips files from a zip archive compress with :epkg:`7z`. 

310 

311 @param zipf archive (or bytes or BytesIO) 

312 @param where_to destination folder (can be None, the result is a list of tuple) 

313 @param fLOG logging function 

314 @param fvalid function which takes two paths (zip name, local name) and return True if the file 

315 must be unzipped, False otherwise, if None, the default answer is True 

316 @param remove_space remove spaces in created local path (+ ``',()``) 

317 @param cmd_line use command line instead of module :epkg:`pylzma` 

318 @return list of unzipped files 

319 

320 The function requires module :epkg:`pylzma`. 

321 See :ref:`Why module pylzma does not work? <faq-pylzma-ref>`. 

322 """ 

323 if cmd_line: 

324 if not isinstance(zipf, str): 

325 raise TypeError( # pragma: no cover 

326 "Cannot use command line unless zipf is a file.") 

327 if remove_space: 

328 warnings.warn( # pragma: no cover 

329 '[un7zip_files] remove_space and cmd_line are incompatible options.', 

330 UserWarning) 

331 if fvalid: 

332 warnings.warn( 

333 'fvalid and cmd_line are incompatible options.', UserWarning) 

334 if sys.platform.startswith("win"): # pragma: no cover 

335 exe = r"C:\Program Files\7-Zip\7z.exe" 

336 if not os.path.exists(exe): 

337 raise FileNotFoundError(f"unable to find: {exe}") 

338 

339 if where_to is None: 

340 where_to = os.path.abspath(".") 

341 elif sys.platform.startswith("darwin"): 

342 exe = "7za" # pragma: no cover 

343 else: 

344 exe = "7z" 

345 

346 cmd = f'"{exe}" x "{zipf}" -o{where_to}' 

347 out, err = run_cmd(cmd, wait=True, fLOG=fLOG) 

348 if len(err) > 0 or "Error:" in out: 

349 raise FileException( # pragma: no cover 

350 f"Unable to un-7zip file '{zipf}'\n--CMD--\n{cmd}\n--OUT--\n{out}\n--ERR--\n{err}") 

351 

352 return explore_folder(where_to)[1] 

353 else: 

354 from py7zlib import Archive7z, FormatError 

355 file_zipf = None 

356 if not isinstance(zipf, BytesIO): 

357 file_zipf = zipf 

358 if isinstance(zipf, bytes): 

359 zipf = BytesIO(zipf) 

360 else: 

361 zipf = open(zipf, "rb") 

362 

363 files = [] 

364 try: 

365 file = Archive7z(zipf) 

366 except FormatError as e: 

367 raise FileException( # pragma: no cover 

368 "You should use a modified version available at https://github.com/sdpython/pylzma") from e 

369 for info in file.files: 

370 if where_to is None: 

371 files.append((info.filename, info.read())) 

372 else: 

373 clean = remove_diacritics(info.filename) 

374 if remove_space: 

375 clean = clean.replace(" ", "").replace("'", "") \ 

376 .replace(",", "_").replace("(", "_") \ 

377 .replace(")", "_") 

378 tos = os.path.join(where_to, clean) 

379 if not os.path.exists(tos): 

380 if fvalid and not fvalid(info.filename, tos): 

381 fLOG("[un7zip_files] skipping", # pragma: no cover 

382 info.filename) 

383 continue # pragma: no cover 

384 try: 

385 data = info.read() 

386 except NotImplementedError as e: # pragma: no cover 

387 # You should use command line. 

388 if file_zipf is None: 

389 raise TypeError( 

390 "Cannot switch to command line unless zipf is a file.") from e 

391 warnings.warn( 

392 f"[un7zip_files] '{zipf}' --> Unavailable format. Use command line.", UserWarning) 

393 return un7zip_files(file_zipf, where_to=where_to, fLOG=fLOG, fvalid=fvalid, 

394 remove_space=remove_space, cmd_line=True) 

395 except Exception as e: # pragma: no cover 

396 raise FileException("Unable to unzip file '{0}' from '{1}'".format( 

397 info.filename, zipf)) from e 

398 # check encoding to avoid characters not allowed in paths 

399 if not os.path.exists(tos): 

400 if sys.platform.startswith("win"): 

401 tos = tos.replace("/", "\\") 

402 finalfolder = os.path.split(tos)[0] 

403 if not os.path.exists(finalfolder): 

404 fLOG("[un7zip_files] creating folder (7z)", 

405 os.path.abspath(finalfolder)) 

406 try: 

407 os.makedirs(finalfolder) 

408 except FileNotFoundError as e: # pragma: no cover 

409 mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format( 

410 info.filename, tos, finalfolder, len(finalfolder)) 

411 raise FileNotFoundError(mes) from e 

412 if not info.filename.endswith("/"): 

413 try: 

414 with open(tos, "wb") as u: 

415 u.write(data) 

416 except FileNotFoundError as e: # pragma: no cover 

417 # probably an issue in the path name 

418 # the next lines are just here to distinguish 

419 # between the two cases 

420 if not os.path.exists(finalfolder): 

421 raise e 

422 newname = info.filename.replace( 

423 " ", "_").replace(",", "_") 

424 if sys.platform.startswith("win"): 

425 newname = newname.replace("/", "\\") 

426 tos = os.path.join(where_to, newname) 

427 finalfolder = os.path.split(tos)[0] 

428 if not os.path.exists(finalfolder): 

429 fLOG("[un7zip_files] creating folder (7z)", 

430 os.path.abspath(finalfolder)) 

431 os.makedirs(finalfolder) 

432 with open(tos, "wb") as u: 

433 u.write(data) 

434 files.append(tos) 

435 fLOG("[un7zip_files] unzipped ", 

436 info.filename, " to ", tos) 

437 elif not tos.endswith("/"): # pragma: no cover 

438 files.append(tos) 

439 elif not info.filename.endswith("/"): # pragma: no cover 

440 files.append(tos) 

441 return files 

442 

443 

444def unrar_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True): 

445 """ 

446 Uncompresses files from a rar archive compress with :epkg:`7z` 

447 on Window or *unrar* on linux. 

448 

449 @param zipf archive (or bytes or BytesIO) 

450 @param where_to destination folder (can be None, the result is a list of tuple) 

451 @param fLOG logging function 

452 @param fvalid function which takes two paths (zip name, local name) and return True if the file 

453 must be unzipped, False otherwise, if None, the default answer is True 

454 @param remove_space remove spaces in created local path (+ ``',()``) 

455 @return list of unzipped files 

456 """ 

457 if sys.platform.startswith("win"): # pragma: no cover 

458 exe = r"C:\Program Files\7-Zip\7z.exe" 

459 if not os.path.exists(exe): 

460 raise FileNotFoundError(f"unable to find: {exe}") 

461 

462 if where_to is None: 

463 where_to = os.path.abspath(".") 

464 cmd = f'"{exe}" x "{zipf}" "-o{where_to}"' 

465 out, err = run_cmd(cmd, wait=True, fLOG=fLOG) 

466 if len(err) > 0 or "Error:" in out: 

467 raise FileException( 

468 f"Unable to unrar file '{zipf}'\n--OUT--\n{out}\n--ERR--\n{err}") 

469 

470 return explore_folder(where_to)[1] 

471 else: 

472 exe = "unrar" 

473 

474 if where_to is None: 

475 where_to = os.path.abspath(".") 

476 cmd = f'"{exe}" x "{zipf}"' 

477 out, err = run_cmd(cmd, wait=True, fLOG=fLOG, change_path=where_to) 

478 if len(err) > 0: 

479 raise FileException( # pragma: no cover 

480 f"Unable to unrar file '{zipf}'\n--CMD--\n{cmd}\n--OUT--\n{out}\n--ERR--\n{err}") 

481 

482 return explore_folder(where_to)[1] 

483 

484 

485def untar_files(filename, where_to=None, fLOG=noLOG, encoding=None): 

486 """ 

487 Uncompresses files from a tar file. 

488 

489 @param filename final tar file (double compression, extension should something like .zip.gz) 

490 @param where_to destination folder (can be None, the result is a list of tuple) 

491 @param fLOG logging function 

492 @param encoding encoding 

493 @return list of unzipped files 

494 """ 

495 if isinstance(filename, bytes): 

496 fileobj = filename 

497 name = None 

498 targz = True 

499 else: 

500 name = filename 

501 fileobj = None 

502 targz = name.endswith(".tar.gz") 

503 

504 if targz: 

505 tar = tarfile.open(name=name, fileobj=fileobj, 

506 mode="r:gz", encoding=encoding) 

507 names = tar.getnames() 

508 tar.extractall(where_to) 

509 tar.close() 

510 else: 

511 tar = tarfile.open(name=name, fileobj=fileobj, 

512 mode="r:", encoding=encoding) 

513 names = tar.getnames() 

514 tar.extractall(where_to) 

515 tar.close() 

516 if where_to is not None: 

517 return [os.path.join(where_to, name) for name in names] 

518 return names