Correction.
from jyquickhelper import add_notebook_menu
add_notebook_menu()
On considère le jeu de données suivant : Localization Data for Person Activity Data Set qui ont déjà été récupéré avec le notebook de l'énoncé.
import os
if "CRTERALAB" in os.environ:
spl = os.environ["CRTERALAB"].split("**")
params=dict(server=spl[0], password=spl[1], username=spl[2])
r = dict
else:
from pyquickhelper.ipythonhelper import open_html_form
params={"server":"ws...fr", "username":"x...e", "password":""}
r = open_html_form(params=params,title="server + credentials", key_save="params")
r
password = params["password"]
server = params["server"]
username = params["username"]
%load_ext pyensae
%load_ext pyenbc
%remote_open
<pyensae.remote.ssh_remote_connection.ASSHClient at 0xa36fc88>
import pandas, sqlite3
con = sqlite3.connect("ConfLongDemo_JSI.db3")
df = pandas.read_sql("""SELECT activity, count(*) as nb FROM person GROUP BY activity""", con)
con.close()
df.head()
activity | nb | |
---|---|---|
0 | falling | 2973 |
1 | lying | 54480 |
2 | lying down | 6168 |
3 | on all fours | 5210 |
4 | sitting | 27244 |
Il faut maintenant le faire avec PIG.
%%PIG solution_groupby.pig
myinput = LOAD 'ConfLongDemo_JSI.small.example.txt'
using PigStorage(',')
AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
gr = GROUP myinput BY activity ;
avgact = FOREACH gr GENERATE group, COUNT(myinput) ;
STORE avgact INTO 'ConfLongDemo_JSI.small.group.txt' USING PigStorage() ;
%pig_submit solution_groupby.pig -r groupby.redirection
%remote_cmd tail groupby.redirection.err
Total bytes written : 89 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_1444669880271_0038 2015-10-29 01:10:53,383 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
%remote_cmd hdfs dfs -ls ConfLongDemo_JSI.small.group.txt
Found 2 items -rw-r--r-- 3 xavierdupre xavierdupre 0 2015-10-29 01:10 ConfLongDemo_JSI.small.group.txt/_SUCCESS -rw-r--r-- 3 xavierdupre xavierdupre 89 2015-10-29 01:10 ConfLongDemo_JSI.small.group.txt/part-r-00000
%remote_cmd hdfs dfs -tail ConfLongDemo_JSI.small.group.txt/part-r-00000
lying 267 falling 30 sitting 435 walking 170 sitting down 56 standing up from sitting 42
con = sqlite3.connect("ConfLongDemo_JSI.db3")
df = pandas.read_sql("""SELECT person.*, A.nb FROM person INNER JOIN (
SELECT activity, count(*) as nb FROM person GROUP BY activity) AS A
ON person.activity == A.activity""", con)
con.close()
df.head()
index | sequence | tag | timestamp | dateformat | x | y | z | activity | nb | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | A01 | 010-000-024-033 | 633790226051280329 | 27.05.2009 14:03:25:127 | 4.062931 | 1.892434 | 0.507425 | walking | 32710 |
1 | 1 | A01 | 020-000-033-111 | 633790226051820913 | 27.05.2009 14:03:25:183 | 4.291954 | 1.781140 | 1.344495 | walking | 32710 |
2 | 2 | A01 | 020-000-032-221 | 633790226052091205 | 27.05.2009 14:03:25:210 | 4.359101 | 1.826456 | 0.968821 | walking | 32710 |
3 | 3 | A01 | 010-000-024-033 | 633790226052361498 | 27.05.2009 14:03:25:237 | 4.087835 | 1.879999 | 0.466983 | walking | 32710 |
4 | 4 | A01 | 010-000-030-096 | 633790226052631792 | 27.05.2009 14:03:25:263 | 4.324462 | 2.072460 | 0.488065 | walking | 32710 |
Idem, maintenant il faut le faire avec PIG.
%%PIG solution_groupby_join.pig
myinput = LOAD 'ConfLongDemo_JSI.small.example.txt'
using PigStorage(',')
AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
gr = GROUP myinput BY activity ;
avgact = FOREACH gr GENERATE group, COUNT(myinput) ;
joined = JOIN myinput BY activity, avgact BY group ;
STORE joined INTO 'ConfLongDemo_JSI.small.group.join.txt' USING PigStorage() ;
%pig_submit solution_groupby_join.pig -r groupby.join.redirection
%remote_cmd tail groupby.join.redirection.err
2015-10-29 01:15:15,416 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 2015-10-29 01:15:15,416 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-10-29 01:15:15,416 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://nameservice1 2015-10-29 01:15:17,285 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: HASH_JOIN,GROUP_BY 2015-10-29 01:15:17,348 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]} 2015-10-29 01:15:17,404 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.textoutputformat.separator is deprecated. Instead, use mapreduce.output.textoutputformat.separator 2015-10-29 01:15:17,426 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 6000:Output Location Validation Failed for: 'hdfs://nameservice1/user/xavierdupre/ConfLongDemo_JSI.small.group.join.txt More info to follow: Output directory hdfs://nameservice1/user/xavierdupre/ConfLongDemo_JSI.small.group.join.txt already exists Details at logfile: /home/xavierdupre/pig_1446077714461.log
%remote_cmd hdfs dfs -ls ConfLongDemo_JSI.small.group.join.txt
Found 2 items -rw-r--r-- 3 xavierdupre xavierdupre 0 2015-10-29 01:13 ConfLongDemo_JSI.small.group.join.txt/_SUCCESS -rw-r--r-- 3 xavierdupre xavierdupre 144059 2015-10-29 01:13 ConfLongDemo_JSI.small.group.join.txt/part-r-00000
%remote_cmd hdfs dfs -tail ConfLongDemo_JSI.small.group.join.txt/part-r-00000
26262834000 27.05.2009 14:03:46:283 3.3038318157196045 1.938292145729065 0.7622964978218079 standing up from sitting standing up from sitting 42 652 A01 020-000-033-111 633790226262563704 27.05.2009 14:03:46:257 3.2363295555114746 2.00623106956482 1.1472841501235962 standing up from sitting standing up from sitting 42 651 A01 010-000-030-096 633790226262293413 27.05.2009 14:03:46:230 3.275949239730835 1.7746492624282837 0.3117055296897888 standing up from sitting standing up from sitting 42 650 A01 010-000-024-033 633790226262023117 27.05.2009 14:03:46:203 3.2498104572296143 1.878917098045349 0.13854867219924927 standing up from sitting standing up from sitting 42 649 A01 020-000-032-221 633790226261752823 27.05.2009 14:03:46:177 3.352446317672729 1.950886845588684 0.8281049728393555 standing up from sitting standing up from sitting 42 648 A01 020-000-033-111 633790226261482530 27.05.2009 14:03:46:147 3.2220029830932617 2.0042579174041752 1.032345414161682 standing up from sitting standing up from sitting 42
PIG n'est pas la seule façon d'exécuter des jobs Map/Reduce. Hive est un langage dont la syntaxe est très proche de celle du SQL. L'article Comparing Pig Latin and SQL for Constructing Data Processing Pipelines explicite les différences des deux approches.
langage haut niveau
Ce qu'il faut retenir est que le langage PIG est un langage haut niveau. Le programme est compilé en une séquence d'opérations Map/Reduce transparente pour l'utilisateur. Le temps de développement est très réduit lorsqu'on le compare au même programme écrit en Java. Le compilateur construit un plan d'exécution (quelques exemples ici) et infère le nombre de machines requises pour distribuer le job. Cela suffit pour la plupart des besoins, cela nécessite.
petits jeux
Certains jobs peuvent durer des heures, il est conseillée de les essayer sur des petits jeux de données avant de les faire tourner sur les vrais données. Il est toujours frustrant de s'apercevoir qu'un job a planté au bout de deux heures car une chaîne de caractères est vide et que ce cas n'a pas été prévu.
Avec ces petits jeux, il est possible de faire tourner et conseillé de tester le job d'abord sur la passerelle (exécution local) avant de le lancer sur le cluster. Avec pyensae, il faut ajouter l'option -local
à la commande pig_submit.
concaténer les fichiers divisés
Un programme PIG ne produit pas un fichier mais plusieurs fichiers dans un répertoire. La commande getmerge télécharge ces fichiers sur la passerelle et les fusionne en un seul.
ordre des lignes
Les jobs sont distribués, même en faisant rien (LOAD + STORE), il n'est pas garanti que l'ordre des lignes soit préservé. La probabilié que ce soit le cas est quasi nulle.