Coverage for src/pyenbc/remote/azure_connection.py: 10%
412 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 05:47 +0200
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 05:47 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief A class to help connect with a remote machine and send command line.
5"""
6import os
7import time
8import io
9import warnings
10import requests
11from pyquickhelper.loghelper import noLOG
12from .azure_exception import AzureException
15class AzureClient():
17 """
19 A simple class to access and communicate with `Azure <http://azure.microsoft.com/>`_.
20 It requires modules:
22 * `azure <https://github.com/Azure/azure-sdk-for-python>`_
23 * `requests <http://docs.python-requests.org/en/latest/>`_
25 .. index: blob, Azure
27 Main functionalities related to blob:
29 * list_containers, create_container, list_blobs, put_blob, put_block_blob_from_bytes
30 * put_block_blob_from_text, put_page_blob_from_file, get_blob, get_blob
32 See `How to use Blob storage from Python
33 <https://azure.microsoft.com/en-us/documentation/articles/storage-python-how-to-use-blob-storage/>`_.
35 .. exref::
36 :title: Get the list of containers and files from a blob storage?
37 :tag: Azure
39 The functionalities of a ``BlobService`` are described in
40 `blockblobservice.py <https://github.com/Azure/azure-storage-python/blob/master/azure/storage/blob/blockblobservice.py>`_.
42 ::
44 from pyenbc.remote.azure_connection import AzureClient
45 cl = AzureClient("<blob_storage_service>",
46 "<primary_key>")
47 bs = cl.open_blob_service()
48 res = cl.ls(bs)
49 for r in res:
50 print(r["name"])
52 .. exref::
53 :title: Upload, download, to a blob storage
54 :tag: Azure
56 The following example uploads and downloads a file on a Blob Storage.
58 ::
60 from pyenbc.remote.azure_connection import AzureClient
61 cl = AzureClient("<blob_storage_service>",
62 "<primary_key>")
63 bs = cl.open_blob_service()
64 cl.upload(bs, "<container>", "myremotefolder/remotename.txt",
65 "local_filename.txt")
67 res = cl.ls(bs,"<container>")
68 for r in res:
69 if "local_filename" in r["name"]:
70 print(r)
72 cl.download(bs, "<container>", "myremotefolder/remotename.txt",
73 "another_local_filename.txt")
75 Many function uses
76 `WebHCat API <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference>`_.
77 The error codes can be found here:
78 `Error Codes and Responses
79 <https://cwiki.apache.org/confluence/display/Hive/WebHCat+UsingWebHCat#WebHCatUsingWebHCat-ErrorCodesandResponses>`_.
81 .. versionchanged::
82 PSEUDO, CONTAINER, SCRIPT will be passed to the script as parameters
84 """
86 _blob_properties = [
87 "copy_completion_time",
88 "content_encoding",
89 "content_language",
90 "blob_type",
91 "copy_status_description",
92 "copy_id",
93 "content_md5",
94 "lease_duration",
95 "copy_source",
96 "content_type",
97 "content_length",
98 "lease_state",
99 "copy_progress",
100 "copy_status",
101 "xms_blob_sequence_number",
102 "lease_status",
103 "etag",
104 "last_modified",
105 ]
107 def __init__(self, blob_name,
108 blob_key,
109 hadoop_name=None,
110 hadoop_key=None,
111 hadoop_user_name="admin",
112 pseudo="any",
113 fLOG=None):
114 """
115 constructor
117 @param blob_name blob storage name
118 @param blob_key account key for the blob storage
119 @param hadoop_name hadoop server name (can be None if HDInsight is not used)
120 @param hadoop_key hadoop key (can be None if HDInsight is not used)
121 @param pseudo sometimes, the same identification is used to connect to HDInsight,
122 the pseudo is meant to avoid collisions
123 @param fLOG logging function
125 """
126 self.account_name = blob_name
127 self.account_key = blob_key
128 self.hadoop_name = hadoop_name
129 self.hadoop_key = hadoop_key
130 self.hadoop_user_name = hadoop_user_name
131 self.pseudo = pseudo
132 if fLOG is None:
133 def _log_(*args, **kwargs):
134 pass
135 self.LOG = _log_
136 else:
137 self.LOG = fLOG
139 if pseudo is None:
140 raise ValueError("pseudo cannot be None")
142 self.default_parameters = dict(
143 SCRIPTPIG=self.pseudo + "/scripts/pig",
144 SCRIPTHIVE=self.pseudo + "/scripts/hive",
145 PSEUDO=self.pseudo,
146 CONTAINER="")
148 def _interpret_path(self, blob_path):
149 """
150 replace variavble such as ``$PSEUDO``, ``$USERNAME``
152 @param blob_path path
153 @return modified path
155 .. versionadded:: 1.1
156 """
157 if blob_path is None:
158 return None
159 if "$" in blob_path:
160 for k, v in self.default_parameters.items():
161 blob_path = blob_path.replace("$" + k, v)
162 return blob_path
164 @staticmethod
165 def mask_string(s):
166 """
167 return empty string or as many ``*`` as the length of the string
168 """
169 if s is None:
170 return ""
171 else:
172 return "*" * len(s)
174 def __str__(self):
175 """
176 usual
177 """
178 mes = "AzureClient [blob:({0},{1}), hadoop:({2},{3},{4})]".format(AzureClient.mask_string(self.account_name),
179 AzureClient.mask_string(
180 self.account_key), AzureClient.mask_string(
181 self.hadoop_name),
182 AzureClient.mask_string(self.hadoop_key), AzureClient.mask_string(self.hadoop_user_name))
183 return mes
185 def open_blob_service(self):
186 """
187 open a blob service
188 """
189 try:
190 from azure.storage.blob import BlobService
191 except ImportError:
192 from azure.storage.blob import BlockBlobService as BlobService
193 return BlobService(self.account_name, self.account_key)
195 def exists(self, blob_service, container_name, path):
196 """
197 test the existence of a path on the blob storage
199 @param blob_service blob service, returned by @see me open_blob_service
200 @param container_name None for all, its name otherwise
201 @param path path in the container
202 @return boolean
204 .. versionadded:: 1.1
205 """
206 path = self._interpret_path(path)
207 df = self.ls(blob_service, container_name, path, as_df=False)
208 return len(df) > 0
210 def ls(self, blob_service, container_name=None,
211 path=None, add_metadata=False, as_df=True):
212 """
213 return the content of a blob storage
215 @param blob_service blob service, returned by @see me open_blob_service
216 @param container_name None for all, its name otherwise
217 @param path path in the container
218 @param add_metadata add the metadata to the blob
219 @param as_df if True, returns a DataFrame
220 @return list of dictionaries
222 .. versionchanged:: 1.1
223 Parameter *add_metadata* was added and the function now returns the
224 property *last_modified*, parameter *as_df*
226 """
227 res = []
228 if container_name is None:
229 for cn in blob_service.list_containers():
230 self.LOG("exploring ", cn.name)
231 r = self.ls(
232 blob_service,
233 cn.name,
234 path=path,
235 add_metadata=add_metadata,
236 as_df=False)
237 res.extend(r)
238 if as_df:
239 import pandas # pylint: disable=C0415
240 return pandas.DataFrame(res)
241 else:
242 return res
243 else:
244 path = self._interpret_path(path)
245 res = []
246 for b in blob_service.list_blobs(container_name, prefix=path,
247 include="metadata" if add_metadata else None):
248 obs = {}
249 obs["name"] = b.name
250 if hasattr(b, "url"):
251 obs["url"] = b.url
252 else:
253 obs["url"] = blob_service.make_blob_url(
254 container_name, b.name)
255 for p in AzureClient._blob_properties:
256 if hasattr(b.properties, p):
257 obs[p] = getattr(b.properties, p)
258 else:
259 obs[p] = None
261 if b.metadata is not None:
262 for k, v in b.metadata.items():
263 obs["meta_%s" % k] = v
265 res.append(obs)
266 if as_df:
267 import pandas # pylint: disable=C0415
268 if len(res) > 0:
269 return pandas.DataFrame(res)
270 else:
271 return pandas.DataFrame(columns=["name", "url"])
272 else:
273 return res
275 _chunk_size = 4 * 1024 * 1024
277 def upload(self,
278 blob_service,
279 container_name,
280 blob_name,
281 file_path):
282 """
283 Uploads data from a file to a blob storage.
284 No more than 64Mb can be uploaded at the same, it needs to be split into
285 pieces.
287 @param blob_service returns by @see me open_blob_service
288 @param container_name container name
289 @param blob_name blob name (remote file name)
290 @param file_path local file path
291 @return list of uploaded blob names
293 The code comes from
294 `Utilisation du service de stockage d'objets blob à partir de Python
295 <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_.
296 """
297 if isinstance(file_path, list):
298 res = []
299 for filename in file_path:
300 only = os.path.split(filename)[-1]
301 bn = blob_name.rstrip("/") + "/" + only
302 r = self.upload(blob_service, container_name, bn, filename)
303 res.append(r)
304 return res
305 else:
306 blob_name = self._interpret_path(blob_name)
307 if hasattr(blob_service, "put_blob"):
308 # this code should disappear as it relies on an old version of
309 # the module azure
310 blob_service.create_container(
311 container_name, None, None, False)
312 blob_service.put_blob(
313 container_name, blob_name, None, 'BlockBlob')
315 block_ids = []
316 index = 0
317 with open(file_path, 'rb') as f:
318 while True:
319 data = f.read(AzureClient._chunk_size)
320 if data:
321 block_id = '{0:08d}'.format(index)
322 blob_service.put_block(
323 container_name,
324 blob_name,
325 data,
326 block_id)
327 block_ids.append(block_id)
328 index += 1
329 self.LOG("uploaded", index,
330 " bytes from ", file_path)
331 else:
332 break
334 blob_service.put_block_list(
335 container_name, blob_name, block_ids)
336 else:
337 blob_service.create_blob_from_path(
338 container_name, blob_name, file_path)
340 return blob_name
342 def upload_data(self,
343 blob_service,
344 container_name,
345 blob_name,
346 data):
347 """
348 Uploads data (bytes) to a blob storage.
349 No more than 64Mb can be uploaded at the same, it needs to be split into
350 pieces.
352 @param blob_service returns by @see me open_blob_service
353 @param container_name container name
354 @param blob_name blob name (remote file name)
355 @param data bytes
356 @return list of uploaded blob names
358 The code comes from
359 `Utilisation du service de stockage d'objets blob à partir de Python
360 <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_.
362 .. versionadded:: 1.1
363 """
364 blob_name = self._interpret_path(blob_name)
365 blob_service.create_container(container_name, None, None, False)
366 if hasattr(blob_service, "put_blob"):
367 # this code should disappear as it relies on an old version of the
368 # module azure
369 blob_service.put_blob(container_name, blob_name, None, 'BlockBlob')
371 block_ids = []
372 index = 0
373 while True:
374 if len(data) > AzureClient._chunk_size:
375 da = data[:AzureClient._chunk_size]
376 data = data[AzureClient._chunk_size:]
377 else:
378 da = data
379 data = None
380 block_id = '{0:08d}'.format(index)
381 blob_service.put_block(
382 container_name,
383 blob_name,
384 da,
385 block_id)
386 block_ids.append(block_id)
387 index += 1
388 if not data:
389 break
391 blob_service.put_block_list(container_name, blob_name, block_ids)
392 else:
393 blob_service.create_blob_from_bytes(
394 container_name, blob_name, data)
395 return blob_name
397 def download(self,
398 blob_service,
399 container_name,
400 blob_name,
401 file_path=None,
402 append=False,
403 chunk_size=None,
404 stop_at=None):
405 """
406 Downloads data from a blob storage to a file.
407 No more than 64Mb can be downloaded at the same, it needs to be split into
408 pieces.
410 @param blob_service returns by @see me open_blob_service
411 @param container_name container name
412 @param blob_name blob name (or list of blob names) (remote file name)
413 @param file_path local file path
414 @param append if True, append the content to an existing file
415 @param chunk_size download by chunk
416 @param stop_at stop at a given size (None to avoid stopping)
417 @return local file or bytes if *file_path* is None
419 The code comes from
420 `Utilisation du service de stockage d'objets blob à partir de Python
421 <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_.
423 .. versionchanged:: 1.1
424 Parameters *append*, *chunk_size* were added.
425 If *file_path* is None (default value now), the function
426 returns bytes.
427 """
428 if not isinstance(blob_name, str):
429 res = []
430 for blob in blob_name:
431 dest = os.path.join(file_path, os.path.split(blob)[-1])
432 r = self.download(
433 blob_service,
434 container_name,
435 blob,
436 dest,
437 append=append,
438 chunk_size=chunk_size,
439 stop_at=stop_at)
440 res.append(r)
441 if stop_at is not None:
442 if file_path is None:
443 stop_at -= len(r)
444 else:
445 stop_at -= os.stat(r).st_size
446 if stop_at <= 0:
447 break
448 if file_path is None:
449 st = io.BytesIO()
450 for r in res:
451 st.write(r)
452 return st.getvalue()
453 else:
454 return res
455 else:
456 blob_name = self._interpret_path(blob_name)
458 if hasattr(blob_service, "get_blob"):
459 # this code should disappear as it relies on an old version of
460 # the module azure
461 props = blob_service.get_blob_properties(
462 container_name, blob_name)
463 if hasattr(props, "properties"):
464 blob_size = props.properties.content_length
465 else:
466 blob_size = int(props['content-length'])
467 if chunk_size is None:
468 chunk_size = AzureClient._chunk_size
469 if stop_at is not None and stop_at < chunk_size:
470 chunk_size = max(stop_at, 0)
472 def iterations(f, chunk_size, container_name, blob_name,
473 file_path, stop_at):
474 "local function"
475 index = 0
477 while index < blob_size:
478 chunk_range = 'bytes={}-{}'.format(index,
479 index + chunk_size - 1)
480 data = blob_service.get_blob(
481 container_name,
482 blob_name,
483 x_ms_range=chunk_range)
484 length = len(data)
485 index += length
486 self.LOG("downloaded ", index,
487 "bytes from ", file_path)
488 if length > 0:
489 f.write(data)
490 if length < chunk_size:
491 return False
492 else:
493 return False
494 if stop_at is not None and stop_at <= index:
495 return False
496 return True
498 if file_path is None:
499 f = io.BytesIO()
500 iterations(f, chunk_size,
501 container_name, blob_name, file_path, stop_at)
502 return f.getvalue()
503 else:
504 mode = 'ab' if append else 'wb'
505 with open(file_path, mode) as f:
506 iterations(f, chunk_size,
507 container_name, blob_name, file_path, stop_at)
508 return file_path
509 else:
510 bl = blob_service.get_blob_to_bytes(container_name, blob_name)
511 if file_path is None:
512 return bl.content
513 else:
514 with open(file_path, "wb") as f:
515 f.write(bl.content)
516 return file_path
518 def download_data(self, blob_service, container_name, blob_name,
519 chunk_size=None, stop_at=None):
520 """
521 Downloads data from a blob storage and return bytes.
522 No more than 64Mb can be downloaded at the same, it needs to be split into
523 pieces.
525 @param blob_service returns by @see me open_blob_service
526 @param container_name container name
527 @param blob_name blob name (or list of blob names) (remote file name)
528 @param chunk_size download by chunk
529 @param stop_at stop at a given size (None to avoid stopping)
530 @return local file or bytes if *file_path* is None
532 .. versionadded:: 1.1
533 """
534 return self.download(blob_service=blob_service, container_name=container_name,
535 blob_name=blob_name, chunk_size=chunk_size, stop_at=stop_at)
537 def df_head(self, blob_service, container_name, blob_name, stop_at=2 ** 20,
538 encoding="utf-8", as_df=True, merge=False, **options):
539 """
540 Downloads the beginning of a stream and displays as a DataFrame.
542 @param blob_service returns by @see me open_blob_service
543 @param container_name container name
544 @param blob_name blob name (or list of blob names) (remote file name)
545 @param stop_at stop at a given size (None to avoid stopping)
546 @param encoding encoding
547 @param as_df result as a dataframe or a string
548 @param merge if True, *blob_name* is a folder, method @see me download_merge is called
549 @param options see `read_csv <http://pandas.pydata.org/pandas-docs/version/0.17.0/generated/pandas.read_csv.html?
550 highlight=read_csv#pandas.read_csv>`_
551 @return local file or bytes if *file_path* is None
553 .. versionadded:: 1.1
554 """
555 if merge:
556 do = self.download_merge(blob_service=blob_service,
557 container_name=container_name,
558 blob_folder=blob_name,
559 stop_at=stop_at)
560 else:
561 do = self.download(blob_service=blob_service,
562 container_name=container_name,
563 blob_name=blob_name,
564 stop_at=stop_at)
565 text = do.decode(encoding)
566 if as_df:
567 pos = text.rfind("\n")
568 if pos > 0:
569 st = io.StringIO(text[:pos])
570 else:
571 st = io.StringIO(text)
572 import pandas # pylint: disable=C0415
573 return pandas.read_csv(st, **options)
574 else:
575 return text
577 def download_merge(self,
578 blob_service,
579 container_name,
580 blob_folder,
581 file_path=None,
582 chunk_size=None,
583 stop_at=None):
584 """
585 Downloads all files from a folder in a blob storage to a single local file.
586 Files will be merged.
587 No more than 64Mb can be downloaded at the same, it needs to be split into
588 pieces.
590 @param blob_service returns by @see me open_blob_service
591 @param container_name container name
592 @param blob_folder blob folder(remote file name)
593 @param file_path local file path
594 @param chunk_size download by chunk
595 @param stop_at stop at a given size (None to avoid stopping)
596 @return local file
598 .. versionchanged:: 1.1
599 Parameters *append*, *chunk_size* were added.
600 If *file_path* is None (default value now), the function
601 returns bytes.
602 """
603 blob_folder = self._interpret_path(blob_folder)
604 content = self.ls(
605 blob_service,
606 container_name,
607 blob_folder,
608 as_df=False)
609 first = True
610 store = io.BytesIO()
611 for cont in content:
612 if cont["content_length"] > 0:
613 by = self.download(
614 blob_service,
615 container_name,
616 cont["name"],
617 file_path=file_path,
618 chunk_size=chunk_size,
619 stop_at=stop_at,
620 append=not first)
621 if first:
622 first = False
623 if file_path is None:
624 store.write(by)
625 if stop_at is not None:
626 stop_at -= len(by)
627 else:
628 if stop_at is not None:
629 stop_at -= os.stat(file_path).st_size
630 if stop_at is not None and stop_at <= 0:
631 break
633 if file_path is None:
634 return store.getvalue()
635 else:
636 return file_path
638 def delete_blob(self, blob_service, container_name, blob_name):
639 """
640 delete a blob
642 @param blob_service returns by @see me open_blob_service
643 @param container_name container name
644 @param blob_name blob name (remote file name)
645 """
646 blob_name = self._interpret_path(blob_name)
647 blob_service.delete_blob(container_name, blob_name)
648 return blob_name
650 def delete_folder(self, blob_service, container_name, blob_folder):
651 """
652 delete a folder and its content
654 @param blob_service returns by @see me open_blob_service
655 @param container_name container name
656 @param blob_folder blob folder (remote folder name)
658 .. versionadded:: 1.1
659 """
660 blob_folder = self._interpret_path(blob_folder)
661 df = self.ls(blob_service, container_name, blob_folder)
662 rem = []
663 for name in df["name"]:
664 r = self.delete_blob(blob_service, container_name, name)
665 rem.append(r)
666 return rem
668 def url_blob(self, blob_service, container, blob_name):
669 """
670 returns an url for a blob file name
672 @param container container
673 @param blob_name blob_name
674 @return url
675 """
676 blob_name = self._interpret_path(blob_name)
677 src = blob_service.make_blob_url(container, blob_name)
678 return src
680 def copy_blob(self, blob_service, container, blob_name, source):
681 """
682 copy a blob
684 @param blob_service returns by @see me open_blob_service
685 @param container_name container name
686 @param blob_name destination
687 @param source source
688 """
689 blob_name = self._interpret_path(blob_name)
690 url = self.url_blob(blob_service, container, source)
691 res = blob_service.copy_blob(container, blob_name, url)
692 return res
694 def url_webHCatUrl(self, cmd):
695 """
696 returns an url to the cluster
698 @param cmd something like ``pig``, ``status``
699 @return url
700 """
701 if self.hadoop_name is None:
702 raise AttributeError(
703 "no hadoop server was given to the constructor for cmd: {0}".format(cmd))
704 webHCatUrl = 'https://' + self.hadoop_name + \
705 '.azurehdinsight.net/templeton/v1/' + cmd
706 return webHCatUrl
708 def wasb_to_file(self, container_name, blob_file):
709 """
710 return something like ``wasb://demo@myblobstorage.blob...``
712 @param container_name name of a container
713 @param blob_file path to a file
714 @return return a url to blob file (pig script for example)
715 """
716 blob_file = self._interpret_path(blob_file)
717 return 'wasb://{1}@{0}.blob.core.windows.net/{2}'.format(container_name,
718 self.account_name, blob_file)
720 def wasb_prefix(self, container_name):
721 """
722 when using an instruction ``LOAD`` in a PIG script,
723 file blob name must be reference using a wasb syntax.
724 This method returns the prefix to add.
726 @return wasb prefix
727 """
728 return self.wasb_to_file(container_name, "")
730 def get_status(self):
731 """
732 return the status of the webHCatUrl server
734 @return json
735 """
736 if self.hadoop_user_name is None:
737 raise AttributeError(
738 "no hadoop user name was given to the constructor")
739 if self.hadoop_key is None:
740 raise AttributeError(
741 "no hadoop password was given to the constructor")
743 webHCatUrl = self.url_webHCatUrl("status")
745 r = requests.get(webHCatUrl,
746 auth=(self.hadoop_user_name, self.hadoop_key))
747 if r.status_code != 200:
748 raise AzureException(
749 "unable to the status of server: " +
750 webHCatUrl,
751 r)
752 return r.json()
754 def get_version(self):
755 """
756 return the status of the WebHCat version
758 @return json
759 """
760 if self.hadoop_user_name is None:
761 raise AttributeError(
762 "no hadoop user name was given to the constructor")
763 if self.hadoop_key is None:
764 raise AttributeError(
765 "no hadoop password was given to the constructor")
767 webHCatUrl = self.url_webHCatUrl("version/hive")
769 r = requests.get(webHCatUrl,
770 auth=(self.hadoop_user_name, self.hadoop_key))
771 if r.status_code != 200:
772 raise AzureException(
773 "unable to the version of server: " +
774 webHCatUrl,
775 r)
776 return r.json()
778 def pig_submit(self, blob_service, container_name, pig_file, dependencies=None, status_dir=None,
779 stop_on_failure=True, params=None):
780 """
781 Submits a :epkg:`PIG` job, the function uploads it to the cluster
782 as well as the dependencies.
784 The code comes from `How to use HDInsight from Linux
785 <http://blogs.msdn.com/b/benjguin/archive/2014/02/18/how-to-use-hdinsight-from-linux.aspx>`_
786 and `start a Pig + Jython job in HDInsight thru WebHCat
787 <http://blogs.msdn.com/b/benjguin/archive/2014/03/21/start-a-pig-jython-job-in-hdinsight-thru-webhcat.aspx>`_.
788 The API is described at `Pig Job — POST pig
789 <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Pig>`_.
791 @param blob_service returns by @see me open_blob_service
792 @param container_name name of a container
793 @param pig_file path to the job in the blob storage
794 @param dependencies dependencies
795 @param status_dir folder used by Hadoop to store job's progress, it should contain
796 your alias if you want to avoid collision with others' jobs
797 @param stop_on_failure stop on failure, do not wait as long as possible
798 @param params to
799 @return json
801 .. exref::
802 :title: Submit a job PIG
803 :tag: Azure
805 The script PIG must include an instruction ``LOAD``.
806 This instruction use file name defined with the
807 `wasb syntax <http://azure.microsoft.com/en-us/documentation/articles/hdinsight-use-blob-storage/>`_.
809 If you place the string ``$CONTAINER`` before a stream name,
810 it should be replaced by the corresponding wasb syntax associated
811 to the container name defined by ``container_name``.
812 The function will then load your script,
813 modify it and save another one with the by adding
814 ``.wasb.pig``.
815 Others constants you could use:
817 * ``$PSEUDO``
818 * ``$CONTAINER``
819 * ``$SCRIPTSPIG``
821 However, this replacement is not done by this class, but your code could
822 be such as:
824 ::
826 blobstorage = "****"
827 blobpassword = "*********************"
828 hadoop_name = "*********"
829 hadoop_password = "********"
830 username = "********"
831 cl = AzureClient(blobstorage,
832 blobpassword,
833 hadoop_name,
834 hadoop_password,
835 username)
836 script = '''
837 myinput = LOAD '$CONTAINER/input.csv'
838 using PigStorage(',')
839 AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
840 filt = FILTER myinput BY activity == 'walking' ;
841 STORE filt INTO '$PSEUDO/output.csv' USING PigStorage() ;
842 '''
844 with open("script_walking.pig","w") as f :
845 f.write(script)
847 bs = cl.open_blob_service()
848 js = cl.pig_submit(bs, blobstorage, "testensae/script_walking.pig")
849 print(js)
851 js = cl.job_status('job_1414863995725_0013')
853 .. versionadded:: 1.1
854 parameter *stop_on_failure*
855 """
856 if self.hadoop_user_name is None:
857 raise AttributeError(
858 "no hadoop user name was given to the constructor")
859 if self.hadoop_key is None:
860 raise AttributeError(
861 "no hadoop password was given to the constructor")
863 # upload
864 scripts = self.default_parameters["SCRIPTPIG"]
865 toup = [pig_file]
866 if dependencies is not None:
867 toup.extend(dependencies)
868 res = self.upload(blob_service, container_name, scripts, toup)
870 # path modification
871 wasb = self.wasb_to_file(container_name, res[0])
872 if dependencies is not None:
873 wasbdep = ",".join(
874 self.wasb_to_file(
875 container_name,
876 _) for _ in res[
877 1:])
878 else:
879 wasbdep = None
881 # parameter
882 args = ['-v']
883 for k, v in sorted(self.default_parameters.items()):
884 if k == "CONTAINER":
885 args.extend(["-param", '%s=%s' %
886 (k, self.wasb_to_file(container_name, v))])
887 else:
888 args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))])
889 if params is not None:
890 for k, v in sorted(params.items()):
891 args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))])
893 if stop_on_failure:
894 args.append("-stop_on_failure")
896 # params
897 params = {'user.name': self.hadoop_user_name,
898 'file': wasb,
899 'arg': args}
901 if wasbdep is not None:
902 params["files"] = wasbdep
904 if status_dir is not None:
905 status_dir = self._interpret_path(status_dir)
906 params['statusdir'] = self.wasb_to_file(
907 container_name, status_dir + "/" + os.path.split(pig_file)[-1] + ".log")
908 else:
909 status_dir = self.default_parameters["SCRIPTPIG"]
910 params['statusdir'] = self.wasb_to_file(container_name, self.default_parameters[
911 "SCRIPTPIG"] + "/" + os.path.split(pig_file)[-1] + ".log")
913 webHCatUrl = self.url_webHCatUrl("pig")
915 # submit the job
916 r = requests.post(webHCatUrl,
917 auth=(self.hadoop_user_name, self.hadoop_key),
918 data=params)
920 if r.status_code != 200:
921 raise AzureException(
922 "unable to submit job: {0}\n---\nWITH PARAMS\n---\n{1}".format(pig_file, params), r)
923 return r.json()
925 def hive_submit(self, blob_service, container_name, hive_file, dependencies=None,
926 status_dir=None, stop_on_failure=True, params=None):
927 """
928 Submits a :epkg:`HIVE` job, the function uploads it to the cluster
929 as well as the dependencies.
931 The code comes from `How to use HDInsight from Linux
932 <http://blogs.msdn.com/b/benjguin/archive/2014/02/18/how-to-use-hdinsight-from-linux.aspx>`_
933 and `start a Pig + Jython job in HDInsight thru WebHCat
934 <http://blogs.msdn.com/b/benjguin/archive/2014/03/21/start-a-pig-jython-job-in-hdinsight-thru-webhcat.aspx>`_.
935 The API is described at `Pig Job — POST pig
936 <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Pig>`_.
938 @param blob_service returns by @see me open_blob_service
939 @param container_name name of a container
940 @param hive_file path to the job in the blob storage
941 @param dependencies dependencies
942 @param status_dir folder used by Hadoop to store job's progress, it should contain
943 your alias if you want to avoid collision with others' jobs
944 @param stop_on_failure stop on failure, do not wait as long as possible
945 @param params to
946 @return json
948 .. versionadded:: 1.1
949 """
950 if self.hadoop_user_name is None:
951 raise AttributeError(
952 "no hadoop user name was given to the constructor")
953 if self.hadoop_key is None:
954 raise AttributeError(
955 "no hadoop password was given to the constructor")
957 # upload
958 scripts = self.default_parameters["SCRIPTHIVE"]
959 toup = [hive_file]
960 if dependencies is not None:
961 toup.extend(dependencies)
962 res = self.upload(blob_service, container_name, scripts, toup)
964 # path modification
965 wasb = self.wasb_to_file(container_name, res[0])
966 if dependencies is not None:
967 wasbdep = ",".join(
968 self.wasb_to_file(
969 container_name,
970 _) for _ in res[
971 1:])
972 else:
973 wasbdep = None
975 # parameter
976 args = ['-v']
977 for k, v in sorted(self.default_parameters.items()):
978 if k == "CONTAINER":
979 args.extend(["-param", '%s=%s' %
980 (k, self.wasb_to_file(container_name, v))])
981 else:
982 args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))])
983 if params is not None:
984 for k, v in sorted(params.items()):
985 args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))])
987 if stop_on_failure:
988 args.append("-stop_on_failure")
990 # params
991 params = {'user.name': self.hadoop_user_name,
992 'file': wasb,
993 'arg': args}
995 if wasbdep is not None:
996 params["files"] = wasbdep
998 if status_dir is not None:
999 status_dir = self._interpret_path(status_dir)
1000 params['statusdir'] = self.wasb_to_file(
1001 container_name, status_dir + "/" + os.path.split(hive_file)[-1] + ".log")
1002 else:
1003 status_dir = self.default_parameters["SCRIPTHIVE"]
1004 params['statusdir'] = self.wasb_to_file(container_name, self.default_parameters[
1005 "SCRIPTHIVE"] + "/" + os.path.split(hive_file)[-1] + ".log")
1007 webHCatUrl = self.url_webHCatUrl("hive")
1009 warnings.warn("Hive submission is not tested. It will probably fail.")
1011 # submit the job
1012 r = requests.post(webHCatUrl,
1013 auth=(self.hadoop_user_name, self.hadoop_key),
1014 data=params)
1016 if r.status_code != 200:
1017 raise AzureException(
1018 "unable to submit job: {0}\n---\nWITH PARAMS\n---\n{1}".format(hive_file, params), r)
1019 return r.json()
1021 def job_queue(self, showall=False, fields=None):
1022 """
1023 returns the list of jobs
1025 It uses the API `Job Information — GET queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Jobs>`_.
1027 @param showall if True, show all your jobs (not only yours)
1028 @param fields to add fields in the requests
1029 @return list of jobs
1031 .. exref::
1032 :title: List job queue
1033 :tag: Azure
1035 Most of the time, a job remains stuck in the job queue because
1036 it is full. Here is a code to check that is the case on
1037 a Azure cluster. It should be executed from a notebook.
1039 Connection ::
1041 blobstorage = "..."
1042 blobpassword = "..."
1043 hadoop_server = "..."
1044 hadoop_password = "..."
1045 username = "..."
1047 %load_ext pyenbc
1048 client, bs = %hd_open
1050 Job queue ::
1052 res = client.job_queue()
1053 res.reverse() # last submitted jobs first
1055 Displays the first 20 jobs::
1057 for i, r in enumerate(res[:20]):
1058 st = client.job_status(r["id"])
1059 print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"])
1060 print(st["userargs"].get("file", None), st["profile"].get("jobName", None))
1062 It gives::
1064 0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob
1065 wasb://..../scripts/pig/titi.pig TempletonControllerJob
1066 1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob
1067 wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob
1068 2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob
1069 wasb://..../scripts/pig/titi.pig TempletonControllerJob
1070 3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob
1071 wasb://..../scripts/pig/alg1.pig TempletonControllerJob
1072 4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig
1073 None PigLatin:pre_processing.pig
1074 5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob
1075 wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob
1076 6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob
1077 wasb://..../scripts/pig/alg1.pig TempletonControllerJob
1079 To kill a job::
1081 client.job_kill("id")
1082 """
1083 if self.hadoop_user_name is None:
1084 raise AttributeError(
1085 "no hadoop user name was given to the constructor")
1086 if self.hadoop_key is None:
1087 raise AttributeError(
1088 "no hadoop password was given to the constructor")
1090 webHCatUrl = self.url_webHCatUrl("jobs")
1092 params = {"user.name": self.hadoop_user_name}
1093 if showall:
1094 params["showall"] = "true"
1095 if fields:
1096 if fields != "*":
1097 raise ValueError("fields can only be *")
1098 params["fields"] = fields
1100 r = requests.get(webHCatUrl,
1101 auth=(self.hadoop_user_name, self.hadoop_key),
1102 params=params)
1104 if r.status_code != 200:
1105 raise AzureException("unable to get job queue", r)
1106 return r.json()
1108 def job_status(self, jobid):
1109 """
1110 return the status of a job
1112 see `List Versions — GET version <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Job>`_
1113 for the outcome
1115 @param jobid jobid
1116 @return json
1118 You can extract the *startTime* by doing::
1120 from datetime import datetime
1121 st = client.job_status(<job_id>)
1122 datetime.fromtimestamp(float(st["status"]["startTime"])/1000)
1123 """
1124 if self.hadoop_user_name is None:
1125 raise AttributeError(
1126 "no hadoop user name was given to the constructor")
1127 if self.hadoop_key is None:
1128 raise AttributeError(
1129 "no hadoop password was given to the constructor")
1131 params = {"user.name": self.hadoop_user_name}
1132 webHCatUrl = self.url_webHCatUrl("jobs/" + jobid)
1134 r = requests.get(webHCatUrl,
1135 auth=(self.hadoop_user_name, self.hadoop_key),
1136 params=params)
1137 if r.status_code != 200:
1138 raise AzureException(
1139 "unable to the version of server: " +
1140 webHCatUrl,
1141 r)
1142 return r.json()
1144 def wait_job(self, job_id, delay=5, fLOG=noLOG):
1145 """
1146 wait until a job has run or failed
1148 @param job_id job_id
1149 @param delay check every N seconds
1150 @return status
1152 .. versionadded:: 1.1
1154 """
1155 status = self.job_status(job_id)
1156 while status["status"]["state"] in ["PREP", "RUNNING"]:
1157 fLOG("job_id", job_id, ":", status["status"]["state"])
1158 time.sleep(delay)
1159 status = self.job_status(job_id)
1160 return status
1162 def standard_outputs(self, job_id, blob_service, container, folder):
1163 """
1164 returns the standard output and error for a specific job id
1166 @param job_id job_id or status
1167 @param blob_service returns by @see me open_blob_service
1168 @param container_name name of a container
1169 @parm folder folder where to download them
1170 @return out, err
1171 """
1172 if isinstance(job_id, str):
1173 status = self.job_status(job_id)
1174 else:
1175 status = job_id
1177 status_dir = status["userargs"]["statusdir"]
1178 spl = status_dir.split("core.windows.net/") # to change
1179 path = spl[-1]
1180 self.download(
1181 blob_service, container, [path + "/" + _ for _ in ["stderr", "stdout"]], folder)
1183 with open(os.path.join(folder, "stdout"), "r", encoding="utf8") as f:
1184 out = f.read()
1185 with open(os.path.join(folder, "stderr"), "r", encoding="utf8") as f:
1186 err = f.read()
1187 return out, err
1189 def job_kill(self, jobid):
1190 """
1191 kills a job
1193 see `Delete Job — DELETE queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+DeleteJob>`_
1194 for the outcome
1196 @param jobid jobid
1197 @return json
1198 """
1199 if self.hadoop_user_name is None:
1200 raise AttributeError(
1201 "no hadoop user name was given to the constructor")
1202 if self.hadoop_key is None:
1203 raise AttributeError(
1204 "no hadoop password was given to the constructor")
1206 params = {"user.name": self.hadoop_user_name}
1207 webHCatUrl = self.url_webHCatUrl("jobs/" + jobid)
1209 r = requests.delete(webHCatUrl,
1210 auth=(self.hadoop_user_name, self.hadoop_key),
1211 params=params)
1212 if r.status_code != 200:
1213 raise AzureException(
1214 "unable to the version of server: " +
1215 webHCatUrl,
1216 r)
1217 return r.json()