Python Hadoop PigΒΆ

Links: notebook, html, PDF, python, slides, GitHub

This notebook aims at showing how to submit a PIG job to remote hadoop cluster (tested with Cloudera). It works better if you know Hadoop otherwise I recommend reading Map/Reduce avec PIG (French). First, we download data. We are going to upload that data to the remote cluster. The Hadoop distribution tested here is Cloudera.

import pyensae
%load_ext pyensae
%load_ext pyenbc
pyensae.download_data("ConfLongDemo_JSI.txt", website="https://archive.ics.uci.edu/ml/machine-learning-databases/00196/")
'ConfLongDemo_JSI.txt'

We open a SSH connection to the bridge which can communicate to the cluster.

import pyquickhelper.ipythonhelper as ipy
params={"server":"", "username":"", "password":""}
ipy.open_html_form(params=params,title="credentials",key_save="ssh_remote_hadoop")
credentials
password
server
username
password = ssh_remote_hadoop["password"]
server = ssh_remote_hadoop["server"]
username = ssh_remote_hadoop["username"]

We open the SSH connection:

%remote_open
<pyensae.remote.ssh_remote_connection.ASSHClient at 0xa2422e8>

We check the content of the remote machine:

%remote_cmd ls -l
total 3404
-rw-rw-r-- 1 xavierdupre xavierdupre    1043 Jul 14 23:40 centrer_reduire.pig
-rw-r--r-- 1 xavierdupre xavierdupre       2 Jul 15 00:22 diff_cluster
-rw-rw-r-- 1 xavierdupre xavierdupre       0 Sep 27 00:21 dummy
-rw-rw-r-- 1 xavierdupre xavierdupre     290 Jul 14 23:48 init_random.pig
-rw-rw-r-- 1 xavierdupre xavierdupre    1654 Jul 15 00:20 iteration_complete.pig
-rw-rw-r-- 1 xavierdupre xavierdupre     235 Jul 14 23:37 nb_obervations.pig
-rw-rw-r-- 1 xavierdupre xavierdupre    1778 Jul 14 23:57 pig_1436911046432.log
-rw-rw-r-- 1 xavierdupre xavierdupre    4570 Jul 15 00:45 pig_1436913856496.log
-rw-rw-r-- 1 xavierdupre xavierdupre    4570 Jul 15 23:52 pig_1436997076356.log
-rw-rw-r-- 1 xavierdupre xavierdupre     574 Jul 15 23:51 post_traitement.pig
-rw-rw-r-- 1 xavierdupre xavierdupre     659 Sep 27 00:21 pystream.pig
-rw-rw-r-- 1 xavierdupre xavierdupre     382 Sep 27 00:21 pystream.py
-rw-rw-r-- 1 xavierdupre xavierdupre   26186 Jul 15 23:52 redirection.err
-rw-rw-r-- 1 xavierdupre xavierdupre       0 Jul 15 23:51 redirection.out
-rw-rw-r-- 1 xavierdupre xavierdupre 3400818 Jul 15 23:48 Skin_NonSkin.txt

%remote_ls .
attributes code alias folder size unit name isdir
-rw-rw-r-- 1 xavierdupre xavierdupre 1043 Jul 14 23:40 centrer_reduire.pig False
-rw-r--r-- 1 xavierdupre xavierdupre 2 Jul 15 00:22 diff_cluster False
-rw-rw-r-- 1 xavierdupre xavierdupre 0 Sep 27 00:21 dummy False
1 xavierdupre xavierdupre 290 Jul 14 23:48 init_random.pig False
1 xavierdupre xavierdupre 1654 Jul 15 00:20 iteration_complete.pig False
1 xavierdupre xavierdupre 235 Jul 14 23:37 nb_obervations.pig False
1 xavierdupre xavierdupre 1778 Jul 14 23:57 pig_1436911046432.log False
1 xavierdupre xavierdupre 4570 Jul 15 00:45 pig_1436913856496.log False
1 xavierdupre xavierdupre 4570 Jul 15 23:52 pig_1436997076356.log False
1 xavierdupre xavierdupre 574 Jul 15 23:51 post_traitement.pig False
1 xavierdupre xavierdupre 659 Sep 27 00:21 pystream.pig False
1 xavierdupre xavierdupre 382 Sep 27 00:21 pystream.py False
1 xavierdupre xavierdupre 26186 Jul 15 23:52 redirection.err False
1 xavierdupre xavierdupre 0 Jul 15 23:51 redirection.out False
1 xavierdupre xavierdupre 3400818 Jul 15 23:48 Skin_NonSkin.txt False

We check the content on the cluster:

%remote_cmd hdfs dfs -ls
Found 33 items
drwx------   - xavierdupre xavierdupre          0 2015-09-27 02:00 .Trash
drwx------   - xavierdupre xavierdupre          0 2015-09-27 00:22 .staging
-rw-r--r--   3 xavierdupre xavierdupre     132727 2014-11-16 02:37 ConfLongDemo_JSI.small.example.txt
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-16 02:38 ConfLongDemo_JSI.small.example2.walking.txt
-rw-r--r--   3 xavierdupre xavierdupre    3400818 2015-07-14 23:35 Skin_NonSkin.txt
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:22 diff_cluster
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-14 23:44 donnees_normalisees
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-14 23:43 ecartstypes
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-14 23:49 init_random
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-14 23:41 moyennes
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-14 23:38 nb_obervations
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:05 output_iter1
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:22 output_iter10
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:07 output_iter2
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:09 output_iter3
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:11 output_iter4
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:13 output_iter5
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:15 output_iter6
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:17 output_iter7
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:18 output_iter8
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-07-15 00:20 output_iter9
-rw-r--r--   3 xavierdupre xavierdupre     461444 2014-11-20 01:33 paris.2014-11-11_22-00-18.331391.txt
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-23 22:03 python_info.txt
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-23 22:07 python_info2.txt
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-12-03 22:55 random
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-20 23:43 unitest2
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-09-27 00:23 unittest
drwxr-xr-x   - xavierdupre xavierdupre          0 2015-09-27 00:22 unittest2
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-20 01:53 velib_1hjs
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-21 01:17 velib_py
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-23 21:34 velib_py_results
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-23 21:51 velib_py_results_3days
drwxr-xr-x   - xavierdupre xavierdupre          0 2014-11-21 11:08 velib_several_days

%dfs_ls .
attributes code alias folder size date time name isdir
0 drwx------ - xavierdupre xavierdupre 0 2015-09-27 02:00 .Trash True
1 drwx------ - xavierdupre xavierdupre 0 2015-09-27 00:22 .staging True
2 -rw-r--r-- 3 xavierdupre xavierdupre 132727 2014-11-16 02:37 ConfLongDemo_JSI.small.example.txt False
3 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-16 02:38 ConfLongDemo_JSI.small.example2.walking.txt True
4 -rw-r--r-- 3 xavierdupre xavierdupre 3400818 2015-07-14 23:35 Skin_NonSkin.txt False
5 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:22 diff_cluster True
6 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:44 donnees_normalisees True
7 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:43 ecartstypes True
8 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:49 init_random True
9 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:41 moyennes True
10 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:38 nb_obervations True
11 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:05 output_iter1 True
12 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:22 output_iter10 True
13 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:07 output_iter2 True
14 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:09 output_iter3 True
15 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:11 output_iter4 True
16 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:13 output_iter5 True
17 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:15 output_iter6 True
18 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:17 output_iter7 True
19 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:18 output_iter8 True
20 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:20 output_iter9 True
21 -rw-r--r-- 3 xavierdupre xavierdupre 461444 2014-11-20 01:33 paris.2014-11-11_22-00-18.331391.txt False
22 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 22:03 python_info.txt True
23 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 22:07 python_info2.txt True
24 drwxr-xr-x - xavierdupre xavierdupre 0 2014-12-03 22:55 random True
25 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 23:43 unitest2 True
26 drwxr-xr-x - xavierdupre xavierdupre 0 2015-09-27 00:23 unittest True
27 drwxr-xr-x - xavierdupre xavierdupre 0 2015-09-27 00:22 unittest2 True
28 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 01:53 velib_1hjs True
29 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 01:17 velib_py True
30 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 21:34 velib_py_results True
31 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 21:51 velib_py_results_3days True
32 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 11:08 velib_several_days True

We upload the file on the bridge (we should zip it first, it would reduce the uploading time).

%remote_up ConfLongDemo_JSI.txt ConfLongDemo_JSI.txt
'ConfLongDemo_JSI.txt'

We check it got there:

%remote_cmd ls Conf*JSI.txt
ConfLongDemo_JSI.txt

We put it on the cluster:

%remote_cmd hdfs dfs -put ConfLongDemo_JSI.txt ConfLongDemo_JSI.txt

We check it was put on the cluster:

%remote_cmd hdfs dfs -ls Conf*JSI.txt
Found 1 items
-rw-r--r--   3 xavierdupre xavierdupre   21546346 2015-09-27 11:33 ConfLongDemo_JSI.txt

dfs_ls Conf*JSI.txt
attributes code alias folder size date time name isdir
0 -rw-r--r-- 3 xavierdupre xavierdupre 21546346 2015-09-27 11:33 ConfLongDemo_JSI.txt False

We create a simple PIG program:

%%PIG filter_example.pig

myinput = LOAD 'ConfLongDemo_JSI.txt' USING PigStorage(',') AS
    (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
filt = FILTER myinput BY activity == 'walking' ;
STORE filt INTO 'ConfLongDemo_JSI.walking.txt' USING PigStorage() ;
%pig_submit filter_example.pig -r=filter_example.redirect

We check the redirected files were created:

%remote_cmd ls f*redirect*
filter_example.redirect.err
filter_example.redirect.out

We check the tail on a regular basis to see the job running (some other commands can be used to monitor jobs, %remote_cmd mapred --help).

%remote_cmd tail filter_example.redirect.err
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1435583503337_0055


2015-09-27 11:38:56,436 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 164860 time(s).
2015-09-27 11:38:56,436 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

%remote_cmd hdfs dfs -ls Conf*JSI.walking.txt
Found 2 items
-rw-r--r--   3 xavierdupre xavierdupre          0 2015-09-27 11:38 ConfLongDemo_JSI.walking.txt/_SUCCESS
-rw-r--r--   3 xavierdupre xavierdupre          0 2015-09-27 11:38 ConfLongDemo_JSI.walking.txt/part-m-00000

%dfs_ls Conf*JSI.walking.txt
attributes code alias folder size date time name isdir
0 -rw-r--r-- 3 xavierdupre xavierdupre 0 2015-09-27 11:38 ConfLongDemo_JSI.walking.txt/_SUCCESS False
1 -rw-r--r-- 3 xavierdupre xavierdupre 0 2015-09-27 11:38 ConfLongDemo_JSI.walking.txt/part-m-00000 False

After that, the stream has to downloaded to the bridge and then to the local machine with %remote_down. We finally close the connection.

%remote_close
True

END