.. _azurepigrst: ============== HDInsight, PIG ============== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/2016/centrale/azure_pig.ipynb|*` Short examples on how to connect to a cluster from a notebook and submit a job (Azure + PIG). .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Download the data ----------------- .. code:: ipython3 url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00222/" file = "bank.zip" import pyensae data = pyensae.download_data(file, website=url) .. code:: ipython3 import pandas df = pandas.read_csv("bank-full.csv", sep=";") .. code:: ipython3 df.head() .. raw:: html
age job marital education default balance housing loan contact day month duration campaign pdays previous poutcome y
0 58 management married tertiary no 2143 yes no unknown 5 may 261 1 -1 0 unknown no
1 44 technician single secondary no 29 yes no unknown 5 may 151 1 -1 0 unknown no
2 33 entrepreneur married secondary no 2 yes yes unknown 5 may 76 1 -1 0 unknown no
3 47 blue-collar married unknown no 1506 yes no unknown 5 may 92 1 -1 0 unknown no
4 33 unknown single unknown no 1 no no unknown 5 may 198 1 -1 0 unknown no
.. code:: ipython3 df.to_csv("bank_full_tab_no.txt", sep="\t", index=False, header=None) Connect to the cluster ---------------------- .. code:: ipython3 import pyensae blobstorage = blobpassword = hadoop_server = hadoop_password = username = "centrale" client, bs = %hd_open client, bs .. parsed-literal:: (, ) Upload the data --------------- .. code:: ipython3 %blob_up bank_full_tab_no.txt hdblobstorage/centrale2/bank_full_tab_no.txt .. parsed-literal:: 'centrale2/bank_full_tab_no.txt' .. code:: ipython3 %blob_ls hdblobstorage/centrale2 .. raw:: html
name last_modified content_type content_length blob_type
0 centrale2/bank_full_tab_no.txt 2016-06-16 10:18:58+00:00 None 3751188 BlockBlob
Submit a PIG query ------------------ .. code:: ipython3 mapping = {'int64': 'double', 'float': 'double', 'object': 'chararray'} schema = ["%s:%s" % (_[0], mapping.get(str(_[1]), _[1])) for _ in zip(df.columns, df.dtypes)] schema = ", ".join(schema) schema .. parsed-literal:: 'age:double, job:chararray, marital:chararray, education:chararray, default:chararray, balance:double, housing:chararray, loan:chararray, contact:chararray, day:double, month:chararray, duration:double, campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray' On ajoute l’instruction `DESCRIBE `__. .. code:: ipython3 %%PIG_azure aggage3.pig values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double, job:chararray, marital:chararray, education:chararray, default:chararray, balance:double, housing:chararray, loan:chararray, contact:chararray, day:double, month:chararray, duration:double, campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray); DESCRIBE values; gr = GROUP values BY loan ; DESCRIBE gr; agg = FOREACH gr GENERATE group, AVG(age) AS avg_age ; DESCRIBE agg; STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg.txt' USING PigStorage('\t') ; .. code:: ipython3 jid = %hd_pig_submit aggage3.pig .. code:: ipython3 jid .. parsed-literal:: {'id': 'job_1466069083851_0005'} .. code:: ipython3 %hd_queue .. parsed-literal:: [{'detail': None, 'id': 'job_1466069083851_0005'}, {'detail': None, 'id': 'job_1466069083851_0004'}, {'detail': None, 'id': 'job_1466069083851_0003'}, {'detail': None, 'id': 'job_1466069083851_0002'}, {'detail': None, 'id': 'job_1466069083851_0001'}] .. code:: ipython3 df = %hd_job_status jid['id'] df["status"]["state"] .. parsed-literal:: 'RUNNING' .. code:: ipython3 %hd_tail_stderr -n 100 jid['id'] .. raw:: html
    16/06/16 21:05:43 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
    16/06/16 21:05:43 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
    16/06/16 21:05:43 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
    2016-06-16 21:05:43,576 [main] INFO  org.apache.pig.Main - Apache Pig version 0.15.0.2.3.3.1-21 (r: unknown) compiled May 04 2016, 20:06:44
    2016-06-16 21:05:43,576 [main] INFO  org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.7.1.2.3.3.1-21\logs\pig_1466111143557.log
    2016-06-16 21:05:45,088 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found
    2016-06-16 21:05:45,498 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
    2016-06-16 21:05:45,498 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
    2016-06-16 21:05:45,498 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-3@hdblobstorage.blob.core.windows.net
    2016-06-16 21:05:47,452 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
    2016-06-16 21:05:49,057 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1025: 
     Invalid field projection. Projected field [age] does not exist in schema: group:chararray,values:bag{:tuple(age:double,job:chararray,marital:chararray,education:chararray,default:chararray,balance:double,housing:chararray,loan:chararray,contact:chararray,day:double,month:chararray,duration:double,campaign:double,pdays:double,previous:double,poutcome:chararray,y:chararray)}.
    2016-06-16 21:05:49,057 [main] ERROR org.apache.pig.tools.grunt.Grunt - org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1001: Unable to describe schema for alias agg
    	at org.apache.pig.PigServer.dumpSchema(PigServer.java:823)
    	at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:321)
    	at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:416)
    	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
    	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
    	at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
    	at org.apache.pig.Main.run(Main.java:502)
    	at org.apache.pig.Main.main(Main.java:177)
    Caused by: org.apache.pig.impl.plan.PlanValidationException: ERROR 1025: 
     Invalid field projection. Projected field [age] does not exist in schema: group:chararray,values:bag{:tuple(age:double,job:chararray,marital:chararray,education:chararray,default:chararray,balance:double,housing:chararray,loan:chararray,contact:chararray,day:double,month:chararray,duration:double,campaign:double,pdays:double,previous:double,poutcome:chararray,y:chararray)}.
    	at org.apache.pig.newplan.logical.expression.ProjectExpression.findColNum(ProjectExpression.java:191)
    	at org.apache.pig.newplan.logical.expression.ProjectExpression.setColumnNumberFromAlias(ProjectExpression.java:174)
    	at org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor$1.visit(ColumnAliasConversionVisitor.java:53)
    	at org.apache.pig.newplan.logical.expression.ProjectExpression.accept(ProjectExpression.java:215)
    	at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
    	at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
    	at org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor.visit(AllExpressionVisitor.java:142)
    	at org.apache.pig.newplan.logical.relational.LOInnerLoad.accept(LOInnerLoad.java:128)
    	at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
    	at org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor.visit(AllExpressionVisitor.java:124)
    	at org.apache.pig.newplan.logical.relational.LOForEach.accept(LOForEach.java:87)
    	at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
    	at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
    	at org.apache.pig.newplan.logical.relational.LogicalPlan.validate(LogicalPlan.java:175)
    	at org.apache.pig.PigServer$Graph.compile(PigServer.java:1767)
    	at org.apache.pig.PigServer$Graph.access$300(PigServer.java:1443)
    	at org.apache.pig.PigServer.buildStorePlan(PigServer.java:1339)
    	at org.apache.pig.PigServer.getOperatorForAlias(PigServer.java:1418)
    	at org.apache.pig.PigServer.dumpSchema(PigServer.java:806)
    	... 7 more

    Details also at logfile: C:\apps\dist\hadoop-2.7.1.2.3.3.1-21\logs\pig_1466111143557.log
    2016-06-16 21:05:49,119 [main] INFO  org.apache.pig.Main - Pig script completed in 5 seconds and 954 milliseconds (5954 ms)

    

OUT:
    values: {age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray}
    gr: {group: chararray,values: {(age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray)}}

    
.. code:: ipython3 %%PIG_azure aggage4.pig values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double, job:chararray, marital:chararray, education:chararray, default:chararray, balance:double, housing:chararray, loan:chararray, contact:chararray, day:double, month:chararray, duration:double, campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray); DESCRIBE values; gr = GROUP values BY loan ; DESCRIBE gr; agg = FOREACH gr GENERATE group, AVG(values.age) AS avg_age ; DESCRIBE agg; STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg2.txt' USING PigStorage('\t') ; .. code:: ipython3 jid = %hd_pig_submit aggage4.pig .. code:: ipython3 jid .. parsed-literal:: {'id': 'job_1466069083851_0008'} .. code:: ipython3 %hd_queue .. parsed-literal:: [{'detail': None, 'id': 'job_1466069083851_0009'}, {'detail': None, 'id': 'job_1466069083851_0008'}, {'detail': None, 'id': 'job_1466069083851_0007'}, {'detail': None, 'id': 'job_1466069083851_0006'}, {'detail': None, 'id': 'job_1466069083851_0005'}, {'detail': None, 'id': 'job_1466069083851_0004'}, {'detail': None, 'id': 'job_1466069083851_0003'}, {'detail': None, 'id': 'job_1466069083851_0002'}, {'detail': None, 'id': 'job_1466069083851_0001'}] .. code:: ipython3 df = %hd_job_status jid['id'] df["status"]["state"] .. parsed-literal:: 'RUNNING' .. code:: ipython3 hd_tail_stderr -n 50 jid['id'] .. raw:: html
    2016-06-16 21:13:19,066 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
    2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
    2016-06-16 21:13:19,504 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2016-06-16 21:13:19,629 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
    2016-06-16 21:13:19,629 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics: 

    HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
    2.7.1.2.3.3.1-21	0.15.0.2.3.3.1-21	hdp	2016-06-16 21:12:27	2016-06-16 21:13:19	GROUP_BY

    Success!

    Job Stats (time in seconds):
    JobId	Maps	Reduces	MaxMapTime	MinMapTime	AvgMapTime	MedianMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	MedianReducetime	Alias	Feature	Outputs
    job_1466069083851_0009	1	1	12	12	12	12	9	9	9	9	agg,gr,values	GROUP_BY,COMBINER	wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no_agg2.txt,

    Input(s):
    Successfully read 45212 records from: "wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no.txt"

    Output(s):
    Successfully stored 3 records in: "wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no_agg2.txt"

    Counters:
    Total records written : 3
    Total bytes written : 0
    Spillable Memory Manager spill count : 0
    Total bags proactively spilled: 0
    Total records proactively spilled: 0

    Job DAG:
    job_1466069083851_0009


    2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
    2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
    2016-06-16 21:13:19,926 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
    2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
    2016-06-16 21:13:20,238 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
    2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
    2016-06-16 21:13:20,582 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2016-06-16 21:13:20,646 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 7 time(s).
    2016-06-16 21:13:20,646 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
    2016-06-16 21:13:20,725 [main] INFO  org.apache.pig.Main - Pig script completed in 1 minute, 2 seconds and 364 milliseconds (62364 ms)

    

OUT:
    values: {age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray}
    gr: {group: chararray,values: {(age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray)}}
    agg: {group: chararray,avg_age: double}

    
.. code:: ipython3 %blob_ls /centrale .. raw:: html
name last_modified content_type content_length blob_type
0 centrale/bank_full.csv 2016-06-15 22:17:59+00:00 None 4610348 BlockBlob
1 centrale/bank_full_tab.txt 2016-06-15 22:19:46+00:00 None 3751306 BlockBlob
2 centrale/bank_full_tab_no.txt 2016-06-15 23:00:52+00:00 None 3751306 BlockBlob
3 centrale/bank_full_tab_no_agg.txt 2016-06-16 10:32:11+00:00 None 0 BlockBlob
4 centrale/bank_full_tab_no_agg.txt/_SUCCESS 2016-06-16 10:32:11+00:00 None 0 BlockBlob
5 centrale/bank_full_tab_no_agg.txt/part-r-00000 2016-06-16 10:32:11+00:00 None 49 BlockBlob
6 centrale/bank_full_tab_no_agg2.txt 2016-06-16 21:13:14+00:00 None 0 BlockBlob
7 centrale/bank_full_tab_no_agg2.txt/_SUCCESS 2016-06-16 21:13:14+00:00 None 0 BlockBlob
8 centrale/bank_full_tab_no_agg2.txt/part-r-00000 2016-06-16 21:13:13+00:00 None 49 BlockBlob
9 centrale/scripts/pig/aggage.pig 2016-06-15 23:15:54+00:00 None 782 BlockBlob
10 centrale/scripts/pig/aggage.pig.log 2016-06-15 23:16:40+00:00 None 0 BlockBlob
11 centrale/scripts/pig/aggage.pig.log/exit 2016-06-15 23:16:40+00:00 None 3 BlockBlob
12 centrale/scripts/pig/aggage.pig.log/stderr 2016-06-15 23:16:30+00:00 None 4060 BlockBlob
13 centrale/scripts/pig/aggage.pig.log/stdout 2016-06-15 23:16:30+00:00 None 0 BlockBlob
14 centrale/scripts/pig/aggage2.pig 2016-06-16 10:28:16+00:00 None 853 BlockBlob
15 centrale/scripts/pig/aggage2.pig.log 2016-06-16 10:29:04+00:00 None 0 BlockBlob
16 centrale/scripts/pig/aggage2.pig.log/exit 2016-06-16 10:29:04+00:00 None 3 BlockBlob
17 centrale/scripts/pig/aggage2.pig.log/stderr 2016-06-16 10:28:54+00:00 None 4883 BlockBlob
18 centrale/scripts/pig/aggage2.pig.log/stdout 2016-06-16 10:28:54+00:00 None 613 BlockBlob
19 centrale/scripts/pig/aggage3.pig 2016-06-16 21:05:11+00:00 None 853 BlockBlob
20 centrale/scripts/pig/aggage3.pig.log 2016-06-16 21:05:59+00:00 None 0 BlockBlob
21 centrale/scripts/pig/aggage3.pig.log/exit 2016-06-16 21:05:59+00:00 None 3 BlockBlob
22 centrale/scripts/pig/aggage3.pig.log/stderr 2016-06-16 21:05:49+00:00 None 4883 BlockBlob
23 centrale/scripts/pig/aggage3.pig.log/stdout 2016-06-16 21:05:49+00:00 None 613 BlockBlob
24 centrale/scripts/pig/aggage4.pig 2016-06-16 21:11:47+00:00 None 861 BlockBlob
25 centrale/scripts/pig/aggage4.pig.log 2016-06-16 21:13:31+00:00 None 0 BlockBlob
26 centrale/scripts/pig/aggage4.pig.log/exit 2016-06-16 21:13:31+00:00 None 3 BlockBlob
27 centrale/scripts/pig/aggage4.pig.log/stderr 2016-06-16 21:13:21+00:00 None 16643 BlockBlob
28 centrale/scripts/pig/aggage4.pig.log/stdout 2016-06-16 21:13:21+00:00 None 654 BlockBlob
29 centrale2/bank_full_tab_no.txt 2016-06-16 10:18:58+00:00 None 3751188 BlockBlob
.. code:: ipython3 %blob_downmerge --help .. parsed-literal:: usage: blob_downmerge [-h] [-o] remotepath localfile download a set of files from a blob storage folder, files will be merged, we assume the container is the first element to the remote path positional arguments: remotepath remote path of the folder to download localfile local name for the downloaded merged file optional arguments: -h, --help show this help message and exit -o, --overwrite overwrite the local file usage: blob_downmerge [-h] [-o] remotepath localfile .. code:: ipython3 %blob_down /centrale/bank_full_tab_no_agg2.txt/part-r-00000 agg_hadoop3.txt .. parsed-literal:: 'agg_hadoop3.txt' .. code:: ipython3 import pandas df = pandas.read_csv("agg_hadoop3.txt", sep="\t", header=-1) df .. raw:: html
0 1
0 no 41.008823
1 yes 40.555632
2 loan NaN
J’ai oublié d’enlever le header. On vérifie que les calcus sont bons en les faisant en local. .. code:: ipython3 df = pandas.read_csv("bank-full.csv", sep=";") df.head() .. raw:: html
age job marital education default balance housing loan contact day month duration campaign pdays previous poutcome y
0 58 management married tertiary no 2143 yes no unknown 5 may 261 1 -1 0 unknown no
1 44 technician single secondary no 29 yes no unknown 5 may 151 1 -1 0 unknown no
2 33 entrepreneur married secondary no 2 yes yes unknown 5 may 76 1 -1 0 unknown no
3 47 blue-collar married unknown no 1506 yes no unknown 5 may 92 1 -1 0 unknown no
4 33 unknown single unknown no 1 no no unknown 5 may 198 1 -1 0 unknown no
.. code:: ipython3 df[["loan", "age"]].groupby("loan").mean() .. raw:: html
age
loan
no 41.008823
yes 40.555632