Coverage for src/pyensae/filehelper/decompress_helper.py: 94%
69 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-03 02:16 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-03 02:16 +0200
1"""
2@file
3@brief Various functions to decompress files
4"""
5import zipfile
6import os
7import gzip
8import bz2
9import warnings
10import copy
11import tarfile
12from tarfile import ExtractError
13from pyquickhelper.loghelper import noLOG
16def decompress_zip(filename, whereTo=".", fLOG=noLOG):
17 """
18 Unzips a :epkg:`zip` file.
20 @param filename file to process
21 @param whereTo location of the result
22 @param fLOG logging function
23 @return return the list of decompressed files
24 """
25 try:
26 file = zipfile.ZipFile(filename, "r")
27 except zipfile.BadZipFile as e: # pragma: no cover
28 raise RuntimeError("Unable to unzip '{}'.".format(
29 filename)) from e
30 files = []
31 for info in file.infolist():
32 if not os.path.exists(info.filename):
33 data = file.read(info.filename)
34 tos = os.path.join(whereTo, info.filename)
35 if not os.path.exists(tos):
36 finalfolder = os.path.split(tos)[0]
37 if not os.path.exists(finalfolder):
38 fLOG( # pragma: no cover
39 "[decompress_zip] creating folder '{0}'".format(
40 finalfolder))
41 os.makedirs(finalfolder) # pragma: no cover
42 if not info.filename.endswith("/"):
43 u = open(tos, "wb")
44 u.write(data)
45 u.close()
46 files.append(tos)
47 fLOG("[decompress_zip] unzipped '{0}' to '{1}'".format(
48 info.filename, tos))
49 elif not tos.endswith("/"):
50 files.append(tos)
51 elif not info.filename.endswith("/"):
52 files.append(info.filename)
53 return files
56def extractall_silent(self, path=".", members=None, *, numeric_owner=False, silent=False):
57 """
58 Extracts all members from the archive to the current working
59 directory and set owner, modification time and permissions on
60 directories afterwards. `path' specifies a different directory
61 to extract to. `members' is optional and must be a subset of the
62 list returned by getmembers(). If `numeric_owner` is True, only
63 the numbers for user/group names are used and not the names.
65 Same function as `TarFile.extractall <https://github.com/python/cpython/blob/master/Lib/tarfile.py>`_
66 but raises a warning if something wrong happens if silent is True.
67 """
68 directories = []
70 if members is None:
71 members = self
73 for tarinfo in members:
74 if tarinfo.isdir():
75 # Extract directories with a safe mode.
76 directories.append(tarinfo)
77 tarinfo = copy.copy(tarinfo)
78 tarinfo.mode = 0o700
79 # Do not set_attrs directories, as we will do that further down
80 if silent: # pragma: no cover
81 try:
82 self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(),
83 numeric_owner=numeric_owner)
84 except FileNotFoundError as e:
85 warnings.warn(
86 "[TarFile.extractall_silent] issue with '{0}' - {1}".format(path, e))
87 else:
88 self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(),
89 numeric_owner=numeric_owner)
91 # Reverse sort directories.
92 directories.sort(key=lambda a: a.name)
93 directories.reverse()
95 # Set correct owner, mtime and filemode on directories.
96 for tarinfo in directories:
97 dirpath = os.path.join(path, tarinfo.name)
98 try:
99 self.chown(tarinfo, dirpath, numeric_owner=numeric_owner)
100 self.utime(tarinfo, dirpath)
101 self.chmod(tarinfo, dirpath)
102 except ExtractError as e: # pragma: no cover
103 if self.errorlevel > 1:
104 raise
105 self._dbg(1, "tarfile: %s" % e)
108def decompress_targz(filename, whereTo=".", silent=True, fLOG=noLOG):
109 """
110 Decompresses a :epkg:`tar.gz` file.
112 @param filename file to process
113 @param folder location of the result
114 @param silent raise a warning instead of an error
115 @param fLOG logging function
116 @return return the list of decompressed files
117 """
118 tfile = tarfile.open(filename, 'r:gz')
119 files = tfile.getmembers()
120 extractall_silent(tfile, whereTo, silent=silent)
121 t = [os.path.join(whereTo, f.name) for f in files]
122 return [f for f in t if os.path.isfile(f)]
125def decompress_gz(filename, whereTo=".", fLOG=noLOG):
126 """
127 Decompresses a :epkg:`tar.gz` file.
129 @param filename file to process
130 @param folder location of the result
131 @param fLOG logging function
132 @return return the list of decompressed files (only one)
133 """
134 if not filename.endswith(".gz"):
135 raise NameError( # pragma: no cover
136 "the file should end with .gz: %r" % filename)
137 dest = os.path.join(whereTo, filename[:-3])
138 with gzip.open(filename, 'rb') as f:
139 with open(dest, "wb") as g:
140 g.write(f.read())
141 return [dest]
144def decompress_bz2(filename, whereTo=".", fLOG=noLOG):
145 """
146 Decompresses a :epkg:`bz2` file.
148 @param filename file to process
149 @param folder location of the result
150 @param fLOG logging function
151 @return return the list of decompressed files (only one)
152 """
153 if not filename.endswith(".bz2"):
154 raise NameError( # pragma: no cover
155 "the file should end with .bz2 not '{0}'".format(filename))
156 dest = os.path.join(whereTo, os.path.split(filename)[-1][:-4])
157 with bz2.open(filename, 'rb') as f:
158 with open(dest, "wb") as g:
159 g.write(f.read())
160 return [dest]