Coverage for src/pyenbc/remote/azure_connection.py: 10%

412 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-20 05:47 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief A class to help connect with a remote machine and send command line. 

5""" 

6import os 

7import time 

8import io 

9import warnings 

10import requests 

11from pyquickhelper.loghelper import noLOG 

12from .azure_exception import AzureException 

13 

14 

15class AzureClient(): 

16 

17 """ 

18 

19 A simple class to access and communicate with `Azure <http://azure.microsoft.com/>`_. 

20 It requires modules: 

21 

22 * `azure <https://github.com/Azure/azure-sdk-for-python>`_ 

23 * `requests <http://docs.python-requests.org/en/latest/>`_ 

24 

25 .. index: blob, Azure 

26 

27 Main functionalities related to blob: 

28 

29 * list_containers, create_container, list_blobs, put_blob, put_block_blob_from_bytes 

30 * put_block_blob_from_text, put_page_blob_from_file, get_blob, get_blob 

31 

32 See `How to use Blob storage from Python 

33 <https://azure.microsoft.com/en-us/documentation/articles/storage-python-how-to-use-blob-storage/>`_. 

34 

35 .. exref:: 

36 :title: Get the list of containers and files from a blob storage? 

37 :tag: Azure 

38 

39 The functionalities of a ``BlobService`` are described in 

40 `blockblobservice.py <https://github.com/Azure/azure-storage-python/blob/master/azure/storage/blob/blockblobservice.py>`_. 

41 

42 :: 

43 

44 from pyenbc.remote.azure_connection import AzureClient 

45 cl = AzureClient("<blob_storage_service>", 

46 "<primary_key>") 

47 bs = cl.open_blob_service() 

48 res = cl.ls(bs) 

49 for r in res: 

50 print(r["name"]) 

51 

52 .. exref:: 

53 :title: Upload, download, to a blob storage 

54 :tag: Azure 

55 

56 The following example uploads and downloads a file on a Blob Storage. 

57 

58 :: 

59 

60 from pyenbc.remote.azure_connection import AzureClient 

61 cl = AzureClient("<blob_storage_service>", 

62 "<primary_key>") 

63 bs = cl.open_blob_service() 

64 cl.upload(bs, "<container>", "myremotefolder/remotename.txt", 

65 "local_filename.txt") 

66 

67 res = cl.ls(bs,"<container>") 

68 for r in res: 

69 if "local_filename" in r["name"]: 

70 print(r) 

71 

72 cl.download(bs, "<container>", "myremotefolder/remotename.txt", 

73 "another_local_filename.txt") 

74 

75 Many function uses 

76 `WebHCat API <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference>`_. 

77 The error codes can be found here: 

78 `Error Codes and Responses 

79 <https://cwiki.apache.org/confluence/display/Hive/WebHCat+UsingWebHCat#WebHCatUsingWebHCat-ErrorCodesandResponses>`_. 

80 

81 .. versionchanged:: 

82 PSEUDO, CONTAINER, SCRIPT will be passed to the script as parameters 

83 

84 """ 

85 

86 _blob_properties = [ 

87 "copy_completion_time", 

88 "content_encoding", 

89 "content_language", 

90 "blob_type", 

91 "copy_status_description", 

92 "copy_id", 

93 "content_md5", 

94 "lease_duration", 

95 "copy_source", 

96 "content_type", 

97 "content_length", 

98 "lease_state", 

99 "copy_progress", 

100 "copy_status", 

101 "xms_blob_sequence_number", 

102 "lease_status", 

103 "etag", 

104 "last_modified", 

105 ] 

106 

107 def __init__(self, blob_name, 

108 blob_key, 

109 hadoop_name=None, 

110 hadoop_key=None, 

111 hadoop_user_name="admin", 

112 pseudo="any", 

113 fLOG=None): 

114 """ 

115 constructor 

116 

117 @param blob_name blob storage name 

118 @param blob_key account key for the blob storage 

119 @param hadoop_name hadoop server name (can be None if HDInsight is not used) 

120 @param hadoop_key hadoop key (can be None if HDInsight is not used) 

121 @param pseudo sometimes, the same identification is used to connect to HDInsight, 

122 the pseudo is meant to avoid collisions 

123 @param fLOG logging function 

124 

125 """ 

126 self.account_name = blob_name 

127 self.account_key = blob_key 

128 self.hadoop_name = hadoop_name 

129 self.hadoop_key = hadoop_key 

130 self.hadoop_user_name = hadoop_user_name 

131 self.pseudo = pseudo 

132 if fLOG is None: 

133 def _log_(*args, **kwargs): 

134 pass 

135 self.LOG = _log_ 

136 else: 

137 self.LOG = fLOG 

138 

139 if pseudo is None: 

140 raise ValueError("pseudo cannot be None") 

141 

142 self.default_parameters = dict( 

143 SCRIPTPIG=self.pseudo + "/scripts/pig", 

144 SCRIPTHIVE=self.pseudo + "/scripts/hive", 

145 PSEUDO=self.pseudo, 

146 CONTAINER="") 

147 

148 def _interpret_path(self, blob_path): 

149 """ 

150 replace variavble such as ``$PSEUDO``, ``$USERNAME`` 

151 

152 @param blob_path path 

153 @return modified path 

154 

155 .. versionadded:: 1.1 

156 """ 

157 if blob_path is None: 

158 return None 

159 if "$" in blob_path: 

160 for k, v in self.default_parameters.items(): 

161 blob_path = blob_path.replace("$" + k, v) 

162 return blob_path 

163 

164 @staticmethod 

165 def mask_string(s): 

166 """ 

167 return empty string or as many ``*`` as the length of the string 

168 """ 

169 if s is None: 

170 return "" 

171 else: 

172 return "*" * len(s) 

173 

174 def __str__(self): 

175 """ 

176 usual 

177 """ 

178 mes = "AzureClient [blob:({0},{1}), hadoop:({2},{3},{4})]".format(AzureClient.mask_string(self.account_name), 

179 AzureClient.mask_string( 

180 self.account_key), AzureClient.mask_string( 

181 self.hadoop_name), 

182 AzureClient.mask_string(self.hadoop_key), AzureClient.mask_string(self.hadoop_user_name)) 

183 return mes 

184 

185 def open_blob_service(self): 

186 """ 

187 open a blob service 

188 """ 

189 try: 

190 from azure.storage.blob import BlobService 

191 except ImportError: 

192 from azure.storage.blob import BlockBlobService as BlobService 

193 return BlobService(self.account_name, self.account_key) 

194 

195 def exists(self, blob_service, container_name, path): 

196 """ 

197 test the existence of a path on the blob storage 

198 

199 @param blob_service blob service, returned by @see me open_blob_service 

200 @param container_name None for all, its name otherwise 

201 @param path path in the container 

202 @return boolean 

203 

204 .. versionadded:: 1.1 

205 """ 

206 path = self._interpret_path(path) 

207 df = self.ls(blob_service, container_name, path, as_df=False) 

208 return len(df) > 0 

209 

210 def ls(self, blob_service, container_name=None, 

211 path=None, add_metadata=False, as_df=True): 

212 """ 

213 return the content of a blob storage 

214 

215 @param blob_service blob service, returned by @see me open_blob_service 

216 @param container_name None for all, its name otherwise 

217 @param path path in the container 

218 @param add_metadata add the metadata to the blob 

219 @param as_df if True, returns a DataFrame 

220 @return list of dictionaries 

221 

222 .. versionchanged:: 1.1 

223 Parameter *add_metadata* was added and the function now returns the 

224 property *last_modified*, parameter *as_df* 

225 

226 """ 

227 res = [] 

228 if container_name is None: 

229 for cn in blob_service.list_containers(): 

230 self.LOG("exploring ", cn.name) 

231 r = self.ls( 

232 blob_service, 

233 cn.name, 

234 path=path, 

235 add_metadata=add_metadata, 

236 as_df=False) 

237 res.extend(r) 

238 if as_df: 

239 import pandas # pylint: disable=C0415 

240 return pandas.DataFrame(res) 

241 else: 

242 return res 

243 else: 

244 path = self._interpret_path(path) 

245 res = [] 

246 for b in blob_service.list_blobs(container_name, prefix=path, 

247 include="metadata" if add_metadata else None): 

248 obs = {} 

249 obs["name"] = b.name 

250 if hasattr(b, "url"): 

251 obs["url"] = b.url 

252 else: 

253 obs["url"] = blob_service.make_blob_url( 

254 container_name, b.name) 

255 for p in AzureClient._blob_properties: 

256 if hasattr(b.properties, p): 

257 obs[p] = getattr(b.properties, p) 

258 else: 

259 obs[p] = None 

260 

261 if b.metadata is not None: 

262 for k, v in b.metadata.items(): 

263 obs["meta_%s" % k] = v 

264 

265 res.append(obs) 

266 if as_df: 

267 import pandas # pylint: disable=C0415 

268 if len(res) > 0: 

269 return pandas.DataFrame(res) 

270 else: 

271 return pandas.DataFrame(columns=["name", "url"]) 

272 else: 

273 return res 

274 

275 _chunk_size = 4 * 1024 * 1024 

276 

277 def upload(self, 

278 blob_service, 

279 container_name, 

280 blob_name, 

281 file_path): 

282 """ 

283 Uploads data from a file to a blob storage. 

284 No more than 64Mb can be uploaded at the same, it needs to be split into 

285 pieces. 

286 

287 @param blob_service returns by @see me open_blob_service 

288 @param container_name container name 

289 @param blob_name blob name (remote file name) 

290 @param file_path local file path 

291 @return list of uploaded blob names 

292 

293 The code comes from 

294 `Utilisation du service de stockage d'objets blob à partir de Python 

295 <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_. 

296 """ 

297 if isinstance(file_path, list): 

298 res = [] 

299 for filename in file_path: 

300 only = os.path.split(filename)[-1] 

301 bn = blob_name.rstrip("/") + "/" + only 

302 r = self.upload(blob_service, container_name, bn, filename) 

303 res.append(r) 

304 return res 

305 else: 

306 blob_name = self._interpret_path(blob_name) 

307 if hasattr(blob_service, "put_blob"): 

308 # this code should disappear as it relies on an old version of 

309 # the module azure 

310 blob_service.create_container( 

311 container_name, None, None, False) 

312 blob_service.put_blob( 

313 container_name, blob_name, None, 'BlockBlob') 

314 

315 block_ids = [] 

316 index = 0 

317 with open(file_path, 'rb') as f: 

318 while True: 

319 data = f.read(AzureClient._chunk_size) 

320 if data: 

321 block_id = '{0:08d}'.format(index) 

322 blob_service.put_block( 

323 container_name, 

324 blob_name, 

325 data, 

326 block_id) 

327 block_ids.append(block_id) 

328 index += 1 

329 self.LOG("uploaded", index, 

330 " bytes from ", file_path) 

331 else: 

332 break 

333 

334 blob_service.put_block_list( 

335 container_name, blob_name, block_ids) 

336 else: 

337 blob_service.create_blob_from_path( 

338 container_name, blob_name, file_path) 

339 

340 return blob_name 

341 

342 def upload_data(self, 

343 blob_service, 

344 container_name, 

345 blob_name, 

346 data): 

347 """ 

348 Uploads data (bytes) to a blob storage. 

349 No more than 64Mb can be uploaded at the same, it needs to be split into 

350 pieces. 

351 

352 @param blob_service returns by @see me open_blob_service 

353 @param container_name container name 

354 @param blob_name blob name (remote file name) 

355 @param data bytes 

356 @return list of uploaded blob names 

357 

358 The code comes from 

359 `Utilisation du service de stockage d'objets blob à partir de Python 

360 <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_. 

361 

362 .. versionadded:: 1.1 

363 """ 

364 blob_name = self._interpret_path(blob_name) 

365 blob_service.create_container(container_name, None, None, False) 

366 if hasattr(blob_service, "put_blob"): 

367 # this code should disappear as it relies on an old version of the 

368 # module azure 

369 blob_service.put_blob(container_name, blob_name, None, 'BlockBlob') 

370 

371 block_ids = [] 

372 index = 0 

373 while True: 

374 if len(data) > AzureClient._chunk_size: 

375 da = data[:AzureClient._chunk_size] 

376 data = data[AzureClient._chunk_size:] 

377 else: 

378 da = data 

379 data = None 

380 block_id = '{0:08d}'.format(index) 

381 blob_service.put_block( 

382 container_name, 

383 blob_name, 

384 da, 

385 block_id) 

386 block_ids.append(block_id) 

387 index += 1 

388 if not data: 

389 break 

390 

391 blob_service.put_block_list(container_name, blob_name, block_ids) 

392 else: 

393 blob_service.create_blob_from_bytes( 

394 container_name, blob_name, data) 

395 return blob_name 

396 

397 def download(self, 

398 blob_service, 

399 container_name, 

400 blob_name, 

401 file_path=None, 

402 append=False, 

403 chunk_size=None, 

404 stop_at=None): 

405 """ 

406 Downloads data from a blob storage to a file. 

407 No more than 64Mb can be downloaded at the same, it needs to be split into 

408 pieces. 

409 

410 @param blob_service returns by @see me open_blob_service 

411 @param container_name container name 

412 @param blob_name blob name (or list of blob names) (remote file name) 

413 @param file_path local file path 

414 @param append if True, append the content to an existing file 

415 @param chunk_size download by chunk 

416 @param stop_at stop at a given size (None to avoid stopping) 

417 @return local file or bytes if *file_path* is None 

418 

419 The code comes from 

420 `Utilisation du service de stockage d'objets blob à partir de Python 

421 <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_. 

422 

423 .. versionchanged:: 1.1 

424 Parameters *append*, *chunk_size* were added. 

425 If *file_path* is None (default value now), the function 

426 returns bytes. 

427 """ 

428 if not isinstance(blob_name, str): 

429 res = [] 

430 for blob in blob_name: 

431 dest = os.path.join(file_path, os.path.split(blob)[-1]) 

432 r = self.download( 

433 blob_service, 

434 container_name, 

435 blob, 

436 dest, 

437 append=append, 

438 chunk_size=chunk_size, 

439 stop_at=stop_at) 

440 res.append(r) 

441 if stop_at is not None: 

442 if file_path is None: 

443 stop_at -= len(r) 

444 else: 

445 stop_at -= os.stat(r).st_size 

446 if stop_at <= 0: 

447 break 

448 if file_path is None: 

449 st = io.BytesIO() 

450 for r in res: 

451 st.write(r) 

452 return st.getvalue() 

453 else: 

454 return res 

455 else: 

456 blob_name = self._interpret_path(blob_name) 

457 

458 if hasattr(blob_service, "get_blob"): 

459 # this code should disappear as it relies on an old version of 

460 # the module azure 

461 props = blob_service.get_blob_properties( 

462 container_name, blob_name) 

463 if hasattr(props, "properties"): 

464 blob_size = props.properties.content_length 

465 else: 

466 blob_size = int(props['content-length']) 

467 if chunk_size is None: 

468 chunk_size = AzureClient._chunk_size 

469 if stop_at is not None and stop_at < chunk_size: 

470 chunk_size = max(stop_at, 0) 

471 

472 def iterations(f, chunk_size, container_name, blob_name, 

473 file_path, stop_at): 

474 "local function" 

475 index = 0 

476 

477 while index < blob_size: 

478 chunk_range = 'bytes={}-{}'.format(index, 

479 index + chunk_size - 1) 

480 data = blob_service.get_blob( 

481 container_name, 

482 blob_name, 

483 x_ms_range=chunk_range) 

484 length = len(data) 

485 index += length 

486 self.LOG("downloaded ", index, 

487 "bytes from ", file_path) 

488 if length > 0: 

489 f.write(data) 

490 if length < chunk_size: 

491 return False 

492 else: 

493 return False 

494 if stop_at is not None and stop_at <= index: 

495 return False 

496 return True 

497 

498 if file_path is None: 

499 f = io.BytesIO() 

500 iterations(f, chunk_size, 

501 container_name, blob_name, file_path, stop_at) 

502 return f.getvalue() 

503 else: 

504 mode = 'ab' if append else 'wb' 

505 with open(file_path, mode) as f: 

506 iterations(f, chunk_size, 

507 container_name, blob_name, file_path, stop_at) 

508 return file_path 

509 else: 

510 bl = blob_service.get_blob_to_bytes(container_name, blob_name) 

511 if file_path is None: 

512 return bl.content 

513 else: 

514 with open(file_path, "wb") as f: 

515 f.write(bl.content) 

516 return file_path 

517 

518 def download_data(self, blob_service, container_name, blob_name, 

519 chunk_size=None, stop_at=None): 

520 """ 

521 Downloads data from a blob storage and return bytes. 

522 No more than 64Mb can be downloaded at the same, it needs to be split into 

523 pieces. 

524 

525 @param blob_service returns by @see me open_blob_service 

526 @param container_name container name 

527 @param blob_name blob name (or list of blob names) (remote file name) 

528 @param chunk_size download by chunk 

529 @param stop_at stop at a given size (None to avoid stopping) 

530 @return local file or bytes if *file_path* is None 

531 

532 .. versionadded:: 1.1 

533 """ 

534 return self.download(blob_service=blob_service, container_name=container_name, 

535 blob_name=blob_name, chunk_size=chunk_size, stop_at=stop_at) 

536 

537 def df_head(self, blob_service, container_name, blob_name, stop_at=2 ** 20, 

538 encoding="utf-8", as_df=True, merge=False, **options): 

539 """ 

540 Downloads the beginning of a stream and displays as a DataFrame. 

541 

542 @param blob_service returns by @see me open_blob_service 

543 @param container_name container name 

544 @param blob_name blob name (or list of blob names) (remote file name) 

545 @param stop_at stop at a given size (None to avoid stopping) 

546 @param encoding encoding 

547 @param as_df result as a dataframe or a string 

548 @param merge if True, *blob_name* is a folder, method @see me download_merge is called 

549 @param options see `read_csv <http://pandas.pydata.org/pandas-docs/version/0.17.0/generated/pandas.read_csv.html? 

550 highlight=read_csv#pandas.read_csv>`_ 

551 @return local file or bytes if *file_path* is None 

552 

553 .. versionadded:: 1.1 

554 """ 

555 if merge: 

556 do = self.download_merge(blob_service=blob_service, 

557 container_name=container_name, 

558 blob_folder=blob_name, 

559 stop_at=stop_at) 

560 else: 

561 do = self.download(blob_service=blob_service, 

562 container_name=container_name, 

563 blob_name=blob_name, 

564 stop_at=stop_at) 

565 text = do.decode(encoding) 

566 if as_df: 

567 pos = text.rfind("\n") 

568 if pos > 0: 

569 st = io.StringIO(text[:pos]) 

570 else: 

571 st = io.StringIO(text) 

572 import pandas # pylint: disable=C0415 

573 return pandas.read_csv(st, **options) 

574 else: 

575 return text 

576 

577 def download_merge(self, 

578 blob_service, 

579 container_name, 

580 blob_folder, 

581 file_path=None, 

582 chunk_size=None, 

583 stop_at=None): 

584 """ 

585 Downloads all files from a folder in a blob storage to a single local file. 

586 Files will be merged. 

587 No more than 64Mb can be downloaded at the same, it needs to be split into 

588 pieces. 

589 

590 @param blob_service returns by @see me open_blob_service 

591 @param container_name container name 

592 @param blob_folder blob folder(remote file name) 

593 @param file_path local file path 

594 @param chunk_size download by chunk 

595 @param stop_at stop at a given size (None to avoid stopping) 

596 @return local file 

597 

598 .. versionchanged:: 1.1 

599 Parameters *append*, *chunk_size* were added. 

600 If *file_path* is None (default value now), the function 

601 returns bytes. 

602 """ 

603 blob_folder = self._interpret_path(blob_folder) 

604 content = self.ls( 

605 blob_service, 

606 container_name, 

607 blob_folder, 

608 as_df=False) 

609 first = True 

610 store = io.BytesIO() 

611 for cont in content: 

612 if cont["content_length"] > 0: 

613 by = self.download( 

614 blob_service, 

615 container_name, 

616 cont["name"], 

617 file_path=file_path, 

618 chunk_size=chunk_size, 

619 stop_at=stop_at, 

620 append=not first) 

621 if first: 

622 first = False 

623 if file_path is None: 

624 store.write(by) 

625 if stop_at is not None: 

626 stop_at -= len(by) 

627 else: 

628 if stop_at is not None: 

629 stop_at -= os.stat(file_path).st_size 

630 if stop_at is not None and stop_at <= 0: 

631 break 

632 

633 if file_path is None: 

634 return store.getvalue() 

635 else: 

636 return file_path 

637 

638 def delete_blob(self, blob_service, container_name, blob_name): 

639 """ 

640 delete a blob 

641 

642 @param blob_service returns by @see me open_blob_service 

643 @param container_name container name 

644 @param blob_name blob name (remote file name) 

645 """ 

646 blob_name = self._interpret_path(blob_name) 

647 blob_service.delete_blob(container_name, blob_name) 

648 return blob_name 

649 

650 def delete_folder(self, blob_service, container_name, blob_folder): 

651 """ 

652 delete a folder and its content 

653 

654 @param blob_service returns by @see me open_blob_service 

655 @param container_name container name 

656 @param blob_folder blob folder (remote folder name) 

657 

658 .. versionadded:: 1.1 

659 """ 

660 blob_folder = self._interpret_path(blob_folder) 

661 df = self.ls(blob_service, container_name, blob_folder) 

662 rem = [] 

663 for name in df["name"]: 

664 r = self.delete_blob(blob_service, container_name, name) 

665 rem.append(r) 

666 return rem 

667 

668 def url_blob(self, blob_service, container, blob_name): 

669 """ 

670 returns an url for a blob file name 

671 

672 @param container container 

673 @param blob_name blob_name 

674 @return url 

675 """ 

676 blob_name = self._interpret_path(blob_name) 

677 src = blob_service.make_blob_url(container, blob_name) 

678 return src 

679 

680 def copy_blob(self, blob_service, container, blob_name, source): 

681 """ 

682 copy a blob 

683 

684 @param blob_service returns by @see me open_blob_service 

685 @param container_name container name 

686 @param blob_name destination 

687 @param source source 

688 """ 

689 blob_name = self._interpret_path(blob_name) 

690 url = self.url_blob(blob_service, container, source) 

691 res = blob_service.copy_blob(container, blob_name, url) 

692 return res 

693 

694 def url_webHCatUrl(self, cmd): 

695 """ 

696 returns an url to the cluster 

697 

698 @param cmd something like ``pig``, ``status`` 

699 @return url 

700 """ 

701 if self.hadoop_name is None: 

702 raise AttributeError( 

703 "no hadoop server was given to the constructor for cmd: {0}".format(cmd)) 

704 webHCatUrl = 'https://' + self.hadoop_name + \ 

705 '.azurehdinsight.net/templeton/v1/' + cmd 

706 return webHCatUrl 

707 

708 def wasb_to_file(self, container_name, blob_file): 

709 """ 

710 return something like ``wasb://demo@myblobstorage.blob...`` 

711 

712 @param container_name name of a container 

713 @param blob_file path to a file 

714 @return return a url to blob file (pig script for example) 

715 """ 

716 blob_file = self._interpret_path(blob_file) 

717 return 'wasb://{1}@{0}.blob.core.windows.net/{2}'.format(container_name, 

718 self.account_name, blob_file) 

719 

720 def wasb_prefix(self, container_name): 

721 """ 

722 when using an instruction ``LOAD`` in a PIG script, 

723 file blob name must be reference using a wasb syntax. 

724 This method returns the prefix to add. 

725 

726 @return wasb prefix 

727 """ 

728 return self.wasb_to_file(container_name, "") 

729 

730 def get_status(self): 

731 """ 

732 return the status of the webHCatUrl server 

733 

734 @return json 

735 """ 

736 if self.hadoop_user_name is None: 

737 raise AttributeError( 

738 "no hadoop user name was given to the constructor") 

739 if self.hadoop_key is None: 

740 raise AttributeError( 

741 "no hadoop password was given to the constructor") 

742 

743 webHCatUrl = self.url_webHCatUrl("status") 

744 

745 r = requests.get(webHCatUrl, 

746 auth=(self.hadoop_user_name, self.hadoop_key)) 

747 if r.status_code != 200: 

748 raise AzureException( 

749 "unable to the status of server: " + 

750 webHCatUrl, 

751 r) 

752 return r.json() 

753 

754 def get_version(self): 

755 """ 

756 return the status of the WebHCat version 

757 

758 @return json 

759 """ 

760 if self.hadoop_user_name is None: 

761 raise AttributeError( 

762 "no hadoop user name was given to the constructor") 

763 if self.hadoop_key is None: 

764 raise AttributeError( 

765 "no hadoop password was given to the constructor") 

766 

767 webHCatUrl = self.url_webHCatUrl("version/hive") 

768 

769 r = requests.get(webHCatUrl, 

770 auth=(self.hadoop_user_name, self.hadoop_key)) 

771 if r.status_code != 200: 

772 raise AzureException( 

773 "unable to the version of server: " + 

774 webHCatUrl, 

775 r) 

776 return r.json() 

777 

778 def pig_submit(self, blob_service, container_name, pig_file, dependencies=None, status_dir=None, 

779 stop_on_failure=True, params=None): 

780 """ 

781 Submits a :epkg:`PIG` job, the function uploads it to the cluster 

782 as well as the dependencies. 

783 

784 The code comes from `How to use HDInsight from Linux 

785 <http://blogs.msdn.com/b/benjguin/archive/2014/02/18/how-to-use-hdinsight-from-linux.aspx>`_ 

786 and `start a Pig + Jython job in HDInsight thru WebHCat 

787 <http://blogs.msdn.com/b/benjguin/archive/2014/03/21/start-a-pig-jython-job-in-hdinsight-thru-webhcat.aspx>`_. 

788 The API is described at `Pig Job — POST pig 

789 <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Pig>`_. 

790 

791 @param blob_service returns by @see me open_blob_service 

792 @param container_name name of a container 

793 @param pig_file path to the job in the blob storage 

794 @param dependencies dependencies 

795 @param status_dir folder used by Hadoop to store job's progress, it should contain 

796 your alias if you want to avoid collision with others' jobs 

797 @param stop_on_failure stop on failure, do not wait as long as possible 

798 @param params to 

799 @return json 

800 

801 .. exref:: 

802 :title: Submit a job PIG 

803 :tag: Azure 

804 

805 The script PIG must include an instruction ``LOAD``. 

806 This instruction use file name defined with the 

807 `wasb syntax <http://azure.microsoft.com/en-us/documentation/articles/hdinsight-use-blob-storage/>`_. 

808 

809 If you place the string ``$CONTAINER`` before a stream name, 

810 it should be replaced by the corresponding wasb syntax associated 

811 to the container name defined by ``container_name``. 

812 The function will then load your script, 

813 modify it and save another one with the by adding 

814 ``.wasb.pig``. 

815 Others constants you could use: 

816 

817 * ``$PSEUDO`` 

818 * ``$CONTAINER`` 

819 * ``$SCRIPTSPIG`` 

820 

821 However, this replacement is not done by this class, but your code could 

822 be such as: 

823 

824 :: 

825 

826 blobstorage = "****" 

827 blobpassword = "*********************" 

828 hadoop_name = "*********" 

829 hadoop_password = "********" 

830 username = "********" 

831 cl = AzureClient(blobstorage, 

832 blobpassword, 

833 hadoop_name, 

834 hadoop_password, 

835 username) 

836 script = ''' 

837 myinput = LOAD '$CONTAINER/input.csv' 

838 using PigStorage(',') 

839 AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ; 

840 filt = FILTER myinput BY activity == 'walking' ; 

841 STORE filt INTO '$PSEUDO/output.csv' USING PigStorage() ; 

842 ''' 

843 

844 with open("script_walking.pig","w") as f : 

845 f.write(script) 

846 

847 bs = cl.open_blob_service() 

848 js = cl.pig_submit(bs, blobstorage, "testensae/script_walking.pig") 

849 print(js) 

850 

851 js = cl.job_status('job_1414863995725_0013') 

852 

853 .. versionadded:: 1.1 

854 parameter *stop_on_failure* 

855 """ 

856 if self.hadoop_user_name is None: 

857 raise AttributeError( 

858 "no hadoop user name was given to the constructor") 

859 if self.hadoop_key is None: 

860 raise AttributeError( 

861 "no hadoop password was given to the constructor") 

862 

863 # upload 

864 scripts = self.default_parameters["SCRIPTPIG"] 

865 toup = [pig_file] 

866 if dependencies is not None: 

867 toup.extend(dependencies) 

868 res = self.upload(blob_service, container_name, scripts, toup) 

869 

870 # path modification 

871 wasb = self.wasb_to_file(container_name, res[0]) 

872 if dependencies is not None: 

873 wasbdep = ",".join( 

874 self.wasb_to_file( 

875 container_name, 

876 _) for _ in res[ 

877 1:]) 

878 else: 

879 wasbdep = None 

880 

881 # parameter 

882 args = ['-v'] 

883 for k, v in sorted(self.default_parameters.items()): 

884 if k == "CONTAINER": 

885 args.extend(["-param", '%s=%s' % 

886 (k, self.wasb_to_file(container_name, v))]) 

887 else: 

888 args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))]) 

889 if params is not None: 

890 for k, v in sorted(params.items()): 

891 args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))]) 

892 

893 if stop_on_failure: 

894 args.append("-stop_on_failure") 

895 

896 # params 

897 params = {'user.name': self.hadoop_user_name, 

898 'file': wasb, 

899 'arg': args} 

900 

901 if wasbdep is not None: 

902 params["files"] = wasbdep 

903 

904 if status_dir is not None: 

905 status_dir = self._interpret_path(status_dir) 

906 params['statusdir'] = self.wasb_to_file( 

907 container_name, status_dir + "/" + os.path.split(pig_file)[-1] + ".log") 

908 else: 

909 status_dir = self.default_parameters["SCRIPTPIG"] 

910 params['statusdir'] = self.wasb_to_file(container_name, self.default_parameters[ 

911 "SCRIPTPIG"] + "/" + os.path.split(pig_file)[-1] + ".log") 

912 

913 webHCatUrl = self.url_webHCatUrl("pig") 

914 

915 # submit the job 

916 r = requests.post(webHCatUrl, 

917 auth=(self.hadoop_user_name, self.hadoop_key), 

918 data=params) 

919 

920 if r.status_code != 200: 

921 raise AzureException( 

922 "unable to submit job: {0}\n---\nWITH PARAMS\n---\n{1}".format(pig_file, params), r) 

923 return r.json() 

924 

925 def hive_submit(self, blob_service, container_name, hive_file, dependencies=None, 

926 status_dir=None, stop_on_failure=True, params=None): 

927 """ 

928 Submits a :epkg:`HIVE` job, the function uploads it to the cluster 

929 as well as the dependencies. 

930 

931 The code comes from `How to use HDInsight from Linux 

932 <http://blogs.msdn.com/b/benjguin/archive/2014/02/18/how-to-use-hdinsight-from-linux.aspx>`_ 

933 and `start a Pig + Jython job in HDInsight thru WebHCat 

934 <http://blogs.msdn.com/b/benjguin/archive/2014/03/21/start-a-pig-jython-job-in-hdinsight-thru-webhcat.aspx>`_. 

935 The API is described at `Pig Job — POST pig 

936 <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Pig>`_. 

937 

938 @param blob_service returns by @see me open_blob_service 

939 @param container_name name of a container 

940 @param hive_file path to the job in the blob storage 

941 @param dependencies dependencies 

942 @param status_dir folder used by Hadoop to store job's progress, it should contain 

943 your alias if you want to avoid collision with others' jobs 

944 @param stop_on_failure stop on failure, do not wait as long as possible 

945 @param params to 

946 @return json 

947 

948 .. versionadded:: 1.1 

949 """ 

950 if self.hadoop_user_name is None: 

951 raise AttributeError( 

952 "no hadoop user name was given to the constructor") 

953 if self.hadoop_key is None: 

954 raise AttributeError( 

955 "no hadoop password was given to the constructor") 

956 

957 # upload 

958 scripts = self.default_parameters["SCRIPTHIVE"] 

959 toup = [hive_file] 

960 if dependencies is not None: 

961 toup.extend(dependencies) 

962 res = self.upload(blob_service, container_name, scripts, toup) 

963 

964 # path modification 

965 wasb = self.wasb_to_file(container_name, res[0]) 

966 if dependencies is not None: 

967 wasbdep = ",".join( 

968 self.wasb_to_file( 

969 container_name, 

970 _) for _ in res[ 

971 1:]) 

972 else: 

973 wasbdep = None 

974 

975 # parameter 

976 args = ['-v'] 

977 for k, v in sorted(self.default_parameters.items()): 

978 if k == "CONTAINER": 

979 args.extend(["-param", '%s=%s' % 

980 (k, self.wasb_to_file(container_name, v))]) 

981 else: 

982 args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))]) 

983 if params is not None: 

984 for k, v in sorted(params.items()): 

985 args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))]) 

986 

987 if stop_on_failure: 

988 args.append("-stop_on_failure") 

989 

990 # params 

991 params = {'user.name': self.hadoop_user_name, 

992 'file': wasb, 

993 'arg': args} 

994 

995 if wasbdep is not None: 

996 params["files"] = wasbdep 

997 

998 if status_dir is not None: 

999 status_dir = self._interpret_path(status_dir) 

1000 params['statusdir'] = self.wasb_to_file( 

1001 container_name, status_dir + "/" + os.path.split(hive_file)[-1] + ".log") 

1002 else: 

1003 status_dir = self.default_parameters["SCRIPTHIVE"] 

1004 params['statusdir'] = self.wasb_to_file(container_name, self.default_parameters[ 

1005 "SCRIPTHIVE"] + "/" + os.path.split(hive_file)[-1] + ".log") 

1006 

1007 webHCatUrl = self.url_webHCatUrl("hive") 

1008 

1009 warnings.warn("Hive submission is not tested. It will probably fail.") 

1010 

1011 # submit the job 

1012 r = requests.post(webHCatUrl, 

1013 auth=(self.hadoop_user_name, self.hadoop_key), 

1014 data=params) 

1015 

1016 if r.status_code != 200: 

1017 raise AzureException( 

1018 "unable to submit job: {0}\n---\nWITH PARAMS\n---\n{1}".format(hive_file, params), r) 

1019 return r.json() 

1020 

1021 def job_queue(self, showall=False, fields=None): 

1022 """ 

1023 returns the list of jobs 

1024 

1025 It uses the API `Job Information — GET queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Jobs>`_. 

1026 

1027 @param showall if True, show all your jobs (not only yours) 

1028 @param fields to add fields in the requests 

1029 @return list of jobs 

1030 

1031 .. exref:: 

1032 :title: List job queue 

1033 :tag: Azure 

1034 

1035 Most of the time, a job remains stuck in the job queue because 

1036 it is full. Here is a code to check that is the case on 

1037 a Azure cluster. It should be executed from a notebook. 

1038 

1039 Connection :: 

1040 

1041 blobstorage = "..." 

1042 blobpassword = "..." 

1043 hadoop_server = "..." 

1044 hadoop_password = "..." 

1045 username = "..." 

1046 

1047 %load_ext pyenbc 

1048 client, bs = %hd_open 

1049 

1050 Job queue :: 

1051 

1052 res = client.job_queue() 

1053 res.reverse() # last submitted jobs first 

1054 

1055 Displays the first 20 jobs:: 

1056 

1057 for i, r in enumerate(res[:20]): 

1058 st = client.job_status(r["id"]) 

1059 print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"]) 

1060 print(st["userargs"].get("file", None), st["profile"].get("jobName", None)) 

1061 

1062 It gives:: 

1063 

1064 0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob 

1065 wasb://..../scripts/pig/titi.pig TempletonControllerJob 

1066 1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob 

1067 wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob 

1068 2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob 

1069 wasb://..../scripts/pig/titi.pig TempletonControllerJob 

1070 3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob 

1071 wasb://..../scripts/pig/alg1.pig TempletonControllerJob 

1072 4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig 

1073 None PigLatin:pre_processing.pig 

1074 5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob 

1075 wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob 

1076 6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob 

1077 wasb://..../scripts/pig/alg1.pig TempletonControllerJob 

1078 

1079 To kill a job:: 

1080 

1081 client.job_kill("id") 

1082 """ 

1083 if self.hadoop_user_name is None: 

1084 raise AttributeError( 

1085 "no hadoop user name was given to the constructor") 

1086 if self.hadoop_key is None: 

1087 raise AttributeError( 

1088 "no hadoop password was given to the constructor") 

1089 

1090 webHCatUrl = self.url_webHCatUrl("jobs") 

1091 

1092 params = {"user.name": self.hadoop_user_name} 

1093 if showall: 

1094 params["showall"] = "true" 

1095 if fields: 

1096 if fields != "*": 

1097 raise ValueError("fields can only be *") 

1098 params["fields"] = fields 

1099 

1100 r = requests.get(webHCatUrl, 

1101 auth=(self.hadoop_user_name, self.hadoop_key), 

1102 params=params) 

1103 

1104 if r.status_code != 200: 

1105 raise AzureException("unable to get job queue", r) 

1106 return r.json() 

1107 

1108 def job_status(self, jobid): 

1109 """ 

1110 return the status of a job 

1111 

1112 see `List Versions — GET version <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Job>`_ 

1113 for the outcome 

1114 

1115 @param jobid jobid 

1116 @return json 

1117 

1118 You can extract the *startTime* by doing:: 

1119 

1120 from datetime import datetime 

1121 st = client.job_status(<job_id>) 

1122 datetime.fromtimestamp(float(st["status"]["startTime"])/1000) 

1123 """ 

1124 if self.hadoop_user_name is None: 

1125 raise AttributeError( 

1126 "no hadoop user name was given to the constructor") 

1127 if self.hadoop_key is None: 

1128 raise AttributeError( 

1129 "no hadoop password was given to the constructor") 

1130 

1131 params = {"user.name": self.hadoop_user_name} 

1132 webHCatUrl = self.url_webHCatUrl("jobs/" + jobid) 

1133 

1134 r = requests.get(webHCatUrl, 

1135 auth=(self.hadoop_user_name, self.hadoop_key), 

1136 params=params) 

1137 if r.status_code != 200: 

1138 raise AzureException( 

1139 "unable to the version of server: " + 

1140 webHCatUrl, 

1141 r) 

1142 return r.json() 

1143 

1144 def wait_job(self, job_id, delay=5, fLOG=noLOG): 

1145 """ 

1146 wait until a job has run or failed 

1147 

1148 @param job_id job_id 

1149 @param delay check every N seconds 

1150 @return status 

1151 

1152 .. versionadded:: 1.1 

1153 

1154 """ 

1155 status = self.job_status(job_id) 

1156 while status["status"]["state"] in ["PREP", "RUNNING"]: 

1157 fLOG("job_id", job_id, ":", status["status"]["state"]) 

1158 time.sleep(delay) 

1159 status = self.job_status(job_id) 

1160 return status 

1161 

1162 def standard_outputs(self, job_id, blob_service, container, folder): 

1163 """ 

1164 returns the standard output and error for a specific job id 

1165 

1166 @param job_id job_id or status 

1167 @param blob_service returns by @see me open_blob_service 

1168 @param container_name name of a container 

1169 @parm folder folder where to download them 

1170 @return out, err 

1171 """ 

1172 if isinstance(job_id, str): 

1173 status = self.job_status(job_id) 

1174 else: 

1175 status = job_id 

1176 

1177 status_dir = status["userargs"]["statusdir"] 

1178 spl = status_dir.split("core.windows.net/") # to change 

1179 path = spl[-1] 

1180 self.download( 

1181 blob_service, container, [path + "/" + _ for _ in ["stderr", "stdout"]], folder) 

1182 

1183 with open(os.path.join(folder, "stdout"), "r", encoding="utf8") as f: 

1184 out = f.read() 

1185 with open(os.path.join(folder, "stderr"), "r", encoding="utf8") as f: 

1186 err = f.read() 

1187 return out, err 

1188 

1189 def job_kill(self, jobid): 

1190 """ 

1191 kills a job 

1192 

1193 see `Delete Job — DELETE queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+DeleteJob>`_ 

1194 for the outcome 

1195 

1196 @param jobid jobid 

1197 @return json 

1198 """ 

1199 if self.hadoop_user_name is None: 

1200 raise AttributeError( 

1201 "no hadoop user name was given to the constructor") 

1202 if self.hadoop_key is None: 

1203 raise AttributeError( 

1204 "no hadoop password was given to the constructor") 

1205 

1206 params = {"user.name": self.hadoop_user_name} 

1207 webHCatUrl = self.url_webHCatUrl("jobs/" + jobid) 

1208 

1209 r = requests.delete(webHCatUrl, 

1210 auth=(self.hadoop_user_name, self.hadoop_key), 

1211 params=params) 

1212 if r.status_code != 200: 

1213 raise AzureException( 

1214 "unable to the version of server: " + 

1215 webHCatUrl, 

1216 r) 

1217 return r.json()