Coverage for pyquickhelper/filehelper/ftp_transfer.py: 93%
175 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1"""
2@file
3@brief provides some functionalities to upload file to a website
4"""
5from ftplib import FTP, FTP_TLS, error_perm
6import os
7import sys
8import time
9import datetime
10from io import BytesIO
11from ..loghelper.flog import noLOG
14class CannotReturnToFolderException(Exception):
15 """
16 raised when a transfer is interrupted by an exception
17 and the class cannot return to the original folder
18 """
19 pass
22class CannotCompleteWithoutNewLoginException(Exception):
23 """
24 raised when a transfer is interrupted by a new login
25 """
26 pass
29class TransferFTP:
31 """
32 This class uploads files to a website,
33 if the remote does not exists, it creates it first.
35 .. exref::
36 :title: Transfer files to webste through FTP
38 Simple sketch to transfer a list of ``files`` to
39 a website through FTP
41 ::
43 ftp = TransferFTP('ftp.<website>', alias, password, fLOG=print)
45 issues = [ ]
46 done = [ ]
47 notdone = [ ]
48 for file in files :
50 try :
51 r = ftp.transfer (file, path)
52 if r : done.append( (file, path) )
53 else : notdone.append ( (file, path) )
54 except Exception as e :
55 issues.append( (file, e) )
57 try :
58 ftp.close()
59 except Exception as e :
60 print ("unable to close FTP connection using ftp.close")
62 The class may access to a server using :epkg:`SFTP`
63 protocol but it relies on :epkg:`pysftp` and :epkg:`paramiko`.
64 """
66 errorNoDirectory = "Can't change directory"
67 blockSize = 2 ** 20
69 def __init__(self, site, login, password, ftps='FTP', fLOG=noLOG):
70 """
71 @param site website
72 @param login login
73 @param password password
74 @param ftps if ``'TLS'``, use class :epkg:`*py:ftplib:FTP_TLS`,
75 if ``'FTP'``, use :epkg:`*py:ftplib:TLS`,
76 if ``'SFTP'``, use :epkg:`pysftp`
77 @param fLOG logging function
78 """
79 self._ftps_ = ftps
80 if site is not None:
81 if ftps == 'TLS':
82 cls = FTP_TLS
83 self.is_sftp = False
84 elif ftps == 'SFTP':
85 import pysftp
86 import paramiko
87 import socket
88 sock = socket.socket()
89 try:
90 sock.connect((site, 22))
91 except socket.gaierror as e:
92 sock.close()
93 raise e
94 trans = paramiko.transport.Transport(sock)
95 trans.start_client()
96 k = trans.get_remote_server_key()
98 hk = paramiko.hostkeys.HostKeys()
99 hk.add(site, 'ssh-rsa', k)
100 cnopts = pysftp.CnOpts()
101 cnopts.hostkeys = hk
103 def cls(si, lo, pw, cnopts=cnopts): return pysftp.Connection(
104 si, username=lo, password=pw, cnopts=cnopts)
105 self._login_ = lambda si=site, lo=login, pw=password: cls(
106 si, lo, pw)
107 self.is_sftp = True
109 elif ftps == 'FTP':
110 cls = FTP
111 self.is_sftp = False
112 else:
113 raise RuntimeError( # pragma: no cover
114 f"No implementation for '{ftps}'.")
115 if not self.is_sftp:
116 self._ftp = cls(site, login, password)
117 self._logins = [(datetime.datetime.now(), site)]
118 else:
119 # mocking
120 if ftps != 'FTP':
121 raise NotImplementedError( # pragma: no cover
122 "Option ftps is not implemented for mocking.")
123 self._logins = []
124 self._ftp = FTP(site)
125 self.is_sftp = False
126 self.LOG = fLOG
127 self._atts = dict(site=site, login=login, password=password)
129 def _check_can_logged(self):
130 if self.is_sftp and not hasattr(self, '_ftp'):
131 self._ftp = self._login_()
133 @property
134 def Site(self):
135 """
136 return the website
137 """
138 return self._atts["site"]
140 def _private_login(self): # pragma: no cover
141 """
142 logs in
143 """
144 self.LOG("reconnecting to ", self.Site, " - ", len(self._logins))
145 try:
146 if self.is_sftp:
147 self._ftp = self._login_()
148 else:
149 self._ftp.login()
150 self._logins.append((datetime.datetime.now(), self.Site))
151 except Exception as e:
152 se = str(e)
153 if "You're already logged in" in se:
154 return
155 elif (not self.is_sftp and (
156 "An existing connection was forcibly closed by the remote host" in se or
157 "An established connection was aborted by the software in your host machine" in se)):
158 # it starts a new connection
159 self.LOG("reconnecting failed, starting a new connection",
160 self.Site, " - ", len(self._logins))
161 self._ftp = FTP(self.Site, self._atts[
162 "login"], self._atts["password"])
163 self._logins.append((datetime.datetime.now(), self.Site))
164 else:
165 raise e
167 def run_command(self, command, *args, **kwargs):
168 """
169 Runs a FTP command.
171 @param command command
172 @param args list of argument
173 @return output of the command or True for success, False for failure
174 """
175 try:
176 t = command(*args, **kwargs)
177 if (command == self._ftp.pwd or
178 command == getattr(self._ftp, 'dir', None) or
179 command == getattr(self._ftp, 'mlsd', 'listdir')):
180 return t
181 elif command != self._ftp.cwd:
182 pass
183 return True
184 except Exception as e: # pragma: no cover
185 if self.is_sftp and 'No such file' in str(e):
186 raise FileNotFoundError(
187 f"Unable to find {args}.") from e
188 if TransferFTP.errorNoDirectory in str(e):
189 raise e
190 self.LOG(e)
191 self.LOG(" ** run exc ", str(command), str(args))
192 self._private_login()
193 if command == self._ftp.pwd or command is self._ftp.pwd:
194 t = command(self)
195 else:
196 t = command(self, *args, **kwargs)
197 self.LOG(" ** run ", str(command), str(args))
198 return t
200 def print_list(self): # pragma: no cover
201 """
202 Returns the list of files in the current directory
203 the function sends everything to the logging function.
205 @return output of the command or True for success, False for failure
206 """
207 self._check_can_logged()
208 if hasattr(self._ftp, 'retrlines'):
209 return self.run_command(self._ftp.retrlines, 'LIST')
210 raise NotImplementedError(
211 f"Not implemented for ftps='{self._ftps_}'.")
213 def mkd(self, path): # pragma: no cover
214 """
215 Creates a directory.
217 @param path path to the directory
218 @return True or False
219 """
220 self._check_can_logged()
221 self.LOG("[mkd]", path)
222 cmd = self._ftp.mkd if hasattr(self._ftp, 'mkd') else self._ftp.mkdir
223 return self.run_command(cmd, path)
225 def cwd(self, path, create=False):
226 """
227 Goes to a directory, if it does not exist, creates it
228 (if *create* is True).
230 @param path path to the directory
231 @param create True to create it
232 @return True or False
233 """
234 self._check_can_logged()
235 try:
236 self.run_command(self._ftp.cwd, path)
237 except EOFError as e: # pragma: no cover
238 raise EOFError(f"unable to go to: {path}") from e
239 except FileNotFoundError as e: # pragma: no cover
240 if create:
241 self.mkd(path)
242 self.cwd(path, create)
243 else:
244 raise e
245 except Exception as e: # pragma: no cover
246 if create and TransferFTP.errorNoDirectory in str(e):
247 self.mkd(path)
248 self.cwd(path, create)
249 else:
250 raise e
252 def pwd(self):
253 """
254 Returns the pathname of the current directory on the server.
256 @return pathname
257 """
258 self._check_can_logged()
259 if hasattr(self._ftp, 'getcwd'):
260 r = self._ftp.getcwd()
261 return self._ftp.pwd
262 else:
263 return self.run_command(self._ftp.pwd)
265 def dir(self, path='.'):
266 """
267 Lists the content of a path.
269 @param path path
270 @return list of path
272 See :meth:`enumerate_ls <pyquickhelper.filehelper.ftp_transfer.TransferFTP.enumerate_ls>`
273 """
274 return list(self.enumerate_ls(path))
276 def ls(self, path='.'):
277 """
278 Lists the content of a path.
280 @param path path
281 @return list of path
283 see :meth:`enumerate_ls <pyquickhelper.filehelper.ftp_transfer.TransferFTP.enumerate_ls>`
285 .. exref::
286 :title: List files from FTP site
288 ::
290 from pyquickhelper.filehelper import TransferFTP
291 ftp = TransferFTP("ftp....", "login", "password")
292 res = ftp.ls("path")
293 for v in res:
294 print(v["name"])
295 ftp.close()
296 """
297 return list(self.enumerate_ls(path))
299 def enumerate_ls(self, path='.'):
300 """
301 Enumerates the content of a path.
303 @param path path
304 @return list of dictionaries
306 One dictionary::
308 {'name': 'www',
309 'type': 'dir',
310 'unique': 'aaaa',
311 'unix.uid': '1111',
312 'unix.mode': '111',
313 'sizd': '5',
314 'unix.gid': '000',
315 'modify': '111111'}
316 """
317 self._check_can_logged()
318 if not self.is_sftp:
319 for a in self.run_command(self._ftp.mlsd, path):
320 r = dict(name=a[0])
321 r.update(a[1])
322 yield r
323 else:
324 with self._ftp.cd(path):
325 for name in self._ftp.listdir():
326 yield dict(name=name)
328 def transfer(self, file, to, name, debug=False, blocksize=None, callback=None):
329 """
330 Transfers a file.
332 @param file file name or stream (binary, BytesIO)
333 @param to destination (a folder)
334 @param name name of the stream on the website
335 @param debug if True, displays more information
336 @param blocksize see :tpl:`py,m='ftplib',o='FTP.storbinary'`
337 @param callback see :tpl:`py,m='ftplib',o='FTP.storbinary'`
338 @return status
340 When an error happens, the original current directory is restored.
341 """
342 self._check_can_logged()
343 path = to.split("/")
344 path = [_ for _ in path if len(_) > 0]
345 nb_logins = len(self._logins)
346 cpwd = self.pwd()
348 done = []
349 exc = None
350 for i, p in enumerate(path):
351 p_ = ('/' + '/'.join(path[:i + 1])) if self.is_sftp else p
352 try:
353 self.cwd(p_, True)
354 except Exception as e: # pragma: no cover
355 exc = e
356 break
357 done.append(p)
359 if nb_logins != len(self._logins):
360 raise CannotCompleteWithoutNewLoginException( # pragma: no cover
361 f"Cannot reach folder '{to}' without new login")
363 bs = blocksize if blocksize else TransferFTP.blockSize
364 if not self.is_sftp:
365 def runc(name, f, bs, callback):
366 return self.run_command(
367 self._ftp.storbinary, 'STOR ' + name, f, bs, callback)
368 else:
369 def runc(name, f, bs, callback):
370 return self.run_command(
371 self._ftp.putfo, remotepath=name, flo=f, file_size=bs,
372 callback=None)
373 if exc is None:
374 try:
375 if isinstance(file, str):
376 if not os.path.exists(file):
377 raise FileNotFoundError(file) # pragma: no cover
378 with open(file, "rb") as f:
379 r = runc(name, f, bs, callback)
380 elif isinstance(file, BytesIO):
381 r = runc(name, file, bs, callback)
382 elif isinstance(file, bytes):
383 st = BytesIO(file)
384 r = runc(name, st, bs, callback)
385 else:
386 r = runc(name, file, bs, callback)
387 except Exception as ee: # pragma: no cover
388 exc = ee
390 if nb_logins != len(self._logins): # pragma: no cover
391 try:
392 self.cwd(cpwd)
393 done = []
394 except Exception as e:
395 raise CannotCompleteWithoutNewLoginException(
396 f"Cannot transfer '{to}' without new login")
398 # It may fail here, it hopes not.
399 nbtry = 0
400 nbth = len(done) * 2 + 1
401 while len(done) > 0:
402 if nb_logins != len(self._logins): # pragma: no cover
403 try:
404 self.cwd(cpwd)
405 break
406 except Exception as e:
407 raise CannotCompleteWithoutNewLoginException(
408 f"Cannot return to original folder'{to}' without new login") from e
410 nbtry += 1
411 try:
412 self.cwd("..")
413 done.pop()
414 except Exception as e: # pragma: no cover
415 time.sleep(0.5)
416 self.LOG(
417 f" issue with command .. len(done) == {len(done)}")
418 if nbtry > nbth:
419 raise CannotReturnToFolderException(
420 "len(path)={0} nbtry={1} exc={2} nbl={3} act={4}".format(
421 len(done), nbtry, exc, nb_logins, len(self._logins))) from e
423 if exc is not None:
424 raise exc # pragma: no cover
425 return r
427 def retrieve(self, fold, name, file=None, debug=False):
428 """
429 Downloads a file.
431 @param file file name or stream (binary, BytesIO)
432 @param fold full remote path
433 @param name name of the stream on the website
434 @param debug if True, displays more information
435 @return status
436 """
437 self._check_can_logged()
439 if self.is_sftp:
440 self.cwd(fold, False)
441 else:
442 path = fold.split("/")
443 path = [_ for _ in path if len(_) > 0]
445 for p in path:
446 self.cwd(p, True)
448 raise_exc = None
450 if not self.is_sftp:
452 def _retrbinary_(name, callback, size, f):
453 r = self.run_command(self._ftp.retrbinary,
454 'RETR ' + name, callback, size)
455 if isinstance(r, (bytes, str)):
456 f.write(r)
457 return r
459 runc = _retrbinary_
460 else:
461 def runc(name, callback, size, f): return self.run_command(
462 self._ftp.getfo, remotepath=name, flo=f, callback=None)
464 if isinstance(file, str):
465 with open(file, "wb") as f:
466 def callback(block):
467 f.write(block)
468 try:
469 runc(name, callback, TransferFTP.blockSize, f)
470 r = True
471 except error_perm as e: # pragma: no cover
472 raise_exc = e
473 r = False
474 elif isinstance(file, BytesIO):
475 def callback(block):
476 file.write(block)
477 try:
478 r = runc(name, callback, TransferFTP.blockSize, file)
479 except error_perm as e: # pragma: no cover
480 raise_exc = e
481 r = False
482 else:
483 b = BytesIO()
485 def callback(block):
486 b.write(block)
487 try:
488 runc(name, callback, TransferFTP.blockSize, b)
489 except error_perm as e: # pragma: no cover
490 raise_exc = e
492 r = b.getvalue()
494 if not self.is_sftp:
495 for p in path:
496 self.cwd("..")
498 if raise_exc:
499 raise raise_exc # pragma: no cover
501 return r
503 def close(self):
504 """
505 Closes the connection.
506 """
507 self._check_can_logged()
508 self._ftp.close()
509 if self.is_sftp:
510 self._ftp = None