Coverage for src/pyenbc/remote/ssh_remote_connection.py: 20%
286 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 05:47 +0200
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 05:47 +0200
1"""
2@file
3@brief A class to help connect with a remote machine and send command line.
4"""
6import time
7import socket
8import os
9import io
10import warnings
12from pyquickhelper.loghelper import noLOG
13from pyquickhelper.filehelper import is_file_string
16class ASSHClient():
18 """
19 A simple class to access to remote machine through SSH.
20 It requires modules
21 `paramiko <http://www.paramiko.org/>`_,
22 `pycrypto <https://pypi.python.org/pypi/pycrypto/>`_,
23 `ecdsa <https://pypi.python.org/pypi/ecdsa>`_.
25 This class is used in magic command @see me remote_open.
26 On Windows, the installation of pycrypto can be tricky.
27 See `Pycrypto on Windows <http://www.xavierdupre.fr/blog/2014-10-21_nojs.html>`_.
28 Those modules are part of the `Anaconda <http://docs.continuum.io/anaconda/pkg-docs.html>`_ distribution.
29 """
31 def __init__(self, server, username, password):
32 """
33 constructor
35 @param server server
36 @param username username
37 @param password password
38 """
39 self.server = server
40 self.username = username
41 self.password = password
42 self.connection = None
43 self.session = None
45 def __str__(self):
46 """
47 usual
48 """
49 return "ASSHClient"
51 def connect(self):
52 """
53 connect
54 """
55 import paramiko # pylint: disable=C0415
56 self.connection = paramiko.SSHClient()
57 self.connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
58 self.connection.connect(
59 self.server,
60 username=self.username,
61 password=self.password)
63 def execute_command(self, command, no_exception=False, fill_stdin=None):
64 """
65 execute a command line, it raises an error
66 if there is an error
68 @param command command
69 @param no_exception if True, do not raise any exception
70 @param fill_stdin data to send on the stdin input
71 @return stdout, stderr
73 Example of commands::
75 ssh.execute_command("ls")
76 ssh.execute_command("hdfs dfs -ls")
78 """
79 stdin, stdout, stderr = self.connection.exec_command(command)
81 if fill_stdin is not None:
82 if isinstance(fill_stdin, list):
83 fill_stdin = "\n".join(stdin)
84 if isinstance(fill_stdin, str):
85 stdin.write(fill_stdin)
86 stdin.flush()
87 else:
88 raise TypeError(
89 "fill_stdin must be a string, not: {0}".format(
90 type(fill_stdin)))
92 stdin.close()
94 err = stderr.read()
95 out = stdout.read()
97 # weird...
98 if isinstance(err, str) and err.startswith("b'"):
99 err = eval(err)
100 if isinstance(out, str) and out.startswith("b'"):
101 out = eval(out)
103 if isinstance(err, bytes):
104 err = err.decode("utf-8")
105 if isinstance(out, bytes):
106 out = out.decode("utf-8")
108 if not no_exception and len(err) > 0:
109 raise Exception(
110 "unable to run: {0}\nOUT:\n{1}\nERR:\n{2}".format(
111 command,
112 out,
113 err))
115 return out, err
117 def close(self):
118 """
119 close the connection
120 """
121 self.connection.close()
122 self.connection = None
124 def upload(self, localpath, remotepath):
125 """
126 upload a file to the remote machine (not on the cluster)
128 @param localpath local file (or a list of files)
129 @param remotepath remote file
131 .. versionchanged:: 1.1
132 it can upload multiple files if localpath is a list
133 """
134 sftp = self.connection.open_sftp()
135 if isinstance(localpath, str):
136 if not os.path.exists(localpath):
137 raise FileNotFoundError(localpath)
138 sftp.put(localpath, remotepath)
139 else:
140 for f in localpath:
141 if not os.path.exists(f):
142 raise FileNotFoundError(f)
143 sftp.put(f, remotepath + "/" + os.path.split(f)[-1])
144 sftp.close()
146 def upload_cluster(self, localpath, remotepath):
147 """
148 the function directly uploads the file to the cluster, it first goes
149 to the bridge, uploads it to the cluster and deletes it from the bridge
151 @param localpath local filename (or list of files)
152 @param remotepath path to the cluster
153 @return filename
155 .. versionadded:: 1.1
156 """
157 if isinstance(localpath, str):
158 filename = os.path.split(localpath)[-1]
159 self.upload(localpath, filename)
160 self.execute_command(
161 "hdfs dfs -put {0} {1}".format(filename, remotepath))
162 self.execute_command("rm {0}".format(filename))
163 else:
164 self.upload(localpath, ".")
165 for afile in localpath:
166 filename = os.path.split(afile)[-1]
167 self.execute_command(
168 "hdfs dfs -put {0} {1}".format(filename, remotepath))
169 self.execute_command("rm {0}".format(filename))
171 return remotepath
173 def download(self, remotepath, localpath):
174 """
175 download a file from the remote machine (not on the cluster)
176 @param localpath local file
177 @param remotepath remote file (it can be a list, localpath is a folder in that case)
179 .. versionchanged:: 1.1
180 remotepath can be a list of paths
181 """
182 sftp = self.connection.open_sftp()
183 if isinstance(remotepath, str):
184 sftp.get(remotepath, localpath)
185 else:
186 for path in remotepath:
187 filename = os.path.split(path)[-1]
188 sftp.get(path, localpath + "/" + filename)
189 sftp.close()
191 def download_cluster(self, remotepath, localpath, merge=False):
192 """
193 download a file directly from the cluster to the local machine
194 @param localpath local file
195 @param remotepath remote file (it can be a list, localpath is a folder in that case)
196 @param merge True to use getmerge instead of get
198 .. versionadded:: 1.1
199 """
200 cget = "getmerge" if merge else "get"
201 if isinstance(remotepath, str):
202 filename = os.path.split(localpath)[-1]
203 self.execute_command(
204 "hdfs dfs -{2} {0} {1}".format(remotepath, filename, cget))
205 self.download(filename, localpath)
206 self.execute_command("rm {0}".format(filename))
207 else:
208 tod = []
209 for afile in remotepath:
210 filename = os.path.split(afile)[-1]
211 self.execute_command(
212 "hdfs dfs -{2} {0} {1}".format(afile, filename, cget))
213 tod.append(filename)
214 self.download(tod, localpath)
215 for afile in tod:
216 self.execute_command("rm {0}".format(afile))
218 return remotepath
220 _allowed_form = {None: None, "plain": None, "html": None}
222 @staticmethod
223 def _get_out_format(format):
224 """
225 Returns a function which converts an :epkg:`ANSI` string into a
226 different format.
228 @param format string
229 @return function
230 """
231 if format not in ASSHClient._allowed_form:
232 raise KeyError(
233 "unexpected format, it should be in " +
234 ",".join(
235 ASSHClient._allowed_form.keys()))
236 func = ASSHClient._allowed_form[format]
237 if func is None:
238 if format is None:
239 def idfunc(s):
240 "local function"
241 return s
242 func = idfunc
243 elif format == "plain":
244 import ansiconv # pylint: disable=C0415
246 def convert_plain(s):
247 "local function"
248 return ansiconv.to_plain(s)
249 func = convert_plain
250 elif format == "html":
251 from ansi2html import Ansi2HTMLConverter
252 conv = Ansi2HTMLConverter()
254 def convert_html(s):
255 "local function"
256 return conv.convert(s)
257 func = convert_html
258 ASSHClient._allowed_form[format] = func
259 return func
261 def open_session(self, no_exception=False, timeout=1.0,
262 add_eol=True, prompts=("~$", ">>>"), out_format=None):
263 """
264 Opens a session with method
265 `invoke_shell <http://docs.paramiko.org/en/latest/api/client.html?highlight=invoke_shell#paramiko.client.SSHClient.invoke_shell>`_.
267 @param no_exception if True, do not raise any exception in case of error
268 @param timeout timeout in s
269 @param add_eol if True, the function will add a EOL to the sent command if it does not have one
270 @param prompts if function terminates if the output ends by one of those strings.
271 @param out_format None, plain, html
273 .. exref::
274 :title: How to open a remote shell?
275 :tag: Hadoop
277 ::
279 ssh = ASSHClient( "<server>",
280 "<login>",
281 "<password>")
282 ssh.connect()
283 out = ssh.send_recv_session("ls")
284 print( ssh.send_recv_session("python") )
285 print( ssh.send_recv_session("print('3')") )
286 print( ssh.send_recv_session("import sys\\nsys.executable") )
287 print( ssh.send_recv_session("sys.exit()") )
288 print( ssh.send_recv_session(None) )
289 ssh.close_session()
290 ssh.close()
292 The notebook :ref:`exampleofsshclientcommunicationrst` illustrates
293 the output of these instructions.
294 """
295 if self.connection is None:
296 raise Exception("No open connection.")
297 if self.session is not None:
298 raise Exception(
299 "A session is already open. Cannot open a second one.")
300 if out_format not in ASSHClient._allowed_form:
301 raise KeyError(
302 "unexpected format, it should be in {0}".format(
303 ";".join(
304 str(_) for _ in ASSHClient._allowed_form.keys())))
306 self.session = self.connection.invoke_shell(width=300, height=1000)
307 self.session_params = {
308 "no_exception": no_exception,
309 "timeout": timeout,
310 "add_eol": add_eol,
311 "prompts": [] if prompts is None else prompts,
312 "out_format": out_format,
313 "out_func": ASSHClient._get_out_format(out_format)
314 }
316 self.session.settimeout(timeout)
317 return self.session
319 def close_session(self):
320 """
321 close a session
322 """
323 if self.session is None:
324 raise Exception("No open session. Cannot close anything.")
326 self.session.close()
327 self.session = None
329 def send_recv_session(self, fillin):
330 """
331 Send something through a session,
332 the function is supposed to return when the execute of the given command is done,
333 but this is quite difficult to detect without knowing what exactly was send.
335 So we add a timeout just to tell the function it has to return even if nothing
336 tells the command has finished. It fillin is None, the function will just
337 listen to the output.
339 @param fillin sent to stdin
340 @return stdout
342 The output contains
343 `escape codes <http://ascii-table.com/ansi-escape-sequences-vt-100.php>`_.
344 They can be converted to plain text or HTML
345 by using the module `ansiconv <http://pythonhosted.org/ansiconv/>`_
346 and `ansi2html <https://github.com/ralphbean/ansi2html/>`_.
347 This can be specified when opening the session.
348 """
349 prompts = self.session_params["prompts"]
350 timeout = self.session_params["timeout"]
351 add_eol = self.session_params["add_eol"]
352 func = self.session_params["out_func"]
354 if fillin is not None:
355 self.session.send(fillin.encode("utf-8"))
356 if add_eol and not fillin.endswith('\n'):
357 self.session.send("\n".encode("utf-8"))
359 buff = ''
360 begin = time.perf_counter()
361 while True:
362 try:
363 resp = self.session.recv(9999)
364 except socket.timeout:
365 resp = b""
366 dec = resp.decode("unicode_escape")
367 buff += dec
368 for p in prompts:
369 if buff.endswith(p):
370 break
371 if time.perf_counter() - begin > timeout:
372 break
374 return func(buff.replace("\r", ""))
376 @staticmethod
377 def parse_lsout(out, local_schema=True):
378 """
379 parses the output of a command ls
381 @param out output
382 @param local_schema schema for the bridge or the cluster (False)
383 @return DataFrame
385 .. versionadded:: 1.1
386 """
387 import pandas # pylint: disable=C0415
388 if local_schema:
389 names = ["attributes", "code", "alias", "folder",
390 "size", "unit", "name"]
391 else:
392 names = ["attributes", "code", "alias", "folder",
393 "size", "date", "time", "name"]
394 kout = out
395 out = out.replace("\r", "").split("\n")
396 out = [_ for _ in out if len(_.split()) > 3]
397 if len(out) == 0:
398 df = pandas.DataFrame(columns=names)
399 return df
401 try:
402 out_ = [_.split() for _ in out]
403 if len(out_) > 0 and len(out_[0]) != len(names):
404 if names[5] == "date" and len(out_[0]) == len(names) + 1:
405 # we merge 2 columns
406 out_ = [_[:5] + [" ".join(_[5:7])] + _[7:] for _ in out_]
407 df = pandas.DataFrame(data=out_, columns=names)
408 except (AssertionError, ValueError) as e:
409 out = "\n".join(out)
410 buf = io.StringIO(out)
411 try:
412 df = pandas.read_fwf(buf, names=names, index=False)
413 except ValueError:
414 raise ValueError(
415 "unable to parse output:\nSCHEMA:\n{1}\nOUT:\n{0}"
416 "".format(kout, ",".join(names))) from e
418 df["isdir"] = df.apply(lambda r: r["attributes"][0] == "d", axis=1)
419 return df
421 def ls(self, path):
422 """
423 return the content of a folder on the bridge as a DataFrame
425 @param path path on the bridge
426 @return DataFrame
428 .. versionadded:: 1.1
429 """
430 out, err = self.execute_command("ls -l " + path)
431 if len(err) > 0:
432 raise Exception("unable to execute ls " + path + "\nERR:\n" + err)
433 return ASSHClient.parse_lsout(out)
435 def dfs_ls(self, path):
436 """
437 return the content of a folder on the cluster as a DataFrame
439 @param path path on the cluster
440 @return DataFrame
442 .. versionadded:: 1.1
443 """
444 out, err = self.execute_command("hdfs dfs -ls " + path)
445 if len(err) > 0:
446 raise Exception(
447 "unable to execute hdfs dfs -ls " +
448 path +
449 "\nERR:\n" +
450 err)
451 return ASSHClient.parse_lsout(out, False)
453 def exists(self, path):
454 """
455 tells if a file exists on the bridge
457 @param path path
458 @return boolean
460 .. versionadded:: 1.1
461 """
462 try:
463 df = self.ls(path)
464 except Exception as e:
465 if "No such file or directory" in str(e):
466 return False
467 ex = df[df.name == path]
468 return len(ex) > 0
470 def dfs_exists(self, path):
471 """
472 tells if a file exists on the cluster
474 @param path path
475 @return boolean
477 .. versionadded:: 1.1
478 """
479 try:
480 df = self.dfs_ls(path)
481 except Exception as e:
482 if "No such file or directory" in str(e):
483 return False
484 else:
485 raise e
486 if len(df) == 0:
487 # it is a folder
488 return True
489 ex = df[df.name == path]
490 if len(ex) > 0:
491 return True
492 ex = df[df.apply(lambda r: r["name"].startswith(path + "/"), axis=1)]
493 if len(ex) > 0:
494 return True
495 return False
497 def dfs_mkdir(self, path):
498 """
499 creates a directory on the cluster
501 @param path path
503 .. versionadded:: 1.1
504 """
505 return self.execute_command("hdfs dfs -mkdir " + path)
507 def dfs_rm(self, path, recursive=False):
508 """
509 removes a file on the cluster
511 @param path path
512 @param recursive boolean
514 .. versionadded:: 1.1
515 """
516 cmd = "hdfs dfs -rm "
517 if recursive:
518 cmd += "-r "
519 out, err = self.execute_command(cmd + path, no_exception=True)
520 if out.startswith("Moved"):
521 return out, err
522 else:
523 raise Exception(
524 "unable to remove " +
525 path +
526 "\nOUT\n" +
527 out +
528 "\nERR:\n" +
529 err)
531 @staticmethod
532 def build_command_line_parameters(params, command_name="-param"):
533 """
534 builds a string for ``pig`` based on the parameters in params
536 @param params dictionary
537 @param command_name ``-param`` or ``-hiveconf``
538 @return string
540 .. versionadded:: 1.1
541 """
542 if params is None:
543 return ""
544 res = []
545 for k, v in sorted(params.items()):
546 if '"' in v:
547 v = v.replace('"', '\\"')
548 one = '{2} {0}="{1}"'.format(k, v, command_name)
549 res.append(one)
550 return " ".join(res)
552 def pig_submit(self, pig_file,
553 dependencies=None,
554 params=None,
555 redirection="redirection.pig",
556 local=False,
557 stop_on_failure=False,
558 check=False,
559 no_exception=True,
560 fLOG=noLOG):
561 """
562 Submits a :epkg:`PIG` script, it first upload the script
563 to the default folder and submits it.
565 @param pig_file pig script (local)
566 @param dependencies others files to upload (still in the default folder)
567 @param params parameters to send to the job
568 @param redirection string empty or not
569 @param local local run or not (option `-x local <https://cwiki.apache.org/confluence/display/PIG/PigTutorial>`_)
570 (in that case, redirection will be empty)
571 @param stop_on_failure if True, add option ``-stop_on_failure`` on the command line
572 @param check if True, add option ``-check`` (in that case, redirection will be empty)
573 @param no_exception sent to @see me execute_command
574 @param fLOG logging function
575 @return out, err from @see me execute_command
577 If *redirection* is not empty, the job is submitted but
578 the function returns after the standard output and error were
579 redirected to ``redirection.out`` and ``redirection.err``.
581 The first file will contain the results of commands
582 `DESCRIBE <http://pig.apache.org/docs/r0.14.0/test.html#describe>`_
583 `DUMP <http://pig.apache.org/docs/r0.14.0/test.html#dump>`_,
584 `EXPLAIN <http://pig.apache.org/docs/r0.14.0/test.html#explain>`_.
585 The standard error receives logs and exceptions.
587 The function executes the command line::
589 pig -execute -f <filename>
591 With redirection::
593 pig -execute -f <filename> 2> redirection.pig.err 1> redirection.pig.out &
595 .. versionadded:: 1.1
596 """
597 dest = os.path.split(pig_file)[-1]
598 self.upload(pig_file, dest)
599 if dependencies is not None:
600 for py in dependencies:
601 self.upload(py, os.path.split(py)[-1])
603 slocal = " -x local" if local else ""
604 sstop_on_failure = " -stop_on_failure" if stop_on_failure else ""
605 scheck = " -check" if check else ""
607 if local or check:
608 redirection = None
610 if params is not None:
611 sparams = ASSHClient.build_command_line_parameters(params)
612 if len(sparams) > 0:
613 sparams = " " + sparams
614 else:
615 sparams = ""
617 if redirection is None:
618 cmd = "pig{0}{1}{2} -execute -f {3}{4}".format(
619 slocal,
620 sstop_on_failure,
621 scheck,
622 dest,
623 sparams)
624 else:
625 cmd = "pig{2}{3}{4} -execute -f {0}{5} 2> {1}.err 1> {1}.out &".format(
626 dest,
627 redirection,
628 slocal,
629 sstop_on_failure,
630 scheck,
631 sparams)
633 if isinstance(cmd, list):
634 raise TypeError("this should not happen:" + str(cmd))
636 fLOG("[pig_submit]:", cmd)
637 out, err = self.execute_command(cmd, no_exception=no_exception)
638 return out, err
640 def hive_submit(self, hive_file_or_query,
641 params=None,
642 redirection="redirection.hive",
643 no_exception=True,
644 fLOG=noLOG):
645 """
646 submits a PIG script, it first upload the script
647 to the default folder and submit it
649 @param hive_file_or_query pig script (local)
650 @param params parameters to send to the job
651 @param redirection string empty or not
652 @param no_exception sent to @see me execute_command
653 @param fLOG logging function
654 @return out, err from @see me execute_command
656 If *redirection* is not empty, the job is submitted but
657 the function returns after the standard output and error were
658 redirected to ``redirection.hive.out`` and ``redirection.hive.err``.
660 The function executes the command line::
662 hive -f <filename>
664 Or::
666 hive -e <query>
668 With redirection::
670 hive -execute -f <filename> 2> redirection.hive.err 1> redirection.hive.out &
672 If there is no redirection, the function
673 waits and return the output.
675 .. exref::
676 :title: Submit a HIVE query
677 :tag: Hadoop
679 ::
681 client = ASSHClient()
683 hive_sql = '''
684 DROP TABLE IF EXISTS bikes20;
685 CREATE TABLE bikes20 (sjson STRING);
686 LOAD DATA INPATH "/user/__USERNAME__/unittest2/paris*.txt" INTO TABLE bikes20;
687 SELECT * FROM bikes20 LIMIT 10;
688 '''.replace("__USERNAME__", self.client.username)
690 out,err = client.hive_submit(hive_sql, redirection=None)
692 .. versionadded:: 1.1
693 """
694 if is_file_string(hive_file_or_query) and os.path.exists(hive_file_or_query):
695 dest = os.path.split(hive_file_or_query)[-1]
696 self.upload(hive_file_or_query, dest)
697 command = "-f"
698 else:
699 command = "-e"
700 dest = hive_file_or_query.replace(
701 "\n", " ").replace("\r", "").replace("\t", " ")
702 dest = dest.replace("'", "\\'")
703 dest = "'{}'".format(dest.strip())
705 if params is not None:
706 sparams = ASSHClient.build_command_line_parameters(
707 params, "-hiveconf")
708 if len(sparams) > 0:
709 sparams = " " + sparams
710 else:
711 sparams = ""
713 if redirection is None:
714 cmd = "hive {0} {1}{2}".format(
715 command,
716 dest,
717 sparams)
718 else:
719 cmd = "hive {0} {1}{2} 2> {3}.err 1> {3}.out &".format(
720 command,
721 dest,
722 sparams,
723 redirection)
725 if isinstance(cmd, list):
726 raise TypeError("this should not happen:" + str(cmd))
728 warnings.warn("Hive submission is not tested. It will probably fail.")
730 fLOG("[hive_submit]:", cmd)
731 out, err = self.execute_command(cmd, no_exception=no_exception)
732 return out, err