Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2@file
3@brief Functions about compressing files.
4"""
6import os
7import zipfile
8import datetime
9import gzip
10import sys
11import warnings
12import tarfile
13from io import BytesIO
15from ..loghelper.flog import noLOG, run_cmd
16from .fexceptions import FileException
17from ..texthelper.diacritic_helper import remove_diacritics
18from .synchelper import explore_folder
21def zip_files(filename, file_set, root=None, fLOG=noLOG):
22 """
23 Zips all files from an iterator.
25 @param filename final zip file (can be None)
26 @param file_set iterator on file to add
27 @param root if not None, all path are relative to this path
28 @param fLOG logging function
29 @return number of added files (or content if filename is None)
31 *filename* can be None, the function compresses
32 into bytes without saving the results.
33 """
34 nb = 0
35 a1980 = datetime.datetime(1980, 1, 1)
36 if filename is None:
37 filename = BytesIO()
38 with zipfile.ZipFile(filename, 'w') as myzip:
39 for file in file_set:
40 if not os.path.exists(file):
41 continue
42 if fLOG:
43 fLOG("[zip_files] '{0}'".format(file))
44 st = os.stat(file)
45 atime = datetime.datetime.fromtimestamp(st.st_atime)
46 mtime = datetime.datetime.fromtimestamp(st.st_mtime)
47 if atime < a1980 or mtime < a1980: # pragma: no cover
48 new_mtime = st.st_mtime + (4 * 3600) # new modification time
49 while datetime.datetime.fromtimestamp(new_mtime) < a1980:
50 new_mtime += (4 * 3600) # new modification time
52 fLOG(
53 "[zip_files] changing time timestamp for file '{0}'".format(file))
54 os.utime(file, (st.st_atime, new_mtime))
56 arcname = os.path.relpath(file, root) if root else None
57 myzip.write(file, arcname=arcname)
58 nb += 1
59 return filename.getvalue() if isinstance(filename, BytesIO) else nb
62def unzip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True,
63 fail_if_error=True):
64 """
65 Unzips files from a zip archive.
67 @param zipf archive (or bytes or BytesIO)
68 @param where_to destination folder (can be None, the result is a list of tuple)
69 @param fLOG logging function
70 @param fvalid function which takes two paths (zip name, local name) and return True if the file
71 must be unzipped, False otherwise, if None, the default answer is True
72 @param remove_space remove spaces in created local path (+ ``',()``)
73 @param fail_if_error fails if an error is encountered
74 (typically a weird character in a filename),
75 otherwise a warning is thrown.
76 @return list of unzipped files
77 """
78 if isinstance(zipf, bytes):
79 zipf = BytesIO(zipf)
81 try:
82 with zipfile.ZipFile(zipf, "r"):
83 pass
84 except zipfile.BadZipFile as e: # pragma: no cover
85 if isinstance(zipf, BytesIO):
86 raise e
87 raise IOError("Unable to read file '{0}'".format(zipf)) from e
89 files = []
90 with zipfile.ZipFile(zipf, "r") as file:
91 for info in file.infolist():
92 if fLOG:
93 fLOG("[unzip_files] unzip '{0}'".format(info.filename))
94 if where_to is None:
95 try:
96 content = file.read(info.filename)
97 except zipfile.BadZipFile as e: # pragma: no cover
98 if fail_if_error:
99 raise zipfile.BadZipFile(
100 "Unable to extract '{0}' due to {1}".format(info.filename, e)) from e
101 warnings.warn(
102 "Unable to extract '{0}' due to {1}".format(info.filename, e), UserWarning)
103 continue
104 files.append((info.filename, content))
105 else:
106 clean = remove_diacritics(info.filename)
107 if remove_space:
108 clean = clean.replace(" ", "").replace("'", "").replace(",", "_") \
109 .replace("(", "_").replace(")", "_")
110 tos = os.path.join(where_to, clean)
111 if not os.path.exists(tos):
112 if fvalid and not fvalid(info.filename, tos):
113 fLOG("[unzip_files] skipping", info.filename)
114 continue
115 try:
116 data = file.read(info.filename)
117 except zipfile.BadZipFile as e: # pragma: no cover
118 if fail_if_error:
119 raise zipfile.BadZipFile(
120 "Unable to extract '{0}' due to {1}".format(info.filename, e)) from e
121 warnings.warn(
122 "Unable to extract '{0}' due to {1}".format(info.filename, e), UserWarning)
123 continue
124 # check encoding to avoid characters not allowed in paths
125 if not os.path.exists(tos):
126 if sys.platform.startswith("win"):
127 tos = tos.replace("/", "\\")
128 finalfolder = os.path.split(tos)[0]
129 if not os.path.exists(finalfolder):
130 fLOG("[unzip_files] creating folder (zip)",
131 os.path.abspath(finalfolder))
132 try:
133 os.makedirs(finalfolder)
134 except FileNotFoundError as e: # pragma: no cover
135 mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format(
136 info.filename, tos, finalfolder, len(finalfolder))
137 raise FileNotFoundError(mes) from e
138 if not info.filename.endswith("/"):
139 try:
140 with open(tos, "wb") as u:
141 u.write(data)
142 except FileNotFoundError as e: # pragma: no cover
143 # probably an issue in the path name
144 # the next lines are just here to distinguish
145 # between the two cases
146 if not os.path.exists(finalfolder):
147 raise e
148 newname = info.filename.replace(
149 " ", "_").replace(",", "_")
150 if sys.platform.startswith("win"):
151 newname = newname.replace("/", "\\")
152 tos = os.path.join(where_to, newname)
153 finalfolder = os.path.split(tos)[0]
154 if not os.path.exists(finalfolder):
155 fLOG("[unzip_files] creating folder (zip)",
156 os.path.abspath(finalfolder))
157 os.makedirs(finalfolder)
158 with open(tos, "wb") as u:
159 u.write(data)
160 files.append(tos)
161 fLOG("[unzip_files] unzipped ",
162 info.filename, " to ", tos)
163 elif not tos.endswith("/"): # pragma: no cover
164 files.append(tos)
165 elif not info.filename.endswith("/"): # pragma: no cover
166 files.append(tos)
167 return files
170def gzip_files(filename, file_set, encoding=None, fLOG=noLOG):
171 """
172 Compresses all files from an iterator in a zip file
173 and then in a gzip file.
175 @param filename final gzip file (double compression, extension should something like .zip.gz)
176 @param file_set iterator on file to add
177 @param encoding encoding of input files (no double compression then)
178 @param fLOG logging function
179 @return bytes (if filename is None) or None
180 """
181 if filename is None:
182 filename = BytesIO()
183 if encoding is None:
184 content = zip_files(None, file_set, fLOG=fLOG)
185 f = gzip.open(filename, 'wb')
186 f.write(content)
187 f.close()
188 return filename.getvalue() if isinstance(filename, BytesIO) else None
189 f = gzip.open(filename, 'wt', encoding="utf-8")
190 for name in file_set:
191 with open(name, "r", encoding="utf-8") as ft:
192 content = ft.read()
193 f.write(content)
194 f.close()
195 return filename.getvalue() if isinstance(filename, BytesIO) else None
198def ungzip_files(filename, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True,
199 unzip=True, encoding=None):
200 """
201 Uncompresses files from a gzip file.
203 @param filename final gzip file (double compression, extension should something like .zip.gz)
204 @param where_to destination folder (can be None, the result is a list of tuple)
205 @param fLOG logging function
206 @param fvalid function which takes two paths (zip name, local name) and return True if the file
207 must be unzipped, False otherwise, if None, the default answer is True
208 @param remove_space remove spaces in created local path (+ ``',()``)
209 @param unzip unzip file after gzip
210 @param encoding encoding
211 @return list of unzipped files
212 """
213 if isinstance(filename, bytes):
214 is_file = False
215 filename = BytesIO(filename)
216 else:
217 is_file = True
219 if encoding is None:
220 f = gzip.open(filename, 'rb')
221 content = f.read()
222 f.close()
223 if unzip:
224 try:
225 return unzip_files(content, where_to=where_to, fLOG=fLOG)
226 except Exception as e: # pragma: no cover
227 raise IOError(
228 "Unable to unzip file '{0}'".format(filename)) from e
229 elif where_to is not None:
230 filename = os.path.split(filename)[-1].replace(".gz", "")
231 filename = os.path.join(where_to, filename)
232 with open(filename, "wb") as f:
233 f.write(content)
234 return filename
235 return content
236 else:
237 f = gzip.open(filename, 'rt', encoding="utf-8")
238 content = f.read()
239 f.close()
240 if is_file:
241 filename = filename.replace(".gz", "")
242 with open(filename, "wb") as f:
243 f.write(content)
244 return filename
245 return content
248def zip7_files(filename_7z, file_set, fLOG=noLOG, temp_folder="."):
249 """
250 If :epkg:`7z` is installed, the function uses it
251 to compress file into 7z format. The file *filename_7z* must not exist.
253 @param filename_7z final destination
254 @param file_set list of files to compress
255 @param fLOG logging function
256 @param temp_folder the function stores the list of files in a file in the
257 folder *temp_folder*, it will be removed afterwords
258 @return number of added files
260 .. faqref::
261 :title: Why module pylzma does not work?
262 :lid: faq-pylzma-ref
264 The module :epkg:`pylzma`
265 failed to decompress the file produced by the latest version
266 of :epkg:`7z` (2016-09-23). The compression
267 was changed by tweaking the command line. LZMA is used instead LZMA2.
268 The current version does not include this
269 `commit <https://github.com/fancycode/pylzma/commit/b5c3c2bd4ab7abfb65de772861ecc600fe37394b>`_.
270 Or you can clone the package
271 `sdpython.pylzma <https://github.com/sdpython/pylzma>`_
272 and build it yourself with ``python setup.py bdist_wheel``.
273 """
274 if sys.platform.startswith("win"): # pragma: no cover
275 exe = r"C:\Program Files\7-Zip\7z.exe"
276 if not os.path.exists(exe):
277 raise FileNotFoundError("unable to find: {0}".format(exe))
278 elif sys.platform.startswith("darwin"):
279 exe = "7za" # pragma: no cover
280 else:
281 exe = "7z"
283 if os.path.exists(filename_7z):
284 raise FileException( # pragma: no cover
285 "'{0}' already exists".format(filename_7z))
287 notxist = [fn for fn in file_set if not os.path.exists(fn)]
288 if len(notxist) > 0:
289 raise FileNotFoundError( # pragma: no cover
290 "unable to compress unexisting files:\n{0}".format("\n".join(notxist)))
292 flist = os.path.join(temp_folder, "listfiles7z.txt")
293 with open(flist, "w", encoding="utf8") as f:
294 f.write("\n".join(file_set))
296 cmd = '"{0}" -m0=lzma -mfb=64 a "{1}" "@{2}"'.format(
297 exe, filename_7z, flist)
298 out, err = run_cmd(cmd, wait=True)
299 if "Error:" in out or not os.path.exists(filename_7z):
300 raise FileException( # pragma: no cover
301 "An error occurred with cmd: '{0}'\n"
302 "--OUT--\n{1}\n--ERR--\n{2}\n----".format(
303 cmd, out, err))
304 return len(file_set)
307def un7zip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None,
308 remove_space=True, cmd_line=False):
309 """
310 Unzips files from a zip archive compress with :epkg:`7z`.
312 @param zipf archive (or bytes or BytesIO)
313 @param where_to destination folder (can be None, the result is a list of tuple)
314 @param fLOG logging function
315 @param fvalid function which takes two paths (zip name, local name) and return True if the file
316 must be unzipped, False otherwise, if None, the default answer is True
317 @param remove_space remove spaces in created local path (+ ``',()``)
318 @param cmd_line use command line instead of module :epkg:`pylzma`
319 @return list of unzipped files
321 The function requires module :epkg:`pylzma`.
322 See :ref:`Why module pylzma does not work? <faq-pylzma-ref>`.
323 """
324 if cmd_line:
325 if not isinstance(zipf, str):
326 raise TypeError( # pragma: no cover
327 "Cannot use command line unless zipf is a file.")
328 if remove_space:
329 warnings.warn( # pragma: no cover
330 '[un7zip_files] remove_space and cmd_line are incompatible options.',
331 UserWarning)
332 if fvalid:
333 warnings.warn(
334 'fvalid and cmd_line are incompatible options.', UserWarning)
335 if sys.platform.startswith("win"): # pragma: no cover
336 exe = r"C:\Program Files\7-Zip\7z.exe"
337 if not os.path.exists(exe):
338 raise FileNotFoundError("unable to find: {0}".format(exe))
340 if where_to is None:
341 where_to = os.path.abspath(".")
342 elif sys.platform.startswith("darwin"):
343 exe = "7za" # pragma: no cover
344 else:
345 exe = "7z"
347 cmd = '"{0}" x "{1}" -o{2}'.format(exe, zipf, where_to)
348 out, err = run_cmd(cmd, wait=True, fLOG=fLOG)
349 if len(err) > 0 or "Error:" in out:
350 raise FileException( # pragma: no cover
351 "Unable to un-7zip file '{0}'\n--CMD--\n{3}\n--OUT--\n{1}\n--ERR--\n{2}".format(zipf, out, err, cmd))
353 return explore_folder(where_to)[1]
354 else:
355 from py7zlib import Archive7z, FormatError
356 file_zipf = None
357 if not isinstance(zipf, BytesIO):
358 file_zipf = zipf
359 if isinstance(zipf, bytes):
360 zipf = BytesIO(zipf)
361 else:
362 zipf = open(zipf, "rb")
364 files = []
365 try:
366 file = Archive7z(zipf)
367 except FormatError as e:
368 raise FileException( # pragma: no cover
369 "You should use a modified version available at https://github.com/sdpython/pylzma") from e
370 for info in file.files:
371 if where_to is None:
372 files.append((info.filename, info.read()))
373 else:
374 clean = remove_diacritics(info.filename)
375 if remove_space:
376 clean = clean.replace(" ", "").replace("'", "") \
377 .replace(",", "_").replace("(", "_") \
378 .replace(")", "_")
379 tos = os.path.join(where_to, clean)
380 if not os.path.exists(tos):
381 if fvalid and not fvalid(info.filename, tos):
382 fLOG("[un7zip_files] skipping", # pragma: no cover
383 info.filename)
384 continue # pragma: no cover
385 try:
386 data = info.read()
387 except NotImplementedError as e: # pragma: no cover
388 # You should use command line.
389 if file_zipf is None:
390 raise TypeError(
391 "Cannot switch to command line unless zipf is a file.") from e
392 warnings.warn(
393 "[un7zip_files] '{0}' --> Unavailable format. Use command line.".format(zipf), UserWarning)
394 return un7zip_files(file_zipf, where_to=where_to, fLOG=fLOG, fvalid=fvalid,
395 remove_space=remove_space, cmd_line=True)
396 except Exception as e: # pragma: no cover
397 raise FileException("Unable to unzip file '{0}' from '{1}'".format(
398 info.filename, zipf)) from e
399 # check encoding to avoid characters not allowed in paths
400 if not os.path.exists(tos):
401 if sys.platform.startswith("win"):
402 tos = tos.replace("/", "\\")
403 finalfolder = os.path.split(tos)[0]
404 if not os.path.exists(finalfolder):
405 fLOG("[un7zip_files] creating folder (7z)",
406 os.path.abspath(finalfolder))
407 try:
408 os.makedirs(finalfolder)
409 except FileNotFoundError as e: # pragma: no cover
410 mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format(
411 info.filename, tos, finalfolder, len(finalfolder))
412 raise FileNotFoundError(mes) from e
413 if not info.filename.endswith("/"):
414 try:
415 with open(tos, "wb") as u:
416 u.write(data)
417 except FileNotFoundError as e: # pragma: no cover
418 # probably an issue in the path name
419 # the next lines are just here to distinguish
420 # between the two cases
421 if not os.path.exists(finalfolder):
422 raise e
423 newname = info.filename.replace(
424 " ", "_").replace(",", "_")
425 if sys.platform.startswith("win"):
426 newname = newname.replace("/", "\\")
427 tos = os.path.join(where_to, newname)
428 finalfolder = os.path.split(tos)[0]
429 if not os.path.exists(finalfolder):
430 fLOG("[un7zip_files] creating folder (7z)",
431 os.path.abspath(finalfolder))
432 os.makedirs(finalfolder)
433 with open(tos, "wb") as u:
434 u.write(data)
435 files.append(tos)
436 fLOG("[un7zip_files] unzipped ",
437 info.filename, " to ", tos)
438 elif not tos.endswith("/"): # pragma: no cover
439 files.append(tos)
440 elif not info.filename.endswith("/"): # pragma: no cover
441 files.append(tos)
442 return files
445def unrar_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True):
446 """
447 Uncompresses files from a rar archive compress with :epkg:`7z`
448 on Window or *unrar* on linux.
450 @param zipf archive (or bytes or BytesIO)
451 @param where_to destination folder (can be None, the result is a list of tuple)
452 @param fLOG logging function
453 @param fvalid function which takes two paths (zip name, local name) and return True if the file
454 must be unzipped, False otherwise, if None, the default answer is True
455 @param remove_space remove spaces in created local path (+ ``',()``)
456 @return list of unzipped files
457 """
458 if sys.platform.startswith("win"): # pragma: no cover
459 exe = r"C:\Program Files\7-Zip\7z.exe"
460 if not os.path.exists(exe):
461 raise FileNotFoundError("unable to find: {0}".format(exe))
463 if where_to is None:
464 where_to = os.path.abspath(".")
465 cmd = '"{0}" x "{1}" "-o{2}"'.format(exe, zipf, where_to)
466 out, err = run_cmd(cmd, wait=True, fLOG=fLOG)
467 if len(err) > 0 or "Error:" in out:
468 raise FileException(
469 "Unable to unrar file '{0}'\n"
470 "--OUT--\n{1}\n--ERR--\n{2}".format(
471 zipf, out, err))
473 return explore_folder(where_to)[1]
474 else:
475 exe = "unrar"
477 if where_to is None:
478 where_to = os.path.abspath(".")
479 cmd = '"{0}" x "{1}"'.format(exe, zipf)
480 out, err = run_cmd(cmd, wait=True, fLOG=fLOG, change_path=where_to)
481 if len(err) > 0:
482 raise FileException( # pragma: no cover
483 "Unable to unrar file '{0}'\n--CMD--\n{3}\n--OUT--\n{1}\n--ERR--\n{2}".format(zipf, out, err, cmd))
485 return explore_folder(where_to)[1]
488def untar_files(filename, where_to=None, fLOG=noLOG, encoding=None):
489 """
490 Uncompresses files from a tar file.
492 @param filename final tar file (double compression, extension should something like .zip.gz)
493 @param where_to destination folder (can be None, the result is a list of tuple)
494 @param fLOG logging function
495 @param encoding encoding
496 @return list of unzipped files
497 """
498 if isinstance(filename, bytes):
499 fileobj = filename
500 name = None
501 targz = True
502 else:
503 name = filename
504 fileobj = None
505 targz = name.endswith(".tar.gz")
507 if targz:
508 tar = tarfile.open(name=name, fileobj=fileobj,
509 mode="r:gz", encoding=encoding)
510 names = tar.getnames()
511 tar.extractall(where_to)
512 tar.close()
513 else:
514 tar = tarfile.open(name=name, fileobj=fileobj,
515 mode="r:", encoding=encoding)
516 names = tar.getnames()
517 tar.extractall(where_to)
518 tar.close()
519 if where_to is not None:
520 return [os.path.join(where_to, name) for name in names]
521 return names