Coverage for src/pyenbc/remote/ssh_remote_connection.py: 20%

286 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-20 05:47 +0200

1""" 

2@file 

3@brief A class to help connect with a remote machine and send command line. 

4""" 

5 

6import time 

7import socket 

8import os 

9import io 

10import warnings 

11 

12from pyquickhelper.loghelper import noLOG 

13from pyquickhelper.filehelper import is_file_string 

14 

15 

16class ASSHClient(): 

17 

18 """ 

19 A simple class to access to remote machine through SSH. 

20 It requires modules 

21 `paramiko <http://www.paramiko.org/>`_, 

22 `pycrypto <https://pypi.python.org/pypi/pycrypto/>`_, 

23 `ecdsa <https://pypi.python.org/pypi/ecdsa>`_. 

24 

25 This class is used in magic command @see me remote_open. 

26 On Windows, the installation of pycrypto can be tricky. 

27 See `Pycrypto on Windows <http://www.xavierdupre.fr/blog/2014-10-21_nojs.html>`_. 

28 Those modules are part of the `Anaconda <http://docs.continuum.io/anaconda/pkg-docs.html>`_ distribution. 

29 """ 

30 

31 def __init__(self, server, username, password): 

32 """ 

33 constructor 

34 

35 @param server server 

36 @param username username 

37 @param password password 

38 """ 

39 self.server = server 

40 self.username = username 

41 self.password = password 

42 self.connection = None 

43 self.session = None 

44 

45 def __str__(self): 

46 """ 

47 usual 

48 """ 

49 return "ASSHClient" 

50 

51 def connect(self): 

52 """ 

53 connect 

54 """ 

55 import paramiko # pylint: disable=C0415 

56 self.connection = paramiko.SSHClient() 

57 self.connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 

58 self.connection.connect( 

59 self.server, 

60 username=self.username, 

61 password=self.password) 

62 

63 def execute_command(self, command, no_exception=False, fill_stdin=None): 

64 """ 

65 execute a command line, it raises an error 

66 if there is an error 

67 

68 @param command command 

69 @param no_exception if True, do not raise any exception 

70 @param fill_stdin data to send on the stdin input 

71 @return stdout, stderr 

72 

73 Example of commands:: 

74 

75 ssh.execute_command("ls") 

76 ssh.execute_command("hdfs dfs -ls") 

77 

78 """ 

79 stdin, stdout, stderr = self.connection.exec_command(command) 

80 

81 if fill_stdin is not None: 

82 if isinstance(fill_stdin, list): 

83 fill_stdin = "\n".join(stdin) 

84 if isinstance(fill_stdin, str): 

85 stdin.write(fill_stdin) 

86 stdin.flush() 

87 else: 

88 raise TypeError( 

89 "fill_stdin must be a string, not: {0}".format( 

90 type(fill_stdin))) 

91 

92 stdin.close() 

93 

94 err = stderr.read() 

95 out = stdout.read() 

96 

97 # weird... 

98 if isinstance(err, str) and err.startswith("b'"): 

99 err = eval(err) 

100 if isinstance(out, str) and out.startswith("b'"): 

101 out = eval(out) 

102 

103 if isinstance(err, bytes): 

104 err = err.decode("utf-8") 

105 if isinstance(out, bytes): 

106 out = out.decode("utf-8") 

107 

108 if not no_exception and len(err) > 0: 

109 raise Exception( 

110 "unable to run: {0}\nOUT:\n{1}\nERR:\n{2}".format( 

111 command, 

112 out, 

113 err)) 

114 

115 return out, err 

116 

117 def close(self): 

118 """ 

119 close the connection 

120 """ 

121 self.connection.close() 

122 self.connection = None 

123 

124 def upload(self, localpath, remotepath): 

125 """ 

126 upload a file to the remote machine (not on the cluster) 

127 

128 @param localpath local file (or a list of files) 

129 @param remotepath remote file 

130 

131 .. versionchanged:: 1.1 

132 it can upload multiple files if localpath is a list 

133 """ 

134 sftp = self.connection.open_sftp() 

135 if isinstance(localpath, str): 

136 if not os.path.exists(localpath): 

137 raise FileNotFoundError(localpath) 

138 sftp.put(localpath, remotepath) 

139 else: 

140 for f in localpath: 

141 if not os.path.exists(f): 

142 raise FileNotFoundError(f) 

143 sftp.put(f, remotepath + "/" + os.path.split(f)[-1]) 

144 sftp.close() 

145 

146 def upload_cluster(self, localpath, remotepath): 

147 """ 

148 the function directly uploads the file to the cluster, it first goes 

149 to the bridge, uploads it to the cluster and deletes it from the bridge 

150 

151 @param localpath local filename (or list of files) 

152 @param remotepath path to the cluster 

153 @return filename 

154 

155 .. versionadded:: 1.1 

156 """ 

157 if isinstance(localpath, str): 

158 filename = os.path.split(localpath)[-1] 

159 self.upload(localpath, filename) 

160 self.execute_command( 

161 "hdfs dfs -put {0} {1}".format(filename, remotepath)) 

162 self.execute_command("rm {0}".format(filename)) 

163 else: 

164 self.upload(localpath, ".") 

165 for afile in localpath: 

166 filename = os.path.split(afile)[-1] 

167 self.execute_command( 

168 "hdfs dfs -put {0} {1}".format(filename, remotepath)) 

169 self.execute_command("rm {0}".format(filename)) 

170 

171 return remotepath 

172 

173 def download(self, remotepath, localpath): 

174 """ 

175 download a file from the remote machine (not on the cluster) 

176 @param localpath local file 

177 @param remotepath remote file (it can be a list, localpath is a folder in that case) 

178 

179 .. versionchanged:: 1.1 

180 remotepath can be a list of paths 

181 """ 

182 sftp = self.connection.open_sftp() 

183 if isinstance(remotepath, str): 

184 sftp.get(remotepath, localpath) 

185 else: 

186 for path in remotepath: 

187 filename = os.path.split(path)[-1] 

188 sftp.get(path, localpath + "/" + filename) 

189 sftp.close() 

190 

191 def download_cluster(self, remotepath, localpath, merge=False): 

192 """ 

193 download a file directly from the cluster to the local machine 

194 @param localpath local file 

195 @param remotepath remote file (it can be a list, localpath is a folder in that case) 

196 @param merge True to use getmerge instead of get 

197 

198 .. versionadded:: 1.1 

199 """ 

200 cget = "getmerge" if merge else "get" 

201 if isinstance(remotepath, str): 

202 filename = os.path.split(localpath)[-1] 

203 self.execute_command( 

204 "hdfs dfs -{2} {0} {1}".format(remotepath, filename, cget)) 

205 self.download(filename, localpath) 

206 self.execute_command("rm {0}".format(filename)) 

207 else: 

208 tod = [] 

209 for afile in remotepath: 

210 filename = os.path.split(afile)[-1] 

211 self.execute_command( 

212 "hdfs dfs -{2} {0} {1}".format(afile, filename, cget)) 

213 tod.append(filename) 

214 self.download(tod, localpath) 

215 for afile in tod: 

216 self.execute_command("rm {0}".format(afile)) 

217 

218 return remotepath 

219 

220 _allowed_form = {None: None, "plain": None, "html": None} 

221 

222 @staticmethod 

223 def _get_out_format(format): 

224 """ 

225 Returns a function which converts an :epkg:`ANSI` string into a 

226 different format. 

227 

228 @param format string 

229 @return function 

230 """ 

231 if format not in ASSHClient._allowed_form: 

232 raise KeyError( 

233 "unexpected format, it should be in " + 

234 ",".join( 

235 ASSHClient._allowed_form.keys())) 

236 func = ASSHClient._allowed_form[format] 

237 if func is None: 

238 if format is None: 

239 def idfunc(s): 

240 "local function" 

241 return s 

242 func = idfunc 

243 elif format == "plain": 

244 import ansiconv # pylint: disable=C0415 

245 

246 def convert_plain(s): 

247 "local function" 

248 return ansiconv.to_plain(s) 

249 func = convert_plain 

250 elif format == "html": 

251 from ansi2html import Ansi2HTMLConverter 

252 conv = Ansi2HTMLConverter() 

253 

254 def convert_html(s): 

255 "local function" 

256 return conv.convert(s) 

257 func = convert_html 

258 ASSHClient._allowed_form[format] = func 

259 return func 

260 

261 def open_session(self, no_exception=False, timeout=1.0, 

262 add_eol=True, prompts=("~$", ">>>"), out_format=None): 

263 """ 

264 Opens a session with method 

265 `invoke_shell <http://docs.paramiko.org/en/latest/api/client.html?highlight=invoke_shell#paramiko.client.SSHClient.invoke_shell>`_. 

266 

267 @param no_exception if True, do not raise any exception in case of error 

268 @param timeout timeout in s 

269 @param add_eol if True, the function will add a EOL to the sent command if it does not have one 

270 @param prompts if function terminates if the output ends by one of those strings. 

271 @param out_format None, plain, html 

272 

273 .. exref:: 

274 :title: How to open a remote shell? 

275 :tag: Hadoop 

276 

277 :: 

278 

279 ssh = ASSHClient( "<server>", 

280 "<login>", 

281 "<password>") 

282 ssh.connect() 

283 out = ssh.send_recv_session("ls") 

284 print( ssh.send_recv_session("python") ) 

285 print( ssh.send_recv_session("print('3')") ) 

286 print( ssh.send_recv_session("import sys\\nsys.executable") ) 

287 print( ssh.send_recv_session("sys.exit()") ) 

288 print( ssh.send_recv_session(None) ) 

289 ssh.close_session() 

290 ssh.close() 

291 

292 The notebook :ref:`exampleofsshclientcommunicationrst` illustrates 

293 the output of these instructions. 

294 """ 

295 if self.connection is None: 

296 raise Exception("No open connection.") 

297 if self.session is not None: 

298 raise Exception( 

299 "A session is already open. Cannot open a second one.") 

300 if out_format not in ASSHClient._allowed_form: 

301 raise KeyError( 

302 "unexpected format, it should be in {0}".format( 

303 ";".join( 

304 str(_) for _ in ASSHClient._allowed_form.keys()))) 

305 

306 self.session = self.connection.invoke_shell(width=300, height=1000) 

307 self.session_params = { 

308 "no_exception": no_exception, 

309 "timeout": timeout, 

310 "add_eol": add_eol, 

311 "prompts": [] if prompts is None else prompts, 

312 "out_format": out_format, 

313 "out_func": ASSHClient._get_out_format(out_format) 

314 } 

315 

316 self.session.settimeout(timeout) 

317 return self.session 

318 

319 def close_session(self): 

320 """ 

321 close a session 

322 """ 

323 if self.session is None: 

324 raise Exception("No open session. Cannot close anything.") 

325 

326 self.session.close() 

327 self.session = None 

328 

329 def send_recv_session(self, fillin): 

330 """ 

331 Send something through a session, 

332 the function is supposed to return when the execute of the given command is done, 

333 but this is quite difficult to detect without knowing what exactly was send. 

334 

335 So we add a timeout just to tell the function it has to return even if nothing 

336 tells the command has finished. It fillin is None, the function will just 

337 listen to the output. 

338 

339 @param fillin sent to stdin 

340 @return stdout 

341 

342 The output contains 

343 `escape codes <http://ascii-table.com/ansi-escape-sequences-vt-100.php>`_. 

344 They can be converted to plain text or HTML 

345 by using the module `ansiconv <http://pythonhosted.org/ansiconv/>`_ 

346 and `ansi2html <https://github.com/ralphbean/ansi2html/>`_. 

347 This can be specified when opening the session. 

348 """ 

349 prompts = self.session_params["prompts"] 

350 timeout = self.session_params["timeout"] 

351 add_eol = self.session_params["add_eol"] 

352 func = self.session_params["out_func"] 

353 

354 if fillin is not None: 

355 self.session.send(fillin.encode("utf-8")) 

356 if add_eol and not fillin.endswith('\n'): 

357 self.session.send("\n".encode("utf-8")) 

358 

359 buff = '' 

360 begin = time.perf_counter() 

361 while True: 

362 try: 

363 resp = self.session.recv(9999) 

364 except socket.timeout: 

365 resp = b"" 

366 dec = resp.decode("unicode_escape") 

367 buff += dec 

368 for p in prompts: 

369 if buff.endswith(p): 

370 break 

371 if time.perf_counter() - begin > timeout: 

372 break 

373 

374 return func(buff.replace("\r", "")) 

375 

376 @staticmethod 

377 def parse_lsout(out, local_schema=True): 

378 """ 

379 parses the output of a command ls 

380 

381 @param out output 

382 @param local_schema schema for the bridge or the cluster (False) 

383 @return DataFrame 

384 

385 .. versionadded:: 1.1 

386 """ 

387 import pandas # pylint: disable=C0415 

388 if local_schema: 

389 names = ["attributes", "code", "alias", "folder", 

390 "size", "unit", "name"] 

391 else: 

392 names = ["attributes", "code", "alias", "folder", 

393 "size", "date", "time", "name"] 

394 kout = out 

395 out = out.replace("\r", "").split("\n") 

396 out = [_ for _ in out if len(_.split()) > 3] 

397 if len(out) == 0: 

398 df = pandas.DataFrame(columns=names) 

399 return df 

400 

401 try: 

402 out_ = [_.split() for _ in out] 

403 if len(out_) > 0 and len(out_[0]) != len(names): 

404 if names[5] == "date" and len(out_[0]) == len(names) + 1: 

405 # we merge 2 columns 

406 out_ = [_[:5] + [" ".join(_[5:7])] + _[7:] for _ in out_] 

407 df = pandas.DataFrame(data=out_, columns=names) 

408 except (AssertionError, ValueError) as e: 

409 out = "\n".join(out) 

410 buf = io.StringIO(out) 

411 try: 

412 df = pandas.read_fwf(buf, names=names, index=False) 

413 except ValueError: 

414 raise ValueError( 

415 "unable to parse output:\nSCHEMA:\n{1}\nOUT:\n{0}" 

416 "".format(kout, ",".join(names))) from e 

417 

418 df["isdir"] = df.apply(lambda r: r["attributes"][0] == "d", axis=1) 

419 return df 

420 

421 def ls(self, path): 

422 """ 

423 return the content of a folder on the bridge as a DataFrame 

424 

425 @param path path on the bridge 

426 @return DataFrame 

427 

428 .. versionadded:: 1.1 

429 """ 

430 out, err = self.execute_command("ls -l " + path) 

431 if len(err) > 0: 

432 raise Exception("unable to execute ls " + path + "\nERR:\n" + err) 

433 return ASSHClient.parse_lsout(out) 

434 

435 def dfs_ls(self, path): 

436 """ 

437 return the content of a folder on the cluster as a DataFrame 

438 

439 @param path path on the cluster 

440 @return DataFrame 

441 

442 .. versionadded:: 1.1 

443 """ 

444 out, err = self.execute_command("hdfs dfs -ls " + path) 

445 if len(err) > 0: 

446 raise Exception( 

447 "unable to execute hdfs dfs -ls " + 

448 path + 

449 "\nERR:\n" + 

450 err) 

451 return ASSHClient.parse_lsout(out, False) 

452 

453 def exists(self, path): 

454 """ 

455 tells if a file exists on the bridge 

456 

457 @param path path 

458 @return boolean 

459 

460 .. versionadded:: 1.1 

461 """ 

462 try: 

463 df = self.ls(path) 

464 except Exception as e: 

465 if "No such file or directory" in str(e): 

466 return False 

467 ex = df[df.name == path] 

468 return len(ex) > 0 

469 

470 def dfs_exists(self, path): 

471 """ 

472 tells if a file exists on the cluster 

473 

474 @param path path 

475 @return boolean 

476 

477 .. versionadded:: 1.1 

478 """ 

479 try: 

480 df = self.dfs_ls(path) 

481 except Exception as e: 

482 if "No such file or directory" in str(e): 

483 return False 

484 else: 

485 raise e 

486 if len(df) == 0: 

487 # it is a folder 

488 return True 

489 ex = df[df.name == path] 

490 if len(ex) > 0: 

491 return True 

492 ex = df[df.apply(lambda r: r["name"].startswith(path + "/"), axis=1)] 

493 if len(ex) > 0: 

494 return True 

495 return False 

496 

497 def dfs_mkdir(self, path): 

498 """ 

499 creates a directory on the cluster 

500 

501 @param path path 

502 

503 .. versionadded:: 1.1 

504 """ 

505 return self.execute_command("hdfs dfs -mkdir " + path) 

506 

507 def dfs_rm(self, path, recursive=False): 

508 """ 

509 removes a file on the cluster 

510 

511 @param path path 

512 @param recursive boolean 

513 

514 .. versionadded:: 1.1 

515 """ 

516 cmd = "hdfs dfs -rm " 

517 if recursive: 

518 cmd += "-r " 

519 out, err = self.execute_command(cmd + path, no_exception=True) 

520 if out.startswith("Moved"): 

521 return out, err 

522 else: 

523 raise Exception( 

524 "unable to remove " + 

525 path + 

526 "\nOUT\n" + 

527 out + 

528 "\nERR:\n" + 

529 err) 

530 

531 @staticmethod 

532 def build_command_line_parameters(params, command_name="-param"): 

533 """ 

534 builds a string for ``pig`` based on the parameters in params 

535 

536 @param params dictionary 

537 @param command_name ``-param`` or ``-hiveconf`` 

538 @return string 

539 

540 .. versionadded:: 1.1 

541 """ 

542 if params is None: 

543 return "" 

544 res = [] 

545 for k, v in sorted(params.items()): 

546 if '"' in v: 

547 v = v.replace('"', '\\"') 

548 one = '{2} {0}="{1}"'.format(k, v, command_name) 

549 res.append(one) 

550 return " ".join(res) 

551 

552 def pig_submit(self, pig_file, 

553 dependencies=None, 

554 params=None, 

555 redirection="redirection.pig", 

556 local=False, 

557 stop_on_failure=False, 

558 check=False, 

559 no_exception=True, 

560 fLOG=noLOG): 

561 """ 

562 Submits a :epkg:`PIG` script, it first upload the script 

563 to the default folder and submits it. 

564 

565 @param pig_file pig script (local) 

566 @param dependencies others files to upload (still in the default folder) 

567 @param params parameters to send to the job 

568 @param redirection string empty or not 

569 @param local local run or not (option `-x local <https://cwiki.apache.org/confluence/display/PIG/PigTutorial>`_) 

570 (in that case, redirection will be empty) 

571 @param stop_on_failure if True, add option ``-stop_on_failure`` on the command line 

572 @param check if True, add option ``-check`` (in that case, redirection will be empty) 

573 @param no_exception sent to @see me execute_command 

574 @param fLOG logging function 

575 @return out, err from @see me execute_command 

576 

577 If *redirection* is not empty, the job is submitted but 

578 the function returns after the standard output and error were 

579 redirected to ``redirection.out`` and ``redirection.err``. 

580 

581 The first file will contain the results of commands 

582 `DESCRIBE <http://pig.apache.org/docs/r0.14.0/test.html#describe>`_ 

583 `DUMP <http://pig.apache.org/docs/r0.14.0/test.html#dump>`_, 

584 `EXPLAIN <http://pig.apache.org/docs/r0.14.0/test.html#explain>`_. 

585 The standard error receives logs and exceptions. 

586 

587 The function executes the command line:: 

588 

589 pig -execute -f <filename> 

590 

591 With redirection:: 

592 

593 pig -execute -f <filename> 2> redirection.pig.err 1> redirection.pig.out & 

594 

595 .. versionadded:: 1.1 

596 """ 

597 dest = os.path.split(pig_file)[-1] 

598 self.upload(pig_file, dest) 

599 if dependencies is not None: 

600 for py in dependencies: 

601 self.upload(py, os.path.split(py)[-1]) 

602 

603 slocal = " -x local" if local else "" 

604 sstop_on_failure = " -stop_on_failure" if stop_on_failure else "" 

605 scheck = " -check" if check else "" 

606 

607 if local or check: 

608 redirection = None 

609 

610 if params is not None: 

611 sparams = ASSHClient.build_command_line_parameters(params) 

612 if len(sparams) > 0: 

613 sparams = " " + sparams 

614 else: 

615 sparams = "" 

616 

617 if redirection is None: 

618 cmd = "pig{0}{1}{2} -execute -f {3}{4}".format( 

619 slocal, 

620 sstop_on_failure, 

621 scheck, 

622 dest, 

623 sparams) 

624 else: 

625 cmd = "pig{2}{3}{4} -execute -f {0}{5} 2> {1}.err 1> {1}.out &".format( 

626 dest, 

627 redirection, 

628 slocal, 

629 sstop_on_failure, 

630 scheck, 

631 sparams) 

632 

633 if isinstance(cmd, list): 

634 raise TypeError("this should not happen:" + str(cmd)) 

635 

636 fLOG("[pig_submit]:", cmd) 

637 out, err = self.execute_command(cmd, no_exception=no_exception) 

638 return out, err 

639 

640 def hive_submit(self, hive_file_or_query, 

641 params=None, 

642 redirection="redirection.hive", 

643 no_exception=True, 

644 fLOG=noLOG): 

645 """ 

646 submits a PIG script, it first upload the script 

647 to the default folder and submit it 

648 

649 @param hive_file_or_query pig script (local) 

650 @param params parameters to send to the job 

651 @param redirection string empty or not 

652 @param no_exception sent to @see me execute_command 

653 @param fLOG logging function 

654 @return out, err from @see me execute_command 

655 

656 If *redirection* is not empty, the job is submitted but 

657 the function returns after the standard output and error were 

658 redirected to ``redirection.hive.out`` and ``redirection.hive.err``. 

659 

660 The function executes the command line:: 

661 

662 hive -f <filename> 

663 

664 Or:: 

665 

666 hive -e <query> 

667 

668 With redirection:: 

669 

670 hive -execute -f <filename> 2> redirection.hive.err 1> redirection.hive.out & 

671 

672 If there is no redirection, the function 

673 waits and return the output. 

674 

675 .. exref:: 

676 :title: Submit a HIVE query 

677 :tag: Hadoop 

678 

679 :: 

680 

681 client = ASSHClient() 

682 

683 hive_sql = ''' 

684 DROP TABLE IF EXISTS bikes20; 

685 CREATE TABLE bikes20 (sjson STRING); 

686 LOAD DATA INPATH "/user/__USERNAME__/unittest2/paris*.txt" INTO TABLE bikes20; 

687 SELECT * FROM bikes20 LIMIT 10; 

688 '''.replace("__USERNAME__", self.client.username) 

689 

690 out,err = client.hive_submit(hive_sql, redirection=None) 

691 

692 .. versionadded:: 1.1 

693 """ 

694 if is_file_string(hive_file_or_query) and os.path.exists(hive_file_or_query): 

695 dest = os.path.split(hive_file_or_query)[-1] 

696 self.upload(hive_file_or_query, dest) 

697 command = "-f" 

698 else: 

699 command = "-e" 

700 dest = hive_file_or_query.replace( 

701 "\n", " ").replace("\r", "").replace("\t", " ") 

702 dest = dest.replace("'", "\\'") 

703 dest = "'{}'".format(dest.strip()) 

704 

705 if params is not None: 

706 sparams = ASSHClient.build_command_line_parameters( 

707 params, "-hiveconf") 

708 if len(sparams) > 0: 

709 sparams = " " + sparams 

710 else: 

711 sparams = "" 

712 

713 if redirection is None: 

714 cmd = "hive {0} {1}{2}".format( 

715 command, 

716 dest, 

717 sparams) 

718 else: 

719 cmd = "hive {0} {1}{2} 2> {3}.err 1> {3}.out &".format( 

720 command, 

721 dest, 

722 sparams, 

723 redirection) 

724 

725 if isinstance(cmd, list): 

726 raise TypeError("this should not happen:" + str(cmd)) 

727 

728 warnings.warn("Hive submission is not tested. It will probably fail.") 

729 

730 fLOG("[hive_submit]:", cmd) 

731 out, err = self.execute_command(cmd, no_exception=no_exception) 

732 return out, err