Coverage for pyquickhelper/filehelper/compression_helper.py: 84%
204 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1"""
2@file
3@brief Functions about compressing files.
4"""
6import os
7import zipfile
8import datetime
9import gzip
10import sys
11import warnings
12import tarfile
13from io import BytesIO
15from ..loghelper.flog import noLOG, run_cmd
16from .fexceptions import FileException
17from ..texthelper.diacritic_helper import remove_diacritics
18from .synchelper import explore_folder
21def zip_files(filename, file_set, root=None, fLOG=noLOG):
22 """
23 Zips all files from an iterator.
25 @param filename final zip file (can be None)
26 @param file_set iterator on file to add
27 @param root if not None, all path are relative to this path
28 @param fLOG logging function
29 @return number of added files (or content if filename is None)
31 *filename* can be None, the function compresses
32 into bytes without saving the results.
33 """
34 nb = 0
35 a1980 = datetime.datetime(1980, 1, 1)
36 if filename is None:
37 filename = BytesIO()
38 with zipfile.ZipFile(filename, 'w') as myzip:
39 for file in file_set:
40 if not os.path.exists(file):
41 continue
42 if fLOG:
43 fLOG(f"[zip_files] '{file}'")
44 st = os.stat(file)
45 atime = datetime.datetime.fromtimestamp(st.st_atime)
46 mtime = datetime.datetime.fromtimestamp(st.st_mtime)
47 if atime < a1980 or mtime < a1980: # pragma: no cover
48 new_mtime = st.st_mtime + (4 * 3600) # new modification time
49 while datetime.datetime.fromtimestamp(new_mtime) < a1980:
50 new_mtime += (4 * 3600) # new modification time
52 fLOG(
53 f"[zip_files] changing time timestamp for file '{file}'")
54 os.utime(file, (st.st_atime, new_mtime))
56 arcname = os.path.relpath(file, root) if root else None
57 myzip.write(file, arcname=arcname)
58 nb += 1
59 return filename.getvalue() if isinstance(filename, BytesIO) else nb
62def unzip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True,
63 fail_if_error=True):
64 """
65 Unzips files from a zip archive.
67 @param zipf archive (or bytes or BytesIO)
68 @param where_to destination folder (can be None, the result is a list of tuple)
69 @param fLOG logging function
70 @param fvalid function which takes two paths (zip name, local name) and return True if the file
71 must be unzipped, False otherwise, if None, the default answer is True
72 @param remove_space remove spaces in created local path (+ ``',()``)
73 @param fail_if_error fails if an error is encountered
74 (typically a weird character in a filename),
75 otherwise a warning is thrown.
76 @return list of unzipped files
77 """
78 if isinstance(zipf, bytes):
79 zipf = BytesIO(zipf)
81 try:
82 with zipfile.ZipFile(zipf, "r"):
83 pass
84 except zipfile.BadZipFile as e: # pragma: no cover
85 if isinstance(zipf, BytesIO):
86 raise e
87 raise IOError(f"Unable to read file '{zipf}'") from e
89 files = []
90 with zipfile.ZipFile(zipf, "r") as file:
91 for info in file.infolist():
92 if fLOG:
93 fLOG(f"[unzip_files] unzip '{info.filename}'")
94 if where_to is None:
95 try:
96 content = file.read(info.filename)
97 except zipfile.BadZipFile as e: # pragma: no cover
98 if fail_if_error:
99 raise zipfile.BadZipFile(
100 f"Unable to extract '{info.filename}' due to {e}") from e
101 warnings.warn(
102 f"Unable to extract '{info.filename}' due to {e}", UserWarning)
103 continue
104 files.append((info.filename, content))
105 else:
106 clean = remove_diacritics(info.filename)
107 if remove_space:
108 clean = clean.replace(" ", "").replace("'", "").replace(",", "_") \
109 .replace("(", "_").replace(")", "_")
110 tos = os.path.join(where_to, clean)
111 if not os.path.exists(tos):
112 if fvalid and not fvalid(info.filename, tos):
113 fLOG("[unzip_files] skipping", info.filename)
114 continue
115 try:
116 data = file.read(info.filename)
117 except zipfile.BadZipFile as e: # pragma: no cover
118 if fail_if_error:
119 raise zipfile.BadZipFile(
120 f"Unable to extract '{info.filename}' due to {e}") from e
121 warnings.warn(
122 f"Unable to extract '{info.filename}' due to {e}", UserWarning)
123 continue
124 # check encoding to avoid characters not allowed in paths
125 if not os.path.exists(tos):
126 if sys.platform.startswith("win"):
127 tos = tos.replace("/", "\\")
128 finalfolder = os.path.split(tos)[0]
129 if not os.path.exists(finalfolder):
130 fLOG("[unzip_files] creating folder (zip)",
131 os.path.abspath(finalfolder))
132 try:
133 os.makedirs(finalfolder)
134 except FileNotFoundError as e: # pragma: no cover
135 mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format(
136 info.filename, tos, finalfolder, len(finalfolder))
137 raise FileNotFoundError(mes) from e
138 if not info.filename.endswith("/"):
139 try:
140 with open(tos, "wb") as u:
141 u.write(data)
142 except FileNotFoundError as e: # pragma: no cover
143 # probably an issue in the path name
144 # the next lines are just here to distinguish
145 # between the two cases
146 if not os.path.exists(finalfolder):
147 raise e
148 newname = info.filename.replace(
149 " ", "_").replace(",", "_")
150 if sys.platform.startswith("win"):
151 newname = newname.replace("/", "\\")
152 tos = os.path.join(where_to, newname)
153 finalfolder = os.path.split(tos)[0]
154 if not os.path.exists(finalfolder):
155 fLOG("[unzip_files] creating folder (zip)",
156 os.path.abspath(finalfolder))
157 os.makedirs(finalfolder)
158 with open(tos, "wb") as u:
159 u.write(data)
160 files.append(tos)
161 fLOG("[unzip_files] unzipped ",
162 info.filename, " to ", tos)
163 elif not tos.endswith("/"): # pragma: no cover
164 files.append(tos)
165 elif not info.filename.endswith("/"): # pragma: no cover
166 files.append(tos)
167 return files
170def gzip_files(filename, file_set, encoding=None, fLOG=noLOG):
171 """
172 Compresses all files from an iterator in a zip file
173 and then in a gzip file.
175 @param filename final gzip file (double compression, extension should something like .zip.gz)
176 @param file_set iterator on file to add
177 @param encoding encoding of input files (no double compression then)
178 @param fLOG logging function
179 @return bytes (if filename is None) or None
180 """
181 if filename is None:
182 filename = BytesIO()
183 if encoding is None:
184 content = zip_files(None, file_set, fLOG=fLOG)
185 f = gzip.open(filename, 'wb')
186 f.write(content)
187 f.close()
188 return filename.getvalue() if isinstance(filename, BytesIO) else None
189 f = gzip.open(filename, 'wt', encoding="utf-8")
190 for name in file_set:
191 with open(name, "r", encoding="utf-8") as ft:
192 content = ft.read()
193 f.write(content)
194 f.close()
195 return filename.getvalue() if isinstance(filename, BytesIO) else None
198def ungzip_files(filename, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True,
199 unzip=True, encoding=None):
200 """
201 Uncompresses files from a gzip file.
203 @param filename final gzip file (double compression, extension should something like .zip.gz)
204 @param where_to destination folder (can be None, the result is a list of tuple)
205 @param fLOG logging function
206 @param fvalid function which takes two paths (zip name, local name) and return True if the file
207 must be unzipped, False otherwise, if None, the default answer is True
208 @param remove_space remove spaces in created local path (+ ``',()``)
209 @param unzip unzip file after gzip
210 @param encoding encoding
211 @return list of unzipped files
212 """
213 if isinstance(filename, bytes):
214 is_file = False
215 filename = BytesIO(filename)
216 else:
217 is_file = True
219 if encoding is None:
220 f = gzip.open(filename, 'rb')
221 content = f.read()
222 f.close()
223 if unzip:
224 try:
225 return unzip_files(content, where_to=where_to, fLOG=fLOG)
226 except Exception as e: # pragma: no cover
227 raise IOError(
228 f"Unable to unzip file '{filename}'") from e
229 elif where_to is not None:
230 filename = os.path.split(filename)[-1].replace(".gz", "")
231 filename = os.path.join(where_to, filename)
232 with open(filename, "wb") as f:
233 f.write(content)
234 return filename
235 return content
236 else:
237 f = gzip.open(filename, 'rt', encoding="utf-8")
238 content = f.read()
239 f.close()
240 if is_file:
241 filename = filename.replace(".gz", "")
242 with open(filename, "wb") as f:
243 f.write(content)
244 return filename
245 return content
248def zip7_files(filename_7z, file_set, fLOG=noLOG, temp_folder="."):
249 """
250 If :epkg:`7z` is installed, the function uses it
251 to compress file into 7z format. The file *filename_7z* must not exist.
253 @param filename_7z final destination
254 @param file_set list of files to compress
255 @param fLOG logging function
256 @param temp_folder the function stores the list of files in a file in the
257 folder *temp_folder*, it will be removed afterwords
258 @return number of added files
260 .. faqref::
261 :title: Why module pylzma does not work?
262 :lid: faq-pylzma-ref
264 The module :epkg:`pylzma`
265 failed to decompress the file produced by the latest version
266 of :epkg:`7z` (2016-09-23). The compression
267 was changed by tweaking the command line. LZMA is used instead LZMA2.
268 The current version does not include this
269 `commit <https://github.com/fancycode/pylzma/commit/b5c3c2bd4ab7abfb65de772861ecc600fe37394b>`_.
270 Or you can clone the package
271 `sdpython.pylzma <https://github.com/sdpython/pylzma>`_
272 and build it yourself with ``python setup.py bdist_wheel``.
273 """
274 if sys.platform.startswith("win"): # pragma: no cover
275 exe = r"C:\Program Files\7-Zip\7z.exe"
276 if not os.path.exists(exe):
277 raise FileNotFoundError(f"unable to find: {exe}")
278 elif sys.platform.startswith("darwin"):
279 exe = "7za" # pragma: no cover
280 else:
281 exe = "7z"
283 if os.path.exists(filename_7z):
284 raise FileException( # pragma: no cover
285 f"'{filename_7z}' already exists")
287 notxist = [fn for fn in file_set if not os.path.exists(fn)]
288 if len(notxist) > 0:
289 raise FileNotFoundError( # pragma: no cover
290 "unable to compress unexisting files:\n{0}".format("\n".join(notxist)))
292 flist = os.path.join(temp_folder, "listfiles7z.txt")
293 with open(flist, "w", encoding="utf8") as f:
294 f.write("\n".join(file_set))
296 cmd = f'"{exe}" -m0=lzma -mfb=64 a "{filename_7z}" "@{flist}"'
297 out, err = run_cmd(cmd, wait=True)
298 if "Error:" in out or not os.path.exists(filename_7z):
299 raise FileException( # pragma: no cover
300 "An error occurred with cmd: '{0}'\n"
301 "--OUT--\n{1}\n--ERR--\n{2}\n----".format(
302 cmd, out, err))
303 return len(file_set)
306def un7zip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None,
307 remove_space=True, cmd_line=False):
308 """
309 Unzips files from a zip archive compress with :epkg:`7z`.
311 @param zipf archive (or bytes or BytesIO)
312 @param where_to destination folder (can be None, the result is a list of tuple)
313 @param fLOG logging function
314 @param fvalid function which takes two paths (zip name, local name) and return True if the file
315 must be unzipped, False otherwise, if None, the default answer is True
316 @param remove_space remove spaces in created local path (+ ``',()``)
317 @param cmd_line use command line instead of module :epkg:`pylzma`
318 @return list of unzipped files
320 The function requires module :epkg:`pylzma`.
321 See :ref:`Why module pylzma does not work? <faq-pylzma-ref>`.
322 """
323 if cmd_line:
324 if not isinstance(zipf, str):
325 raise TypeError( # pragma: no cover
326 "Cannot use command line unless zipf is a file.")
327 if remove_space:
328 warnings.warn( # pragma: no cover
329 '[un7zip_files] remove_space and cmd_line are incompatible options.',
330 UserWarning)
331 if fvalid:
332 warnings.warn(
333 'fvalid and cmd_line are incompatible options.', UserWarning)
334 if sys.platform.startswith("win"): # pragma: no cover
335 exe = r"C:\Program Files\7-Zip\7z.exe"
336 if not os.path.exists(exe):
337 raise FileNotFoundError(f"unable to find: {exe}")
339 if where_to is None:
340 where_to = os.path.abspath(".")
341 elif sys.platform.startswith("darwin"):
342 exe = "7za" # pragma: no cover
343 else:
344 exe = "7z"
346 cmd = f'"{exe}" x "{zipf}" -o{where_to}'
347 out, err = run_cmd(cmd, wait=True, fLOG=fLOG)
348 if len(err) > 0 or "Error:" in out:
349 raise FileException( # pragma: no cover
350 f"Unable to un-7zip file '{zipf}'\n--CMD--\n{cmd}\n--OUT--\n{out}\n--ERR--\n{err}")
352 return explore_folder(where_to)[1]
353 else:
354 from py7zlib import Archive7z, FormatError
355 file_zipf = None
356 if not isinstance(zipf, BytesIO):
357 file_zipf = zipf
358 if isinstance(zipf, bytes):
359 zipf = BytesIO(zipf)
360 else:
361 zipf = open(zipf, "rb")
363 files = []
364 try:
365 file = Archive7z(zipf)
366 except FormatError as e:
367 raise FileException( # pragma: no cover
368 "You should use a modified version available at https://github.com/sdpython/pylzma") from e
369 for info in file.files:
370 if where_to is None:
371 files.append((info.filename, info.read()))
372 else:
373 clean = remove_diacritics(info.filename)
374 if remove_space:
375 clean = clean.replace(" ", "").replace("'", "") \
376 .replace(",", "_").replace("(", "_") \
377 .replace(")", "_")
378 tos = os.path.join(where_to, clean)
379 if not os.path.exists(tos):
380 if fvalid and not fvalid(info.filename, tos):
381 fLOG("[un7zip_files] skipping", # pragma: no cover
382 info.filename)
383 continue # pragma: no cover
384 try:
385 data = info.read()
386 except NotImplementedError as e: # pragma: no cover
387 # You should use command line.
388 if file_zipf is None:
389 raise TypeError(
390 "Cannot switch to command line unless zipf is a file.") from e
391 warnings.warn(
392 f"[un7zip_files] '{zipf}' --> Unavailable format. Use command line.", UserWarning)
393 return un7zip_files(file_zipf, where_to=where_to, fLOG=fLOG, fvalid=fvalid,
394 remove_space=remove_space, cmd_line=True)
395 except Exception as e: # pragma: no cover
396 raise FileException("Unable to unzip file '{0}' from '{1}'".format(
397 info.filename, zipf)) from e
398 # check encoding to avoid characters not allowed in paths
399 if not os.path.exists(tos):
400 if sys.platform.startswith("win"):
401 tos = tos.replace("/", "\\")
402 finalfolder = os.path.split(tos)[0]
403 if not os.path.exists(finalfolder):
404 fLOG("[un7zip_files] creating folder (7z)",
405 os.path.abspath(finalfolder))
406 try:
407 os.makedirs(finalfolder)
408 except FileNotFoundError as e: # pragma: no cover
409 mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format(
410 info.filename, tos, finalfolder, len(finalfolder))
411 raise FileNotFoundError(mes) from e
412 if not info.filename.endswith("/"):
413 try:
414 with open(tos, "wb") as u:
415 u.write(data)
416 except FileNotFoundError as e: # pragma: no cover
417 # probably an issue in the path name
418 # the next lines are just here to distinguish
419 # between the two cases
420 if not os.path.exists(finalfolder):
421 raise e
422 newname = info.filename.replace(
423 " ", "_").replace(",", "_")
424 if sys.platform.startswith("win"):
425 newname = newname.replace("/", "\\")
426 tos = os.path.join(where_to, newname)
427 finalfolder = os.path.split(tos)[0]
428 if not os.path.exists(finalfolder):
429 fLOG("[un7zip_files] creating folder (7z)",
430 os.path.abspath(finalfolder))
431 os.makedirs(finalfolder)
432 with open(tos, "wb") as u:
433 u.write(data)
434 files.append(tos)
435 fLOG("[un7zip_files] unzipped ",
436 info.filename, " to ", tos)
437 elif not tos.endswith("/"): # pragma: no cover
438 files.append(tos)
439 elif not info.filename.endswith("/"): # pragma: no cover
440 files.append(tos)
441 return files
444def unrar_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True):
445 """
446 Uncompresses files from a rar archive compress with :epkg:`7z`
447 on Window or *unrar* on linux.
449 @param zipf archive (or bytes or BytesIO)
450 @param where_to destination folder (can be None, the result is a list of tuple)
451 @param fLOG logging function
452 @param fvalid function which takes two paths (zip name, local name) and return True if the file
453 must be unzipped, False otherwise, if None, the default answer is True
454 @param remove_space remove spaces in created local path (+ ``',()``)
455 @return list of unzipped files
456 """
457 if sys.platform.startswith("win"): # pragma: no cover
458 exe = r"C:\Program Files\7-Zip\7z.exe"
459 if not os.path.exists(exe):
460 raise FileNotFoundError(f"unable to find: {exe}")
462 if where_to is None:
463 where_to = os.path.abspath(".")
464 cmd = f'"{exe}" x "{zipf}" "-o{where_to}"'
465 out, err = run_cmd(cmd, wait=True, fLOG=fLOG)
466 if len(err) > 0 or "Error:" in out:
467 raise FileException(
468 f"Unable to unrar file '{zipf}'\n--OUT--\n{out}\n--ERR--\n{err}")
470 return explore_folder(where_to)[1]
471 else:
472 exe = "unrar"
474 if where_to is None:
475 where_to = os.path.abspath(".")
476 cmd = f'"{exe}" x "{zipf}"'
477 out, err = run_cmd(cmd, wait=True, fLOG=fLOG, change_path=where_to)
478 if len(err) > 0:
479 raise FileException( # pragma: no cover
480 f"Unable to unrar file '{zipf}'\n--CMD--\n{cmd}\n--OUT--\n{out}\n--ERR--\n{err}")
482 return explore_folder(where_to)[1]
485def untar_files(filename, where_to=None, fLOG=noLOG, encoding=None):
486 """
487 Uncompresses files from a tar file.
489 @param filename final tar file (double compression, extension should something like .zip.gz)
490 @param where_to destination folder (can be None, the result is a list of tuple)
491 @param fLOG logging function
492 @param encoding encoding
493 @return list of unzipped files
494 """
495 if isinstance(filename, bytes):
496 fileobj = filename
497 name = None
498 targz = True
499 else:
500 name = filename
501 fileobj = None
502 targz = name.endswith(".tar.gz")
504 if targz:
505 tar = tarfile.open(name=name, fileobj=fileobj,
506 mode="r:gz", encoding=encoding)
507 names = tar.getnames()
508 tar.extractall(where_to)
509 tar.close()
510 else:
511 tar = tarfile.open(name=name, fileobj=fileobj,
512 mode="r:", encoding=encoding)
513 names = tar.getnames()
514 tar.extractall(where_to)
515 tar.close()
516 if where_to is not None:
517 return [os.path.join(where_to, name) for name in names]
518 return names