Coverage for src/pyenbc/remote/magic_azure.py: 32%
531 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 05:47 +0200
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 05:47 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Magic command to run PIG script with Azure.
5"""
6import sys
7import os
8from IPython.core.magic import magics_class, line_magic, cell_magic
9from IPython.core.display import HTML
10from pyquickhelper.loghelper import run_cmd
11from pyquickhelper.ipythonhelper import MagicClassWithHelpers, MagicCommandParser
12from .azure_connection import AzureClient, AzureException
13from ..filehelper.jython_helper import run_jython, download_java_standalone
16@magics_class
17class MagicAzure(MagicClassWithHelpers):
19 """
20 Defines magic commands to access
21 `blob storage <http://azure.microsoft.com/fr-fr/documentation/articles/storage-dotnet-how-to-use-blobs/>`_
22 and `HDInsight <http://azure.microsoft.com/fr-fr/services/hdinsight/>`_.
24 When the container is not specified, it will take the default one.
26 .. faqref::
27 :title: Magic command %blob_open does not work
29 Try this::
31 %load_ext pyenbc
33 The exception tells more about what goes wrong.
34 Usually a module is missing.
36 .. faqref::
37 :title: Incorrect padding
39 The following crypted message happens sometimes::
41 Error: Incorrect padding
43 It is usually due to an incorrect password.
44 Some notebooks uses::
46 import pyquickhelper.ipythonhelper as ipy
47 params={"blob_storage":"hdblobstorage", "password":""}
48 ipy.open_html_form(params=params,title="credentials",key_save="blobservice")
50 blobstorage = blobservice["blob_storage"]
51 blobpassword = blobservice["password"]
53 %load_ext pyenbc
54 %blob_open
56 This code avoids the author letting password in a notebook
57 but you can just replace everything by::
59 blobstorage = "<username>"
60 blobpassword = "****long*key*******=="
62 %load_ext pyenbc
63 %blob_open
64 """
66 def create_client(self, account_name, account_key,
67 hadoop_server=None, hadoop_password=None, username=None):
68 """
69 Create a @see cl AzureClient and stores in the workspace.
71 @param account_name login
72 @param account_key password
73 @param hadoop_server hadoop server
74 @param hadoop_password hadoop password
75 @param username username
76 @return instance of @see cl AzureClient
77 """
78 if username is None:
79 username = "any"
80 cl = AzureClient(
81 account_name,
82 account_key,
83 hadoop_server,
84 hadoop_password,
85 pseudo=username)
86 self.shell.user_ns["remote_azure_client"] = cl
87 return cl
89 def _replace_params(self, cell):
90 """
91 replaces parameter such ``__PASSWORD__`` by variable in the notebook environment
93 @param cell string
94 @return modified string
95 """
96 if "__PASSWORD__" in cell and self.shell is not None and "password" in self.shell.user_ns:
97 cell = cell.replace("__PASSWORD__", self.shell.user_ns["password"])
98 return cell
100 def get_blob_connection(self):
101 """
102 returns the connection stored in the workspace
103 """
104 if self.shell is None:
105 raise Exception("No detected workspace.")
107 if "remote_azure_client" not in self.shell.user_ns:
108 raise KeyError("No opened Azure connection.")
110 if "remote_azure_blob" not in self.shell.user_ns:
111 raise KeyError("No opened Blob Storage connection.")
113 cl = self.shell.user_ns["remote_azure_client"]
114 bs = self.shell.user_ns["remote_azure_blob"]
115 return cl, bs
117 @line_magic
118 def azureclient(self, line):
119 """
120 returns the AzureClient object
121 """
122 cl, _ = self.get_blob_connection()
123 return cl
125 @line_magic
126 def blobservice(self, line):
127 """
128 returns the BlobService object
129 """
130 _, bs = self.get_blob_connection()
131 return bs
133 @line_magic
134 def blobcontainer(self, line):
135 """
136 returns the Blob Storage container
137 """
138 cl, _ = self.get_blob_connection()
139 return cl.account_name
141 @staticmethod
142 def blob_open_parser():
143 """
144 defines the way to parse the magic command ``%blob_open``
145 """
146 parser = MagicCommandParser(prog="blob_open",
147 description='open a connection to an Azure blob storage, by default, ' +
148 'the magic command takes blobstorage and blobpassword local variables as default values')
149 parser.add_argument(
150 '-b',
151 '--blobstorage',
152 type=str,
153 default='blobstorage',
154 help='blob storage name')
155 parser.add_argument(
156 '-p',
157 '--blobpassword',
158 type=str,
159 default='blobpassword',
160 help='blob password')
161 return parser
163 @line_magic
164 def blob_open(self, line):
165 """
166 .. nbref::
167 :tag: Azure
168 :title: blob_open
170 Opens a connection to blob service.
171 It returns objects @see cl AzureClient and
172 `BlobService <http://www.xavierdupre.fr/app/azure-sdk-for-python/helpsphinx/storage/blobservice.html?
173 highlight=blobservice#azure.storage.blobservice.BlobService>`_.
175 The code for magic command ``%blob_open`` is equivalent to::
177 from pyenbc.remote import AzureClient
178 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
179 bs = cl.open_blob_service()
181 .. versionchanged:: 1.1
182 """
183 parser = self.get_parser(MagicAzure.blob_open_parser, "blob_open")
184 args = self.get_args(line, parser)
186 if args is not None:
187 server = args.blobstorage
188 password = args.blobpassword
189 if self.shell is None:
190 raise Exception("No detected workspace.")
192 if "remote_azure_blob" in self.shell.user_ns:
193 raise Exception(
194 "a connection is still open, close it first (stored in remote_azure_blob local variable)")
196 cl = self.create_client(server, password)
197 bs = cl.open_blob_service()
198 self.shell.user_ns["remote_azure_blob"] = bs
199 return cl, bs
200 return None
202 @staticmethod
203 def hd_open_parser():
204 """
205 defines the way to parse the magic command ``%hd_open``
206 """
207 parser = MagicCommandParser(prog="hd_open",
208 description='open a connection to an Azure blob storage and a HD Insight cluster, ' +
209 'by default, the magic command takes blobstorage, blobpassword, hadoop_server, ' +
210 'hadoop_password local variables as default values')
211 parser.add_argument(
212 '-b',
213 '--blobstorage',
214 type=str,
215 default='blobstorage',
216 help='blob storage name')
217 parser.add_argument(
218 '-p',
219 '--blobpassword',
220 type=str,
221 default='blobpassword',
222 help='blob password')
223 parser.add_argument(
224 '-s',
225 '--hadoop_server',
226 type=str,
227 default='hadoop_server',
228 help='hadoop server name')
229 parser.add_argument(
230 '-P',
231 '--hadoop_password',
232 type=str,
233 default='hadoop_password',
234 help='hadoop password')
235 parser.add_argument(
236 '-u',
237 '--username',
238 type=str,
239 default='username',
240 help='username (used as a prefix to avoid conflict when multiple users are using the same connection')
241 return parser
243 @line_magic
244 def hd_open(self, line):
245 """
246 Opens a connection to blob service.
248 .. nbref::
249 :tag: Azure
250 :title: hd_open
252 Opens a connection to blob service.
253 It returns objects @see cl AzureClient and
254 `BlobService <http://www.xavierdupre.fr/app/azure-sdk-for-python/helpsphinx/storage/blobservice.html?
255 highlight=blobservice#azure.storage.blobservice.BlobService>`_.
257 The code for magic command ``%hd_open`` is equivalent to::
259 from pyenbc.remote import AzureClient
260 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
261 bs = cl.open_blob_service()
262 """
263 parser = self.get_parser(MagicAzure.hd_open_parser, "hd_open")
264 args = self.get_args(line, parser)
266 if args is not None:
267 server = args.blobstorage
268 password = args.blobpassword
269 hadoop_server = args.hadoop_server
270 hadoop_password = args.hadoop_password
271 username = args.username
273 if self.shell is None:
274 raise Exception("No detected workspace.")
276 if "remote_azure_blob" in self.shell.user_ns:
277 raise Exception(
278 "a connection is still open, close it first (stored in remote_azure_blob local variable)")
280 cl = self.create_client(
281 server,
282 password,
283 hadoop_server,
284 hadoop_password,
285 username=username)
286 bs = cl.open_blob_service()
287 self.shell.user_ns["remote_azure_blob"] = bs
288 return cl, bs
289 return None
291 @line_magic
292 def blob_close(self, line):
293 """
294 close the connection and remove the connection
295 from the notebook workspace
297 .. nbref::
298 :tag: Azure
299 :title: blob_close
301 Does nothing.
302 """
303 _, bs = self.get_blob_connection()
304 # bs.close()
305 del self.shell.user_ns["remote_azure_blob"]
306 return True
308 @line_magic
309 def blob_containers(self, line):
310 """
311 returns the list of containers
312 """
313 if "-h" in line or "--help" in line:
314 print("Usage: %blob_containers")
315 return None
316 else:
317 _, bs = self.get_blob_connection()
318 res = bs.list_containers()
319 return [r.name for r in res]
321 def _interpret_path(self, line, cl, bs, empty_is_value=False):
322 """
323 interpret a path
325 @param line line (see :ref:`l-magic-path-container`)
326 @param cl @see cl AzureClient
327 @param bs blob service
328 @param empty_is_value if True, do not raise an exception
329 @return container, remotepath
330 """
331 line = line.strip()
332 if line.startswith("/"):
333 container = cl.account_name
334 line = line.lstrip("/")
335 remotepath = line
336 else:
337 spl = line.split("/")
338 container = spl[0]
339 remotepath = None if len(spl) == 1 else "/".join(spl[1:])
341 if not empty_is_value and (remotepath is None or len(remotepath) == 0):
342 raise FileNotFoundError("path should not be empty: " + line)
344 return container, remotepath
346 @staticmethod
347 def blob_ls_parser():
348 """
349 defines the way to parse the magic command ``%blob_ls``
350 """
351 parser = MagicCommandParser(prog="blob_ls",
352 description='describes the content of folder in a blob storage')
353 parser.add_argument(
354 'path',
355 type=str,
356 help='path to look into, </path> or <container>/<path>')
357 return parser
359 @line_magic
360 def blob_ls(self, line):
361 """
362 defines command %blob_ls, see :ref:`l-magic-path-container`
364 .. nbref::
365 :tag: Azure
366 :title: blob_ls
368 The code for magic command ``%blob_ls`` is equivalent to::
370 from pyenbc.remote import AzureClient
371 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
372 bs = cl.open_blob_service()
373 df = cl.ls(bs, container, remotepath)
374 """
375 parser = self.get_parser(MagicAzure.blob_ls_parser, "blob_ls")
376 args = self.get_args(line, parser)
378 if args is not None:
379 cl, bs = self.get_blob_connection()
380 container, remotepath = self._interpret_path(
381 args.path, cl, bs, True)
382 df = cl.ls(bs, container, remotepath)
383 if len(df) > 0:
384 return df[["name", "last_modified",
385 "content_type", "content_length", "blob_type"]]
386 else:
387 return df
388 return None
390 @staticmethod
391 def blob_lsl_parser():
392 """
393 defines the way to parse the magic command ``%blob_lsl``
394 """
395 parser = MagicCommandParser(prog="blob_lsl",
396 description='describes the content of folder in a blob storage + metadata')
397 parser.add_argument(
398 'path',
399 type=str,
400 help='path to look into, </path> or <container>/<path>')
401 return parser
403 @line_magic
404 def blob_lsl(self, line):
405 """
406 defines command %blob_lsl (extended version of blob_lsl),
407 see :ref:`l-magic-path-container`
409 .. nbref::
410 :tag: Azure
411 :title: blob_lsl
413 The code for magic command ``%blob_lsl`` is equivalent to::
415 from pyenbc.remote import AzureClient
416 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
417 bs = cl.open_blob_service()
418 df = cl.ls(bs, container, remotepath, add_metadata=True)
419 """
420 parser = self.get_parser(MagicAzure.blob_lsl_parser, "blob_lsl")
421 args = self.get_args(line, parser)
423 if args is not None:
424 cl, bs = self.get_blob_connection()
425 container, remotepath = self._interpret_path(
426 args.path, cl, bs, True)
427 return cl.ls(bs, container, remotepath, add_metadata=True)
428 return None
430 @staticmethod
431 def blob_up_parser():
432 """
433 Defines the way to parse the magic command ``%blob_up``.
434 """
435 parser = MagicCommandParser(prog="blob_up", description='upload a file on a blob storage, ' +
436 'we assume the container is the first element to the remote path')
437 parser.add_argument(
438 'localfile',
439 type=str,
440 help='local file to upload')
441 parser.add_argument(
442 'remotepath',
443 type=str,
444 help='remote path of the uploaded file')
445 return parser
447 @line_magic
448 def blob_up(self, line):
449 """
450 upload a file to the blob storage,
451 we assume the container is the first element of the path,
452 see :ref:`l-magic-path-container`
454 Example::
456 %blob_up localfile remotepath
458 the command does not allow spaces in files
460 .. nbref::
461 :tag: Azure
462 :title: blob_up
464 The code for magic command ``%blob_up`` is equivalent to::
466 from pyenbc.remote import AzureClient
467 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
468 bs = cl.open_blob_service()
469 cl.upload(bs, container, remotepath, localfile)
470 """
471 parser = self.get_parser(MagicAzure.blob_up_parser, "blob_up")
472 args = self.get_args(line, parser)
474 if args is not None:
475 localfile, remotepath = args.localfile, args.remotepath
476 if not os.path.exists(localfile):
477 raise FileNotFoundError(localfile)
479 cl, bs = self.get_blob_connection()
480 container, remotepath = self._interpret_path(remotepath, cl, bs)
481 cl.upload(bs, container, remotepath, localfile)
482 return remotepath
483 return None
485 @staticmethod
486 def blob_down_parser():
487 """
488 Defines the way to parse the magic command ``%blob_down``.
489 """
490 parser = MagicCommandParser(prog="blob_down", description='download a file from a blob storage, we assume the container ' +
491 'is the first element to the remote path')
492 parser.add_argument(
493 'remotepath',
494 type=str,
495 help='remote path of the file to download')
496 parser.add_argument(
497 'localfile',
498 type=str,
499 help='local name for the downloaded file')
500 parser.add_argument(
501 '-o',
502 '--overwrite',
503 action='store_true',
504 default=False,
505 help='overwrite the local file')
506 return parser
508 @line_magic
509 def blob_down(self, line):
510 """
511 download a file from the blob storage,
512 see :ref:`l-magic-path-container`
514 Example::
516 %blob_down remotepath localfile
518 the command does not allow spaces in file names
520 .. nbref::
521 :tag: Azure
522 :title: blob_down
524 The code for magic command ``%blob_down`` is equivalent to::
526 from pyenbc.remote import AzureClient
527 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
528 bs = cl.open_blob_service()
529 cl.download(bs, container, remotepath, localfile)
530 """
531 parser = self.get_parser(MagicAzure.blob_down_parser, "blob_down")
532 args = self.get_args(line, parser)
534 if args is not None:
535 localfile, remotepath = args.localfile, args.remotepath
536 if os.path.exists(localfile):
537 if args.overwrite:
538 os.remove(localfile)
539 else:
540 raise Exception(
541 "file {0} cannot be overwritten".format(localfile))
542 cl, bs = self.get_blob_connection()
543 container, remotepath = self._interpret_path(remotepath, cl, bs)
544 cl.download(bs, container, remotepath, localfile)
545 return localfile
546 return None
548 @staticmethod
549 def blob_downmerge_parser():
550 """
551 defines the way to parse the magic command ``%blob_downmerge``
552 """
553 parser = MagicCommandParser(prog="blob_downmerge",
554 description='download a set of files from a blob storage folder, files will ' +
555 'be merged, we assume the container is the first element to the remote path')
556 parser.add_argument(
557 'remotepath',
558 type=str,
559 help='remote path of the folder to download')
560 parser.add_argument(
561 'localfile',
562 type=str,
563 help='local name for the downloaded merged file')
564 parser.add_argument(
565 '-o',
566 '--overwrite',
567 action='store_true',
568 default=False,
569 help='overwrite the local file')
570 return parser
572 @line_magic
573 def blob_downmerge(self, line):
574 """
575 download all files from a folder,
576 see :ref:`l-magic-path-container`
578 Example::
580 %blob_downmerge remotepath localfile
582 the command does not allow spaces in file names
584 .. nbref::
585 :tag: Azure
586 :title: blob_downmerge
588 The code for magic command ``%blob_downmerge`` is equivalent to::
590 from pyenbc.remote import AzureClient
591 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
592 bs = cl.open_blob_service()
593 cl.download_merge(bs, container, remotepath, localfile)
595 .. versionadded:: 1.1
596 """
597 parser = self.get_parser(
598 MagicAzure.blob_downmerge_parser, "blob_downmerge")
599 args = self.get_args(line, parser)
601 if args is not None:
602 localfile, remotepath = args.localfile, args.remotepath
603 if os.path.exists(localfile):
604 if args.overwrite:
605 os.remove(localfile)
606 else:
607 raise Exception(
608 "file {0} cannot be overwritten".format(localfile))
610 cl, bs = self.get_blob_connection()
611 container, remotepath = self._interpret_path(remotepath, cl, bs)
612 cl.download_merge(bs, container, remotepath, localfile)
613 return localfile
614 return None
616 @line_magic
617 def blob_rm(self, line):
618 """
619 calls @see me blob_delete
621 .. versionadded:: 1.1
622 """
623 return self.blob_delete(line)
625 @staticmethod
626 def blob_delete_parser():
627 """
628 defines the way to parse the magic command ``%blob_delete``
629 """
630 parser = MagicCommandParser(prog="blob_delete",
631 description='remove a remote path')
632 parser.add_argument(
633 'remotepath',
634 type=str,
635 help='remote path to remove')
636 return parser
638 @line_magic
639 def blob_delete(self, line):
640 """
641 deletes a blob,
642 see :ref:`l-magic-path-container`
644 .. nbref::
645 :tag: Azure
646 :title: blob_delete
648 The code for magic command ``%blob_delete`` is equivalent to::
650 from pyenbc.remote import AzureClient
651 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
652 bs = cl.open_blob_service()
653 cl.delete_blob(bs, container, remotepath)
654 """
655 parser = self.get_parser(MagicAzure.blob_delete_parser, "blob_delete")
656 args = self.get_args(line, parser)
658 if args is not None:
659 cl, bs = self.get_blob_connection()
660 container, remotepath = self._interpret_path(
661 args.remotepath, cl, bs)
662 cl.delete_blob(bs, container, remotepath)
663 return True
664 return None
666 @staticmethod
667 def blob_rmr_parser():
668 """
669 defines the way to parse the magic command ``%blob_rmr``
670 """
671 parser = MagicCommandParser(prog="blob_rmr",
672 description='remove a remote folder')
673 parser.add_argument(
674 'remotepath',
675 type=str,
676 help='remote path to remove')
677 return parser
679 @line_magic
680 def blob_rmr(self, line):
681 """
682 deletes a folder,
683 see :ref:`l-magic-path-container`
685 .. nbref::
686 :tag: Azure
687 :title: blob_rmr
689 The code for magic command ``%blob_rmr`` is equivalent to::
691 from pyenbc.remote import AzureClient
692 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
693 bs = cl.open_blob_service()
694 cl.delete_folder(bs, container, remotepath)
695 """
696 parser = self.get_parser(MagicAzure.blob_rmr_parser, "blob_rmr")
697 args = self.get_args(line, parser)
699 if args is not None:
700 cl, bs = self.get_blob_connection()
701 container, remotepath = self._interpret_path(
702 args.remotepath, cl, bs)
703 return cl.delete_folder(bs, container, remotepath)
704 return None
706 @staticmethod
707 def blob_copy_parser():
708 """
709 defines the way to parse the magic command ``%blob_copy``
710 """
711 parser = MagicCommandParser(prog="blob_copy",
712 description='copy a blob folder')
713 parser.add_argument(
714 'remotepath',
715 type=str,
716 help='remote path to remove')
717 parser.add_argument(
718 'remotedest',
719 type=str,
720 help='remote destination')
721 return parser
723 @line_magic
724 def blob_copy(self, line):
725 """
726 copy a blob storage,
727 see :ref:`l-magic-path-container`
729 .. nbref::
730 :tag: Azure
731 :title: blob_copy
733 The code for magic command ``%blob_copy`` is equivalent to::
735 from pyenbc.remote import AzureClient
736 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
737 bs = cl.open_blob_service()
738 cl.copy_blob(bs, container, dest, src)
739 """
740 parser = self.get_parser(MagicAzure.blob_copy_parser, "blob_copy")
741 args = self.get_args(line, parser)
743 if args is not None:
744 src, dest = args.remotepath, args.remotedest
745 cl, bs = self.get_blob_connection()
746 container, src = self._interpret_path(src, cl, bs)
747 container_, dest = self._interpret_path(dest, cl, bs)
748 if container != container_:
749 raise AzureException(
750 "containers should be the same: {0} != {1}".format(
751 container,
752 container_),
753 None)
754 cl.copy_blob(bs, container, dest, src)
755 return True
756 return None
758 @staticmethod
759 def hd_queue_parser():
760 """
761 defines the way to parse the magic command ``%hd_queue``
762 """
763 parser = MagicCommandParser(prog="hd_queue",
764 description='displays the job queue')
765 parser.add_argument(
766 '-s',
767 '--showall',
768 action="store_true",
769 default=False,
770 help="show all jobs, only users'")
771 return parser
773 @line_magic
774 def hd_queue(self, line):
775 """
776 defines ``%hd_queue``
778 .. nbref::
779 :tag: Azure
780 :title: hd_queue
782 The code for magic command ``%hd_queue`` is equivalent to::
784 from pyenbc.remote import AzureClient
785 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
786 bs = cl.open_blob_service()
787 cl.job_queue(showall=showall)
788 """
789 parser = self.get_parser(MagicAzure.hd_queue_parser, "hd_queue")
790 args = self.get_args(line, parser)
792 if args is not None:
793 showall = args.showall
794 cl, _ = self.get_blob_connection()
795 return cl.job_queue(showall=showall)
796 return None
798 @staticmethod
799 def hd_job_status_parser():
800 """
801 defines the way to parse the magic command ``%hd_job_status``
802 """
803 parser = MagicCommandParser(prog="hd_job_status",
804 description='get the status of the job')
805 parser.add_argument(
806 'jobid',
807 type=str,
808 help='job id')
809 return parser
811 @line_magic
812 def hd_job_status(self, line):
813 """
814 defines ``%hd_job_status``
816 .. nbref::
817 :tag: Azure
818 :title: hd_job_status
820 The code for magic command ``%hd_job_status`` is equivalent to::
822 from pyenbc.remote import AzureClient
823 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
824 bs = cl.open_blob_service()
825 cl.job_status(jobid)
826 """
827 parser = self.get_parser(
828 MagicAzure.hd_job_status_parser, "hd_job_status")
829 args = self.get_args(line, parser)
831 if args is not None:
832 jobid = args.jobid
833 cl, _ = self.get_blob_connection()
834 return cl.job_status(jobid)
835 return None
837 @staticmethod
838 def hd_job_kill_parser():
839 """
840 defines the way to parse the magic command ``%hd_job_kill``
841 """
842 parser = MagicCommandParser(prog="hd_job_kill",
843 description='kill a job')
844 parser.add_argument(
845 'jobid',
846 type=str,
847 help='job id')
848 return parser
850 @line_magic
851 def hd_job_kill(self, line):
852 """
853 defines ``%hd_job_kill``
855 .. nbref::
856 :tag: Azure
857 :title: hd_job_kill
859 The code for magic command ``%hd_job_kill`` is equivalent to::
861 from pyenbc.remote import AzureClient
862 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
863 bs = cl.open_blob_service()
864 cl.job_kill(jobid)
865 """
866 parser = self.get_parser(MagicAzure.hd_job_kill_parser, "hd_job_kill")
867 args = self.get_args(line, parser)
869 if args is not None:
870 jobid = args.jobid
871 cl, _ = self.get_blob_connection()
872 return cl.job_kill(jobid)
873 return None
875 @line_magic
876 def hd_wasb_prefix(self, line):
877 """
878 defines ``%hd_wasb_prefix``, returns the prefix used to connect to the blob storage,
879 it includes the *container* name
880 """
881 cl, _ = self.get_blob_connection()
882 return cl.wasb_to_file(cl.account_name, "")
884 @staticmethod
885 def PIG_azure_parser():
886 """
887 defines the way to parse the magic command ``%%PIG_azure``
888 """
889 parser = MagicCommandParser(prog="PIG_azure",
890 description='The command store the content of the cell as a local file.')
891 parser.add_argument(
892 'file',
893 type=str,
894 help='file name')
895 return parser
897 @cell_magic
898 def PIG_azure(self, line, cell=None):
899 """
900 defines command ``%%PIG_azure``
902 .. nbref::
903 :tag: Azure
904 :title: PIG_azure
906 The code for magic command ``%PIG_azure`` is equivalent to::
908 with open(filename, "w", encoding="utf8") as f:
909 f.write(script)
912 """
913 parser = self.get_parser(MagicAzure.PIG_azure_parser, "PIG_azure")
914 args = self.get_args(line, parser)
916 if args is not None:
917 filename = args.file
918 script = cell.replace("\r", "")
919 with open(filename, "w", encoding="utf8") as f:
920 f.write(script)
922 @staticmethod
923 def HIVE_azure_parser():
924 """
925 defines the way to parse the magic command ``%HIVE_azure``
926 """
927 parser = MagicCommandParser(prog="HIVE_azure",
928 description='The command store the content of the cell as a local file.')
929 parser.add_argument(
930 'file',
931 type=str,
932 help='file name')
933 return parser
935 @cell_magic
936 def HIVE_azure(self, line, cell=None):
937 """
938 defines command ``%%HIVE_azure``
940 .. nbref::
941 :tag: Azure
942 :title: HIVE_azure
944 The code for magic command ``%HIVE_azure`` is equivalent to::
946 with open(filename, "w", encoding="utf8") as f:
947 f.write(script)
950 """
951 parser = self.get_parser(MagicAzure.HIVE_azure_parser, "HIVE_azure")
952 args = self.get_args(line, parser)
954 if args is not None:
955 filename = args.file
956 script = cell.replace("\r", "")
957 with open(filename, "w", encoding="utf8") as f:
958 f.write(script)
960 @staticmethod
961 def HIVE_azure_submit_parser():
962 """
963 Defines the way to parse the magic command ``%HIVE_azure_submit``.
964 """
965 parser = MagicCommandParser(prog="HIVE_azure_submit",
966 description='Submits a job to the cluster, the job is local, the job is ' +
967 'first uploaded to the cluster. The magic command populates the local ' +
968 'variable last_job with the submitted job id.')
969 parser.add_argument(
970 'file',
971 type=str,
972 help='file name')
973 parser.add_argument(
974 '-d',
975 '--dependency',
976 nargs="*",
977 type=str,
978 help='dependency of the job, the python script')
979 parser.add_argument(
980 '-s',
981 '--stop-on-failure',
982 action='store_true',
983 default=False,
984 help='if true, the job stops on failure right away')
985 parser.add_argument(
986 '-o',
987 '--options',
988 nargs='*',
989 type=str,
990 help='list of options for the job')
991 return parser
993 @line_magic
994 def HIVE_azure_submit(self, line):
995 """
996 Defines command ``%HIVE_azure_submit``.
998 .. nbref::
999 :tag: Azure
1000 :title: HIVE_azure_submit
1002 The code for magic command ``%HIVE_azure_submit`` is equivalent to::
1004 from pyenbc.remote import AzureClient
1005 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
1006 bs = cl.open_blob_service()
1007 cl.hive_submit(bs, cl.account_name, hive_file_name, dependencies, **options)
1008 """
1009 parser = self.get_parser(
1010 MagicAzure.HIVE_azure_submit_parser, "HIVE_azure_submit")
1011 args = self.get_args(line, parser)
1013 if args is not None:
1014 pig = args.file
1015 pys = [_ for _ in args.dependency if _.endswith(
1016 ".py")] if args.dependency is not None else []
1018 if not os.path.exists(pig):
1019 raise FileNotFoundError(pig)
1021 options = {"stop_on_failure": False}
1022 if args.options is not None:
1023 options.update({k: True for k in args.options})
1025 cl, bs = self.get_blob_connection()
1026 r = cl.HIVE_submit(bs, cl.account_name, pig, pys, **options)
1028 self.shell.user_ns["last_job"] = r
1029 return r
1030 return None
1032 @staticmethod
1033 def hd_pig_submit_parser():
1034 """
1035 Defines the way to parse the magic command ``%hd_pig_submit``.
1036 """
1037 parser = MagicCommandParser(prog="hd_pig_submit",
1038 description='Submits a job to the cluster, the job is local, the job is ' +
1039 'first uploaded to the cluster. The magic command populates the local ' +
1040 'variable last_job with the submitted job id.')
1041 parser.add_argument(
1042 'file',
1043 type=str,
1044 help='file name')
1045 parser.add_argument(
1046 '-d',
1047 '--dependency',
1048 nargs="*",
1049 type=str,
1050 help='dependency of the job, the python script')
1051 parser.add_argument(
1052 '-s',
1053 '--stop-on-failure',
1054 action='store_true',
1055 default=False,
1056 help='if true, the job stops on failure right away')
1057 parser.add_argument(
1058 '-o',
1059 '--options',
1060 nargs='*',
1061 type=str,
1062 help='list of options for the job')
1063 return parser
1065 @line_magic
1066 def hd_pig_submit(self, line):
1067 """
1068 Defines command ``%hd_pig_submit``.
1070 .. nbref::
1071 :tag: Azure
1072 :title: hd_pig_submit
1074 The code for magic command ``%hd_pig_submit`` is equivalent to::
1076 from pyenbc.remote import AzureClient
1077 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
1078 bs = cl.open_blob_service()
1079 cl.pig_submit(bs, cl.account_name, pig_file_name, dependencies, **options)
1080 """
1081 parser = self.get_parser(
1082 MagicAzure.hd_pig_submit_parser, "hd_pig_submit")
1083 args = self.get_args(line, parser)
1085 if args is not None:
1086 pig = args.file
1087 pys = [_ for _ in args.dependency if _.endswith(
1088 ".py")] if args.dependency is not None else []
1090 if not os.path.exists(pig):
1091 raise FileNotFoundError(pig)
1093 options = {"stop_on_failure": False}
1094 if args.options is not None:
1095 options.update({k: True for k in args.options})
1097 cl, bs = self.get_blob_connection()
1098 r = cl.pig_submit(bs, cl.account_name, pig, pys, **options)
1100 self.shell.user_ns["last_job"] = r
1101 return r
1102 return None
1104 @staticmethod
1105 def hd_tail_stderr_parser():
1106 """
1107 defines the way to parse the magic command ``%hd_tail_stderr``
1108 """
1109 parser = MagicCommandParser(prog="hd_tail_stderr",
1110 description='Submits a job to the cluster, the job is local, the job is first ' +
1111 'uploaded to the cluster. The magic command populates the local variable ' +
1112 'last_job with the submitted job id.')
1113 parser.add_argument(
1114 'jobid',
1115 type=str,
1116 help='job id')
1117 parser.add_argument(
1118 '-n',
1119 '--nblines',
1120 type=int,
1121 default=20,
1122 help='number of lines to display')
1123 parser.add_argument(
1124 '--raw-output',
1125 default=False,
1126 action='store_true',
1127 help='display raw text instead of HTML')
1128 return parser
1130 @line_magic
1131 def hd_tail_stderr(self, line):
1132 """
1133 defines ``%hd_tail_stderr``
1135 @warning This function gets the status of the job to get the script name.
1136 But the rediction uses the script name and not the job id. As a consequence,
1137 if the same script name is run multiple times, the redirection will contain
1138 the output of multiples jobs.
1140 .. nbref::
1141 :tag: Azure
1142 :title: hd_tail_stderr
1144 The code for magic command ``%hd_tail_stderr`` is equivalent to::
1146 from pyenbc.remote import AzureClient
1147 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
1148 bs = cl.open_blob_service()
1149 cl.standard_outputs(job_id, bs, cl.account_name, ".")
1150 """
1151 parser = self.get_parser(
1152 MagicAzure.hd_tail_stderr_parser, "hd_tail_stderr")
1153 args = self.get_args(line, parser)
1155 if args is not None:
1156 job = args.jobid
1157 nbline = args.nblines
1158 if len(job) == 0:
1159 if self.shell is None or "last_job" not in self.shell.user_ns:
1160 raise Exception("no submitted jobs found in the workspace")
1161 else:
1162 job = self.shell.user_ns["last_job"]["jid"]
1164 cl, bs = self.get_blob_connection()
1165 out, err = cl.standard_outputs(job, bs, cl.account_name, ".")
1167 lines = err.split("\n")
1168 show = "\n".join(_.strip("\n\r") for _ in lines[-nbline:])
1169 show = show.replace(
1170 "ERROR",
1171 '<b><font color="#DD0000">ERROR</font></b>')
1173 if args.raw_output:
1174 if len(out) > 0:
1175 lineo = out.split("\n")
1176 shoo = "\n".join(_.strip("\n\r") for _ in lineo[-nbline:])
1177 return shoo
1178 else:
1179 return show
1180 else:
1181 if len(out) > 0:
1182 lineo = out.split("\n")
1183 shoo = "\n".join(_.strip("\n\r") for _ in lineo[-nbline:])
1184 return HTML(
1185 "<pre>\n%s\n</pre><br /><b>OUT:</b><br /><pre>\n%s\n</pre>" % (show, shoo))
1186 else:
1187 return HTML("<pre>\n%s\n</pre><br />" % show)
1188 return None
1190 def _run_jython(self, cell, filename, func_name, args, true_jython=None):
1191 """
1192 run a jython script
1194 @param cell content of the cell
1195 @param filename filename used to store the content of the cell
1196 @param func_name function name
1197 @param args list of arguments to run
1198 @param true_jython jython (True) or this Python (False)
1199 @return out, err
1200 """
1201 with open(filename, 'r', encoding="utf8") as pyf:
1202 content = pyf.read()
1203 temp = filename.replace(".py", ".temp.py")
1204 with open(temp, "w", encoding="utf8") as pyf:
1205 pyf.write("""
1206 # -*- coding: utf8 -*-
1207 if __name__ != '__lib__':
1208 def outputSchema(dont_care):
1209 def wrapper(func):
1210 def inner(*args, **kwargs):
1211 return func(*args, **kwargs)
1212 return inner
1213 return wrapper
1214 def outputSchemaFunction(schema_def):
1215 def decorator(func):
1216 func.outputSchemaFunction = schema_def
1217 return func
1218 return decorator
1219 def schemaFunction(schema_def):
1220 def decorator(func):
1221 func.schemaFunction = schema_def
1222 return func
1223 return decorator
1224 """.replace(" ", ""))
1225 pyf.write(
1226 content.replace(
1227 "except Exception,",
1228 "except Exception as "))
1229 s_func_name = func_name if isinstance(
1230 func_name, str) else func_name.__name__.split(".")[-1]
1231 pyf.write("""
1232 if __name__ != '__lib__':
1233 import sys
1234 for row in sys.stdin:
1235 row = row.strip()
1236 res = {0}(row)
1237 sys.stdout.write(str(res))
1238 sys.stdout.write("\\n")
1239 sys.stdout.flush()
1240 """.format(s_func_name).replace(" ", ""))
1242 cmd = sys.executable.replace(
1243 "pythonw",
1244 "python") + " " + temp + " " + " ".join("{}".format(_) for _ in args)
1245 tosend = cell
1247 if true_jython:
1248 download_java_standalone()
1249 out, err = run_jython(temp, sin=cell, timeout=10)
1250 else:
1251 out, err = run_cmd(
1252 cmd, wait=True, sin=tosend, communicate=True, timeout=10, shell=False)
1253 return out, err
1255 @staticmethod
1256 def runjython_parser():
1257 """
1258 defines the way to parse the magic command ``%%runjython``
1259 """
1260 parser = MagicCommandParser(prog="runjython",
1261 description='run a jython script used for streaming in HDInsight, ' +
1262 'the function appends fake decorator a timeout is set up at 10s')
1263 parser.add_argument(
1264 'file',
1265 type=str,
1266 help='file name')
1267 parser.add_argument(
1268 'function_name',
1269 type=str,
1270 help='function name')
1271 parser.add_argument(
1272 '--raw-output',
1273 default=False,
1274 action='store_true',
1275 help='display raw text instead of HTML')
1276 parser.add_argument(
1277 'args',
1278 type=str,
1279 nargs="*",
1280 help='arguments')
1281 return parser
1283 @cell_magic
1284 def runjpython(self, line, cell=None):
1285 """
1286 Defines command ``%%runjython``.
1288 .. nbref::
1289 :tag: Azure
1290 :title: runjpython
1292 Run a jython script used for streaming in HDInsight,
1293 the function appends fake decorator
1294 a timeout is set up at 10s
1296 The magic function create another file included the decoration.
1297 It runs the script with this version of Python.
1299 See `In a python script how can I ignore Apache Pig's Python Decorators for standalone unit testing
1300 <http://stackoverflow.com/questions/18223898/in-a-python-script-how-can-i-ignore-apache-pigs-python-decorators-for-standalon>`_
1302 See @see me _run_jython to see the code.
1304 .. versionadded:: 1.1
1305 """
1306 parser = self.get_parser(MagicAzure.runjython_parser, "runjpython")
1307 args = self.get_args(line, parser)
1309 if args is not None:
1310 filename = args.file
1311 func_name = args.function_name
1312 args = args.args
1313 out, err = self._run_jython(cell, filename, func_name, args, False)
1314 if args.raw_output:
1315 if len(err) > 0:
1316 return err
1317 else:
1318 return out
1319 else:
1320 if len(err) > 0:
1321 return HTML(
1322 '<font color="#DD0000">Error</font><br /><pre>\n%s\n</pre>' % err)
1323 else:
1324 return HTML('<pre>\n%s\n</pre>' % out)
1325 return None
1327 @staticmethod
1328 def jython_parser():
1329 """
1330 defines the way to parse the magic command ``%%jython``
1331 """
1332 parser = MagicCommandParser(prog="jython",
1333 description='run a jython script used for streaming in HDInsight, it does it using Jython')
1334 parser.add_argument(
1335 'file',
1336 type=str,
1337 help='file name')
1338 parser.add_argument(
1339 'function_name',
1340 type=str,
1341 help='function name')
1342 parser.add_argument(
1343 '--raw-output',
1344 default=False,
1345 action='store_true',
1346 help='display raw text instead of HTML')
1347 parser.add_argument(
1348 'args',
1349 type=str,
1350 nargs="*",
1351 help='arguments')
1352 return parser
1354 @cell_magic
1355 def jython(self, line, cell=None):
1356 """
1357 Defines command ``%%runjython``.
1359 run a jython script used for streaming in HDInsight,
1360 the function appends fake decorator
1361 a timeout is set up at 10s
1363 The magic function create another file included the decoration.
1364 It runs the script with Jython (see the default version)
1366 See `In a python script how can I ignore Apache Pig's Python Decorators for standalone unit testing
1367 <http://stackoverflow.com/questions/18223898/in-a-python-script-how-can-i-ignore-apache-pigs-python-decorators-for-standalon>`_.
1369 .. versionadded:: 1.1
1370 """
1371 parser = self.get_parser(MagicAzure.jython_parser, "jpython")
1372 args = self.get_args(line, parser)
1374 if args is not None:
1375 filename = args.file
1376 func_name = args.function_name
1377 raw_output = args.raw_output
1378 args = args.args
1379 out, err = self._run_jython(cell, filename, func_name, args, True)
1380 if raw_output:
1381 if len(err) > 0:
1382 return err
1383 else:
1384 return out
1385 else:
1386 if len(err) > 0:
1387 return HTML(
1388 '<font color="#DD0000">Error</font><br /><pre>\n%s\n</pre>' % err)
1389 else:
1390 return HTML('<pre>\n%s\n</pre>' % out)
1391 return None
1393 @staticmethod
1394 def blob_head_parser():
1395 """
1396 defines the way to parse the magic command ``%blob_head``
1397 """
1398 parser = MagicCommandParser(prog="blob_head",
1399 description='get the head of stream in a dataframe')
1400 parser.add_argument(
1401 'remotepath',
1402 type=str,
1403 help='remote path of the file to download')
1404 parser.add_argument(
1405 '-m',
1406 '--merge',
1407 action='store_true',
1408 default=False,
1409 help='merges files in a folder')
1410 parser.add_argument(
1411 '-d',
1412 '--df',
1413 action='store_true',
1414 default=True,
1415 help='results as a dataframe')
1416 parser.add_argument(
1417 '-s',
1418 '--size',
1419 type=int,
1420 default=2 ** 20,
1421 help='size of data to get')
1422 parser.add_argument(
1423 '-e',
1424 '--encoding',
1425 type=str,
1426 default="utf8",
1427 help='encoding')
1428 parser.add_argument(
1429 '--sep',
1430 type=str,
1431 default="\t",
1432 help='column separator')
1433 parser.add_argument(
1434 '--header',
1435 default='infer',
1436 help='results as a dataframe')
1437 return parser
1439 @line_magic
1440 def blob_head(self, line):
1441 """
1442 download a file from the blob storage
1443 and display its head, see :ref:`l-magic-path-container`
1445 Example::
1447 %blob_head remotepath
1449 the command does not allow spaces in file names
1451 .. nbref::
1452 :tag: Azure
1453 :title: blob_head
1455 The code for magic command ``%blob_head`` is equivalent to::
1457 from pyenbc.remote import AzureClient
1458 cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username)
1459 bs = cl.open_blob_service()
1460 df = cl.df_head(bs, container, remotepath, localfile)
1461 """
1462 parser = self.get_parser(MagicAzure.blob_head_parser, "blob_head")
1463 args = self.get_args(line, parser)
1465 if args is not None:
1466 remotepath = args.remotepath
1467 cl, bs = self.get_blob_connection()
1468 container, remotepath = self._interpret_path(remotepath, cl, bs)
1469 res = cl.df_head(bs, container, remotepath,
1470 stop_at=args.size, encoding=args.encoding,
1471 as_df=args.df, merge=args.merge, sep=args.sep,
1472 header=args.header)
1473 return res
1474 return None
1476 @staticmethod
1477 def blob_path_parser():
1478 """
1479 defines the magic command ``%blob_path``,
1480 checks the path used in commands ``blob_down``, ``blob_up``
1481 """
1482 parser = MagicCommandParser(prog="blob_path",
1483 description='remove a remote path')
1484 parser.add_argument(
1485 'remotepath',
1486 type=str,
1487 help='remote path to interpret')
1488 return parser
1490 @line_magic
1491 def blob_path(self, line):
1492 """
1493 checks the path used in commands ``blob_down``, ``blob_up``,
1494 see @see me _interpret_path, :ref:`l-magic-path-container`
1496 .. nbref::
1497 :tag: Azure
1498 :title: blob_path
1500 The code for magic command ``%blob_path`` is equivalent to::
1502 if line.startswith("/"):
1503 container = account_name
1504 remotepath = remotepath.lstrip("/")
1505 else:
1506 spl = line.split("/")
1507 container = spl[0]
1508 remotepath = None if len(spl) == 1 else "/".join(spl[1:])
1510 result = container, remotepath
1511 """
1512 parser = self.get_parser(MagicAzure.blob_delete_parser, "blob_delete")
1513 args = self.get_args(line, parser)
1515 if args is not None:
1516 cl, bs = self.get_blob_connection()
1517 container, remotepath = self._interpret_path(
1518 args.remotepath, cl, bs)
1519 return container, remotepath
1520 return None
1523def register_azure_magics(ip=None):
1524 """
1525 register magics function, can be called from a notebook
1527 @param ip from ``get_ipython()``
1528 """
1529 if ip is None:
1530 from IPython import get_ipython
1531 ip = get_ipython()
1532 ip.register_magics(MagicAzure)