HDInsight, PIG

Links: notebook, html, PDF, python, slides, GitHub

Short examples on how to connect to a cluster from a notebook and submit a job (Azure + PIG).

from jyquickhelper import add_notebook_menu
add_notebook_menu()

Download the data

url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00222/"
file = "bank.zip"
import pyensae
data = pyensae.download_data(file, website=url)
import pandas
df = pandas.read_csv("bank-full.csv", sep=";")
df.head()
age job marital education default balance housing loan contact day month duration campaign pdays previous poutcome y
0 58 management married tertiary no 2143 yes no unknown 5 may 261 1 -1 0 unknown no
1 44 technician single secondary no 29 yes no unknown 5 may 151 1 -1 0 unknown no
2 33 entrepreneur married secondary no 2 yes yes unknown 5 may 76 1 -1 0 unknown no
3 47 blue-collar married unknown no 1506 yes no unknown 5 may 92 1 -1 0 unknown no
4 33 unknown single unknown no 1 no no unknown 5 may 198 1 -1 0 unknown no
df.to_csv("bank_full_tab_no.txt", sep="\t", index=False, header=None)

Connect to the cluster

import pyensae
blobstorage =
blobpassword =
hadoop_server =
hadoop_password =
username = "centrale"
client, bs =  %hd_open
client, bs
(<pyensae.remote.azure_connection.AzureClient at 0x1a349a00550>,
 <azure.storage.blob.blockblobservice.BlockBlobService at 0x1a349a314a8>)

Upload the data

%blob_up bank_full_tab_no.txt hdblobstorage/centrale2/bank_full_tab_no.txt
'centrale2/bank_full_tab_no.txt'
%blob_ls hdblobstorage/centrale2
name last_modified content_type content_length blob_type
0 centrale2/bank_full_tab_no.txt 2016-06-16 10:18:58+00:00 None 3751188 BlockBlob

Submit a PIG query

mapping = {'int64': 'double', 'float': 'double', 'object': 'chararray'}
schema = ["%s:%s" % (_[0], mapping.get(str(_[1]), _[1])) for _ in zip(df.columns, df.dtypes)]
schema = ", ".join(schema)
schema
'age:double, job:chararray, marital:chararray, education:chararray, default:chararray, balance:double, housing:chararray, loan:chararray, contact:chararray, day:double, month:chararray, duration:double, campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray'

On ajoute l’instruction DESCRIBE.

%%PIG_azure aggage3.pig
values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double,
                    job:chararray, marital:chararray, education:chararray,
                   default:chararray, balance:double, housing:chararray, loan:chararray,
                   contact:chararray, day:double, month:chararray, duration:double,
                   campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray);
DESCRIBE values;
gr = GROUP values BY loan ;
DESCRIBE gr;
agg = FOREACH gr GENERATE group, AVG(age) AS avg_age ;
DESCRIBE agg;
STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg.txt' USING PigStorage('\t') ;
jid = %hd_pig_submit aggage3.pig
jid
{'id': 'job_1466069083851_0005'}
%hd_queue
[{'detail': None, 'id': 'job_1466069083851_0005'},
 {'detail': None, 'id': 'job_1466069083851_0004'},
 {'detail': None, 'id': 'job_1466069083851_0003'},
 {'detail': None, 'id': 'job_1466069083851_0002'},
 {'detail': None, 'id': 'job_1466069083851_0001'}]
df = %hd_job_status jid['id']
df["status"]["state"]
'RUNNING'
%hd_tail_stderr -n 100 jid['id']
16/06/16 21:05:43 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
16/06/16 21:05:43 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
16/06/16 21:05:43 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2016-06-16 21:05:43,576 [main] INFO  org.apache.pig.Main - Apache Pig version 0.15.0.2.3.3.1-21 (r: unknown) compiled May 04 2016, 20:06:44
2016-06-16 21:05:43,576 [main] INFO  org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.7.1.2.3.3.1-21\logs\pig_1466111143557.log
2016-06-16 21:05:45,088 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found
2016-06-16 21:05:45,498 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2016-06-16 21:05:45,498 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2016-06-16 21:05:45,498 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-3@hdblobstorage.blob.core.windows.net
2016-06-16 21:05:47,452 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2016-06-16 21:05:49,057 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1025:
 Invalid field projection. Projected field [age] does not exist in schema: group:chararray,values:bag{:tuple(age:double,job:chararray,marital:chararray,education:chararray,default:chararray,balance:double,housing:chararray,loan:chararray,contact:chararray,day:double,month:chararray,duration:double,campaign:double,pdays:double,previous:double,poutcome:chararray,y:chararray)}.
2016-06-16 21:05:49,057 [main] ERROR org.apache.pig.tools.grunt.Grunt - org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1001: Unable to describe schema for alias agg
    at org.apache.pig.PigServer.dumpSchema(PigServer.java:823)
    at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:321)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:416)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
    at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
    at org.apache.pig.Main.run(Main.java:502)
    at org.apache.pig.Main.main(Main.java:177)
Caused by: org.apache.pig.impl.plan.PlanValidationException: ERROR 1025:
 Invalid field projection. Projected field [age] does not exist in schema: group:chararray,values:bag{:tuple(age:double,job:chararray,marital:chararray,education:chararray,default:chararray,balance:double,housing:chararray,loan:chararray,contact:chararray,day:double,month:chararray,duration:double,campaign:double,pdays:double,previous:double,poutcome:chararray,y:chararray)}.
    at org.apache.pig.newplan.logical.expression.ProjectExpression.findColNum(ProjectExpression.java:191)
    at org.apache.pig.newplan.logical.expression.ProjectExpression.setColumnNumberFromAlias(ProjectExpression.java:174)
    at org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor$1.visit(ColumnAliasConversionVisitor.java:53)
    at org.apache.pig.newplan.logical.expression.ProjectExpression.accept(ProjectExpression.java:215)
    at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
    at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
    at org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor.visit(AllExpressionVisitor.java:142)
    at org.apache.pig.newplan.logical.relational.LOInnerLoad.accept(LOInnerLoad.java:128)
    at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
    at org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor.visit(AllExpressionVisitor.java:124)
    at org.apache.pig.newplan.logical.relational.LOForEach.accept(LOForEach.java:87)
    at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
    at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
    at org.apache.pig.newplan.logical.relational.LogicalPlan.validate(LogicalPlan.java:175)
    at org.apache.pig.PigServer$Graph.compile(PigServer.java:1767)
    at org.apache.pig.PigServer$Graph.access$300(PigServer.java:1443)
    at org.apache.pig.PigServer.buildStorePlan(PigServer.java:1339)
    at org.apache.pig.PigServer.getOperatorForAlias(PigServer.java:1418)
    at org.apache.pig.PigServer.dumpSchema(PigServer.java:806)
    ... 7 more

Details also at logfile: C:\apps\dist\hadoop-2.7.1.2.3.3.1-21\logs\pig_1466111143557.log
2016-06-16 21:05:49,119 [main] INFO  org.apache.pig.Main - Pig script completed in 5 seconds and 954 milliseconds (5954 ms)


OUT:
values: {age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray}
gr: {group: chararray,values: {(age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray)}}

%%PIG_azure aggage4.pig
values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double,
                                                    job:chararray, marital:chararray, education:chararray,
                                                   default:chararray, balance:double, housing:chararray, loan:chararray,
                                                   contact:chararray, day:double, month:chararray, duration:double,
                                                   campaign:double,
                                                   pdays:double, previous:double, poutcome:chararray, y:chararray);
DESCRIBE values;
gr = GROUP values BY loan ;
DESCRIBE gr;
agg = FOREACH gr GENERATE group, AVG(values.age) AS avg_age ;
DESCRIBE agg;
STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg2.txt' USING PigStorage('\t') ;
jid = %hd_pig_submit aggage4.pig
jid
{'id': 'job_1466069083851_0008'}
%hd_queue
[{'detail': None, 'id': 'job_1466069083851_0009'},
 {'detail': None, 'id': 'job_1466069083851_0008'},
 {'detail': None, 'id': 'job_1466069083851_0007'},
 {'detail': None, 'id': 'job_1466069083851_0006'},
 {'detail': None, 'id': 'job_1466069083851_0005'},
 {'detail': None, 'id': 'job_1466069083851_0004'},
 {'detail': None, 'id': 'job_1466069083851_0003'},
 {'detail': None, 'id': 'job_1466069083851_0002'},
 {'detail': None, 'id': 'job_1466069083851_0001'}]
df = %hd_job_status jid['id']
df["status"]["state"]
'RUNNING'
hd_tail_stderr -n 50 jid['id']
2016-06-16 21:13:19,066 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
2016-06-16 21:13:19,504 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:19,629 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2016-06-16 21:13:19,629 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:

HadoopVersion       PigVersion      UserId  StartedAt       FinishedAt      Features
2.7.1.2.3.3.1-21    0.15.0.2.3.3.1-21       hdp     2016-06-16 21:12:27     2016-06-16 21:13:19     GROUP_BY

Success!

Job Stats (time in seconds):
JobId       Maps    Reduces MaxMapTime      MinMapTime      AvgMapTime      MedianMapTime   MaxReduceTime   MinReduceTime   AvgReduceTime   MedianReducetime        Alias   Feature Outputs
job_1466069083851_0009      1       1       12      12      12      12      9       9       9       9       agg,gr,values   GROUP_BY,COMBINER       wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no_agg2.txt,

Input(s):
Successfully read 45212 records from: "wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no.txt"

Output(s):
Successfully stored 3 records in: "wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no_agg2.txt"

Counters:
Total records written : 3
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1466069083851_0009


2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
2016-06-16 21:13:19,926 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
2016-06-16 21:13:20,238 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
2016-06-16 21:13:20,582 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:20,646 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 7 time(s).
2016-06-16 21:13:20,646 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2016-06-16 21:13:20,725 [main] INFO  org.apache.pig.Main - Pig script completed in 1 minute, 2 seconds and 364 milliseconds (62364 ms)


OUT:
values: {age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray}
gr: {group: chararray,values: {(age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray)}}
agg: {group: chararray,avg_age: double}

%blob_ls /centrale
name last_modified content_type content_length blob_type
0 centrale/bank_full.csv 2016-06-15 22:17:59+00:00 None 4610348 BlockBlob
1 centrale/bank_full_tab.txt 2016-06-15 22:19:46+00:00 None 3751306 BlockBlob
2 centrale/bank_full_tab_no.txt 2016-06-15 23:00:52+00:00 None 3751306 BlockBlob
3 centrale/bank_full_tab_no_agg.txt 2016-06-16 10:32:11+00:00 None 0 BlockBlob
4 centrale/bank_full_tab_no_agg.txt/_SUCCESS 2016-06-16 10:32:11+00:00 None 0 BlockBlob
5 centrale/bank_full_tab_no_agg.txt/part-r-00000 2016-06-16 10:32:11+00:00 None 49 BlockBlob
6 centrale/bank_full_tab_no_agg2.txt 2016-06-16 21:13:14+00:00 None 0 BlockBlob
7 centrale/bank_full_tab_no_agg2.txt/_SUCCESS 2016-06-16 21:13:14+00:00 None 0 BlockBlob
8 centrale/bank_full_tab_no_agg2.txt/part-r-00000 2016-06-16 21:13:13+00:00 None 49 BlockBlob
9 centrale/scripts/pig/aggage.pig 2016-06-15 23:15:54+00:00 None 782 BlockBlob
10 centrale/scripts/pig/aggage.pig.log 2016-06-15 23:16:40+00:00 None 0 BlockBlob
11 centrale/scripts/pig/aggage.pig.log/exit 2016-06-15 23:16:40+00:00 None 3 BlockBlob
12 centrale/scripts/pig/aggage.pig.log/stderr 2016-06-15 23:16:30+00:00 None 4060 BlockBlob
13 centrale/scripts/pig/aggage.pig.log/stdout 2016-06-15 23:16:30+00:00 None 0 BlockBlob
14 centrale/scripts/pig/aggage2.pig 2016-06-16 10:28:16+00:00 None 853 BlockBlob
15 centrale/scripts/pig/aggage2.pig.log 2016-06-16 10:29:04+00:00 None 0 BlockBlob
16 centrale/scripts/pig/aggage2.pig.log/exit 2016-06-16 10:29:04+00:00 None 3 BlockBlob
17 centrale/scripts/pig/aggage2.pig.log/stderr 2016-06-16 10:28:54+00:00 None 4883 BlockBlob
18 centrale/scripts/pig/aggage2.pig.log/stdout 2016-06-16 10:28:54+00:00 None 613 BlockBlob
19 centrale/scripts/pig/aggage3.pig 2016-06-16 21:05:11+00:00 None 853 BlockBlob
20 centrale/scripts/pig/aggage3.pig.log 2016-06-16 21:05:59+00:00 None 0 BlockBlob
21 centrale/scripts/pig/aggage3.pig.log/exit 2016-06-16 21:05:59+00:00 None 3 BlockBlob
22 centrale/scripts/pig/aggage3.pig.log/stderr 2016-06-16 21:05:49+00:00 None 4883 BlockBlob
23 centrale/scripts/pig/aggage3.pig.log/stdout 2016-06-16 21:05:49+00:00 None 613 BlockBlob
24 centrale/scripts/pig/aggage4.pig 2016-06-16 21:11:47+00:00 None 861 BlockBlob
25 centrale/scripts/pig/aggage4.pig.log 2016-06-16 21:13:31+00:00 None 0 BlockBlob
26 centrale/scripts/pig/aggage4.pig.log/exit 2016-06-16 21:13:31+00:00 None 3 BlockBlob
27 centrale/scripts/pig/aggage4.pig.log/stderr 2016-06-16 21:13:21+00:00 None 16643 BlockBlob
28 centrale/scripts/pig/aggage4.pig.log/stdout 2016-06-16 21:13:21+00:00 None 654 BlockBlob
29 centrale2/bank_full_tab_no.txt 2016-06-16 10:18:58+00:00 None 3751188 BlockBlob
%blob_downmerge --help
usage: blob_downmerge [-h] [-o] remotepath localfile
download a set of files from a blob storage folder, files will be merged, we
assume the container is the first element to the remote path
positional arguments:
  remotepath       remote path of the folder to download
  localfile        local name for the downloaded merged file
optional arguments:
  -h, --help       show this help message and exit
  -o, --overwrite  overwrite the local file
usage: blob_downmerge [-h] [-o] remotepath localfile
%blob_down /centrale/bank_full_tab_no_agg2.txt/part-r-00000 agg_hadoop3.txt
'agg_hadoop3.txt'
import pandas
df = pandas.read_csv("agg_hadoop3.txt", sep="\t", header=-1)
df
0 1
0 no 41.008823
1 yes 40.555632
2 loan NaN

J’ai oublié d’enlever le header. On vérifie que les calcus sont bons en les faisant en local.

df = pandas.read_csv("bank-full.csv", sep=";")
df.head()
age job marital education default balance housing loan contact day month duration campaign pdays previous poutcome y
0 58 management married tertiary no 2143 yes no unknown 5 may 261 1 -1 0 unknown no
1 44 technician single secondary no 29 yes no unknown 5 may 151 1 -1 0 unknown no
2 33 entrepreneur married secondary no 2 yes yes unknown 5 may 76 1 -1 0 unknown no
3 47 blue-collar married unknown no 1506 yes no unknown 5 may 92 1 -1 0 unknown no
4 33 unknown single unknown no 1 no no unknown 5 may 198 1 -1 0 unknown no
df[["loan", "age"]].groupby("loan").mean()
age
loan
no 41.008823
yes 40.555632