Coverage for pyquickhelper/filehelper/transfer_api_ftp.py: 82%

33 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief API to move files using FTP 

4""" 

5import ftplib 

6from io import BytesIO 

7from ..loghelper import noLOG 

8from .transfer_api import TransferAPI 

9from .ftp_transfer import TransferFTP 

10from .ftp_mock import MockTransferFTP 

11 

12 

13class TransferAPIFtp(TransferAPI): 

14 """ 

15 Defines an API to transfer files over a remote location 

16 through :epkg:`FTP`. 

17 """ 

18 

19 def __init__(self, site, login, password, root="backup", 

20 ftps='FTP', fLOG=noLOG): 

21 """ 

22 @param site website 

23 @param login login 

24 @param password password 

25 @param root root on the website 

26 @param ftps protocol, see @see cl TransferFTP 

27 @param fLOG logging function 

28 """ 

29 TransferAPI.__init__(self, fLOG=fLOG) 

30 self._ftp = (TransferFTP(site, login, password, fLOG=fLOG, ftps=ftps) 

31 if site else MockTransferFTP(ftps=ftps, fLOG=fLOG)) 

32 self._root = root 

33 

34 def connect(self): 

35 """ 

36 connect 

37 """ 

38 pass 

39 # self._ftp.connect() 

40 

41 def close(self): 

42 """ 

43 close the connection 

44 """ 

45 pass 

46 # self._ftp._close() 

47 

48 def transfer(self, path, data): 

49 """ 

50 It assumes a data holds in memory, 

51 tansfer data to path. 

52 

53 @param data bytes 

54 @param path path to remove location 

55 @return boolean 

56 """ 

57 spl = path.rsplit("/") 

58 to = self._root + "/" + "/".join(spl[:-1]) 

59 to = to.rstrip("/") 

60 byt = BytesIO(data) 

61 r = self._ftp.transfer(byt, to, spl[-1]) 

62 return r 

63 

64 def retrieve(self, path, exc=True): 

65 """ 

66 retrieve data from path 

67 

68 @param path remove location 

69 @param exc keep exception 

70 @return data 

71 """ 

72 spl = path.rsplit("/") 

73 src = self._root + "/" + "/".join(spl[:-1]) 

74 src = src.rstrip("/") 

75 if exc: 

76 r = self._ftp.retrieve(src, spl[-1], None) 

77 else: 

78 try: 

79 r = self._ftp.retrieve(src, spl[-1], None) 

80 except ftplib.error_perm: 

81 r = None 

82 return r