Examples¶
Azure¶
Get the list of containers and files from a blob storage?
The functionalities of a BlobService
are described in
blockblobservice.py.
from pyenbc.remote.azure_connection import AzureClient
cl = AzureClient("<blob_storage_service>",
"<primary_key>")
bs = cl.open_blob_service()
res = cl.ls(bs)
for r in res:
print(r["name"])
(original entry : azure_connection.py:docstring of pyenbc.remote.azure_connection.AzureClient, line 17)
List job queue
Most of the time, a job remains stuck in the job queue because it is full. Here is a code to check that is the case on a Azure cluster. It should be executed from a notebook.
Connection
blobstorage = "..."
blobpassword = "..."
hadoop_server = "..."
hadoop_password = "..."
username = "..."
%load_ext pyenbc
client, bs = %hd_open
Job queue
res = client.job_queue()
res.reverse() # last submitted jobs first
Displays the first 20 jobs:
for i, r in enumerate(res[:20]):
st = client.job_status(r["id"])
print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"])
print(st["userargs"].get("file", None), st["profile"].get("jobName", None))
It gives:
0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob
wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob
2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob
4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig
None PigLatin:pre_processing.pig
5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob
wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob
6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob
To kill a job:
client.job_kill("id")
(original entry : azure_connection.py:docstring of pyenbc.remote.azure_connection.AzureClient.job_queue, line 9)
Submit a job PIG
The script PIG must include an instruction LOAD
.
This instruction use file name defined with the
wasb syntax.
If you place the string $CONTAINER
before a stream name,
it should be replaced by the corresponding wasb syntax associated
to the container name defined by container_name
.
The function will then load your script,
modify it and save another one with the by adding
.wasb.pig
.
Others constants you could use:
$PSEUDO
$CONTAINER
$SCRIPTSPIG
However, this replacement is not done by this class, but your code could be such as:
blobstorage = "****"
blobpassword = "*********************"
hadoop_name = "*********"
hadoop_password = "********"
username = "********"
cl = AzureClient(blobstorage,
blobpassword,
hadoop_name,
hadoop_password,
username)
script = '''
myinput = LOAD '$CONTAINER/input.csv'
using PigStorage(',')
AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
filt = FILTER myinput BY activity == 'walking' ;
STORE filt INTO '$PSEUDO/output.csv' USING PigStorage() ;
'''
with open("script_walking.pig","w") as f :
f.write(script)
bs = cl.open_blob_service()
js = cl.pig_submit(bs, blobstorage, "testensae/script_walking.pig")
print(js)
js = cl.job_status('job_1414863995725_0013')
(original entry : azure_connection.py:docstring of pyenbc.remote.azure_connection.AzureClient.pig_submit, line 22)
Upload, download, to a blob storage
The following example uploads and downloads a file on a Blob Storage.
from pyenbc.remote.azure_connection import AzureClient
cl = AzureClient("<blob_storage_service>",
"<primary_key>")
bs = cl.open_blob_service()
cl.upload(bs, "<container>", "myremotefolder/remotename.txt",
"local_filename.txt")
res = cl.ls(bs,"<container>")
for r in res:
if "local_filename" in r["name"]:
print(r)
cl.download(bs, "<container>", "myremotefolder/remotename.txt",
"another_local_filename.txt")
(original entry : azure_connection.py:docstring of pyenbc.remote.azure_connection.AzureClient, line 34)
Hadoop¶
How to open a remote shell?
ssh = ASSHClient( "<server>",
"<login>",
"<password>")
ssh.connect()
out = ssh.send_recv_session("ls")
print( ssh.send_recv_session("python") )
print( ssh.send_recv_session("print('3')") )
print( ssh.send_recv_session("import sys\nsys.executable") )
print( ssh.send_recv_session("sys.exit()") )
print( ssh.send_recv_session(None) )
ssh.close_session()
ssh.close()
The notebook Communication with a remote Linux machine through SSH illustrates the output of these instructions.
(original entry : ssh_remote_connection.py:docstring of pyenbc.remote.ssh_remote_connection.ASSHClient.open_session, line 10)
Submit a HIVE query
client = ASSHClient()
hive_sql = '''
DROP TABLE IF EXISTS bikes20;
CREATE TABLE bikes20 (sjson STRING);
LOAD DATA INPATH "/user/__USERNAME__/unittest2/paris*.txt" INTO TABLE bikes20;
SELECT * FROM bikes20 LIMIT 10;
'''.replace("__USERNAME__", self.client.username)
out,err = client.hive_submit(hive_sql, redirection=None)
(original entry : ssh_remote_connection.py:docstring of pyenbc.remote.ssh_remote_connection.ASSHClient.hive_submit, line 30)