Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3@brief Various helpers about files 

4""" 

5import os 

6import stat 

7import warnings 

8from io import BytesIO, StringIO 

9from .synchelper import explore_folder_iterfile 

10from .internet_helper import read_url 

11from .file_info import is_file_string, is_url_string 

12 

13 

14def change_file_status(folder, status=stat.S_IWRITE, strict=False, 

15 include_folder=True): 

16 """ 

17 Changes the status of all files inside a folder. 

18 

19 @param folder folder or file 

20 @param status new status 

21 @param strict False, use ``|=``, True, use ``=`` 

22 @param include_folder change the status of the folders as well 

23 @return list of modified files 

24 

25 By default, status is ``stat.S_IWRITE``. 

26 If *folder* is a file, the function changes the status of this file, 

27 otherwise, it will change the status of every file the folder contains. 

28 """ 

29 if os.path.isfile(folder): 

30 if include_folder: 

31 dirname = os.path.dirname(folder) 

32 todo = [dirname, folder] 

33 else: 

34 todo = [folder] 

35 res = [] 

36 

37 for f in todo: 

38 mode = os.stat(f).st_mode 

39 if strict: 

40 nmode = status 

41 if nmode != mode: 

42 os.chmod(f, nmode) 

43 res.append(f) 

44 else: 

45 nmode = mode | stat.S_IWRITE 

46 if nmode != mode: 

47 os.chmod(f, nmode) 

48 res.append(f) 

49 return res 

50 else: 

51 res = [] 

52 dirname = set() 

53 if strict: # pragma: no cover 

54 for f in explore_folder_iterfile(folder): 

55 if include_folder: 

56 d = os.path.dirname(f) 

57 if d not in dirname: 

58 dirname.add(d) 

59 mode = os.stat(d).st_mode 

60 nmode = status 

61 if nmode != mode: 

62 os.chmod(d, nmode) 

63 res.append(d) 

64 try: 

65 mode = os.stat(f).st_mode 

66 except FileNotFoundError: # pragma: no cover 

67 # It appends for some weird path. 

68 warnings.warn( 

69 "[change_file_status] unable to find '{0}'".format(f), UserWarning) 

70 continue 

71 nmode = status 

72 if nmode != mode: 

73 os.chmod(f, nmode) 

74 res.append(f) 

75 

76 # It ends up with the folder. 

77 if include_folder: 

78 d = folder 

79 if d not in dirname: 

80 mode = os.stat(d).st_mode 

81 nmode = status 

82 if nmode != mode: 

83 os.chmod(d, nmode) 

84 res.append(d) 

85 else: 

86 for f in explore_folder_iterfile(folder): 

87 if include_folder: 

88 d = os.path.dirname(f) 

89 if d not in dirname: 

90 dirname.add(d) 

91 mode = os.stat(d).st_mode 

92 nmode = mode | stat.S_IWRITE 

93 if nmode != mode: 

94 os.chmod(d, nmode) 

95 res.append(d) 

96 try: 

97 mode = os.stat(f).st_mode 

98 except FileNotFoundError: 

99 # it appends for some weird path 

100 warnings.warn( 

101 "[change_file_status] unable to find '{0}'".format(f), UserWarning) 

102 continue 

103 nmode = mode | stat.S_IWRITE 

104 if nmode != mode: 

105 os.chmod(f, nmode) 

106 res.append(f) 

107 

108 # It ends up with the folder. 

109 if include_folder: 

110 d = folder 

111 if d not in dirname: 

112 mode = os.stat(d).st_mode 

113 nmode = mode | stat.S_IWRITE 

114 if nmode != mode: 

115 os.chmod(d, nmode) 

116 res.append(d) 

117 return res 

118 

119 

120def read_content_ufs(file_url_stream, encoding="utf8", asbytes=False, 

121 add_source=False, min_size=None): 

122 """ 

123 Reads the content of a source, whether it is a url, a file, a stream 

124 or a string (in that case, it returns the string itself), 

125 it assumes the content type is text. 

126 

127 @param file_url_stream file or url or stream or string 

128 @param encoding encoding 

129 @param asbytes return bytes instead of chars 

130 @param add_source also return the way the content was obtained 

131 @param min_size if not empty, the function raises an exception 

132 if the size is below that theshold 

133 @return content of the source (str) or *(content, type)* 

134 

135 Type can be: 

136 

137 * *rb*: binary file 

138 * *r*: text file 

139 * *u*: url 

140 * *ub*: binary content from url 

141 * *s*: string 

142 * *b*: binary string 

143 * *S*: StringIO 

144 * *SB*: BytesIO 

145 * *SBb*: BytesIO, return bytes 

146 

147 The function can return bytes. 

148 """ 

149 def check_size(cont): 

150 if min_size is not None and len(cont) < min_size: 

151 raise RuntimeError( # pragma: no cover 

152 "File '{}' is smaller than {}.".format( 

153 file_url_stream, min_size)) 

154 

155 if isinstance(file_url_stream, str): 

156 if is_file_string(file_url_stream) and os.path.exists(file_url_stream): 

157 if asbytes: 

158 with open(file_url_stream, "rb") as f: 

159 content = f.read() 

160 check_size(content) 

161 return (content, "rb") if add_source else content 

162 with open(file_url_stream, "r", encoding=encoding) as f: 

163 content = f.read() 

164 check_size(content) 

165 return (content, "r") if add_source else content 

166 if len(file_url_stream) < 5000 and file_url_stream.startswith("http"): 

167 content = read_url(file_url_stream, encoding=encoding) 

168 check_size(content) 

169 return (content, "u") if add_source else content 

170 if is_url_string(file_url_stream): 

171 if asbytes: 

172 content = read_url(file_url_stream) 

173 check_size(content) 

174 return (content, "ub") if add_source else content 

175 if encoding is None: 

176 raise ValueError( # pragma: no cover 

177 "cannot return bytes if encoding is None for url: " + file_url_stream) 

178 content = read_url(file_url_stream, encoding=encoding) 

179 check_size(content) 

180 return (content, "u") if add_source else content 

181 # the string should the content itself 

182 if isinstance(file_url_stream, str): 

183 if asbytes: 

184 raise TypeError( # pragma: no cover 

185 "file_url_stream is str when expected bytes") 

186 return (file_url_stream, "s") if add_source else file_url_stream 

187 if asbytes: 

188 return (file_url_stream, "b") if add_source else file_url_stream 

189 raise TypeError( # pragma: no cover 

190 "file_url_stream is bytes when expected str") 

191 

192 if isinstance(file_url_stream, bytes): 

193 if asbytes: 

194 return (file_url_stream, "b") if add_source else file_url_stream 

195 content = file_url_stream.encode(encoding=encoding) 

196 check_size(content) 

197 return (content, "b") if add_source else content 

198 if isinstance(file_url_stream, StringIO): 

199 v = file_url_stream.getvalue() 

200 if asbytes and v: 

201 content = v.encode(encoding=encoding) 

202 check_size(content) 

203 return (content, "Sb") if add_source else content 

204 return (v, "S") if add_source else v 

205 if isinstance(file_url_stream, BytesIO): 

206 v = file_url_stream.getvalue() 

207 if asbytes or not v: 

208 return (v, "SBb") if add_source else v 

209 content = v.decode(encoding=encoding) 

210 check_size(content) 

211 return (content, "SB") if add_source else content 

212 raise TypeError( # pragma: no cover 

213 "unexpected type for file_url_stream: {0}\n{1}".format( 

214 type(file_url_stream), file_url_stream))