Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3@brief Functions about compressing files. 

4""" 

5 

6import os 

7import zipfile 

8import datetime 

9import gzip 

10import sys 

11import warnings 

12import tarfile 

13from io import BytesIO 

14 

15from ..loghelper.flog import noLOG, run_cmd 

16from .fexceptions import FileException 

17from ..texthelper.diacritic_helper import remove_diacritics 

18from .synchelper import explore_folder 

19 

20 

21def zip_files(filename, file_set, root=None, fLOG=noLOG): 

22 """ 

23 Zips all files from an iterator. 

24 

25 @param filename final zip file (can be None) 

26 @param file_set iterator on file to add 

27 @param root if not None, all path are relative to this path 

28 @param fLOG logging function 

29 @return number of added files (or content if filename is None) 

30 

31 *filename* can be None, the function compresses 

32 into bytes without saving the results. 

33 """ 

34 nb = 0 

35 a1980 = datetime.datetime(1980, 1, 1) 

36 if filename is None: 

37 filename = BytesIO() 

38 with zipfile.ZipFile(filename, 'w') as myzip: 

39 for file in file_set: 

40 if not os.path.exists(file): 

41 continue 

42 if fLOG: 

43 fLOG("[zip_files] '{0}'".format(file)) 

44 st = os.stat(file) 

45 atime = datetime.datetime.fromtimestamp(st.st_atime) 

46 mtime = datetime.datetime.fromtimestamp(st.st_mtime) 

47 if atime < a1980 or mtime < a1980: # pragma: no cover 

48 new_mtime = st.st_mtime + (4 * 3600) # new modification time 

49 while datetime.datetime.fromtimestamp(new_mtime) < a1980: 

50 new_mtime += (4 * 3600) # new modification time 

51 

52 fLOG( 

53 "[zip_files] changing time timestamp for file '{0}'".format(file)) 

54 os.utime(file, (st.st_atime, new_mtime)) 

55 

56 arcname = os.path.relpath(file, root) if root else None 

57 myzip.write(file, arcname=arcname) 

58 nb += 1 

59 return filename.getvalue() if isinstance(filename, BytesIO) else nb 

60 

61 

62def unzip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True, 

63 fail_if_error=True): 

64 """ 

65 Unzips files from a zip archive. 

66 

67 @param zipf archive (or bytes or BytesIO) 

68 @param where_to destination folder (can be None, the result is a list of tuple) 

69 @param fLOG logging function 

70 @param fvalid function which takes two paths (zip name, local name) and return True if the file 

71 must be unzipped, False otherwise, if None, the default answer is True 

72 @param remove_space remove spaces in created local path (+ ``',()``) 

73 @param fail_if_error fails if an error is encountered 

74 (typically a weird character in a filename), 

75 otherwise a warning is thrown. 

76 @return list of unzipped files 

77 """ 

78 if isinstance(zipf, bytes): 

79 zipf = BytesIO(zipf) 

80 

81 try: 

82 with zipfile.ZipFile(zipf, "r"): 

83 pass 

84 except zipfile.BadZipFile as e: # pragma: no cover 

85 if isinstance(zipf, BytesIO): 

86 raise e 

87 raise IOError("Unable to read file '{0}'".format(zipf)) from e 

88 

89 files = [] 

90 with zipfile.ZipFile(zipf, "r") as file: 

91 for info in file.infolist(): 

92 if fLOG: 

93 fLOG("[unzip_files] unzip '{0}'".format(info.filename)) 

94 if where_to is None: 

95 try: 

96 content = file.read(info.filename) 

97 except zipfile.BadZipFile as e: # pragma: no cover 

98 if fail_if_error: 

99 raise zipfile.BadZipFile( 

100 "Unable to extract '{0}' due to {1}".format(info.filename, e)) from e 

101 warnings.warn( 

102 "Unable to extract '{0}' due to {1}".format(info.filename, e), UserWarning) 

103 continue 

104 files.append((info.filename, content)) 

105 else: 

106 clean = remove_diacritics(info.filename) 

107 if remove_space: 

108 clean = clean.replace(" ", "").replace("'", "").replace(",", "_") \ 

109 .replace("(", "_").replace(")", "_") 

110 tos = os.path.join(where_to, clean) 

111 if not os.path.exists(tos): 

112 if fvalid and not fvalid(info.filename, tos): 

113 fLOG("[unzip_files] skipping", info.filename) 

114 continue 

115 try: 

116 data = file.read(info.filename) 

117 except zipfile.BadZipFile as e: # pragma: no cover 

118 if fail_if_error: 

119 raise zipfile.BadZipFile( 

120 "Unable to extract '{0}' due to {1}".format(info.filename, e)) from e 

121 warnings.warn( 

122 "Unable to extract '{0}' due to {1}".format(info.filename, e), UserWarning) 

123 continue 

124 # check encoding to avoid characters not allowed in paths 

125 if not os.path.exists(tos): 

126 if sys.platform.startswith("win"): 

127 tos = tos.replace("/", "\\") 

128 finalfolder = os.path.split(tos)[0] 

129 if not os.path.exists(finalfolder): 

130 fLOG("[unzip_files] creating folder (zip)", 

131 os.path.abspath(finalfolder)) 

132 try: 

133 os.makedirs(finalfolder) 

134 except FileNotFoundError as e: # pragma: no cover 

135 mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format( 

136 info.filename, tos, finalfolder, len(finalfolder)) 

137 raise FileNotFoundError(mes) from e 

138 if not info.filename.endswith("/"): 

139 try: 

140 with open(tos, "wb") as u: 

141 u.write(data) 

142 except FileNotFoundError as e: # pragma: no cover 

143 # probably an issue in the path name 

144 # the next lines are just here to distinguish 

145 # between the two cases 

146 if not os.path.exists(finalfolder): 

147 raise e 

148 newname = info.filename.replace( 

149 " ", "_").replace(",", "_") 

150 if sys.platform.startswith("win"): 

151 newname = newname.replace("/", "\\") 

152 tos = os.path.join(where_to, newname) 

153 finalfolder = os.path.split(tos)[0] 

154 if not os.path.exists(finalfolder): 

155 fLOG("[unzip_files] creating folder (zip)", 

156 os.path.abspath(finalfolder)) 

157 os.makedirs(finalfolder) 

158 with open(tos, "wb") as u: 

159 u.write(data) 

160 files.append(tos) 

161 fLOG("[unzip_files] unzipped ", 

162 info.filename, " to ", tos) 

163 elif not tos.endswith("/"): # pragma: no cover 

164 files.append(tos) 

165 elif not info.filename.endswith("/"): # pragma: no cover 

166 files.append(tos) 

167 return files 

168 

169 

170def gzip_files(filename, file_set, encoding=None, fLOG=noLOG): 

171 """ 

172 Compresses all files from an iterator in a zip file 

173 and then in a gzip file. 

174 

175 @param filename final gzip file (double compression, extension should something like .zip.gz) 

176 @param file_set iterator on file to add 

177 @param encoding encoding of input files (no double compression then) 

178 @param fLOG logging function 

179 @return bytes (if filename is None) or None 

180 """ 

181 if filename is None: 

182 filename = BytesIO() 

183 if encoding is None: 

184 content = zip_files(None, file_set, fLOG=fLOG) 

185 f = gzip.open(filename, 'wb') 

186 f.write(content) 

187 f.close() 

188 return filename.getvalue() if isinstance(filename, BytesIO) else None 

189 f = gzip.open(filename, 'wt', encoding="utf-8") 

190 for name in file_set: 

191 with open(name, "r", encoding="utf-8") as ft: 

192 content = ft.read() 

193 f.write(content) 

194 f.close() 

195 return filename.getvalue() if isinstance(filename, BytesIO) else None 

196 

197 

198def ungzip_files(filename, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True, 

199 unzip=True, encoding=None): 

200 """ 

201 Uncompresses files from a gzip file. 

202 

203 @param filename final gzip file (double compression, extension should something like .zip.gz) 

204 @param where_to destination folder (can be None, the result is a list of tuple) 

205 @param fLOG logging function 

206 @param fvalid function which takes two paths (zip name, local name) and return True if the file 

207 must be unzipped, False otherwise, if None, the default answer is True 

208 @param remove_space remove spaces in created local path (+ ``',()``) 

209 @param unzip unzip file after gzip 

210 @param encoding encoding 

211 @return list of unzipped files 

212 """ 

213 if isinstance(filename, bytes): 

214 is_file = False 

215 filename = BytesIO(filename) 

216 else: 

217 is_file = True 

218 

219 if encoding is None: 

220 f = gzip.open(filename, 'rb') 

221 content = f.read() 

222 f.close() 

223 if unzip: 

224 try: 

225 return unzip_files(content, where_to=where_to, fLOG=fLOG) 

226 except Exception as e: # pragma: no cover 

227 raise IOError( 

228 "Unable to unzip file '{0}'".format(filename)) from e 

229 elif where_to is not None: 

230 filename = os.path.split(filename)[-1].replace(".gz", "") 

231 filename = os.path.join(where_to, filename) 

232 with open(filename, "wb") as f: 

233 f.write(content) 

234 return filename 

235 return content 

236 else: 

237 f = gzip.open(filename, 'rt', encoding="utf-8") 

238 content = f.read() 

239 f.close() 

240 if is_file: 

241 filename = filename.replace(".gz", "") 

242 with open(filename, "wb") as f: 

243 f.write(content) 

244 return filename 

245 return content 

246 

247 

248def zip7_files(filename_7z, file_set, fLOG=noLOG, temp_folder="."): 

249 """ 

250 If :epkg:`7z` is installed, the function uses it 

251 to compress file into 7z format. The file *filename_7z* must not exist. 

252 

253 @param filename_7z final destination 

254 @param file_set list of files to compress 

255 @param fLOG logging function 

256 @param temp_folder the function stores the list of files in a file in the 

257 folder *temp_folder*, it will be removed afterwords 

258 @return number of added files 

259 

260 .. faqref:: 

261 :title: Why module pylzma does not work? 

262 :lid: faq-pylzma-ref 

263 

264 The module :epkg:`pylzma` 

265 failed to decompress the file produced by the latest version 

266 of :epkg:`7z` (2016-09-23). The compression 

267 was changed by tweaking the command line. LZMA is used instead LZMA2. 

268 The current version does not include this 

269 `commit <https://github.com/fancycode/pylzma/commit/b5c3c2bd4ab7abfb65de772861ecc600fe37394b>`_. 

270 Or you can clone the package 

271 `sdpython.pylzma <https://github.com/sdpython/pylzma>`_ 

272 and build it yourself with ``python setup.py bdist_wheel``. 

273 """ 

274 if sys.platform.startswith("win"): # pragma: no cover 

275 exe = r"C:\Program Files\7-Zip\7z.exe" 

276 if not os.path.exists(exe): 

277 raise FileNotFoundError("unable to find: {0}".format(exe)) 

278 elif sys.platform.startswith("darwin"): 

279 exe = "7za" # pragma: no cover 

280 else: 

281 exe = "7z" 

282 

283 if os.path.exists(filename_7z): 

284 raise FileException( # pragma: no cover 

285 "'{0}' already exists".format(filename_7z)) 

286 

287 notxist = [fn for fn in file_set if not os.path.exists(fn)] 

288 if len(notxist) > 0: 

289 raise FileNotFoundError( # pragma: no cover 

290 "unable to compress unexisting files:\n{0}".format("\n".join(notxist))) 

291 

292 flist = os.path.join(temp_folder, "listfiles7z.txt") 

293 with open(flist, "w", encoding="utf8") as f: 

294 f.write("\n".join(file_set)) 

295 

296 cmd = '"{0}" -m0=lzma -mfb=64 a "{1}" "@{2}"'.format( 

297 exe, filename_7z, flist) 

298 out, err = run_cmd(cmd, wait=True) 

299 if "Error:" in out or not os.path.exists(filename_7z): 

300 raise FileException( # pragma: no cover 

301 "An error occurred with cmd: '{0}'\n" 

302 "--OUT--\n{1}\n--ERR--\n{2}\n----".format( 

303 cmd, out, err)) 

304 return len(file_set) 

305 

306 

307def un7zip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, 

308 remove_space=True, cmd_line=False): 

309 """ 

310 Unzips files from a zip archive compress with :epkg:`7z`. 

311 

312 @param zipf archive (or bytes or BytesIO) 

313 @param where_to destination folder (can be None, the result is a list of tuple) 

314 @param fLOG logging function 

315 @param fvalid function which takes two paths (zip name, local name) and return True if the file 

316 must be unzipped, False otherwise, if None, the default answer is True 

317 @param remove_space remove spaces in created local path (+ ``',()``) 

318 @param cmd_line use command line instead of module :epkg:`pylzma` 

319 @return list of unzipped files 

320 

321 The function requires module :epkg:`pylzma`. 

322 See :ref:`Why module pylzma does not work? <faq-pylzma-ref>`. 

323 """ 

324 if cmd_line: 

325 if not isinstance(zipf, str): 

326 raise TypeError( # pragma: no cover 

327 "Cannot use command line unless zipf is a file.") 

328 if remove_space: 

329 warnings.warn( # pragma: no cover 

330 '[un7zip_files] remove_space and cmd_line are incompatible options.', 

331 UserWarning) 

332 if fvalid: 

333 warnings.warn( 

334 'fvalid and cmd_line are incompatible options.', UserWarning) 

335 if sys.platform.startswith("win"): # pragma: no cover 

336 exe = r"C:\Program Files\7-Zip\7z.exe" 

337 if not os.path.exists(exe): 

338 raise FileNotFoundError("unable to find: {0}".format(exe)) 

339 

340 if where_to is None: 

341 where_to = os.path.abspath(".") 

342 elif sys.platform.startswith("darwin"): 

343 exe = "7za" # pragma: no cover 

344 else: 

345 exe = "7z" 

346 

347 cmd = '"{0}" x "{1}" -o{2}'.format(exe, zipf, where_to) 

348 out, err = run_cmd(cmd, wait=True, fLOG=fLOG) 

349 if len(err) > 0 or "Error:" in out: 

350 raise FileException( # pragma: no cover 

351 "Unable to un-7zip file '{0}'\n--CMD--\n{3}\n--OUT--\n{1}\n--ERR--\n{2}".format(zipf, out, err, cmd)) 

352 

353 return explore_folder(where_to)[1] 

354 else: 

355 from py7zlib import Archive7z, FormatError 

356 file_zipf = None 

357 if not isinstance(zipf, BytesIO): 

358 file_zipf = zipf 

359 if isinstance(zipf, bytes): 

360 zipf = BytesIO(zipf) 

361 else: 

362 zipf = open(zipf, "rb") 

363 

364 files = [] 

365 try: 

366 file = Archive7z(zipf) 

367 except FormatError as e: 

368 raise FileException( # pragma: no cover 

369 "You should use a modified version available at https://github.com/sdpython/pylzma") from e 

370 for info in file.files: 

371 if where_to is None: 

372 files.append((info.filename, info.read())) 

373 else: 

374 clean = remove_diacritics(info.filename) 

375 if remove_space: 

376 clean = clean.replace(" ", "").replace("'", "") \ 

377 .replace(",", "_").replace("(", "_") \ 

378 .replace(")", "_") 

379 tos = os.path.join(where_to, clean) 

380 if not os.path.exists(tos): 

381 if fvalid and not fvalid(info.filename, tos): 

382 fLOG("[un7zip_files] skipping", # pragma: no cover 

383 info.filename) 

384 continue # pragma: no cover 

385 try: 

386 data = info.read() 

387 except NotImplementedError as e: # pragma: no cover 

388 # You should use command line. 

389 if file_zipf is None: 

390 raise TypeError( 

391 "Cannot switch to command line unless zipf is a file.") from e 

392 warnings.warn( 

393 "[un7zip_files] '{0}' --> Unavailable format. Use command line.".format(zipf), UserWarning) 

394 return un7zip_files(file_zipf, where_to=where_to, fLOG=fLOG, fvalid=fvalid, 

395 remove_space=remove_space, cmd_line=True) 

396 except Exception as e: # pragma: no cover 

397 raise FileException("Unable to unzip file '{0}' from '{1}'".format( 

398 info.filename, zipf)) from e 

399 # check encoding to avoid characters not allowed in paths 

400 if not os.path.exists(tos): 

401 if sys.platform.startswith("win"): 

402 tos = tos.replace("/", "\\") 

403 finalfolder = os.path.split(tos)[0] 

404 if not os.path.exists(finalfolder): 

405 fLOG("[un7zip_files] creating folder (7z)", 

406 os.path.abspath(finalfolder)) 

407 try: 

408 os.makedirs(finalfolder) 

409 except FileNotFoundError as e: # pragma: no cover 

410 mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format( 

411 info.filename, tos, finalfolder, len(finalfolder)) 

412 raise FileNotFoundError(mes) from e 

413 if not info.filename.endswith("/"): 

414 try: 

415 with open(tos, "wb") as u: 

416 u.write(data) 

417 except FileNotFoundError as e: # pragma: no cover 

418 # probably an issue in the path name 

419 # the next lines are just here to distinguish 

420 # between the two cases 

421 if not os.path.exists(finalfolder): 

422 raise e 

423 newname = info.filename.replace( 

424 " ", "_").replace(",", "_") 

425 if sys.platform.startswith("win"): 

426 newname = newname.replace("/", "\\") 

427 tos = os.path.join(where_to, newname) 

428 finalfolder = os.path.split(tos)[0] 

429 if not os.path.exists(finalfolder): 

430 fLOG("[un7zip_files] creating folder (7z)", 

431 os.path.abspath(finalfolder)) 

432 os.makedirs(finalfolder) 

433 with open(tos, "wb") as u: 

434 u.write(data) 

435 files.append(tos) 

436 fLOG("[un7zip_files] unzipped ", 

437 info.filename, " to ", tos) 

438 elif not tos.endswith("/"): # pragma: no cover 

439 files.append(tos) 

440 elif not info.filename.endswith("/"): # pragma: no cover 

441 files.append(tos) 

442 return files 

443 

444 

445def unrar_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True): 

446 """ 

447 Uncompresses files from a rar archive compress with :epkg:`7z` 

448 on Window or *unrar* on linux. 

449 

450 @param zipf archive (or bytes or BytesIO) 

451 @param where_to destination folder (can be None, the result is a list of tuple) 

452 @param fLOG logging function 

453 @param fvalid function which takes two paths (zip name, local name) and return True if the file 

454 must be unzipped, False otherwise, if None, the default answer is True 

455 @param remove_space remove spaces in created local path (+ ``',()``) 

456 @return list of unzipped files 

457 """ 

458 if sys.platform.startswith("win"): # pragma: no cover 

459 exe = r"C:\Program Files\7-Zip\7z.exe" 

460 if not os.path.exists(exe): 

461 raise FileNotFoundError("unable to find: {0}".format(exe)) 

462 

463 if where_to is None: 

464 where_to = os.path.abspath(".") 

465 cmd = '"{0}" x "{1}" "-o{2}"'.format(exe, zipf, where_to) 

466 out, err = run_cmd(cmd, wait=True, fLOG=fLOG) 

467 if len(err) > 0 or "Error:" in out: 

468 raise FileException( 

469 "Unable to unrar file '{0}'\n" 

470 "--OUT--\n{1}\n--ERR--\n{2}".format( 

471 zipf, out, err)) 

472 

473 return explore_folder(where_to)[1] 

474 else: 

475 exe = "unrar" 

476 

477 if where_to is None: 

478 where_to = os.path.abspath(".") 

479 cmd = '"{0}" x "{1}"'.format(exe, zipf) 

480 out, err = run_cmd(cmd, wait=True, fLOG=fLOG, change_path=where_to) 

481 if len(err) > 0: 

482 raise FileException( # pragma: no cover 

483 "Unable to unrar file '{0}'\n--CMD--\n{3}\n--OUT--\n{1}\n--ERR--\n{2}".format(zipf, out, err, cmd)) 

484 

485 return explore_folder(where_to)[1] 

486 

487 

488def untar_files(filename, where_to=None, fLOG=noLOG, encoding=None): 

489 """ 

490 Uncompresses files from a tar file. 

491 

492 @param filename final tar file (double compression, extension should something like .zip.gz) 

493 @param where_to destination folder (can be None, the result is a list of tuple) 

494 @param fLOG logging function 

495 @param encoding encoding 

496 @return list of unzipped files 

497 """ 

498 if isinstance(filename, bytes): 

499 fileobj = filename 

500 name = None 

501 targz = True 

502 else: 

503 name = filename 

504 fileobj = None 

505 targz = name.endswith(".tar.gz") 

506 

507 if targz: 

508 tar = tarfile.open(name=name, fileobj=fileobj, 

509 mode="r:gz", encoding=encoding) 

510 names = tar.getnames() 

511 tar.extractall(where_to) 

512 tar.close() 

513 else: 

514 tar = tarfile.open(name=name, fileobj=fileobj, 

515 mode="r:", encoding=encoding) 

516 names = tar.getnames() 

517 tar.extractall(where_to) 

518 tar.close() 

519 if where_to is not None: 

520 return [os.path.join(where_to, name) for name in names] 

521 return names