Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2@file
3@brief Various helpers about files
4"""
5import os
6import stat
7import warnings
8from io import BytesIO, StringIO
9from .synchelper import explore_folder_iterfile
10from .internet_helper import read_url
11from .file_info import is_file_string, is_url_string
14def change_file_status(folder, status=stat.S_IWRITE, strict=False,
15 include_folder=True):
16 """
17 Changes the status of all files inside a folder.
19 @param folder folder or file
20 @param status new status
21 @param strict False, use ``|=``, True, use ``=``
22 @param include_folder change the status of the folders as well
23 @return list of modified files
25 By default, status is ``stat.S_IWRITE``.
26 If *folder* is a file, the function changes the status of this file,
27 otherwise, it will change the status of every file the folder contains.
28 """
29 if os.path.isfile(folder):
30 if include_folder:
31 dirname = os.path.dirname(folder)
32 todo = [dirname, folder]
33 else:
34 todo = [folder]
35 res = []
37 for f in todo:
38 mode = os.stat(f).st_mode
39 if strict:
40 nmode = status
41 if nmode != mode:
42 os.chmod(f, nmode)
43 res.append(f)
44 else:
45 nmode = mode | stat.S_IWRITE
46 if nmode != mode:
47 os.chmod(f, nmode)
48 res.append(f)
49 return res
50 else:
51 res = []
52 dirname = set()
53 if strict: # pragma: no cover
54 for f in explore_folder_iterfile(folder):
55 if include_folder:
56 d = os.path.dirname(f)
57 if d not in dirname:
58 dirname.add(d)
59 mode = os.stat(d).st_mode
60 nmode = status
61 if nmode != mode:
62 os.chmod(d, nmode)
63 res.append(d)
64 try:
65 mode = os.stat(f).st_mode
66 except FileNotFoundError: # pragma: no cover
67 # It appends for some weird path.
68 warnings.warn(
69 "[change_file_status] unable to find '{0}'".format(f), UserWarning)
70 continue
71 nmode = status
72 if nmode != mode:
73 os.chmod(f, nmode)
74 res.append(f)
76 # It ends up with the folder.
77 if include_folder:
78 d = folder
79 if d not in dirname:
80 mode = os.stat(d).st_mode
81 nmode = status
82 if nmode != mode:
83 os.chmod(d, nmode)
84 res.append(d)
85 else:
86 for f in explore_folder_iterfile(folder):
87 if include_folder:
88 d = os.path.dirname(f)
89 if d not in dirname:
90 dirname.add(d)
91 mode = os.stat(d).st_mode
92 nmode = mode | stat.S_IWRITE
93 if nmode != mode:
94 os.chmod(d, nmode)
95 res.append(d)
96 try:
97 mode = os.stat(f).st_mode
98 except FileNotFoundError:
99 # it appends for some weird path
100 warnings.warn(
101 "[change_file_status] unable to find '{0}'".format(f), UserWarning)
102 continue
103 nmode = mode | stat.S_IWRITE
104 if nmode != mode:
105 os.chmod(f, nmode)
106 res.append(f)
108 # It ends up with the folder.
109 if include_folder:
110 d = folder
111 if d not in dirname:
112 mode = os.stat(d).st_mode
113 nmode = mode | stat.S_IWRITE
114 if nmode != mode:
115 os.chmod(d, nmode)
116 res.append(d)
117 return res
120def read_content_ufs(file_url_stream, encoding="utf8", asbytes=False,
121 add_source=False, min_size=None):
122 """
123 Reads the content of a source, whether it is a url, a file, a stream
124 or a string (in that case, it returns the string itself),
125 it assumes the content type is text.
127 @param file_url_stream file or url or stream or string
128 @param encoding encoding
129 @param asbytes return bytes instead of chars
130 @param add_source also return the way the content was obtained
131 @param min_size if not empty, the function raises an exception
132 if the size is below that theshold
133 @return content of the source (str) or *(content, type)*
135 Type can be:
137 * *rb*: binary file
138 * *r*: text file
139 * *u*: url
140 * *ub*: binary content from url
141 * *s*: string
142 * *b*: binary string
143 * *S*: StringIO
144 * *SB*: BytesIO
145 * *SBb*: BytesIO, return bytes
147 The function can return bytes.
148 """
149 def check_size(cont):
150 if min_size is not None and len(cont) < min_size:
151 raise RuntimeError( # pragma: no cover
152 "File '{}' is smaller than {}.".format(
153 file_url_stream, min_size))
155 if isinstance(file_url_stream, str):
156 if is_file_string(file_url_stream) and os.path.exists(file_url_stream):
157 if asbytes:
158 with open(file_url_stream, "rb") as f:
159 content = f.read()
160 check_size(content)
161 return (content, "rb") if add_source else content
162 with open(file_url_stream, "r", encoding=encoding) as f:
163 content = f.read()
164 check_size(content)
165 return (content, "r") if add_source else content
166 if len(file_url_stream) < 5000 and file_url_stream.startswith("http"):
167 content = read_url(file_url_stream, encoding=encoding)
168 check_size(content)
169 return (content, "u") if add_source else content
170 if is_url_string(file_url_stream):
171 if asbytes:
172 content = read_url(file_url_stream)
173 check_size(content)
174 return (content, "ub") if add_source else content
175 if encoding is None:
176 raise ValueError( # pragma: no cover
177 "cannot return bytes if encoding is None for url: " + file_url_stream)
178 content = read_url(file_url_stream, encoding=encoding)
179 check_size(content)
180 return (content, "u") if add_source else content
181 # the string should the content itself
182 if isinstance(file_url_stream, str):
183 if asbytes:
184 raise TypeError( # pragma: no cover
185 "file_url_stream is str when expected bytes")
186 return (file_url_stream, "s") if add_source else file_url_stream
187 if asbytes:
188 return (file_url_stream, "b") if add_source else file_url_stream
189 raise TypeError( # pragma: no cover
190 "file_url_stream is bytes when expected str")
192 if isinstance(file_url_stream, bytes):
193 if asbytes:
194 return (file_url_stream, "b") if add_source else file_url_stream
195 content = file_url_stream.encode(encoding=encoding)
196 check_size(content)
197 return (content, "b") if add_source else content
198 if isinstance(file_url_stream, StringIO):
199 v = file_url_stream.getvalue()
200 if asbytes and v:
201 content = v.encode(encoding=encoding)
202 check_size(content)
203 return (content, "Sb") if add_source else content
204 return (v, "S") if add_source else v
205 if isinstance(file_url_stream, BytesIO):
206 v = file_url_stream.getvalue()
207 if asbytes or not v:
208 return (v, "SBb") if add_source else v
209 content = v.decode(encoding=encoding)
210 check_size(content)
211 return (content, "SB") if add_source else content
212 raise TypeError( # pragma: no cover
213 "unexpected type for file_url_stream: {0}\n{1}".format(
214 type(file_url_stream), file_url_stream))