Short examples on how to connect to a cluster from a notebook and submit a job (Azure + PIG).
from jyquickhelper import add_notebook_menu
add_notebook_menu()
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00222/"
file = "bank.zip"
import pyensae
data = pyensae.download_data(file, website=url)
import pandas
df = pandas.read_csv("bank-full.csv", sep=";")
df.head()
age | job | marital | education | default | balance | housing | loan | contact | day | month | duration | campaign | pdays | previous | poutcome | y | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 58 | management | married | tertiary | no | 2143 | yes | no | unknown | 5 | may | 261 | 1 | -1 | 0 | unknown | no |
1 | 44 | technician | single | secondary | no | 29 | yes | no | unknown | 5 | may | 151 | 1 | -1 | 0 | unknown | no |
2 | 33 | entrepreneur | married | secondary | no | 2 | yes | yes | unknown | 5 | may | 76 | 1 | -1 | 0 | unknown | no |
3 | 47 | blue-collar | married | unknown | no | 1506 | yes | no | unknown | 5 | may | 92 | 1 | -1 | 0 | unknown | no |
4 | 33 | unknown | single | unknown | no | 1 | no | no | unknown | 5 | may | 198 | 1 | -1 | 0 | unknown | no |
df.to_csv("bank_full_tab_no.txt", sep="\t", index=False, header=None)
import pyensae
blobstorage =
blobpassword =
hadoop_server =
hadoop_password =
username = "centrale"
client, bs = %hd_open
client, bs
(<pyensae.remote.azure_connection.AzureClient at 0x1a349a00550>, <azure.storage.blob.blockblobservice.BlockBlobService at 0x1a349a314a8>)
%blob_up bank_full_tab_no.txt hdblobstorage/centrale2/bank_full_tab_no.txt
'centrale2/bank_full_tab_no.txt'
%blob_ls hdblobstorage/centrale2
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | centrale2/bank_full_tab_no.txt | 2016-06-16 10:18:58+00:00 | None | 3751188 | BlockBlob |
mapping = {'int64': 'double', 'float': 'double', 'object': 'chararray'}
schema = ["%s:%s" % (_[0], mapping.get(str(_[1]), _[1])) for _ in zip(df.columns, df.dtypes)]
schema = ", ".join(schema)
schema
'age:double, job:chararray, marital:chararray, education:chararray, default:chararray, balance:double, housing:chararray, loan:chararray, contact:chararray, day:double, month:chararray, duration:double, campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray'
On ajoute l'instruction DESCRIBE.
%%PIG_azure aggage3.pig
values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double,
job:chararray, marital:chararray, education:chararray,
default:chararray, balance:double, housing:chararray, loan:chararray,
contact:chararray, day:double, month:chararray, duration:double,
campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray);
DESCRIBE values;
gr = GROUP values BY loan ;
DESCRIBE gr;
agg = FOREACH gr GENERATE group, AVG(age) AS avg_age ;
DESCRIBE agg;
STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg.txt' USING PigStorage('\t') ;
jid = %hd_pig_submit aggage3.pig
jid
{'id': 'job_1466069083851_0005'}
%hd_queue
[{'detail': None, 'id': 'job_1466069083851_0005'}, {'detail': None, 'id': 'job_1466069083851_0004'}, {'detail': None, 'id': 'job_1466069083851_0003'}, {'detail': None, 'id': 'job_1466069083851_0002'}, {'detail': None, 'id': 'job_1466069083851_0001'}]
df = %hd_job_status jid['id']
df["status"]["state"]
'RUNNING'
%hd_tail_stderr -n 100 jid['id']
16/06/16 21:05:43 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL 16/06/16 21:05:43 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE 16/06/16 21:05:43 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType 2016-06-16 21:05:43,576 [main] INFO org.apache.pig.Main - Apache Pig version 0.15.0.2.3.3.1-21 (r: unknown) compiled May 04 2016, 20:06:44 2016-06-16 21:05:43,576 [main] INFO org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.7.1.2.3.3.1-21\logs\pig_1466111143557.log 2016-06-16 21:05:45,088 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found 2016-06-16 21:05:45,498 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 2016-06-16 21:05:45,498 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2016-06-16 21:05:45,498 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-3@hdblobstorage.blob.core.windows.net 2016-06-16 21:05:47,452 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2016-06-16 21:05:49,057 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1025:Invalid field projection. Projected field [age] does not exist in schema: group:chararray,values:bag{:tuple(age:double,job:chararray,marital:chararray,education:chararray,default:chararray,balance:double,housing:chararray,loan:chararray,contact:chararray,day:double,month:chararray,duration:double,campaign:double,pdays:double,previous:double,poutcome:chararray,y:chararray)}. 2016-06-16 21:05:49,057 [main] ERROR org.apache.pig.tools.grunt.Grunt - org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1001: Unable to describe schema for alias agg at org.apache.pig.PigServer.dumpSchema(PigServer.java:823) at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:321) at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:416) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205) at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81) at org.apache.pig.Main.run(Main.java:502) at org.apache.pig.Main.main(Main.java:177) Caused by: org.apache.pig.impl.plan.PlanValidationException: ERROR 1025: Invalid field projection. Projected field [age] does not exist in schema: group:chararray,values:bag{:tuple(age:double,job:chararray,marital:chararray,education:chararray,default:chararray,balance:double,housing:chararray,loan:chararray,contact:chararray,day:double,month:chararray,duration:double,campaign:double,pdays:double,previous:double,poutcome:chararray,y:chararray)}. at org.apache.pig.newplan.logical.expression.ProjectExpression.findColNum(ProjectExpression.java:191) at org.apache.pig.newplan.logical.expression.ProjectExpression.setColumnNumberFromAlias(ProjectExpression.java:174) at org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor$1.visit(ColumnAliasConversionVisitor.java:53) at org.apache.pig.newplan.logical.expression.ProjectExpression.accept(ProjectExpression.java:215) at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75) at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52) at org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor.visit(AllExpressionVisitor.java:142) at org.apache.pig.newplan.logical.relational.LOInnerLoad.accept(LOInnerLoad.java:128) at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75) at org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor.visit(AllExpressionVisitor.java:124) at org.apache.pig.newplan.logical.relational.LOForEach.accept(LOForEach.java:87) at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75) at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52) at org.apache.pig.newplan.logical.relational.LogicalPlan.validate(LogicalPlan.java:175) at org.apache.pig.PigServer$Graph.compile(PigServer.java:1767) at org.apache.pig.PigServer$Graph.access$300(PigServer.java:1443) at org.apache.pig.PigServer.buildStorePlan(PigServer.java:1339) at org.apache.pig.PigServer.getOperatorForAlias(PigServer.java:1418) at org.apache.pig.PigServer.dumpSchema(PigServer.java:806) ... 7 more Details also at logfile: C:\apps\dist\hadoop-2.7.1.2.3.3.1-21\logs\pig_1466111143557.log 2016-06-16 21:05:49,119 [main] INFO org.apache.pig.Main - Pig script completed in 5 seconds and 954 milliseconds (5954 ms)
values: {age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray} gr: {group: chararray,values: {(age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray)}}
%%PIG_azure aggage4.pig
values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double,
job:chararray, marital:chararray, education:chararray,
default:chararray, balance:double, housing:chararray, loan:chararray,
contact:chararray, day:double, month:chararray, duration:double,
campaign:double,
pdays:double, previous:double, poutcome:chararray, y:chararray);
DESCRIBE values;
gr = GROUP values BY loan ;
DESCRIBE gr;
agg = FOREACH gr GENERATE group, AVG(values.age) AS avg_age ;
DESCRIBE agg;
STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg2.txt' USING PigStorage('\t') ;
jid = %hd_pig_submit aggage4.pig
jid
{'id': 'job_1466069083851_0008'}
%hd_queue
[{'detail': None, 'id': 'job_1466069083851_0009'}, {'detail': None, 'id': 'job_1466069083851_0008'}, {'detail': None, 'id': 'job_1466069083851_0007'}, {'detail': None, 'id': 'job_1466069083851_0006'}, {'detail': None, 'id': 'job_1466069083851_0005'}, {'detail': None, 'id': 'job_1466069083851_0004'}, {'detail': None, 'id': 'job_1466069083851_0003'}, {'detail': None, 'id': 'job_1466069083851_0002'}, {'detail': None, 'id': 'job_1466069083851_0001'}]
df = %hd_job_status jid['id']
df["status"]["state"]
'RUNNING'
hd_tail_stderr -n 50 jid['id']
2016-06-16 21:13:19,066 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2016-06-16 21:13:19,410 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2016-06-16 21:13:19,410 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010 2016-06-16 21:13:19,410 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200 2016-06-16 21:13:19,504 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2016-06-16 21:13:19,629 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete 2016-06-16 21:13:19,629 [main] INFO org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics: HadoopVersion PigVersion UserId StartedAt FinishedAt Features 2.7.1.2.3.3.1-21 0.15.0.2.3.3.1-21 hdp 2016-06-16 21:12:27 2016-06-16 21:13:19 GROUP_BY Success! Job Stats (time in seconds): JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs job_1466069083851_0009 1 1 12 12 12 12 9 9 9 9 agg,gr,values GROUP_BY,COMBINER wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no_agg2.txt, Input(s): Successfully read 45212 records from: "wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no.txt" Output(s): Successfully stored 3 records in: "wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no_agg2.txt" Counters: Total records written : 3 Total bytes written : 0 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_1466069083851_0009 2016-06-16 21:13:19,848 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2016-06-16 21:13:19,848 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010 2016-06-16 21:13:19,848 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200 2016-06-16 21:13:19,926 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2016-06-16 21:13:20,160 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2016-06-16 21:13:20,160 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010 2016-06-16 21:13:20,160 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200 2016-06-16 21:13:20,238 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2016-06-16 21:13:20,506 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2016-06-16 21:13:20,506 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010 2016-06-16 21:13:20,506 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200 2016-06-16 21:13:20,582 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2016-06-16 21:13:20,646 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 7 time(s). 2016-06-16 21:13:20,646 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2016-06-16 21:13:20,725 [main] INFO org.apache.pig.Main - Pig script completed in 1 minute, 2 seconds and 364 milliseconds (62364 ms)
values: {age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray} gr: {group: chararray,values: {(age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray)}} agg: {group: chararray,avg_age: double}
%blob_ls /centrale
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | centrale/bank_full.csv | 2016-06-15 22:17:59+00:00 | None | 4610348 | BlockBlob |
1 | centrale/bank_full_tab.txt | 2016-06-15 22:19:46+00:00 | None | 3751306 | BlockBlob |
2 | centrale/bank_full_tab_no.txt | 2016-06-15 23:00:52+00:00 | None | 3751306 | BlockBlob |
3 | centrale/bank_full_tab_no_agg.txt | 2016-06-16 10:32:11+00:00 | None | 0 | BlockBlob |
4 | centrale/bank_full_tab_no_agg.txt/_SUCCESS | 2016-06-16 10:32:11+00:00 | None | 0 | BlockBlob |
5 | centrale/bank_full_tab_no_agg.txt/part-r-00000 | 2016-06-16 10:32:11+00:00 | None | 49 | BlockBlob |
6 | centrale/bank_full_tab_no_agg2.txt | 2016-06-16 21:13:14+00:00 | None | 0 | BlockBlob |
7 | centrale/bank_full_tab_no_agg2.txt/_SUCCESS | 2016-06-16 21:13:14+00:00 | None | 0 | BlockBlob |
8 | centrale/bank_full_tab_no_agg2.txt/part-r-00000 | 2016-06-16 21:13:13+00:00 | None | 49 | BlockBlob |
9 | centrale/scripts/pig/aggage.pig | 2016-06-15 23:15:54+00:00 | None | 782 | BlockBlob |
10 | centrale/scripts/pig/aggage.pig.log | 2016-06-15 23:16:40+00:00 | None | 0 | BlockBlob |
11 | centrale/scripts/pig/aggage.pig.log/exit | 2016-06-15 23:16:40+00:00 | None | 3 | BlockBlob |
12 | centrale/scripts/pig/aggage.pig.log/stderr | 2016-06-15 23:16:30+00:00 | None | 4060 | BlockBlob |
13 | centrale/scripts/pig/aggage.pig.log/stdout | 2016-06-15 23:16:30+00:00 | None | 0 | BlockBlob |
14 | centrale/scripts/pig/aggage2.pig | 2016-06-16 10:28:16+00:00 | None | 853 | BlockBlob |
15 | centrale/scripts/pig/aggage2.pig.log | 2016-06-16 10:29:04+00:00 | None | 0 | BlockBlob |
16 | centrale/scripts/pig/aggage2.pig.log/exit | 2016-06-16 10:29:04+00:00 | None | 3 | BlockBlob |
17 | centrale/scripts/pig/aggage2.pig.log/stderr | 2016-06-16 10:28:54+00:00 | None | 4883 | BlockBlob |
18 | centrale/scripts/pig/aggage2.pig.log/stdout | 2016-06-16 10:28:54+00:00 | None | 613 | BlockBlob |
19 | centrale/scripts/pig/aggage3.pig | 2016-06-16 21:05:11+00:00 | None | 853 | BlockBlob |
20 | centrale/scripts/pig/aggage3.pig.log | 2016-06-16 21:05:59+00:00 | None | 0 | BlockBlob |
21 | centrale/scripts/pig/aggage3.pig.log/exit | 2016-06-16 21:05:59+00:00 | None | 3 | BlockBlob |
22 | centrale/scripts/pig/aggage3.pig.log/stderr | 2016-06-16 21:05:49+00:00 | None | 4883 | BlockBlob |
23 | centrale/scripts/pig/aggage3.pig.log/stdout | 2016-06-16 21:05:49+00:00 | None | 613 | BlockBlob |
24 | centrale/scripts/pig/aggage4.pig | 2016-06-16 21:11:47+00:00 | None | 861 | BlockBlob |
25 | centrale/scripts/pig/aggage4.pig.log | 2016-06-16 21:13:31+00:00 | None | 0 | BlockBlob |
26 | centrale/scripts/pig/aggage4.pig.log/exit | 2016-06-16 21:13:31+00:00 | None | 3 | BlockBlob |
27 | centrale/scripts/pig/aggage4.pig.log/stderr | 2016-06-16 21:13:21+00:00 | None | 16643 | BlockBlob |
28 | centrale/scripts/pig/aggage4.pig.log/stdout | 2016-06-16 21:13:21+00:00 | None | 654 | BlockBlob |
29 | centrale2/bank_full_tab_no.txt | 2016-06-16 10:18:58+00:00 | None | 3751188 | BlockBlob |
%blob_downmerge --help
usage: blob_downmerge [-h] [-o] remotepath localfile download a set of files from a blob storage folder, files will be merged, we assume the container is the first element to the remote path positional arguments: remotepath remote path of the folder to download localfile local name for the downloaded merged file optional arguments: -h, --help show this help message and exit -o, --overwrite overwrite the local file usage: blob_downmerge [-h] [-o] remotepath localfile
%blob_down /centrale/bank_full_tab_no_agg2.txt/part-r-00000 agg_hadoop3.txt
'agg_hadoop3.txt'
import pandas
df = pandas.read_csv("agg_hadoop3.txt", sep="\t", header=-1)
df
0 | 1 | |
---|---|---|
0 | no | 41.008823 |
1 | yes | 40.555632 |
2 | loan | NaN |
J'ai oublié d'enlever le header. On vérifie que les calcus sont bons en les faisant en local.
df = pandas.read_csv("bank-full.csv", sep=";")
df.head()
age | job | marital | education | default | balance | housing | loan | contact | day | month | duration | campaign | pdays | previous | poutcome | y | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 58 | management | married | tertiary | no | 2143 | yes | no | unknown | 5 | may | 261 | 1 | -1 | 0 | unknown | no |
1 | 44 | technician | single | secondary | no | 29 | yes | no | unknown | 5 | may | 151 | 1 | -1 | 0 | unknown | no |
2 | 33 | entrepreneur | married | secondary | no | 2 | yes | yes | unknown | 5 | may | 76 | 1 | -1 | 0 | unknown | no |
3 | 47 | blue-collar | married | unknown | no | 1506 | yes | no | unknown | 5 | may | 92 | 1 | -1 | 0 | unknown | no |
4 | 33 | unknown | single | unknown | no | 1 | no | no | unknown | 5 | may | 198 | 1 | -1 | 0 | unknown | no |
df[["loan", "age"]].groupby("loan").mean()
age | |
---|---|
loan | |
no | 41.008823 |
yes | 40.555632 |