{"cells": [{"cell_type": "markdown", "metadata": {}, "source": ["# Map/Reduce avec PIG sur cloudera - correction\n", "\n", "Correction."]}, {"cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [{"data": {"text/html": ["Plan\n", "
run previous cell, wait for 2 seconds
\n", ""], "text/plain": [""]}, "execution_count": 2, "metadata": {}, "output_type": "execute_result"}], "source": ["from jyquickhelper import add_notebook_menu\n", "add_notebook_menu()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Donn\u00e9es\n", "\n", "On consid\u00e8re le jeu de donn\u00e9es suivant : [Localization Data for Person Activity Data Set](https://archive.ics.uci.edu/ml/datasets/Localization+Data+for+Person+Activity) qui ont d\u00e9j\u00e0 \u00e9t\u00e9 r\u00e9cup\u00e9r\u00e9 avec le notebook de l'\u00e9nonc\u00e9."]}, {"cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [{"data": {"text/html": ["
server + credentials\n", "
password \n", "
server \n", "
username \n", "
\n", ""], "text/plain": [""]}, "execution_count": 3, "metadata": {}, "output_type": "execute_result"}], "source": ["import os\n", "if \"CRTERALAB\" in os.environ:\n", " spl = os.environ[\"CRTERALAB\"].split(\"**\")\n", " params=dict(server=spl[0], password=spl[1], username=spl[2])\n", " r = dict\n", "else:\n", " from pyquickhelper.ipythonhelper import open_html_form\n", " params={\"server\":\"ws...fr\", \"username\":\"x...e\", \"password\":\"\"}\n", " r = open_html_form(params=params,title=\"server + credentials\", key_save=\"params\")\n", "r"]}, {"cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": ["password = params[\"password\"]\n", "server = params[\"server\"]\n", "username = params[\"username\"]"]}, {"cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [{"data": {"text/plain": [""]}, "execution_count": 5, "metadata": {}, "output_type": "execute_result"}], "source": ["%load_ext pyensae\n", "%load_ext pyenbc\n", "%remote_open"]}, {"cell_type": "markdown", "metadata": {}, "source": ["

Exercice 1 : GROUP BY

"]}, {"cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
activitynb
0falling2973
1lying54480
2lying down6168
3on all fours5210
4sitting27244
\n", "
"], "text/plain": [" activity nb\n", "0 falling 2973\n", "1 lying 54480\n", "2 lying down 6168\n", "3 on all fours 5210\n", "4 sitting 27244"]}, "execution_count": 6, "metadata": {}, "output_type": "execute_result"}], "source": ["import pandas, sqlite3\n", "con = sqlite3.connect(\"ConfLongDemo_JSI.db3\")\n", "df = pandas.read_sql(\"\"\"SELECT activity, count(*) as nb FROM person GROUP BY activity\"\"\", con)\n", "con.close()\n", "df.head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Il faut maintenant le faire avec PIG."]}, {"cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": ["%%PIG solution_groupby.pig\n", "\n", "myinput = LOAD 'ConfLongDemo_JSI.small.example.txt' \n", " using PigStorage(',') \n", " AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;\n", "\n", "gr = GROUP myinput BY activity ;\n", "avgact = FOREACH gr GENERATE group, COUNT(myinput) ; \n", "\n", "STORE avgact INTO 'ConfLongDemo_JSI.small.group.txt' USING PigStorage() ;"]}, {"cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", "
"], "text/plain": [""]}, "execution_count": 8, "metadata": {}, "output_type": "execute_result"}], "source": ["%pig_submit solution_groupby.pig -r groupby.redirection"]}, {"cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "Total bytes written : 89\n", "Spillable Memory Manager spill count : 0\n", "Total bags proactively spilled: 0\n", "Total records proactively spilled: 0\n", "\n", "Job DAG:\n", "job_1444669880271_0038\n", "\n", "\n", "2015-10-29 01:10:53,383 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!\n", "\n", "
"], "text/plain": [""]}, "execution_count": 9, "metadata": {}, "output_type": "execute_result"}], "source": ["%remote_cmd tail groupby.redirection.err"]}, {"cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "Found 2 items\n", "-rw-r--r--   3 xavierdupre xavierdupre          0 2015-10-29 01:10 ConfLongDemo_JSI.small.group.txt/_SUCCESS\n", "-rw-r--r--   3 xavierdupre xavierdupre         89 2015-10-29 01:10 ConfLongDemo_JSI.small.group.txt/part-r-00000\n", "\n", "
"], "text/plain": [""]}, "execution_count": 10, "metadata": {}, "output_type": "execute_result"}], "source": ["%remote_cmd hdfs dfs -ls ConfLongDemo_JSI.small.group.txt"]}, {"cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "lying\t267\n", "falling\t30\n", "sitting\t435\n", "walking\t170\n", "sitting down\t56\n", "standing up from sitting\t42\n", "\n", "
"], "text/plain": [""]}, "execution_count": 11, "metadata": {}, "output_type": "execute_result"}], "source": ["%remote_cmd hdfs dfs -tail ConfLongDemo_JSI.small.group.txt/part-r-00000"]}, {"cell_type": "markdown", "metadata": {}, "source": ["

Exercice 2 : JOIN

"]}, {"cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
indexsequencetagtimestampdateformatxyzactivitynb
00A01010-000-024-03363379022605128032927.05.2009 14:03:25:1274.0629311.8924340.507425walking32710
11A01020-000-033-11163379022605182091327.05.2009 14:03:25:1834.2919541.7811401.344495walking32710
22A01020-000-032-22163379022605209120527.05.2009 14:03:25:2104.3591011.8264560.968821walking32710
33A01010-000-024-03363379022605236149827.05.2009 14:03:25:2374.0878351.8799990.466983walking32710
44A01010-000-030-09663379022605263179227.05.2009 14:03:25:2634.3244622.0724600.488065walking32710
\n", "
"], "text/plain": [" index sequence tag timestamp \\\n", "0 0 A01 010-000-024-033 633790226051280329 \n", "1 1 A01 020-000-033-111 633790226051820913 \n", "2 2 A01 020-000-032-221 633790226052091205 \n", "3 3 A01 010-000-024-033 633790226052361498 \n", "4 4 A01 010-000-030-096 633790226052631792 \n", "\n", " dateformat x y z activity nb \n", "0 27.05.2009 14:03:25:127 4.062931 1.892434 0.507425 walking 32710 \n", "1 27.05.2009 14:03:25:183 4.291954 1.781140 1.344495 walking 32710 \n", "2 27.05.2009 14:03:25:210 4.359101 1.826456 0.968821 walking 32710 \n", "3 27.05.2009 14:03:25:237 4.087835 1.879999 0.466983 walking 32710 \n", "4 27.05.2009 14:03:25:263 4.324462 2.072460 0.488065 walking 32710 "]}, "execution_count": 12, "metadata": {}, "output_type": "execute_result"}], "source": ["con = sqlite3.connect(\"ConfLongDemo_JSI.db3\")\n", "df = pandas.read_sql(\"\"\"SELECT person.*, A.nb FROM person INNER JOIN (\n", " SELECT activity, count(*) as nb FROM person GROUP BY activity) AS A\n", " ON person.activity == A.activity\"\"\", con)\n", "con.close()\n", "df.head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Idem, maintenant il faut le faire avec PIG."]}, {"cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": ["%%PIG solution_groupby_join.pig\n", "\n", "myinput = LOAD 'ConfLongDemo_JSI.small.example.txt' \n", " using PigStorage(',') \n", " AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;\n", "\n", "gr = GROUP myinput BY activity ;\n", "avgact = FOREACH gr GENERATE group, COUNT(myinput) ; \n", "\n", "joined = JOIN myinput BY activity, avgact BY group ;\n", "\n", "STORE joined INTO 'ConfLongDemo_JSI.small.group.join.txt' USING PigStorage() ;"]}, {"cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", "
"], "text/plain": [""]}, "execution_count": 14, "metadata": {}, "output_type": "execute_result"}], "source": ["%pig_submit solution_groupby_join.pig -r groupby.join.redirection"]}, {"cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "2015-10-29 01:15:15,416 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address\n", "2015-10-29 01:15:15,416 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS\n", "2015-10-29 01:15:15,416 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://nameservice1\n", "2015-10-29 01:15:17,285 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: HASH_JOIN,GROUP_BY\n", "2015-10-29 01:15:17,348 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}\n", "2015-10-29 01:15:17,404 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.textoutputformat.separator is deprecated. Instead, use mapreduce.output.textoutputformat.separator\n", "2015-10-29 01:15:17,426 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 6000: \n", " Output Location Validation Failed for: 'hdfs://nameservice1/user/xavierdupre/ConfLongDemo_JSI.small.group.join.txt More info to follow:\n", "Output directory hdfs://nameservice1/user/xavierdupre/ConfLongDemo_JSI.small.group.join.txt already exists\n", "Details at logfile: /home/xavierdupre/pig_1446077714461.log\n", "\n", "
"], "text/plain": [""]}, "execution_count": 15, "metadata": {}, "output_type": "execute_result"}], "source": ["%remote_cmd tail groupby.join.redirection.err"]}, {"cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "Found 2 items\n", "-rw-r--r--   3 xavierdupre xavierdupre          0 2015-10-29 01:13 ConfLongDemo_JSI.small.group.join.txt/_SUCCESS\n", "-rw-r--r--   3 xavierdupre xavierdupre     144059 2015-10-29 01:13 ConfLongDemo_JSI.small.group.join.txt/part-r-00000\n", "\n", "
"], "text/plain": [""]}, "execution_count": 16, "metadata": {}, "output_type": "execute_result"}], "source": ["%remote_cmd hdfs dfs -ls ConfLongDemo_JSI.small.group.join.txt"]}, {"cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "26262834000\t27.05.2009 14:03:46:283\t3.3038318157196045\t1.938292145729065\t0.7622964978218079\tstanding up from sitting\tstanding up from sitting\t42\n", "652\tA01\t020-000-033-111\t633790226262563704\t27.05.2009 14:03:46:257\t3.2363295555114746\t2.00623106956482\t1.1472841501235962\tstanding up from sitting\tstanding up from sitting\t42\n", "651\tA01\t010-000-030-096\t633790226262293413\t27.05.2009 14:03:46:230\t3.275949239730835\t1.7746492624282837\t0.3117055296897888\tstanding up from sitting\tstanding up from sitting\t42\n", "650\tA01\t010-000-024-033\t633790226262023117\t27.05.2009 14:03:46:203\t3.2498104572296143\t1.878917098045349\t0.13854867219924927\tstanding up from sitting\tstanding up from sitting\t42\n", "649\tA01\t020-000-032-221\t633790226261752823\t27.05.2009 14:03:46:177\t3.352446317672729\t1.950886845588684\t0.8281049728393555\tstanding up from sitting\tstanding up from sitting\t42\n", "648\tA01\t020-000-033-111\t633790226261482530\t27.05.2009 14:03:46:147\t3.2220029830932617\t2.0042579174041752\t1.032345414161682\tstanding up from sitting\tstanding up from sitting\t42\n", "\n", "
"], "text/plain": [""]}, "execution_count": 17, "metadata": {}, "output_type": "execute_result"}], "source": ["%remote_cmd hdfs dfs -tail ConfLongDemo_JSI.small.group.join.txt/part-r-00000"]}, {"cell_type": "markdown", "metadata": {}, "source": ["

Prolongements

\n", "\n", "[PIG](http://pig.apache.org/) n'est pas la seule fa\u00e7on d'ex\u00e9cuter des jobs Map/Reduce. [Hive](https://hive.apache.org/) est un langage dont la syntaxe est tr\u00e8s proche de celle du SQL. L'article [Comparing Pig Latin and SQL for Constructing Data Processing Pipelines](https://developer.yahoo.com/blogs/hadoop/comparing-pig-latin-sql-constructing-data-processing-pipelines-444.html) explicite les diff\u00e9rences des deux approches.\n", "\n", "**langage haut niveau**\n", "\n", "Ce qu'il faut retenir est que le langage PIG est un langage haut niveau. Le programme est compil\u00e9 en une s\u00e9quence d'op\u00e9rations Map/Reduce transparente pour l'utilisateur. Le temps de d\u00e9veloppement est tr\u00e8s r\u00e9duit lorsqu'on le compare au m\u00eame programme \u00e9crit en Java. Le compilateur construit un plan d'ex\u00e9cution ([quelques exemples ici](http://chimera.labs.oreilly.com/books/1234000001811/ch07.html#explain)) et inf\u00e8re le nombre de machines requises pour distribuer le job. Cela suffit pour la plupart des besoins, cela n\u00e9cessite.\n", "\n", "**petits jeux**\n", "\n", "Certains jobs peuvent durer des heures, il est conseill\u00e9e de les essayer sur des petits jeux de donn\u00e9es avant de les faire tourner sur les vrais donn\u00e9es. Il est toujours frustrant de s'apercevoir qu'un job a plant\u00e9 au bout de deux heures car une cha\u00eene de caract\u00e8res est vide et que ce cas n'a pas \u00e9t\u00e9 pr\u00e9vu.\n", "\n", "Avec ces petits jeux, il est possible de faire tourner et conseill\u00e9 de tester le job d'abord sur la passerelle ([ex\u00e9cution local](http://archive.cloudera.com/cdh/3/pig/tutorial.html#Running+the+Pig+Scripts+in+Local+Mode)) avant de le lancer sur le cluster. Avec pyensae, il faut ajouter l'option ``-local`` \u00e0 la commande [pig_submit](http://www.xavierdupre.fr/app/pyensae/helpsphinx/pyensae/remote/magic_remote_ssh.html?highlight=pig_submit#pyensae.remote.magic_remote_ssh.MagicRemoteSSH.pig_submit).\n", "\n", "**concat\u00e9ner les fichiers divis\u00e9s**\n", "\n", "Un programme PIG ne produit pas un fichier mais plusieurs fichiers dans un r\u00e9pertoire. La commande [getmerge](http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-common/FileSystemShell.html) t\u00e9l\u00e9charge ces fichiers sur la passerelle et les fusionne en un seul.\n", "\n", "**ordre des lignes**\n", "\n", "Les jobs sont distribu\u00e9s, m\u00eame en faisant rien (LOAD + STORE), il n'est pas garanti que l'ordre des lignes soit pr\u00e9serv\u00e9. La probabili\u00e9 que ce soit le cas est quasi nulle."]}, {"cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": []}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4"}}, "nbformat": 4, "nbformat_minor": 2}