Coverage for pyquickhelper/filehelper/anyfhelper.py: 77%
105 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1"""
2@file
3@brief Various helpers about files
4"""
5import os
6import stat
7import warnings
8from io import BytesIO, StringIO
9from .synchelper import explore_folder_iterfile
10from .internet_helper import read_url
11from .file_info import is_file_string, is_url_string
14def change_file_status(folder, status=stat.S_IWRITE, strict=False,
15 include_folder=True):
16 """
17 Changes the status of all files inside a folder.
19 @param folder folder or file
20 @param status new status
21 @param strict False, use ``|=``, True, use ``=``
22 @param include_folder change the status of the folders as well
23 @return list of modified files
25 By default, status is ``stat.S_IWRITE``.
26 If *folder* is a file, the function changes the status of this file,
27 otherwise, it will change the status of every file the folder contains.
28 """
29 if os.path.isfile(folder):
30 if include_folder:
31 dirname = os.path.dirname(folder)
32 todo = [dirname, folder]
33 else:
34 todo = [folder]
35 res = []
37 for f in todo:
38 mode = os.stat(f).st_mode
39 if strict:
40 nmode = status
41 if nmode != mode:
42 os.chmod(f, nmode)
43 res.append(f)
44 else:
45 nmode = mode | stat.S_IWRITE
46 if nmode != mode:
47 os.chmod(f, nmode)
48 res.append(f)
49 return res
50 else:
51 res = []
52 dirname = set()
53 if strict: # pragma: no cover
54 for f in explore_folder_iterfile(folder):
55 if include_folder:
56 d = os.path.dirname(f)
57 if d not in dirname:
58 dirname.add(d)
59 mode = os.stat(d).st_mode
60 nmode = status
61 if nmode != mode:
62 os.chmod(d, nmode)
63 res.append(d)
64 try:
65 mode = os.stat(f).st_mode
66 except FileNotFoundError: # pragma: no cover
67 # It appends for some weird path.
68 warnings.warn(
69 f"[change_file_status] unable to find '{f}'", UserWarning)
70 continue
71 nmode = status
72 if nmode != mode:
73 os.chmod(f, nmode)
74 res.append(f)
76 # It ends up with the folder.
77 if include_folder:
78 d = folder
79 if d not in dirname:
80 mode = os.stat(d).st_mode
81 nmode = status
82 if nmode != mode:
83 os.chmod(d, nmode)
84 res.append(d)
85 else:
86 for f in explore_folder_iterfile(folder):
87 if include_folder:
88 d = os.path.dirname(f)
89 if d not in dirname:
90 dirname.add(d)
91 mode = os.stat(d).st_mode
92 nmode = mode | stat.S_IWRITE
93 if nmode != mode:
94 os.chmod(d, nmode)
95 res.append(d)
96 try:
97 mode = os.stat(f).st_mode
98 except FileNotFoundError: # pragma: no cover
99 # it appends for some weird path
100 warnings.warn(
101 f"[change_file_status] unable to find '{f}'", UserWarning)
102 continue
103 nmode = mode | stat.S_IWRITE
104 if nmode != mode:
105 os.chmod(f, nmode)
106 res.append(f)
108 # It ends up with the folder.
109 if include_folder:
110 d = folder
111 if d not in dirname:
112 mode = os.stat(d).st_mode
113 nmode = mode | stat.S_IWRITE
114 if nmode != mode:
115 os.chmod(d, nmode)
116 res.append(d)
117 return res
120def read_content_ufs(file_url_stream, encoding="utf8", asbytes=False,
121 add_source=False, min_size=None):
122 """
123 Reads the content of a source, whether it is a url, a file, a stream
124 or a string (in that case, it returns the string itself),
125 it assumes the content type is text.
127 @param file_url_stream file or url or stream or string
128 @param encoding encoding
129 @param asbytes return bytes instead of chars
130 @param add_source also return the way the content was obtained
131 @param min_size if not empty, the function raises an exception
132 if the size is below that theshold
133 @return content of the source (str) or *(content, type)*
135 Type can be:
137 * *rb*: binary file
138 * *r*: text file
139 * *u*: url
140 * *ub*: binary content from url
141 * *s*: string
142 * *b*: binary string
143 * *S*: StringIO
144 * *SB*: BytesIO
145 * *SBb*: BytesIO, return bytes
147 The function can return bytes.
148 """
149 def check_size(cont):
150 if min_size is not None and len(cont) < min_size:
151 raise RuntimeError( # pragma: no cover
152 f"File '{file_url_stream}' is smaller than {min_size}.")
154 if isinstance(file_url_stream, str):
155 if is_file_string(file_url_stream) and os.path.exists(file_url_stream):
156 if asbytes:
157 with open(file_url_stream, "rb") as f:
158 content = f.read()
159 check_size(content)
160 return (content, "rb") if add_source else content
161 with open(file_url_stream, "r", encoding=encoding) as f:
162 content = f.read()
163 check_size(content)
164 return (content, "r") if add_source else content
165 if len(file_url_stream) < 5000 and file_url_stream.startswith("http"):
166 content = read_url(file_url_stream, encoding=encoding)
167 check_size(content)
168 return (content, "u") if add_source else content
169 if is_url_string(file_url_stream):
170 if asbytes:
171 content = read_url(file_url_stream)
172 check_size(content)
173 return (content, "ub") if add_source else content
174 if encoding is None:
175 raise ValueError( # pragma: no cover
176 "cannot return bytes if encoding is None for url: " + file_url_stream)
177 content = read_url(file_url_stream, encoding=encoding)
178 check_size(content)
179 return (content, "u") if add_source else content
180 # the string should the content itself
181 if isinstance(file_url_stream, str):
182 if asbytes:
183 raise TypeError( # pragma: no cover
184 "file_url_stream is str when expected bytes")
185 return (file_url_stream, "s") if add_source else file_url_stream
186 if asbytes:
187 return (file_url_stream, "b") if add_source else file_url_stream
188 raise TypeError( # pragma: no cover
189 "file_url_stream is bytes when expected str")
191 if isinstance(file_url_stream, bytes):
192 if asbytes:
193 return (file_url_stream, "b") if add_source else file_url_stream
194 content = file_url_stream.encode(encoding=encoding)
195 check_size(content)
196 return (content, "b") if add_source else content
197 if isinstance(file_url_stream, StringIO):
198 v = file_url_stream.getvalue()
199 if asbytes and v:
200 content = v.encode(encoding=encoding)
201 check_size(content)
202 return (content, "Sb") if add_source else content
203 return (v, "S") if add_source else v
204 if isinstance(file_url_stream, BytesIO):
205 v = file_url_stream.getvalue()
206 if asbytes or not v:
207 return (v, "SBb") if add_source else v
208 content = v.decode(encoding=encoding)
209 check_size(content)
210 return (content, "SB") if add_source else content
211 raise TypeError( # pragma: no cover
212 "unexpected type for file_url_stream: {0}\n{1}".format(
213 type(file_url_stream), file_url_stream))