import mermaid from 'https://cdnjs.cloudflare.com/ajax/libs/mermaid/10.2.3/mermaid.esm.min.mjs'; mermaid.initialize({ startOnLoad: true });
Correction.
from jyquickhelper import add_notebook_menu
add_notebook_menu()
with open("sample4.txt", "w", encoding="utf8") as f:
for i in range(0,100000):
f.write("{0}\t{1}{0}\n".format(i, chr(i%26 + 65)))
f.write("100001\tAAAAAA")
%load_ext pyensae
%head sample4.txt
0 A0 1 B1 2 C2 3 D3 4 E4 5 F5 6 G6 7 H7 8 I8 9 J9
import os
blobhp = {}
if "HDCREDENTIALS" in os.environ:
blobhp["blob_storage"], blobhp["password1"], blobhp["hadoop_server"], blobhp["password2"], blobhp["username"] = \
os.environ["HDCREDENTIALS"].split("**")
r = type(blobhp)
else:
from pyquickhelper.ipythonhelper import open_html_form
params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"axavier"}
r = open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp")
r
dict
import pyensae
%load_ext pyensae
%load_ext pyenbc
blobstorage = blobhp["blob_storage"]
blobpassword = blobhp["password1"]
hadoop_server = blobhp["hadoop_server"]
hadoop_password = blobhp["password2"]
username = blobhp["username"]
client, bs = %hd_open
client, bs
(<pyensae.remote.azure_connection.AzureClient at 0x942e860>, <azure.storage.blob.blobservice.BlobService at 0x942e898>)
%blob_up sample3.txt /$PSEUDO/sampling/sample4.txt
'$PSEUDO/sampling/sample4.txt'
%blob_ls /$PSEUDO/sampling
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | axavier/sampling/datafu-1.2.0.jar | Fri, 13 Nov 2015 00:03:49 GMT | application/octet-stream | 1600826 | BlockBlob |
1 | axavier/sampling/out_sampled_rs4_2015.txt | Fri, 13 Nov 2015 01:08:22 GMT | 0 | BlockBlob | |
2 | axavier/sampling/out_sampled_rs4_2015.txt/_SUC... | Fri, 13 Nov 2015 01:08:22 GMT | application/octet-stream | 0 | BlockBlob |
3 | axavier/sampling/out_sampled_rs4_2015.txt/part... | Fri, 13 Nov 2015 01:08:21 GMT | application/octet-stream | 12785 | BlockBlob |
4 | axavier/sampling/sample.txt | Fri, 13 Nov 2015 00:02:50 GMT | application/octet-stream | 1377780 | BlockBlob |
5 | axavier/sampling/sample2.txt | Fri, 13 Nov 2015 00:35:55 GMT | application/octet-stream | 1377793 | BlockBlob |
6 | axavier/sampling/sample3.txt | Fri, 13 Nov 2015 00:39:40 GMT | application/octet-stream | 1377793 | BlockBlob |
7 | axavier/sampling/sample4.txt | Sun, 15 Nov 2015 12:24:22 GMT | application/octet-stream | 1377793 | BlockBlob |
8 | axavier/sampling/sample4_hash.txt | Fri, 13 Nov 2015 14:50:39 GMT | 0 | BlockBlob | |
9 | axavier/sampling/sample4_hash.txt/_SUCCESS | Fri, 13 Nov 2015 14:50:39 GMT | application/octet-stream | 0 | BlockBlob |
10 | axavier/sampling/sample4_hash.txt/part-r-00000 | Fri, 13 Nov 2015 14:50:38 GMT | application/octet-stream | 4771358 | BlockBlob |
11 | axavier/sampling/sampled4_2015.txt | Fri, 13 Nov 2015 00:50:20 GMT | 0 | BlockBlob | |
12 | axavier/sampling/sampled4_2015.txt/_SUCCESS | Fri, 13 Nov 2015 00:50:20 GMT | application/octet-stream | 0 | BlockBlob |
13 | axavier/sampling/sampled4_2015.txt/part-m-00000 | Fri, 13 Nov 2015 00:50:19 GMT | application/octet-stream | 1277794 | BlockBlob |
14 | axavier/sampling/sampled_rs4_2015.txt | Fri, 13 Nov 2015 01:04:51 GMT | 0 | BlockBlob | |
15 | axavier/sampling/sampled_rs4_2015.txt/_SUCCESS | Fri, 13 Nov 2015 01:04:51 GMT | application/octet-stream | 0 | BlockBlob |
16 | axavier/sampling/sampled_rs4_2015.txt/part-m-0... | Fri, 13 Nov 2015 01:04:50 GMT | application/octet-stream | 1277794 | BlockBlob |
17 | axavier/sampling/sampled_srs4_2015.txt | Fri, 13 Nov 2015 00:56:09 GMT | 0 | BlockBlob | |
18 | axavier/sampling/sampled_srs4_2015.txt/_SUCCESS | Fri, 13 Nov 2015 00:56:09 GMT | application/octet-stream | 0 | BlockBlob |
19 | axavier/sampling/sampled_srs4_2015.txt/part-m-... | Fri, 13 Nov 2015 00:56:09 GMT | application/octet-stream | 1277794 | BlockBlob |
20 | axavier/sampling/sampled_srs_2015.txt | Fri, 13 Nov 2015 00:52:34 GMT | 0 | BlockBlob | |
21 | axavier/sampling/sampled_srs_2015.txt/_SUCCESS | Fri, 13 Nov 2015 00:52:34 GMT | application/octet-stream | 0 | BlockBlob |
22 | axavier/sampling/sampled_srs_2015.txt/part-m-0... | Fri, 13 Nov 2015 00:52:34 GMT | application/octet-stream | 1277794 | BlockBlob |
ensemble = [ "%d%s" % (i, chr(i%26 + 97)) for i in range(0,10000)]
ensemble[:5]
['0a', '1b', '2c', '3d', '4e']
import random
def reservoir_sampling(ensemble, k):
N = len(ensemble)
echantillon = []
for i, e in enumerate(ensemble):
if len(echantillon) < k:
echantillon.append(e)
else:
j = random.randint(0, i)
if j < k:
echantillon[j] = e
return echantillon
reservoir_sampling(ensemble, 10)
['8681x', '8356k', '5490e', '4405l', '5890o', '2689l', '8672o', '3603p', '8599t', '6086c']
On s'assure que le code précédent fonctionne en jython (python compilé en java). On s'inspire pour cela de la documentation jython-udfs.
%%PIG sample_explore.pig
ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(ensemble);
DESCRIBE sampled;
--ens10 = LIMIT ensemble 10;
--ens_group10 = LIMIT en_group10 ;
--DUMP ens10;
--DUMP ens_group10;
Si la fonction suivante provoque une erreur ::
AzureException: STATUS: 403, JSON: Expecting value: line 1 column 1 (char 0)
<Response [403]>
unable to submit job: sample_explore.pig
Vérifier les identifiants utilisés pour se connecter.
jid = %hd_pig_submit sample_explore.pig
jid
{'id': 'job_1446540516812_0185'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0185', None, None, False, 'RUNNING')
La sortie standard contient les informations souhaitées :
%hd_tail_stderr jid["id"] -n 5
2015-11-15 12:33:06,608 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-11-15 12:33:06,608 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-1@hdblobstorage.blob.core.windows.net 2015-11-15 12:33:08,233 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-11-15 12:33:09,374 [main] INFO org.apache.pig.Main - Pig script completed in 4 seconds and 578 milliseconds (4578 ms)
ensemble: {x: int,v: chararray} ens_group: {group: chararray,ensemble: {(x: int,v: chararray)}} sampled: {ensemble::x: int,ensemble::v: chararray}
Et la sortie du second dump ::
(all,{(100001,AAAAAA),(99999,D99999),(99998,C99998)...
import pyensae
%%PYTHON reservoir_sampling.py
import random
@outputSchemaFunction("rsSchema")
def reservoir_sampling(ensemble):
ensemble = eval(ensemble)
k = 10
N = len(ensemble)
echantillon = []
for i, e in enumerate(ensemble):
if len(echantillon) < k:
echantillon.append(e)
else:
j = random.randint(0, i)
if j < k:
echantillon[j] = e
return echantillon
@schemaFunction("rsSchema")
def rsSchema(input):
return input
%%jython reservoir_sampling.py reservoir_sampling
{(100001,"AAAAAA"),(99999,"D99999"),(99998,"C99998")}
[(99998, 'C99998'), (99999, 'D99999'), (100001, 'AAAAAA')]
On ajoute le code jython au script précédent :
%%PIG sample_explore_complete.pig
REGISTER '$CONTAINER/$SCRIPTPIG/reservoir_sampling.py' using jython as myrs;
ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(myrs.reservoir_sample(ensemble));
DESCRIBE sampled;
STORE sampled INTO
INTO '$CONTAINER/$PSEUDO/sampling/sample_rs.txt' USING PigStorage();
jid = %hd_pig_submit sample_explore_complete.pig -d reservoir_sampling.py
jid
{'id': 'job_1446540516812_0229'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0229', None, 'done', False, 'RUNNING')
%hd_tail_stderr jid["id"] -n 100
15/11/15 18:43:49 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL 15/11/15 18:43:49 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE 15/11/15 18:43:49 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType 2015-11-15 18:43:49,598 [main] INFO org.apache.pig.Main - Apache Pig version 0.14.0.2.2.7.1-33 (r: unknown) compiled Oct 13 2015, 04:18:06 2015-11-15 18:43:49,598 [main] INFO org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.6.0.2.2.7.1-33\logs\pig_1447613029598.log 2015-11-15 18:43:50,848 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found 2015-11-15 18:43:51,145 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 2015-11-15 18:43:51,145 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-11-15 18:43:51,145 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-1@hdblobstorage.blob.core.windows.net 2015-11-15 18:43:51,879 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-11-15 18:43:52,192 [main] INFO org.apache.pig.scripting.jython.JythonScriptEngine - created tmp python.cachedir=D:\Users\hdp\AppData\Local\Temp\pig_jython_3357684506669481882 2015-11-15 18:43:54,817 [main] WARN org.apache.pig.scripting.jython.JythonScriptEngine - pig.cmd.args.remainders is empty. This is not expected unless on testing. 2015-11-15 18:43:57,645 [main] INFO org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myrs.reservoir_sampling 2015-11-15 18:43:58,535 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-11-15 18:43:59,660 [main] ERROR org.apache.pig.PigServer - exception during parsing: Error during parsing. Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] Failed to parse: Pig script failed to parse:Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:199) at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1735) at org.apache.pig.PigServer$Graph.access$000(PigServer.java:1443) at org.apache.pig.PigServer.parseAndBuild(PigServer.java:387) at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:300) at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:412) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205) at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81) at org.apache.pig.Main.run(Main.java:495) at org.apache.pig.Main.main(Main.java:170) Caused by: Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1572) at org.apache.pig.parser.LogicalPlanGenerator.func_eval(LogicalPlanGenerator.java:9372) at org.apache.pig.parser.LogicalPlanGenerator.projectable_expr(LogicalPlanGenerator.java:11051) at org.apache.pig.parser.LogicalPlanGenerator.var_expr(LogicalPlanGenerator.java:10810) at org.apache.pig.parser.LogicalPlanGenerator.expr(LogicalPlanGenerator.java:10159) at org.apache.pig.parser.LogicalPlanGenerator.flatten_clause(LogicalPlanGenerator.java:7629) at org.apache.pig.parser.LogicalPlanGenerator.flatten_generated_item(LogicalPlanGenerator.java:7452) at org.apache.pig.parser.LogicalPlanGenerator.generate_clause(LogicalPlanGenerator.java:17590) at org.apache.pig.parser.LogicalPlanGenerator.foreach_plan(LogicalPlanGenerator.java:15982) at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15849) at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933) at org.apache.pig.parser.LogicalPlanGenerator.general_statement(LogicalPlanGenerator.java:1102) at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560) at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421) at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191) ... 10 more Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:677) at org.apache.pig.impl.PigContext.getClassForAlias(PigContext.java:793) at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1569) ... 24 more 2015-11-15 18:43:59,707 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] 2015-11-15 18:43:59,707 [main] ERROR org.apache.pig.tools.grunt.Grunt - org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1000: Error during parsing. Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1748) at org.apache.pig.PigServer$Graph.access$000(PigServer.java:1443) at org.apache.pig.PigServer.parseAndBuild(PigServer.java:387) at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:300) at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:412) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205) at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81) at org.apache.pig.Main.run(Main.java:495) at org.apache.pig.Main.main(Main.java:170) Caused by: Failed to parse: Pig script failed to parse: Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:199) at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1735) ... 9 more Caused by: Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1572) at org.apache.pig.parser.LogicalPlanGenerator.func_eval(LogicalPlanGenerator.java:9372) at org.apache.pig.parser.LogicalPlanGenerator.projectable_expr(LogicalPlanGenerator.java:11051) at org.apache.pig.parser.LogicalPlanGenerator.var_expr(LogicalPlanGenerator.java:10810) at org.apache.pig.parser.LogicalPlanGenerator.expr(LogicalPlanGenerator.java:10159) at org.apache.pig.parser.LogicalPlanGenerator.flatten_clause(LogicalPlanGenerator.java:7629) at org.apache.pig.parser.LogicalPlanGenerator.flatten_generated_item(LogicalPlanGenerator.java:7452) at org.apache.pig.parser.LogicalPlanGenerator.generate_clause(LogicalPlanGenerator.java:17590) at org.apache.pig.parser.LogicalPlanGenerator.foreach_plan(LogicalPlanGenerator.java:15982) at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15849) at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933) at org.apache.pig.parser.LogicalPlanGenerator.general_statement(LogicalPlanGenerator.java:1102) at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560) at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421) at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191) ... 10 more Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.] at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:677) at org.apache.pig.impl.PigContext.getClassForAlias(PigContext.java:793) at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1569) ... 24 more Details also at logfile: C:\apps\dist\hadoop-2.6.0.2.2.7.1-33\logs\pig_1447613029598.log 2015-11-15 18:43:59,754 [main] INFO org.apache.pig.Main - Pig script completed in 10 seconds and 453 milliseconds (10453 ms)
ensemble: {x: int,v: chararray} ens_group: {group: chararray,ensemble: {(x: int,v: chararray)}}
A corriger plus tard. Dans l'immédiat, on utilisera la librairie datafu. Si le cluster ne reconnaît pas la librairie, voir la section java pour comprendre comment l'importer. On la déclare dans le script par l'instruction REGISTER
.
%%PIG sample_explore_datafu.pig
REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';
DEFINE RS datafu.pig.sampling.ReservoirSample('1000');
ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(RS(ensemble));
DESCRIBE sampled;
STORE sampled
INTO '$CONTAINER/$PSEUDO/sampling/sample_datafu_rs.txt' USING PigStorage();
jid = %hd_pig_submit sample_explore_datafu.pig
jid
{'id': 'job_1446540516812_0193'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0193', '50% complete', None, False, 'RUNNING')
%hd_tail_stderr jid["id"] -n 100
%blob_ls /$PSEUDO/sampling/sample_datafu
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | axavier/sampling/sample_datafu_rs.txt | Sun, 15 Nov 2015 13:23:40 GMT | 0 | BlockBlob | |
1 | axavier/sampling/sample_datafu_rs.txt/_SUCCESS | Sun, 15 Nov 2015 13:23:40 GMT | application/octet-stream | 0 | BlockBlob |
2 | axavier/sampling/sample_datafu_rs.txt/part-r-0... | Sun, 15 Nov 2015 13:23:38 GMT | application/octet-stream | 12780 | BlockBlob |
Astuce : on distribue puis on recombine les échantillons en faisant un dernier reservoir sampling mais pondéré. Comment distribuer ? Le second sampling est remplacé par une méthode d'échantillonage classique car le reservoir sampling pondéré n'est pas disponible dans la librairie datafu version 1.2.0.
%%PIG sample_explore_datafu_dist.pig
REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';
DEFINE RS datafu.pig.sampling.ReservoirSample('1000');
DEFINE WeightedSample datafu.pig.sampling.WeightedSample();
ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
keys = FOREACH ensemble GENERATE x, v, x%10 AS key;
DESCRIBE keys;
ens_group = GROUP keys BY key ;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE COUNT(keys) AS weigth, FLATTEN(RS(keys));
DESCRIBE sampled;
wsampled = FOREACH (GROUP sampled ALL) GENERATE FLATTEN(WeightedSample(sampled, 0, 1000));
DESCRIBE wsampled;
STORE wsampled
INTO '$CONTAINER/$PSEUDO/sampling/sample_datafu_rs_dist2.txt' USING PigStorage();
jid = %hd_pig_submit sample_explore_datafu_dist.pig
jid
{'id': 'job_1446540516812_0238'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0238', '100% complete', 'done', True, 'SUCCEEDED')
%hd_tail_stderr jid["id"] -n 10
2015-11-15 19:22:17,553 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010 2015-11-15 19:22:17,553 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200 2015-11-15 19:22:17,615 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2015-11-15 19:22:17,803 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2015-11-15 19:22:17,803 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010 2015-11-15 19:22:17,803 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200 2015-11-15 19:22:17,865 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2015-11-15 19:22:17,943 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2015-11-15 19:22:17,975 [main] INFO org.apache.pig.Main - Pig script completed in 1 minute, 42 seconds and 839 milliseconds (102839 ms)
ensemble: {x: int,v: chararray} keys: {x: int,v: chararray,key: int} ens_group: {group: int,keys: {(x: int,v: chararray,key: int)}} sampled: {weigth: long,datafu.pig.sampling.reservoirsample_keys_4::x: int,datafu.pig.sampling.reservoirsample_keys_4::v: chararray,datafu.pig.sampling.reservoirsample_keys_4::key: int} wsampled: {datafu.pig.sampling.weightedsample_sampled_12::weigth: long,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::x: int,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::v: chararray,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::key: int}
%blob_ls /$PSEUDO/sampling/sample_datafu_rs_dist2
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | axavier/sampling/sample_datafu_rs_dist2.txt | Sun, 15 Nov 2015 19:22:05 GMT | 0 | BlockBlob | |
1 | axavier/sampling/sample_datafu_rs_dist2.txt/_S... | Sun, 15 Nov 2015 19:22:06 GMT | application/octet-stream | 0 | BlockBlob |
2 | axavier/sampling/sample_datafu_rs_dist2.txt/pa... | Sun, 15 Nov 2015 19:22:05 GMT | application/octet-stream | 20770 | BlockBlob |
df = %blob_head /$PSEUDO/sampling/sample_datafu_rs_dist2.txt -m
df.head()
10001 | 21260 | S21260 | 0 | |
---|---|---|---|---|
0 | 10000 | 25191 | X25191 | 1 |
1 | 10000 | 73760 | Y73760 | 0 |
2 | 10000 | 90105 | P90105 | 5 |
3 | 10000 | 46070 | Y46070 | 0 |
4 | 10001 | 58590 | M58590 | 0 |
Le problème de la version précédente : chaque sous-ensemble traité d'un seul bloc utilise une séquence de nombres aléatoires sur laquelle on ne connaît pas grand chose. Si les mêmes seed sont utilisées, il est possible que les séquences, même si elles simulent le hasard, soient extrêmement corrélées entre chaque bloc. Il faut remédier à cela.
Il faut également s'assurer que chaque bloc n'est pas skewed.
%%PIG_azure script_rs.pig
REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';
DEFINE MD5 datafu.pig.hash.MD5();
DEFINE RS datafu.pig.sampling.ReservoirSample('1000');
DEFINE WeightedSample datafu.pig.sampling.WeightedSample();
ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble BY (x,v);
DESCRIBE ens_group;
compte_group = FOREACH ens_group
GENERATE group.x AS x,
group.v AS v,
COUNT(ensemble) AS nb_ligne ;
DESCRIBE compte_group;
hash_group = FOREACH compte_group
GENERATE x, v, nb_ligne,
SUBSTRING(MD5(v), 0, 1) AS val;
DESCRIBE hash_group;
group_hash = GROUP hash_group BY val ;
DESCRIBE group_hash;
rs_parall = FOREACH group_hash GENERATE
COUNT(hash_group) AS nb_hash,
FLATTEN(RS(hash_group)) ;
DESCRIBE rs_parall;
wsampled = FOREACH (GROUP rs_parall ALL) GENERATE FLATTEN(WeightedSample(rs_parall, 0, 1000));
DESCRIBE wsampled;
STORE wsampled
INTO '$CONTAINER/$PSEUDO/sampling/sample_distributed_hash.txt' USING PigStorage();
jid=%hd_pig_submit script_rs.pig
jid
{'id': 'job_1446540516812_0244'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0244', '100% complete', None, False, 'RUNNING')
%hd_tail_stderr jid["id"] -n 10
2015-11-15 19:52:05,138 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010 2015-11-15 19:52:05,138 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200 2015-11-15 19:52:05,200 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2015-11-15 19:52:05,435 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/ 2015-11-15 19:52:05,435 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010 2015-11-15 19:52:05,435 [main] INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200 2015-11-15 19:52:05,513 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2015-11-15 19:52:05,560 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2015-11-15 19:52:05,607 [main] INFO org.apache.pig.Main - Pig script completed in 2 minutes, 29 seconds and 962 milliseconds (149962 ms)
ensemble: {x: int,v: chararray} ens_group: {group: (x: int,v: chararray),ensemble: {(x: int,v: chararray)}} compte_group: {x: int,v: chararray,nb_ligne: long} hash_group: {x: int,v: chararray,nb_ligne: long,val: chararray} group_hash: {group: chararray,hash_group: {(x: int,v: chararray,nb_ligne: long,val: chararray)}} rs_parall: {nb_hash: long,datafu.pig.sampling.reservoirsample_hash_group_4::x: int,datafu.pig.sampling.reservoirsample_hash_group_4::v: chararray,datafu.pig.sampling.reservoirsample_hash_group_4::nb_ligne: long,datafu.pig.sampling.reservoirsample_hash_group_4::val: chararray} wsampled: {datafu.pig.sampling.weightedsample_rs_parall_12::nb_hash: long,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::x: int,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::v: chararray,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::nb_ligne: long,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::val: chararray}
%blob_ls /$PSEUDO/sampling/sample_distributed_hash.txt
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | axavier/sampling/sample_distributed_hash.txt | Sun, 15 Nov 2015 19:51:56 GMT | 0 | BlockBlob | |
1 | axavier/sampling/sample_distributed_hash.txt/_... | Sun, 15 Nov 2015 19:51:56 GMT | application/octet-stream | 0 | BlockBlob |
2 | axavier/sampling/sample_distributed_hash.txt/p... | Sun, 15 Nov 2015 19:51:55 GMT | application/octet-stream | 21750 | BlockBlob |
df =%blob_head /$PSEUDO/sampling/sample_distributed_hash.txt -m
df.head()
6693 | 27244 | W27244 | 1 | 6 | |
---|---|---|---|---|---|
0 | 6749 | 51104 | O51104 | 1 | 1 |
1 | 6605 | 91527 | H91527 | 1 | 6 |
2 | 6630 | 75027 | R75027 | 1 | 4 |
3 | 6789 | 58148 | M58148 | 1 | 1 |
4 | 6659 | 71659 | D71659 | 1 | 5 |
%blob_downmerge /$PSEUDO/sampling/sample_distributed_hash.txt sample_distributed_hash.txt
'sample_distributed_hash.txt'
%head sample_distributed_hash.txt
6693 27244 W27244 1 6 6749 51104 O51104 1 1 6605 91527 H91527 1 6 6630 75027 R75027 1 4 6789 58148 M58148 1 1 6659 71659 D71659 1 5 6811 74380 U74380 1 9 6749 20125 B20125 1 2 6587 33466 E33466 1 5 6587 21645 N21645 1 5
On s'inspire de l'exemple suivant Sampling. On télécharge datafu 1.2 depuis Maven. Ce n'est pas la dernière version mais suivre les instructions pour builder datafu (voir documentation). En particulier, la version pondérée du reservoir sampling n'est pas disponible (voir history, la version 1.2.0 est sorti en décembre 2013).
L'implémentation java n'a pas l'air de résoudre un problème qui peut survenir si la taille de l'échantillon demandée est trop grande. Voir section suivante.
import pyensae.datasource
pyensae.datasource.download_data("datafu-1.2.0.jar", url="http://central.maven.org/maven2/com/linkedin/datafu/datafu/1.2.0/")
'datafu-1.2.0.jar'
%blob_up datafu-1.2.0.jar /$PSEUDO/sampling/datafu-1.2.0.jar
'$PSEUDO/sampling/datafu-1.2.0.jar'
%blob_ls /$PSEUDO/sampling
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | axavier/sampling/datafu-1.2.0.jar | Fri, 13 Nov 2015 00:03:49 GMT | application/octet-stream | 1600826 | BlockBlob |
1 | axavier/sampling/sample.txt | Fri, 13 Nov 2015 00:02:50 GMT | application/octet-stream | 1377780 | BlockBlob |
2 | axavier/sampling/sample2.txt | Fri, 13 Nov 2015 00:35:55 GMT | application/octet-stream | 1377793 | BlockBlob |
3 | axavier/sampling/sample3.txt | Fri, 13 Nov 2015 00:39:40 GMT | application/octet-stream | 1377793 | BlockBlob |
4 | axavier/sampling/sample4.txt | Fri, 13 Nov 2015 00:41:49 GMT | application/octet-stream | 1377793 | BlockBlob |
%%PIG_azure sample.pig
REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';
DEFINE RS datafu.pig.sampling.ReservoirSample('1000');
dset = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
USING PigStorage('\t') AS (x:int, v:chararray) ;
sampled = FOREACH (GROUP dset ALL) GENERATE FLATTEN(RS(dset));
STORE sampled INTO '$CONTAINER/$PSEUDO/sampling/out_sampled_rs4_2015.txt' USING PigStorage() ;
jid = %hd_pig_submit sample.pig
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"]
('job_1446540516812_0136', None, None, False, 'RUNNING')
%hd_tail_stderr jid["id"] -n 10
%blob_ls /$PSEUDO/sampling
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | axavier/sampling/datafu-1.2.0.jar | Fri, 13 Nov 2015 00:03:49 GMT | application/octet-stream | 1600826 | BlockBlob |
1 | axavier/sampling/out_sampled_rs4_2015.txt | Fri, 13 Nov 2015 01:08:22 GMT | 0 | BlockBlob | |
2 | axavier/sampling/out_sampled_rs4_2015.txt/_SUC... | Fri, 13 Nov 2015 01:08:22 GMT | application/octet-stream | 0 | BlockBlob |
3 | axavier/sampling/out_sampled_rs4_2015.txt/part... | Fri, 13 Nov 2015 01:08:21 GMT | application/octet-stream | 12785 | BlockBlob |
4 | axavier/sampling/sample.txt | Fri, 13 Nov 2015 00:02:50 GMT | application/octet-stream | 1377780 | BlockBlob |
5 | axavier/sampling/sample2.txt | Fri, 13 Nov 2015 00:35:55 GMT | application/octet-stream | 1377793 | BlockBlob |
6 | axavier/sampling/sample3.txt | Fri, 13 Nov 2015 00:39:40 GMT | application/octet-stream | 1377793 | BlockBlob |
7 | axavier/sampling/sample4.txt | Fri, 13 Nov 2015 00:41:49 GMT | application/octet-stream | 1377793 | BlockBlob |
8 | axavier/sampling/sampled4_2015.txt | Fri, 13 Nov 2015 00:50:20 GMT | 0 | BlockBlob | |
9 | axavier/sampling/sampled4_2015.txt/_SUCCESS | Fri, 13 Nov 2015 00:50:20 GMT | application/octet-stream | 0 | BlockBlob |
10 | axavier/sampling/sampled4_2015.txt/part-m-00000 | Fri, 13 Nov 2015 00:50:19 GMT | application/octet-stream | 1277794 | BlockBlob |
11 | axavier/sampling/sampled_rs4_2015.txt | Fri, 13 Nov 2015 01:04:51 GMT | 0 | BlockBlob | |
12 | axavier/sampling/sampled_rs4_2015.txt/_SUCCESS | Fri, 13 Nov 2015 01:04:51 GMT | application/octet-stream | 0 | BlockBlob |
13 | axavier/sampling/sampled_rs4_2015.txt/part-m-0... | Fri, 13 Nov 2015 01:04:50 GMT | application/octet-stream | 1277794 | BlockBlob |
14 | axavier/sampling/sampled_srs4_2015.txt | Fri, 13 Nov 2015 00:56:09 GMT | 0 | BlockBlob | |
15 | axavier/sampling/sampled_srs4_2015.txt/_SUCCESS | Fri, 13 Nov 2015 00:56:09 GMT | application/octet-stream | 0 | BlockBlob |
16 | axavier/sampling/sampled_srs4_2015.txt/part-m-... | Fri, 13 Nov 2015 00:56:09 GMT | application/octet-stream | 1277794 | BlockBlob |
17 | axavier/sampling/sampled_srs_2015.txt | Fri, 13 Nov 2015 00:52:34 GMT | 0 | BlockBlob | |
18 | axavier/sampling/sampled_srs_2015.txt/_SUCCESS | Fri, 13 Nov 2015 00:52:34 GMT | application/octet-stream | 0 | BlockBlob |
19 | axavier/sampling/sampled_srs_2015.txt/part-m-0... | Fri, 13 Nov 2015 00:52:34 GMT | application/octet-stream | 1277794 | BlockBlob |
%blob_downmerge /$PSEUDO/sampling/out_sampled_rs4_2015.txt out_sampled_rs4_2015.txt -o
'out_sampled_rs4_2015.txt'
%head out_sampled_rs4_2015.txt
90648 M90648 49678 S49678 41434 Q41434 30149 P30149 15836 C15836 61110 K61110 3838 Q3838 81515 F81515 48052 E48052 16332 E16332
%blob_close
True