Coverage for pyquickhelper/filehelper/anyfhelper.py: 77%

105 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief Various helpers about files 

4""" 

5import os 

6import stat 

7import warnings 

8from io import BytesIO, StringIO 

9from .synchelper import explore_folder_iterfile 

10from .internet_helper import read_url 

11from .file_info import is_file_string, is_url_string 

12 

13 

14def change_file_status(folder, status=stat.S_IWRITE, strict=False, 

15 include_folder=True): 

16 """ 

17 Changes the status of all files inside a folder. 

18 

19 @param folder folder or file 

20 @param status new status 

21 @param strict False, use ``|=``, True, use ``=`` 

22 @param include_folder change the status of the folders as well 

23 @return list of modified files 

24 

25 By default, status is ``stat.S_IWRITE``. 

26 If *folder* is a file, the function changes the status of this file, 

27 otherwise, it will change the status of every file the folder contains. 

28 """ 

29 if os.path.isfile(folder): 

30 if include_folder: 

31 dirname = os.path.dirname(folder) 

32 todo = [dirname, folder] 

33 else: 

34 todo = [folder] 

35 res = [] 

36 

37 for f in todo: 

38 mode = os.stat(f).st_mode 

39 if strict: 

40 nmode = status 

41 if nmode != mode: 

42 os.chmod(f, nmode) 

43 res.append(f) 

44 else: 

45 nmode = mode | stat.S_IWRITE 

46 if nmode != mode: 

47 os.chmod(f, nmode) 

48 res.append(f) 

49 return res 

50 else: 

51 res = [] 

52 dirname = set() 

53 if strict: # pragma: no cover 

54 for f in explore_folder_iterfile(folder): 

55 if include_folder: 

56 d = os.path.dirname(f) 

57 if d not in dirname: 

58 dirname.add(d) 

59 mode = os.stat(d).st_mode 

60 nmode = status 

61 if nmode != mode: 

62 os.chmod(d, nmode) 

63 res.append(d) 

64 try: 

65 mode = os.stat(f).st_mode 

66 except FileNotFoundError: # pragma: no cover 

67 # It appends for some weird path. 

68 warnings.warn( 

69 f"[change_file_status] unable to find '{f}'", UserWarning) 

70 continue 

71 nmode = status 

72 if nmode != mode: 

73 os.chmod(f, nmode) 

74 res.append(f) 

75 

76 # It ends up with the folder. 

77 if include_folder: 

78 d = folder 

79 if d not in dirname: 

80 mode = os.stat(d).st_mode 

81 nmode = status 

82 if nmode != mode: 

83 os.chmod(d, nmode) 

84 res.append(d) 

85 else: 

86 for f in explore_folder_iterfile(folder): 

87 if include_folder: 

88 d = os.path.dirname(f) 

89 if d not in dirname: 

90 dirname.add(d) 

91 mode = os.stat(d).st_mode 

92 nmode = mode | stat.S_IWRITE 

93 if nmode != mode: 

94 os.chmod(d, nmode) 

95 res.append(d) 

96 try: 

97 mode = os.stat(f).st_mode 

98 except FileNotFoundError: # pragma: no cover 

99 # it appends for some weird path 

100 warnings.warn( 

101 f"[change_file_status] unable to find '{f}'", UserWarning) 

102 continue 

103 nmode = mode | stat.S_IWRITE 

104 if nmode != mode: 

105 os.chmod(f, nmode) 

106 res.append(f) 

107 

108 # It ends up with the folder. 

109 if include_folder: 

110 d = folder 

111 if d not in dirname: 

112 mode = os.stat(d).st_mode 

113 nmode = mode | stat.S_IWRITE 

114 if nmode != mode: 

115 os.chmod(d, nmode) 

116 res.append(d) 

117 return res 

118 

119 

120def read_content_ufs(file_url_stream, encoding="utf8", asbytes=False, 

121 add_source=False, min_size=None): 

122 """ 

123 Reads the content of a source, whether it is a url, a file, a stream 

124 or a string (in that case, it returns the string itself), 

125 it assumes the content type is text. 

126 

127 @param file_url_stream file or url or stream or string 

128 @param encoding encoding 

129 @param asbytes return bytes instead of chars 

130 @param add_source also return the way the content was obtained 

131 @param min_size if not empty, the function raises an exception 

132 if the size is below that theshold 

133 @return content of the source (str) or *(content, type)* 

134 

135 Type can be: 

136 

137 * *rb*: binary file 

138 * *r*: text file 

139 * *u*: url 

140 * *ub*: binary content from url 

141 * *s*: string 

142 * *b*: binary string 

143 * *S*: StringIO 

144 * *SB*: BytesIO 

145 * *SBb*: BytesIO, return bytes 

146 

147 The function can return bytes. 

148 """ 

149 def check_size(cont): 

150 if min_size is not None and len(cont) < min_size: 

151 raise RuntimeError( # pragma: no cover 

152 f"File '{file_url_stream}' is smaller than {min_size}.") 

153 

154 if isinstance(file_url_stream, str): 

155 if is_file_string(file_url_stream) and os.path.exists(file_url_stream): 

156 if asbytes: 

157 with open(file_url_stream, "rb") as f: 

158 content = f.read() 

159 check_size(content) 

160 return (content, "rb") if add_source else content 

161 with open(file_url_stream, "r", encoding=encoding) as f: 

162 content = f.read() 

163 check_size(content) 

164 return (content, "r") if add_source else content 

165 if len(file_url_stream) < 5000 and file_url_stream.startswith("http"): 

166 content = read_url(file_url_stream, encoding=encoding) 

167 check_size(content) 

168 return (content, "u") if add_source else content 

169 if is_url_string(file_url_stream): 

170 if asbytes: 

171 content = read_url(file_url_stream) 

172 check_size(content) 

173 return (content, "ub") if add_source else content 

174 if encoding is None: 

175 raise ValueError( # pragma: no cover 

176 "cannot return bytes if encoding is None for url: " + file_url_stream) 

177 content = read_url(file_url_stream, encoding=encoding) 

178 check_size(content) 

179 return (content, "u") if add_source else content 

180 # the string should the content itself 

181 if isinstance(file_url_stream, str): 

182 if asbytes: 

183 raise TypeError( # pragma: no cover 

184 "file_url_stream is str when expected bytes") 

185 return (file_url_stream, "s") if add_source else file_url_stream 

186 if asbytes: 

187 return (file_url_stream, "b") if add_source else file_url_stream 

188 raise TypeError( # pragma: no cover 

189 "file_url_stream is bytes when expected str") 

190 

191 if isinstance(file_url_stream, bytes): 

192 if asbytes: 

193 return (file_url_stream, "b") if add_source else file_url_stream 

194 content = file_url_stream.encode(encoding=encoding) 

195 check_size(content) 

196 return (content, "b") if add_source else content 

197 if isinstance(file_url_stream, StringIO): 

198 v = file_url_stream.getvalue() 

199 if asbytes and v: 

200 content = v.encode(encoding=encoding) 

201 check_size(content) 

202 return (content, "Sb") if add_source else content 

203 return (v, "S") if add_source else v 

204 if isinstance(file_url_stream, BytesIO): 

205 v = file_url_stream.getvalue() 

206 if asbytes or not v: 

207 return (v, "SBb") if add_source else v 

208 content = v.decode(encoding=encoding) 

209 check_size(content) 

210 return (content, "SB") if add_source else content 

211 raise TypeError( # pragma: no cover 

212 "unexpected type for file_url_stream: {0}\n{1}".format( 

213 type(file_url_stream), file_url_stream))