Coverage for src/pyenbc/filehelper/pig_helper.py: 11%

124 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-20 05:47 +0200

1""" 

2@file 

3@brief Hadoop uses a java implementation of Python: Jython. 

4This provides provides helper around that. 

5 

6.. versionadded:: 1.1 

7""" 

8 

9import os 

10import glob 

11from pyquickhelper.loghelper import run_cmd, noLOG 

12from pyquickhelper.filehelper import change_file_status 

13from pyensae.datasource.http_retrieve import download_data 

14from .jython_helper import get_java_cmd, get_java_path 

15 

16PIG_VERSION = "0.17.0" 

17HADOOP_VERSION = "3.3.0" 

18 

19 

20def download_pig_standalone(pig_version=PIG_VERSION, 

21 hadoop_version=HADOOP_VERSION, 

22 fLOG=noLOG): 

23 """ 

24 Downloads the standalone :epkg:`jython`. 

25 If it does not exists, we should version ``HADOOP_VERSION`` 

26 by default in order to fit the cluster's version. 

27 

28 @param pig_version pig_version 

29 @param hadoop_version hadoop_version 

30 @param fLOG logging function 

31 @return location 

32 

33 This function might need to be run twice if the first try 

34 fails, it might to due to very long path when unzipping the 

35 downloaded file. 

36 

37 :epkg:`Hadoop` is downloaded from one of the websites 

38 referenced at 

39 `Apache Software Foundation <http://www.apache.org/dyn/closer.cgi/hadoop/common/>`_. 

40 Check the source to see which one was chosen. 

41 """ 

42 fbs = [] 

43 

44 # download winutils.exe 

45 d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "winutils") 

46 if not os.path.exists(d): 

47 os.mkdir(d) 

48 exe = download_data(name="winutils.zip", 

49 whereTo=d, 

50 website="xd", 

51 fLOG=fLOG) 

52 fbs.append(exe) 

53 change_file_status(d) 

54 

55 # download hadoop 

56 fLOG("download hadoop", hadoop_version) 

57 d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "hadoopjar") 

58 if not os.path.exists(d): 

59 os.mkdir(d) 

60 fn = download_data(name="hadoop-%s.tar.gz" % hadoop_version, 

61 whereTo=d, 

62 website="http://apache.crihan.fr/dist/hadoop/common/hadoop-%s/" % hadoop_version, 

63 fLOG=fLOG) 

64 fbs.append(fn) 

65 change_file_status(d) 

66 

67 # download pig 

68 fLOG("download pig", pig_version) 

69 d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "pigjar") 

70 if not os.path.exists(d): 

71 os.mkdir(d) 

72 fn = download_data(name="pig-%s.tar.gz" % pig_version, 

73 whereTo=d, silent=True, 

74 website="http://apache.crihan.fr/dist/pig/pig-%s/" % pig_version, 

75 fLOG=fLOG) 

76 fbs.append(fn) 

77 change_file_status(d) 

78 return fbs 

79 

80 

81def get_pig_path(): 

82 """ 

83 This function assumes a folder pig ``pigjar`` 

84 is present in this directory, the function returns the folder 

85 

86 @return absolute path 

87 """ 

88 this = os.path.abspath(os.path.dirname(__file__)) 

89 files = [os.path.join(this, _) for _ in os.listdir(this)] 

90 files = [_ for _ in files if "pig" in _ and os.path.isdir(_)] 

91 if len(files) == 0: 

92 raise FileNotFoundError("no pig folder found in " + this) 

93 if len(files) != 1: 

94 raise FileNotFoundError( 

95 "more than one folder found in " + 

96 this + 

97 "\n:" + 

98 "\n".join(files)) 

99 return files[0] 

100 

101 

102def get_hadoop_path(): 

103 """ 

104 This function assumes a folder pig ``hadoopjar`` 

105 is present in this directory, the function returns the folder. 

106 

107 @return absolute path 

108 """ 

109 this = os.path.abspath(os.path.dirname(__file__)) 

110 files = [os.path.join(this, _) for _ in os.listdir(this)] 

111 files = [_ for _ in files if "hadoop" in _ and os.path.isdir(_)] 

112 if len(files) == 0: 

113 raise FileNotFoundError("no hadoop folder found in " + this) 

114 if len(files) != 1: 

115 raise FileNotFoundError( 

116 "more than one folder found in " + 

117 this + 

118 "\n:" + 

119 "\n".join(files)) 

120 return files[0] 

121 

122 

123def get_pig_jars(): 

124 """ 

125 Returns the list of jars to include into the 

126 command line in order to run :epkg:`PIG`. 

127 

128 @return list of jars 

129 """ 

130 path = get_pig_path() 

131 res = [] 

132 for root, _, files in os.walk(path): 

133 for name in files: 

134 if os.path.splitext(name)[-1] == ".jar" and "lib" in root: 

135 if "h1" not in root and "h1" not in name and "h1" not in root \ 

136 and "hadoop1-runtime" not in name \ 

137 and "hadoop1-runtime" not in root \ 

138 and "test" not in root \ 

139 and "h2" not in name \ 

140 and ("pig-" + PIG_VERSION + "-withouthadoop-h2") not in name: 

141 res.append(os.path.join(root, name)) 

142 return res 

143 

144 

145def get_hadoop_jars(): 

146 """ 

147 Returns the list of jars to include into the command 

148 line in order to run :epkg:`HADOOP`. 

149 

150 @return list of jars 

151 """ 

152 path = get_hadoop_path() 

153 res = [] 

154 for root, _, files in os.walk(path): 

155 for name in files: 

156 if os.path.splitext(name)[-1] == ".jar": 

157 if "sources.jar" not in name and "-test-sources" not in name \ 

158 and "tests.jar" not in name: 

159 res.append(os.path.join(root, name)) 

160 return res 

161 

162 

163def run_pig(pigfile, argv=None, pig_path=None, hadoop_path=None, 

164 jython_path=None, timeout=None, logpath="logs", 

165 pig_version=PIG_VERSION, hadoop_version=HADOOP_VERSION, 

166 jar_no_hadoop=True, fLOG=noLOG): 

167 """ 

168 Runs a :epkg:`pig` script and returns the 

169 standard output and error. 

170 

171 @param pigfile pig file 

172 @param argv arguments to sned to the command line 

173 @param pig_path path to pig 0.XX.0 

174 @param hadoop_path path to hadoop 

175 @param timeout timeout 

176 @param logpath path to the logs 

177 @param pig_version PIG version (if *pig_path* is not defined) 

178 @param hadoop_version Hadoop version (if *hadoop_path* is not defined) 

179 @param jar_no_hadoop use :epkg:`pig` without :epkg:`hadoop` 

180 @param fLOG logging function 

181 @return out, err 

182 

183 If *pig_path* is None, the function looks into this directory. 

184 """ 

185 if pig_path is None: 

186 pig_path = os.path.join(get_pig_path(), "pig-%s" % pig_version) 

187 

188 if hadoop_path is None: 

189 hadoop_path = get_hadoop_path() 

190 

191 java = get_java_path() 

192 if "JAVA_HOME" not in os.environ: 

193 os.environ["JAVA_HOME"] = java 

194 

195 if "PIG_CONF_DIR" not in os.environ: 

196 os.environ["PIG_CONF_DIR"] = os.path.normpath( 

197 os.path.join( 

198 pig_path, 

199 "conf")) 

200 if not os.path.exists(os.environ["PIG_CONF_DIR"]): 

201 raise FileNotFoundError(os.environ["PIG_CONF_DIR"]) 

202 

203 if "HADOOP_HOME" not in os.environ: 

204 os.environ["HADOOP_HOME"] = hadoop_path 

205 if not os.path.exists(os.environ["HADOOP_HOME"]): 

206 raise FileNotFoundError(os.environ["HADOOP_HOME"]) 

207 

208 if "HADOOP_CLIENT_OPTS" not in os.environ: 

209 os.environ["HADOOP_CLIENT_OPTS"] = "-Xmx1024m" 

210 

211 fLOG("PIG_CONF_DIR=", os.environ["PIG_CONF_DIR"]) 

212 

213 def clean(i, p): 

214 "local function" 

215 if i == 0: 

216 return p 

217 if '"' in p: 

218 p = p.replace('"', '\\"') 

219 if " " in p: 

220 p = '"{0}"'.format(p) 

221 return p 

222 

223 jars = [] 

224 if not jar_no_hadoop: 

225 jars.extend(get_pig_jars()) # + get_hadoop_jars() 

226 folds = set(os.path.split(j)[0] for j in jars) 

227 jars = [os.path.join(f, "*.jar") for f in folds] 

228 

229 jars.append( 

230 os.path.join( 

231 hadoop_path, 

232 "hadoop-%s" % hadoop_version, 

233 "share", 

234 "hadoop", 

235 "common", 

236 "lib", 

237 "*.jar")) 

238 jars.append( 

239 os.path.join( 

240 hadoop_path, 

241 "hadoop-%s" % hadoop_version, 

242 "share", 

243 "hadoop", 

244 "hdfs", 

245 "lib", 

246 "*.jar")) 

247 jars.append( 

248 os.path.join( 

249 hadoop_path, 

250 "hadoop-%s" % hadoop_version, 

251 "share", 

252 "hadoop", 

253 "mapreduce", 

254 "lib", 

255 "*.jar")) 

256 jars.append( 

257 os.path.join( 

258 hadoop_path, 

259 "hadoop-%s" % hadoop_version, 

260 "share", 

261 "hadoop", 

262 "httpfs", 

263 "tomcat", 

264 "lib", 

265 "*.jar")) 

266 jars.append( 

267 os.path.join( 

268 hadoop_path, 

269 "hadoop-%s" % hadoop_version, 

270 "share", 

271 "hadoop", 

272 "tools", 

273 "lib", 

274 "*.jar")) 

275 jars.append( 

276 os.path.join( 

277 hadoop_path, 

278 "hadoop-%s" % hadoop_version, 

279 "share", 

280 "hadoop", 

281 "yarn", 

282 "lib", 

283 "*.jar")) 

284 

285 jars.append( 

286 os.path.join( 

287 hadoop_path, 

288 "hadoop-%s" % hadoop_version, 

289 "share", 

290 "hadoop", 

291 "common", 

292 "hadoop-common-%s.jar" % hadoop_version)) 

293 jars.append( 

294 os.path.join( 

295 hadoop_path, 

296 "hadoop-%s" % hadoop_version, 

297 "share", 

298 "hadoop", 

299 "common", 

300 "hadoop-nfs-%s" % hadoop_version)) 

301 jars.append( 

302 os.path.join( 

303 hadoop_path, 

304 "hadoop-%s" % hadoop_version, 

305 "share", 

306 "hadoop", 

307 "hdfs", 

308 "hadoop-hdfs-%s.jar" % hadoop_version)) 

309 jars.append( 

310 os.path.join( 

311 hadoop_path, 

312 "hadoop-%s" % hadoop_version, 

313 "share", 

314 "hadoop", 

315 "mapreduce", 

316 "*.jar")) 

317 jars.append( 

318 os.path.join( 

319 hadoop_path, 

320 "hadoop-%s" % hadoop_version, 

321 "share", 

322 "hadoop", 

323 "yarn", 

324 "*.jar")) 

325 

326 jars.append(os.path.join(pig_path, "pig-%s-core-h2.jar" % pig_version)) 

327 else: 

328 jars.append( 

329 os.path.join( 

330 pig_path, 

331 "pig-%s" % pig_version, 

332 "legacy", 

333 "pig-%s-withouthadoop-h2.jar" % pig_version)) 

334 

335 jarsall = [] 

336 for j in jars: 

337 r = glob.glob(j) 

338 jarsall.extend(r) 

339 jarsall.sort() 

340 

341 jars = ";".join(jars) 

342 fLOG("jars", jars) 

343 

344 cmd = [get_java_cmd(), "-Xmx1024m", 

345 "-classpath", jars, 

346 "-Dpig.log.dir=" + logpath, 

347 "-Dhadoop.log.dir=" + logpath, 

348 "-Dhadoop.tmp.dir=" + logpath, 

349 "-Dpig.log.file=pid.log", 

350 "-Djava.io.tmpdir=" + logpath, 

351 "-Dpig.home.dir=" + pig_path, 

352 # "-Dpig.schematuple=true", 

353 #"-Dpig.schematuple.local.dir=" + logpath, 

354 "org.apache.pig.Main", 

355 "-x", "local", pigfile, 

356 "-stop_on_failure" 

357 ] 

358 

359 cmd = " ".join(clean(i, _) for i, _ in enumerate(cmd)) 

360 out, err = run_cmd( 

361 cmd, wait=True, sin=None, communicate=True, timeout=timeout, shell=False) 

362 out = "PIG_CONF_DIR={0}\nJAVA_HOME={1}\nHADOOP_HOME={2}\n{3}\n{4}".format( 

363 os.environ.get('PIG_CONF_DIR', ''), os.environ.get('JAVA_HOME', ''), 

364 os.environ.get('HADOOP_HOME', ''), 

365 "\n".join("add jar: '{0}'".format(j) for j in jarsall), 

366 out) 

367 return out, err