Coverage for pyquickhelper/filehelper/ftp_transfer.py: 93%

175 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief provides some functionalities to upload file to a website 

4""" 

5from ftplib import FTP, FTP_TLS, error_perm 

6import os 

7import sys 

8import time 

9import datetime 

10from io import BytesIO 

11from ..loghelper.flog import noLOG 

12 

13 

14class CannotReturnToFolderException(Exception): 

15 """ 

16 raised when a transfer is interrupted by an exception 

17 and the class cannot return to the original folder 

18 """ 

19 pass 

20 

21 

22class CannotCompleteWithoutNewLoginException(Exception): 

23 """ 

24 raised when a transfer is interrupted by a new login 

25 """ 

26 pass 

27 

28 

29class TransferFTP: 

30 

31 """ 

32 This class uploads files to a website, 

33 if the remote does not exists, it creates it first. 

34 

35 .. exref:: 

36 :title: Transfer files to webste through FTP 

37 

38 Simple sketch to transfer a list of ``files`` to 

39 a website through FTP 

40 

41 :: 

42 

43 ftp = TransferFTP('ftp.<website>', alias, password, fLOG=print) 

44 

45 issues = [ ] 

46 done = [ ] 

47 notdone = [ ] 

48 for file in files : 

49 

50 try : 

51 r = ftp.transfer (file, path) 

52 if r : done.append( (file, path) ) 

53 else : notdone.append ( (file, path) ) 

54 except Exception as e : 

55 issues.append( (file, e) ) 

56 

57 try : 

58 ftp.close() 

59 except Exception as e : 

60 print ("unable to close FTP connection using ftp.close") 

61 

62 The class may access to a server using :epkg:`SFTP` 

63 protocol but it relies on :epkg:`pysftp` and :epkg:`paramiko`. 

64 """ 

65 

66 errorNoDirectory = "Can't change directory" 

67 blockSize = 2 ** 20 

68 

69 def __init__(self, site, login, password, ftps='FTP', fLOG=noLOG): 

70 """ 

71 @param site website 

72 @param login login 

73 @param password password 

74 @param ftps if ``'TLS'``, use class :epkg:`*py:ftplib:FTP_TLS`, 

75 if ``'FTP'``, use :epkg:`*py:ftplib:TLS`, 

76 if ``'SFTP'``, use :epkg:`pysftp` 

77 @param fLOG logging function 

78 """ 

79 self._ftps_ = ftps 

80 if site is not None: 

81 if ftps == 'TLS': 

82 cls = FTP_TLS 

83 self.is_sftp = False 

84 elif ftps == 'SFTP': 

85 import pysftp 

86 import paramiko 

87 import socket 

88 sock = socket.socket() 

89 try: 

90 sock.connect((site, 22)) 

91 except socket.gaierror as e: 

92 sock.close() 

93 raise e 

94 trans = paramiko.transport.Transport(sock) 

95 trans.start_client() 

96 k = trans.get_remote_server_key() 

97 

98 hk = paramiko.hostkeys.HostKeys() 

99 hk.add(site, 'ssh-rsa', k) 

100 cnopts = pysftp.CnOpts() 

101 cnopts.hostkeys = hk 

102 

103 def cls(si, lo, pw, cnopts=cnopts): return pysftp.Connection( 

104 si, username=lo, password=pw, cnopts=cnopts) 

105 self._login_ = lambda si=site, lo=login, pw=password: cls( 

106 si, lo, pw) 

107 self.is_sftp = True 

108 

109 elif ftps == 'FTP': 

110 cls = FTP 

111 self.is_sftp = False 

112 else: 

113 raise RuntimeError( # pragma: no cover 

114 f"No implementation for '{ftps}'.") 

115 if not self.is_sftp: 

116 self._ftp = cls(site, login, password) 

117 self._logins = [(datetime.datetime.now(), site)] 

118 else: 

119 # mocking 

120 if ftps != 'FTP': 

121 raise NotImplementedError( # pragma: no cover 

122 "Option ftps is not implemented for mocking.") 

123 self._logins = [] 

124 self._ftp = FTP(site) 

125 self.is_sftp = False 

126 self.LOG = fLOG 

127 self._atts = dict(site=site, login=login, password=password) 

128 

129 def _check_can_logged(self): 

130 if self.is_sftp and not hasattr(self, '_ftp'): 

131 self._ftp = self._login_() 

132 

133 @property 

134 def Site(self): 

135 """ 

136 return the website 

137 """ 

138 return self._atts["site"] 

139 

140 def _private_login(self): # pragma: no cover 

141 """ 

142 logs in 

143 """ 

144 self.LOG("reconnecting to ", self.Site, " - ", len(self._logins)) 

145 try: 

146 if self.is_sftp: 

147 self._ftp = self._login_() 

148 else: 

149 self._ftp.login() 

150 self._logins.append((datetime.datetime.now(), self.Site)) 

151 except Exception as e: 

152 se = str(e) 

153 if "You're already logged in" in se: 

154 return 

155 elif (not self.is_sftp and ( 

156 "An existing connection was forcibly closed by the remote host" in se or 

157 "An established connection was aborted by the software in your host machine" in se)): 

158 # it starts a new connection 

159 self.LOG("reconnecting failed, starting a new connection", 

160 self.Site, " - ", len(self._logins)) 

161 self._ftp = FTP(self.Site, self._atts[ 

162 "login"], self._atts["password"]) 

163 self._logins.append((datetime.datetime.now(), self.Site)) 

164 else: 

165 raise e 

166 

167 def run_command(self, command, *args, **kwargs): 

168 """ 

169 Runs a FTP command. 

170 

171 @param command command 

172 @param args list of argument 

173 @return output of the command or True for success, False for failure 

174 """ 

175 try: 

176 t = command(*args, **kwargs) 

177 if (command == self._ftp.pwd or 

178 command == getattr(self._ftp, 'dir', None) or 

179 command == getattr(self._ftp, 'mlsd', 'listdir')): 

180 return t 

181 elif command != self._ftp.cwd: 

182 pass 

183 return True 

184 except Exception as e: # pragma: no cover 

185 if self.is_sftp and 'No such file' in str(e): 

186 raise FileNotFoundError( 

187 f"Unable to find {args}.") from e 

188 if TransferFTP.errorNoDirectory in str(e): 

189 raise e 

190 self.LOG(e) 

191 self.LOG(" ** run exc ", str(command), str(args)) 

192 self._private_login() 

193 if command == self._ftp.pwd or command is self._ftp.pwd: 

194 t = command(self) 

195 else: 

196 t = command(self, *args, **kwargs) 

197 self.LOG(" ** run ", str(command), str(args)) 

198 return t 

199 

200 def print_list(self): # pragma: no cover 

201 """ 

202 Returns the list of files in the current directory 

203 the function sends everything to the logging function. 

204 

205 @return output of the command or True for success, False for failure 

206 """ 

207 self._check_can_logged() 

208 if hasattr(self._ftp, 'retrlines'): 

209 return self.run_command(self._ftp.retrlines, 'LIST') 

210 raise NotImplementedError( 

211 f"Not implemented for ftps='{self._ftps_}'.") 

212 

213 def mkd(self, path): # pragma: no cover 

214 """ 

215 Creates a directory. 

216 

217 @param path path to the directory 

218 @return True or False 

219 """ 

220 self._check_can_logged() 

221 self.LOG("[mkd]", path) 

222 cmd = self._ftp.mkd if hasattr(self._ftp, 'mkd') else self._ftp.mkdir 

223 return self.run_command(cmd, path) 

224 

225 def cwd(self, path, create=False): 

226 """ 

227 Goes to a directory, if it does not exist, creates it 

228 (if *create* is True). 

229 

230 @param path path to the directory 

231 @param create True to create it 

232 @return True or False 

233 """ 

234 self._check_can_logged() 

235 try: 

236 self.run_command(self._ftp.cwd, path) 

237 except EOFError as e: # pragma: no cover 

238 raise EOFError(f"unable to go to: {path}") from e 

239 except FileNotFoundError as e: # pragma: no cover 

240 if create: 

241 self.mkd(path) 

242 self.cwd(path, create) 

243 else: 

244 raise e 

245 except Exception as e: # pragma: no cover 

246 if create and TransferFTP.errorNoDirectory in str(e): 

247 self.mkd(path) 

248 self.cwd(path, create) 

249 else: 

250 raise e 

251 

252 def pwd(self): 

253 """ 

254 Returns the pathname of the current directory on the server. 

255 

256 @return pathname 

257 """ 

258 self._check_can_logged() 

259 if hasattr(self._ftp, 'getcwd'): 

260 r = self._ftp.getcwd() 

261 return self._ftp.pwd 

262 else: 

263 return self.run_command(self._ftp.pwd) 

264 

265 def dir(self, path='.'): 

266 """ 

267 Lists the content of a path. 

268 

269 @param path path 

270 @return list of path 

271 

272 See :meth:`enumerate_ls <pyquickhelper.filehelper.ftp_transfer.TransferFTP.enumerate_ls>` 

273 """ 

274 return list(self.enumerate_ls(path)) 

275 

276 def ls(self, path='.'): 

277 """ 

278 Lists the content of a path. 

279 

280 @param path path 

281 @return list of path 

282 

283 see :meth:`enumerate_ls <pyquickhelper.filehelper.ftp_transfer.TransferFTP.enumerate_ls>` 

284 

285 .. exref:: 

286 :title: List files from FTP site 

287 

288 :: 

289 

290 from pyquickhelper.filehelper import TransferFTP 

291 ftp = TransferFTP("ftp....", "login", "password") 

292 res = ftp.ls("path") 

293 for v in res: 

294 print(v["name"]) 

295 ftp.close() 

296 """ 

297 return list(self.enumerate_ls(path)) 

298 

299 def enumerate_ls(self, path='.'): 

300 """ 

301 Enumerates the content of a path. 

302 

303 @param path path 

304 @return list of dictionaries 

305 

306 One dictionary:: 

307 

308 {'name': 'www', 

309 'type': 'dir', 

310 'unique': 'aaaa', 

311 'unix.uid': '1111', 

312 'unix.mode': '111', 

313 'sizd': '5', 

314 'unix.gid': '000', 

315 'modify': '111111'} 

316 """ 

317 self._check_can_logged() 

318 if not self.is_sftp: 

319 for a in self.run_command(self._ftp.mlsd, path): 

320 r = dict(name=a[0]) 

321 r.update(a[1]) 

322 yield r 

323 else: 

324 with self._ftp.cd(path): 

325 for name in self._ftp.listdir(): 

326 yield dict(name=name) 

327 

328 def transfer(self, file, to, name, debug=False, blocksize=None, callback=None): 

329 """ 

330 Transfers a file. 

331 

332 @param file file name or stream (binary, BytesIO) 

333 @param to destination (a folder) 

334 @param name name of the stream on the website 

335 @param debug if True, displays more information 

336 @param blocksize see :tpl:`py,m='ftplib',o='FTP.storbinary'` 

337 @param callback see :tpl:`py,m='ftplib',o='FTP.storbinary'` 

338 @return status 

339 

340 When an error happens, the original current directory is restored. 

341 """ 

342 self._check_can_logged() 

343 path = to.split("/") 

344 path = [_ for _ in path if len(_) > 0] 

345 nb_logins = len(self._logins) 

346 cpwd = self.pwd() 

347 

348 done = [] 

349 exc = None 

350 for i, p in enumerate(path): 

351 p_ = ('/' + '/'.join(path[:i + 1])) if self.is_sftp else p 

352 try: 

353 self.cwd(p_, True) 

354 except Exception as e: # pragma: no cover 

355 exc = e 

356 break 

357 done.append(p) 

358 

359 if nb_logins != len(self._logins): 

360 raise CannotCompleteWithoutNewLoginException( # pragma: no cover 

361 f"Cannot reach folder '{to}' without new login") 

362 

363 bs = blocksize if blocksize else TransferFTP.blockSize 

364 if not self.is_sftp: 

365 def runc(name, f, bs, callback): 

366 return self.run_command( 

367 self._ftp.storbinary, 'STOR ' + name, f, bs, callback) 

368 else: 

369 def runc(name, f, bs, callback): 

370 return self.run_command( 

371 self._ftp.putfo, remotepath=name, flo=f, file_size=bs, 

372 callback=None) 

373 if exc is None: 

374 try: 

375 if isinstance(file, str): 

376 if not os.path.exists(file): 

377 raise FileNotFoundError(file) # pragma: no cover 

378 with open(file, "rb") as f: 

379 r = runc(name, f, bs, callback) 

380 elif isinstance(file, BytesIO): 

381 r = runc(name, file, bs, callback) 

382 elif isinstance(file, bytes): 

383 st = BytesIO(file) 

384 r = runc(name, st, bs, callback) 

385 else: 

386 r = runc(name, file, bs, callback) 

387 except Exception as ee: # pragma: no cover 

388 exc = ee 

389 

390 if nb_logins != len(self._logins): # pragma: no cover 

391 try: 

392 self.cwd(cpwd) 

393 done = [] 

394 except Exception as e: 

395 raise CannotCompleteWithoutNewLoginException( 

396 f"Cannot transfer '{to}' without new login") 

397 

398 # It may fail here, it hopes not. 

399 nbtry = 0 

400 nbth = len(done) * 2 + 1 

401 while len(done) > 0: 

402 if nb_logins != len(self._logins): # pragma: no cover 

403 try: 

404 self.cwd(cpwd) 

405 break 

406 except Exception as e: 

407 raise CannotCompleteWithoutNewLoginException( 

408 f"Cannot return to original folder'{to}' without new login") from e 

409 

410 nbtry += 1 

411 try: 

412 self.cwd("..") 

413 done.pop() 

414 except Exception as e: # pragma: no cover 

415 time.sleep(0.5) 

416 self.LOG( 

417 f" issue with command .. len(done) == {len(done)}") 

418 if nbtry > nbth: 

419 raise CannotReturnToFolderException( 

420 "len(path)={0} nbtry={1} exc={2} nbl={3} act={4}".format( 

421 len(done), nbtry, exc, nb_logins, len(self._logins))) from e 

422 

423 if exc is not None: 

424 raise exc # pragma: no cover 

425 return r 

426 

427 def retrieve(self, fold, name, file=None, debug=False): 

428 """ 

429 Downloads a file. 

430 

431 @param file file name or stream (binary, BytesIO) 

432 @param fold full remote path 

433 @param name name of the stream on the website 

434 @param debug if True, displays more information 

435 @return status 

436 """ 

437 self._check_can_logged() 

438 

439 if self.is_sftp: 

440 self.cwd(fold, False) 

441 else: 

442 path = fold.split("/") 

443 path = [_ for _ in path if len(_) > 0] 

444 

445 for p in path: 

446 self.cwd(p, True) 

447 

448 raise_exc = None 

449 

450 if not self.is_sftp: 

451 

452 def _retrbinary_(name, callback, size, f): 

453 r = self.run_command(self._ftp.retrbinary, 

454 'RETR ' + name, callback, size) 

455 if isinstance(r, (bytes, str)): 

456 f.write(r) 

457 return r 

458 

459 runc = _retrbinary_ 

460 else: 

461 def runc(name, callback, size, f): return self.run_command( 

462 self._ftp.getfo, remotepath=name, flo=f, callback=None) 

463 

464 if isinstance(file, str): 

465 with open(file, "wb") as f: 

466 def callback(block): 

467 f.write(block) 

468 try: 

469 runc(name, callback, TransferFTP.blockSize, f) 

470 r = True 

471 except error_perm as e: # pragma: no cover 

472 raise_exc = e 

473 r = False 

474 elif isinstance(file, BytesIO): 

475 def callback(block): 

476 file.write(block) 

477 try: 

478 r = runc(name, callback, TransferFTP.blockSize, file) 

479 except error_perm as e: # pragma: no cover 

480 raise_exc = e 

481 r = False 

482 else: 

483 b = BytesIO() 

484 

485 def callback(block): 

486 b.write(block) 

487 try: 

488 runc(name, callback, TransferFTP.blockSize, b) 

489 except error_perm as e: # pragma: no cover 

490 raise_exc = e 

491 

492 r = b.getvalue() 

493 

494 if not self.is_sftp: 

495 for p in path: 

496 self.cwd("..") 

497 

498 if raise_exc: 

499 raise raise_exc # pragma: no cover 

500 

501 return r 

502 

503 def close(self): 

504 """ 

505 Closes the connection. 

506 """ 

507 self._check_can_logged() 

508 self._ftp.close() 

509 if self.is_sftp: 

510 self._ftp = None