Map/Reduce avec PIG sur Azure - correction#
Links: notebook
, html, PDF
, python
, slides, GitHub
from jyquickhelper import add_notebook_menu
add_notebook_menu()
Données#
On considère le jeu de données suivant : Localization Data for Person Activity Data Set qu’on récupère comme indiqué dans le notebook de l’énoncé.
from pyquickhelper.ipythonhelper import open_html_form
params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"xavierdupre"}
open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp")
blob_storage
hadoop_server
password1
password2
username
blobstorage = blobhp["blob_storage"]
blobpassword = blobhp["password1"]
hadoop_server = blobhp["hadoop_server"]
hadoop_password = blobhp["password2"]
username = blobhp["username"]
import pyensae
%load_ext pyensae
%load_ext pyenbc
%hd_open
(<pyensae.remote.azure_connection.AzureClient at 0xafe7e10>,
<azure.storage.blob.blobservice.BlobService at 0xafe7e48>)
Exercice 1 : GROUP BY#
import pandas, sqlite3
con = sqlite3.connect("ConfLongDemo_JSI.db3")
df = pandas.read_sql("""SELECT activity, count(*) as nb FROM person GROUP BY activity""", con)
con.close()
df.head()
activity | nb | |
---|---|---|
0 | falling | 2973 |
1 | lying | 54480 |
2 | lying down | 6168 |
3 | on all fours | 5210 |
4 | sitting | 27244 |
On vérifie que le fichier qu’on veut traiter est bien là :
%blob_ls /testensae/ConfLongDemo_JSI.small.txt
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | testensae/ConfLongDemo_JSI.small.txt | Thu, 29 Oct 2015 00:23:00 GMT | application/octet-stream | 132727 | BlockBlob |
Il faut maintenant le faire avec PIG.
%%PIG_azure solution_groupby.pig
myinput = LOAD '$CONTAINER/testensae/ConfLongDemo_JSI.small.txt'
using PigStorage(',')
AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
gr = GROUP myinput BY activity ;
avgact = FOREACH gr GENERATE group, COUNT(myinput) ;
STORE avgact INTO '$CONTAINER/$PSEUDO/testensae/ConfLongDemo_JSI.small.group.2015.txt' USING PigStorage() ;
On soumet le job :
jid = %hd_pig_submit solution_groupby.pig
jid
{'id': 'job_1445989166328_0009'}
On vérifie le status du job :
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"]
('job_1445989166328_0009', '100% complete', None, False, 'RUNNING')
On regarde si la compilation s’est bien passée :
%hd_tail_stderr jid["id"]
Job DAG: job_1445989166328_0010 2015-10-29 00:55:14,395 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2015-10-29 00:55:14,395 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.100.164:9010 2015-10-29 00:55:14,395 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.100.164:10200 2015-10-29 00:55:14,473 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2015-10-29 00:55:14,676 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2015-10-29 00:55:14,676 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.100.164:9010 2015-10-29 00:55:14,676 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.100.164:10200 2015-10-29 00:55:14,754 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2015-10-29 00:55:14,957 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2015-10-29 00:55:14,957 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.100.164:9010 2015-10-29 00:55:14,957 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.100.164:10200 2015-10-29 00:55:15,020 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2015-10-29 00:55:15,082 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2015-10-29 00:55:15,113 [main] INFO org.apache.pig.Main - Pig script completed in 49 seconds and 706 milliseconds (49706 ms)
On regarde le contenu du répertoire sur le blob storage :
df=%blob_ls /$PSEUDO/testensae
list(df["name"])
['xavierdupre/testensae',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/part-r-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/part-r-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/part-r-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/part-m-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt/part-m-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt/part-m-00000']
import os
if os.path.exists("results.group.2015.xt") : os.remove("results.group.2015.txt")
%blob_downmerge /$PSEUDO/testensae/ConfLongDemo_JSI.small.group.2015.txt results.group.2015.txt
'results.group.2015.txt'
%lsr res.*[.]txt
directory | last_modified | name | size | |
---|---|---|---|---|
0 | False | 2015-10-29 01:56:11.025867 | .\results.group.2015.txt | 89 |
1 | False | 2015-10-29 01:46:45.425028 | .\results.txt | 21.65 Kb |
2 | False | 2015-10-29 01:46:46.705466 | .\results_allfiles.txt | 21.65 Kb |
%head results.group.2015.txt
lying 267 falling 30 sitting 435 walking 170 sitting down 56 standing up from sitting 42
Exercice 2 : JOIN#
con = sqlite3.connect("ConfLongDemo_JSI.db3")
df = pandas.read_sql("""SELECT person.*, A.nb FROM person INNER JOIN (
SELECT activity, count(*) as nb FROM person GROUP BY activity) AS A
ON person.activity == A.activity""", con)
con.close()
df.head()
index | sequence | tag | timestamp | dateformat | x | y | z | activity | nb | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | A01 | 010-000-024-033 | 633790226051280329 | 27.05.2009 14:03:25:127 | 4.062931 | 1.892434 | 0.507425 | walking | 32710 |
1 | 1 | A01 | 020-000-033-111 | 633790226051820913 | 27.05.2009 14:03:25:183 | 4.291954 | 1.781140 | 1.344495 | walking | 32710 |
2 | 2 | A01 | 020-000-032-221 | 633790226052091205 | 27.05.2009 14:03:25:210 | 4.359101 | 1.826456 | 0.968821 | walking | 32710 |
3 | 3 | A01 | 010-000-024-033 | 633790226052361498 | 27.05.2009 14:03:25:237 | 4.087835 | 1.879999 | 0.466983 | walking | 32710 |
4 | 4 | A01 | 010-000-030-096 | 633790226052631792 | 27.05.2009 14:03:25:263 | 4.324462 | 2.072460 | 0.488065 | walking | 32710 |
Idem, maintenant il faut le faire avec PIG.
%%PIG_azure solution_groupby_join.pig
myinput = LOAD '$CONTAINER/testensae/ConfLongDemo_JSI.small.txt'
using PigStorage(',')
AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
gr = GROUP myinput BY activity ;
avgact = FOREACH gr GENERATE group, COUNT(myinput) ;
joined = JOIN myinput BY activity, avgact BY group ;
STORE joined INTO '$CONTAINER/$PSEUDO/testensae/ConfLongDemo_JSI.small.group.join.2015.txt' USING PigStorage() ;
jid = %hd_pig_submit solution_groupby_join.pig
jid
{'id': 'job_1445989166328_0011'}
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"], st["userargs"]["file"]
('job_1445989166328_0011',
'100% complete',
'done',
True,
'SUCCEEDED',
'wasb://hdblobstorage@hdblobstorage.blob.core.windows.net/xavierdupre/scripts/pig/solution_groupby_join.pig')
df=%blob_ls /$PSEUDO/testensae
df
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | xavierdupre/testensae | Tue, 25 Nov 2014 00:50:34 GMT | application/octet-stream | 0 | BlockBlob |
1 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Thu, 29 Oct 2015 00:55:09 GMT | 0 | BlockBlob | |
2 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Thu, 29 Oct 2015 00:55:09 GMT | application/octet-stream | 0 | BlockBlob |
3 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Thu, 29 Oct 2015 00:55:08 GMT | application/octet-stream | 89 | BlockBlob |
4 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Thu, 29 Oct 2015 00:58:43 GMT | 0 | BlockBlob | |
5 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Thu, 29 Oct 2015 00:58:43 GMT | application/octet-stream | 0 | BlockBlob |
6 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Thu, 29 Oct 2015 00:58:42 GMT | application/octet-stream | 144059 | BlockBlob |
7 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Tue, 25 Nov 2014 01:16:11 GMT | 0 | BlockBlob | |
8 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Tue, 25 Nov 2014 01:16:11 GMT | application/octet-stream | 0 | BlockBlob |
9 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Tue, 25 Nov 2014 01:16:10 GMT | application/octet-stream | 144059 | BlockBlob |
10 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Tue, 25 Nov 2014 01:12:49 GMT | 0 | BlockBlob | |
11 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Tue, 25 Nov 2014 01:12:49 GMT | application/octet-stream | 0 | BlockBlob |
12 | xavierdupre/testensae/ConfLongDemo_JSI.small.g... | Tue, 25 Nov 2014 01:12:49 GMT | application/octet-stream | 89 | BlockBlob |
13 | xavierdupre/testensae/ConfLongDemo_JSI.small.k... | Tue, 25 Nov 2014 00:50:45 GMT | 0 | BlockBlob | |
14 | xavierdupre/testensae/ConfLongDemo_JSI.small.k... | Tue, 25 Nov 2014 00:50:46 GMT | application/octet-stream | 0 | BlockBlob |
15 | xavierdupre/testensae/ConfLongDemo_JSI.small.k... | Tue, 25 Nov 2014 00:50:45 GMT | application/octet-stream | 22166 | BlockBlob |
16 | xavierdupre/testensae/ConfLongDemo_JSI.small.w... | Thu, 29 Oct 2015 00:28:30 GMT | 0 | BlockBlob | |
17 | xavierdupre/testensae/ConfLongDemo_JSI.small.w... | Thu, 29 Oct 2015 00:28:30 GMT | application/octet-stream | 0 | BlockBlob |
18 | xavierdupre/testensae/ConfLongDemo_JSI.small.w... | Thu, 29 Oct 2015 00:28:30 GMT | application/octet-stream | 22166 | BlockBlob |
19 | xavierdupre/testensae/ConfLongDemo_JSI.small.w... | Thu, 29 Oct 2015 00:46:05 GMT | 0 | BlockBlob | |
20 | xavierdupre/testensae/ConfLongDemo_JSI.small.w... | Thu, 29 Oct 2015 00:46:05 GMT | application/octet-stream | 0 | BlockBlob |
21 | xavierdupre/testensae/ConfLongDemo_JSI.small.w... | Thu, 29 Oct 2015 00:46:04 GMT | application/octet-stream | 22166 | BlockBlob |
set(df.name)
{'xavierdupre/testensae',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/part-r-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt/part-r-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/part-r-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/part-r-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/part-m-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt/part-m-00000',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt/_SUCCESS',
'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt/part-m-00000'}
if os.path.exists("results.join.2015.txt") : os.remove("results.join.2015.txt")
%blob_downmerge /$PSEUDO/testensae/ConfLongDemo_JSI.small.group.join.2015.txt results.join.2015.txt
'results.join.2015.txt'
%head results.join.2015.txt
999 A01 010-000-024-033 633790226379871138 27.05.2009 14:03:57:987 3.198556661605835 1.1257659196853638 0.3567752242088318 lying lying 267 998 A01 020-000-032-221 633790226379600847 27.05.2009 14:03:57:960 4.3730292320251465 1.3821170330047607 0.38861045241355896 lying lying 267 997 A01 020-000-033-111 633790226379330550 27.05.2009 14:03:57:933 4.7574005126953125 1.285519003868103 -0.08946932852268219 lying lying 267 996 A01 010-000-030-096 633790226379060251 27.05.2009 14:03:57:907 3.182415008544922 1.1020996570587158 0.29104289412498474 lying lying 267 995 A01 010-000-024-033 633790226378789954 27.05.2009 14:03:57:880 3.0784008502960205 1.0197675228118896 0.6061218976974487 lying lying 267 994 A01 020-000-032-221 633790226378519655 27.05.2009 14:03:57:853 4.36382532119751 1.4307395219802856 0.3206148743629456 lying lying 267 993 A01 010-000-024-033 633790226377708776 27.05.2009 14:03:57:770 3.0621800422668457 1.0790562629699707 0.6795752048492432 lying lying 267 992 A01 020-000-032-221 633790226377438480 27.05.2009 14:03:57:743 4.371500492095946 1.4781558513641355 0.5384233593940735 lying lying 267 991 A01 020-000-033-111 633790226377168187 27.05.2009 14:03:57:717 4.918898105621338 1.1530661582946775 0.19635945558547974 lying lying 267 990 A01 010-000-030-096 633790226376897895 27.05.2009 14:03:57:690 3.208510637283325 1.1156394481658936 0.3381773829460144 lying lying 267
Prolongements
PIG n’est pas la seule façon d’exécuter des jobs Map/Reduce. Hive est un langage dont la syntaxe est très proche de celle du SQL. L’article Comparing Pig Latin and SQL for Constructing Data Processing Pipelines explicite les différences des deux approches.
langage haut niveau
Ce qu’il faut retenir est que le langage PIG est un langage haut niveau. Le programme est compilé en une séquence d’opérations Map/Reduce transparente pour l’utilisateur. Le temps de développement est très réduit lorsqu’on le compare au même programme écrit en Java. Le compilateur construit un plan d’exécution (quelques exemples ici) et infère le nombre de machines requises pour distribuer le job. Cela suffit pour la plupart des besoins, cela nécessite.
petits jeux
Certains jobs peuvent durer des heures, il est conseillée de les essayer sur des petits jeux de données avant de les faire tourner sur les vrais données. Il est toujours frustrant de s’apercevoir qu’un job a planté au bout de deux heures car une chaîne de caractères est vide et que ce cas n’a pas été prévu.
Avec ces petits jeux, il est possible de faire tourner et conseillé de
tester le job d’abord sur la passerelle (exécution
local)
avant de le lancer sur le cluster. Avec pyensae, il faut ajouter
l’option -local
à la commande
hd_pig_submit.
concaténer les fichiers divisés
Un programme PIG ne produit pas un fichier mais plusieurs fichiers dans un répertoire. La commande getmerge télécharge ces fichiers sur la passerelle et les fusionne en un seul.
ordre des lignes
Les jobs sont distribués, même en faisant rien (LOAD + STORE), il n’est pas garanti que l’ordre des lignes soit préservé. La probabilié que ce soit le cas est quasi nulle.