Coverage for src/pyenbc/filehelper/pig_helper.py: 11%
124 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 05:47 +0200
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 05:47 +0200
1"""
2@file
3@brief Hadoop uses a java implementation of Python: Jython.
4This provides provides helper around that.
6.. versionadded:: 1.1
7"""
9import os
10import glob
11from pyquickhelper.loghelper import run_cmd, noLOG
12from pyquickhelper.filehelper import change_file_status
13from pyensae.datasource.http_retrieve import download_data
14from .jython_helper import get_java_cmd, get_java_path
16PIG_VERSION = "0.17.0"
17HADOOP_VERSION = "3.3.0"
20def download_pig_standalone(pig_version=PIG_VERSION,
21 hadoop_version=HADOOP_VERSION,
22 fLOG=noLOG):
23 """
24 Downloads the standalone :epkg:`jython`.
25 If it does not exists, we should version ``HADOOP_VERSION``
26 by default in order to fit the cluster's version.
28 @param pig_version pig_version
29 @param hadoop_version hadoop_version
30 @param fLOG logging function
31 @return location
33 This function might need to be run twice if the first try
34 fails, it might to due to very long path when unzipping the
35 downloaded file.
37 :epkg:`Hadoop` is downloaded from one of the websites
38 referenced at
39 `Apache Software Foundation <http://www.apache.org/dyn/closer.cgi/hadoop/common/>`_.
40 Check the source to see which one was chosen.
41 """
42 fbs = []
44 # download winutils.exe
45 d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "winutils")
46 if not os.path.exists(d):
47 os.mkdir(d)
48 exe = download_data(name="winutils.zip",
49 whereTo=d,
50 website="xd",
51 fLOG=fLOG)
52 fbs.append(exe)
53 change_file_status(d)
55 # download hadoop
56 fLOG("download hadoop", hadoop_version)
57 d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "hadoopjar")
58 if not os.path.exists(d):
59 os.mkdir(d)
60 fn = download_data(name="hadoop-%s.tar.gz" % hadoop_version,
61 whereTo=d,
62 website="http://apache.crihan.fr/dist/hadoop/common/hadoop-%s/" % hadoop_version,
63 fLOG=fLOG)
64 fbs.append(fn)
65 change_file_status(d)
67 # download pig
68 fLOG("download pig", pig_version)
69 d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "pigjar")
70 if not os.path.exists(d):
71 os.mkdir(d)
72 fn = download_data(name="pig-%s.tar.gz" % pig_version,
73 whereTo=d, silent=True,
74 website="http://apache.crihan.fr/dist/pig/pig-%s/" % pig_version,
75 fLOG=fLOG)
76 fbs.append(fn)
77 change_file_status(d)
78 return fbs
81def get_pig_path():
82 """
83 This function assumes a folder pig ``pigjar``
84 is present in this directory, the function returns the folder
86 @return absolute path
87 """
88 this = os.path.abspath(os.path.dirname(__file__))
89 files = [os.path.join(this, _) for _ in os.listdir(this)]
90 files = [_ for _ in files if "pig" in _ and os.path.isdir(_)]
91 if len(files) == 0:
92 raise FileNotFoundError("no pig folder found in " + this)
93 if len(files) != 1:
94 raise FileNotFoundError(
95 "more than one folder found in " +
96 this +
97 "\n:" +
98 "\n".join(files))
99 return files[0]
102def get_hadoop_path():
103 """
104 This function assumes a folder pig ``hadoopjar``
105 is present in this directory, the function returns the folder.
107 @return absolute path
108 """
109 this = os.path.abspath(os.path.dirname(__file__))
110 files = [os.path.join(this, _) for _ in os.listdir(this)]
111 files = [_ for _ in files if "hadoop" in _ and os.path.isdir(_)]
112 if len(files) == 0:
113 raise FileNotFoundError("no hadoop folder found in " + this)
114 if len(files) != 1:
115 raise FileNotFoundError(
116 "more than one folder found in " +
117 this +
118 "\n:" +
119 "\n".join(files))
120 return files[0]
123def get_pig_jars():
124 """
125 Returns the list of jars to include into the
126 command line in order to run :epkg:`PIG`.
128 @return list of jars
129 """
130 path = get_pig_path()
131 res = []
132 for root, _, files in os.walk(path):
133 for name in files:
134 if os.path.splitext(name)[-1] == ".jar" and "lib" in root:
135 if "h1" not in root and "h1" not in name and "h1" not in root \
136 and "hadoop1-runtime" not in name \
137 and "hadoop1-runtime" not in root \
138 and "test" not in root \
139 and "h2" not in name \
140 and ("pig-" + PIG_VERSION + "-withouthadoop-h2") not in name:
141 res.append(os.path.join(root, name))
142 return res
145def get_hadoop_jars():
146 """
147 Returns the list of jars to include into the command
148 line in order to run :epkg:`HADOOP`.
150 @return list of jars
151 """
152 path = get_hadoop_path()
153 res = []
154 for root, _, files in os.walk(path):
155 for name in files:
156 if os.path.splitext(name)[-1] == ".jar":
157 if "sources.jar" not in name and "-test-sources" not in name \
158 and "tests.jar" not in name:
159 res.append(os.path.join(root, name))
160 return res
163def run_pig(pigfile, argv=None, pig_path=None, hadoop_path=None,
164 jython_path=None, timeout=None, logpath="logs",
165 pig_version=PIG_VERSION, hadoop_version=HADOOP_VERSION,
166 jar_no_hadoop=True, fLOG=noLOG):
167 """
168 Runs a :epkg:`pig` script and returns the
169 standard output and error.
171 @param pigfile pig file
172 @param argv arguments to sned to the command line
173 @param pig_path path to pig 0.XX.0
174 @param hadoop_path path to hadoop
175 @param timeout timeout
176 @param logpath path to the logs
177 @param pig_version PIG version (if *pig_path* is not defined)
178 @param hadoop_version Hadoop version (if *hadoop_path* is not defined)
179 @param jar_no_hadoop use :epkg:`pig` without :epkg:`hadoop`
180 @param fLOG logging function
181 @return out, err
183 If *pig_path* is None, the function looks into this directory.
184 """
185 if pig_path is None:
186 pig_path = os.path.join(get_pig_path(), "pig-%s" % pig_version)
188 if hadoop_path is None:
189 hadoop_path = get_hadoop_path()
191 java = get_java_path()
192 if "JAVA_HOME" not in os.environ:
193 os.environ["JAVA_HOME"] = java
195 if "PIG_CONF_DIR" not in os.environ:
196 os.environ["PIG_CONF_DIR"] = os.path.normpath(
197 os.path.join(
198 pig_path,
199 "conf"))
200 if not os.path.exists(os.environ["PIG_CONF_DIR"]):
201 raise FileNotFoundError(os.environ["PIG_CONF_DIR"])
203 if "HADOOP_HOME" not in os.environ:
204 os.environ["HADOOP_HOME"] = hadoop_path
205 if not os.path.exists(os.environ["HADOOP_HOME"]):
206 raise FileNotFoundError(os.environ["HADOOP_HOME"])
208 if "HADOOP_CLIENT_OPTS" not in os.environ:
209 os.environ["HADOOP_CLIENT_OPTS"] = "-Xmx1024m"
211 fLOG("PIG_CONF_DIR=", os.environ["PIG_CONF_DIR"])
213 def clean(i, p):
214 "local function"
215 if i == 0:
216 return p
217 if '"' in p:
218 p = p.replace('"', '\\"')
219 if " " in p:
220 p = '"{0}"'.format(p)
221 return p
223 jars = []
224 if not jar_no_hadoop:
225 jars.extend(get_pig_jars()) # + get_hadoop_jars()
226 folds = set(os.path.split(j)[0] for j in jars)
227 jars = [os.path.join(f, "*.jar") for f in folds]
229 jars.append(
230 os.path.join(
231 hadoop_path,
232 "hadoop-%s" % hadoop_version,
233 "share",
234 "hadoop",
235 "common",
236 "lib",
237 "*.jar"))
238 jars.append(
239 os.path.join(
240 hadoop_path,
241 "hadoop-%s" % hadoop_version,
242 "share",
243 "hadoop",
244 "hdfs",
245 "lib",
246 "*.jar"))
247 jars.append(
248 os.path.join(
249 hadoop_path,
250 "hadoop-%s" % hadoop_version,
251 "share",
252 "hadoop",
253 "mapreduce",
254 "lib",
255 "*.jar"))
256 jars.append(
257 os.path.join(
258 hadoop_path,
259 "hadoop-%s" % hadoop_version,
260 "share",
261 "hadoop",
262 "httpfs",
263 "tomcat",
264 "lib",
265 "*.jar"))
266 jars.append(
267 os.path.join(
268 hadoop_path,
269 "hadoop-%s" % hadoop_version,
270 "share",
271 "hadoop",
272 "tools",
273 "lib",
274 "*.jar"))
275 jars.append(
276 os.path.join(
277 hadoop_path,
278 "hadoop-%s" % hadoop_version,
279 "share",
280 "hadoop",
281 "yarn",
282 "lib",
283 "*.jar"))
285 jars.append(
286 os.path.join(
287 hadoop_path,
288 "hadoop-%s" % hadoop_version,
289 "share",
290 "hadoop",
291 "common",
292 "hadoop-common-%s.jar" % hadoop_version))
293 jars.append(
294 os.path.join(
295 hadoop_path,
296 "hadoop-%s" % hadoop_version,
297 "share",
298 "hadoop",
299 "common",
300 "hadoop-nfs-%s" % hadoop_version))
301 jars.append(
302 os.path.join(
303 hadoop_path,
304 "hadoop-%s" % hadoop_version,
305 "share",
306 "hadoop",
307 "hdfs",
308 "hadoop-hdfs-%s.jar" % hadoop_version))
309 jars.append(
310 os.path.join(
311 hadoop_path,
312 "hadoop-%s" % hadoop_version,
313 "share",
314 "hadoop",
315 "mapreduce",
316 "*.jar"))
317 jars.append(
318 os.path.join(
319 hadoop_path,
320 "hadoop-%s" % hadoop_version,
321 "share",
322 "hadoop",
323 "yarn",
324 "*.jar"))
326 jars.append(os.path.join(pig_path, "pig-%s-core-h2.jar" % pig_version))
327 else:
328 jars.append(
329 os.path.join(
330 pig_path,
331 "pig-%s" % pig_version,
332 "legacy",
333 "pig-%s-withouthadoop-h2.jar" % pig_version))
335 jarsall = []
336 for j in jars:
337 r = glob.glob(j)
338 jarsall.extend(r)
339 jarsall.sort()
341 jars = ";".join(jars)
342 fLOG("jars", jars)
344 cmd = [get_java_cmd(), "-Xmx1024m",
345 "-classpath", jars,
346 "-Dpig.log.dir=" + logpath,
347 "-Dhadoop.log.dir=" + logpath,
348 "-Dhadoop.tmp.dir=" + logpath,
349 "-Dpig.log.file=pid.log",
350 "-Djava.io.tmpdir=" + logpath,
351 "-Dpig.home.dir=" + pig_path,
352 # "-Dpig.schematuple=true",
353 #"-Dpig.schematuple.local.dir=" + logpath,
354 "org.apache.pig.Main",
355 "-x", "local", pigfile,
356 "-stop_on_failure"
357 ]
359 cmd = " ".join(clean(i, _) for i, _ in enumerate(cmd))
360 out, err = run_cmd(
361 cmd, wait=True, sin=None, communicate=True, timeout=timeout, shell=False)
362 out = "PIG_CONF_DIR={0}\nJAVA_HOME={1}\nHADOOP_HOME={2}\n{3}\n{4}".format(
363 os.environ.get('PIG_CONF_DIR', ''), os.environ.get('JAVA_HOME', ''),
364 os.environ.get('HADOOP_HOME', ''),
365 "\n".join("add jar: '{0}'".format(j) for j in jarsall),
366 out)
367 return out, err