Coverage for pyquickhelper/filehelper/winzipfile.py: 53%

68 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief Fix a bug: see https://bugs.python.org/issue6839. 

4""" 

5import sys 

6import struct 

7from zipfile import ZipFile, ZipInfo, ZipExtFile, _ZipDecrypter, BadZipFile 

8from zipfile import _FH_EXTRA_FIELD_LENGTH, _FH_FILENAME_LENGTH, _FH_SIGNATURE 

9from zipfile import stringFileHeader, structFileHeader, sizeFileHeader, _SharedFile 

10 

11 

12class WinZipFile(ZipFile): 

13 """ 

14 Overwrite method :epkg:`*py:zipfile:ZipFile:open`. 

15 

16 Issue `6839 <https://bugs.python.org/issue6839>`_ happens when 

17 a zip file is created on Windows. The created zip may contain 

18 full path with ``\\`` when the file list only contains ``/``. 

19 This raises exception ``BadZipFile`` with the following message: 

20 *File name in directory ... and header ... differ* due to a mismatch 

21 between backslashes. This owerwrite method :epkg:`*py:zipfile:ZipFile:open` 

22 to fix the line which checks that names are consistent in the file list 

23 and in the compressed content. 

24 """ 

25 

26 def open(self, name, mode="r", pwd=None, *, force_zip64=False): 

27 """ 

28 Returns file-like object for 'name'. 

29 

30 @param name is a string for the file name within the ZIP file, or a ZipInfo 

31 object. 

32 @param mode should be 'r' to read a file already in the ZIP file, or 'w' to 

33 write to a file newly added to the archive. 

34 @param pwd is the password to decrypt files (only used for reading). 

35 

36 When writing, if the file size is not known in advance but may exceed 

37 2 GiB, pass force_zip64 to use the ZIP64 format, which can handle large 

38 files. If the size is known in advance, it is best to pass a ZipInfo 

39 instance for name, with zinfo.file_size set. 

40 """ 

41 if mode not in {"r", "w"}: 

42 raise ValueError('open() requires mode "r" or "w"') 

43 if pwd and not isinstance(pwd, bytes): 

44 raise TypeError(f"pwd: expected bytes, got {type(pwd).__name__}") 

45 if pwd and (mode == "w"): 

46 raise ValueError("pwd is only supported for reading files") 

47 if not self.fp: 

48 raise ValueError( 

49 "Attempt to use ZIP archive that was already closed") 

50 

51 # Make sure we have an info object 

52 if isinstance(name, ZipInfo): 

53 # 'name' is already an info object 

54 zinfo = name 

55 elif mode == 'w': 

56 zinfo = ZipInfo(name) 

57 zinfo.compress_type = self.compression 

58 else: 

59 # Get info object for name 

60 zinfo = self.getinfo(name) 

61 

62 if mode == 'w': 

63 return self._open_to_write(zinfo, force_zip64=force_zip64) 

64 

65 if hasattr(self, "_writing") and self._writing: 

66 raise ValueError("Can't read from the ZIP file while there " 

67 "is an open writing handle on it. " 

68 "Close the writing handle before trying to read.") 

69 

70 # Open for reading: 

71 self._fileRefCnt += 1 

72 if sys.version_info[:2] <= (3, 5): 

73 zef_file = _SharedFile( # pylint: disable=E1120 

74 self.fp, zinfo.header_offset, self._fpclose, self._lock) 

75 zef_file = _SharedFile(self.fp, zinfo.header_offset, 

76 self._fpclose, self._lock, lambda: hasattr(self, "_writing") and self._writing) 

77 try: 

78 # Skip the file header: 

79 fheader = zef_file.read(sizeFileHeader) 

80 if len(fheader) != sizeFileHeader: 

81 raise BadZipFile("Truncated file header") 

82 fheader = struct.unpack(structFileHeader, fheader) 

83 if fheader[_FH_SIGNATURE] != stringFileHeader: 

84 raise BadZipFile("Bad magic number for file header") 

85 

86 fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) 

87 if fheader[_FH_EXTRA_FIELD_LENGTH]: 

88 zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) 

89 

90 if zinfo.flag_bits & 0x20: 

91 # Zip 2.7: compressed patched data 

92 raise NotImplementedError( 

93 "compressed patched data (flag bit 5)") 

94 

95 if zinfo.flag_bits & 0x40: 

96 # strong encryption 

97 raise NotImplementedError("strong encryption (flag bit 6)") 

98 

99 if zinfo.flag_bits & 0x800: 

100 # UTF-8 filename 

101 fname_str = fname.decode("utf-8") 

102 else: 

103 fname_str = fname.decode("cp437") 

104 

105 if sys.platform.startswith("win"): 

106 if fname_str.replace("\\", "/") != zinfo.orig_filename.replace("\\", "/"): 

107 raise BadZipFile( 

108 'File name in directory %r and header %r differ.' 

109 % (zinfo.orig_filename, fname)) 

110 else: 

111 if fname_str != zinfo.orig_filename: 

112 raise BadZipFile( 

113 'File name in directory %r and header %r differ.' 

114 % (zinfo.orig_filename, fname)) 

115 

116 # check for encrypted flag & handle password 

117 is_encrypted = zinfo.flag_bits & 0x1 

118 zd = None 

119 if is_encrypted: 

120 if not pwd: 

121 pwd = self.pwd 

122 if not pwd: 

123 raise RuntimeError("File %r is encrypted, password " 

124 "required for extraction" % name) 

125 

126 zd = _ZipDecrypter(pwd) 

127 # The first 12 bytes in the cypher stream is an encryption header 

128 # used to strengthen the algorithm. The first 11 bytes are 

129 # completely random, while the 12th contains the MSB of the CRC, 

130 # or the MSB of the file time depending on the header type 

131 # and is used to check the correctness of the password. 

132 header = zef_file.read(12) 

133 h = list(map(zd, header[0:12])) 

134 if zinfo.flag_bits & 0x8: 

135 # compare against the file type from extended local headers 

136 check_byte = (zinfo._raw_time >> 8) & 0xff 

137 else: 

138 # compare against the CRC otherwise 

139 check_byte = (zinfo.CRC >> 24) & 0xff 

140 if h[11] != check_byte: 

141 raise RuntimeError(f"Bad password for file {name!r}") 

142 

143 return ZipExtFile(zef_file, mode, zinfo, zd, True) 

144 except Exception: 

145 zef_file.close() 

146 raise