Coverage for src/pyensae/sql/database_import_export.py: 53%

179 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-03 02:16 +0200

1""" 

2@file 

3 

4@brief @see cl Database 

5""" 

6 

7import os 

8import collections 

9try: 

10 from collections.abc import Iterable 

11except ImportError: 

12 from collections import Iterable 

13from .file_text_binary import TextFile 

14from .file_text_binary_columns import TextFileColumns 

15from .database_exception import DBException 

16 

17 

18class DatabaseImportExport: 

19 

20 """ 

21 This class is not meant to be working alone. 

22 It contains import, export function for a database, in various formats. 

23 """ 

24 

25 ########################################################################## 

26 # exporting functions 

27 ########################################################################## 

28 

29 def export_table_into_flat_file(self, table, filename, header=False, columns=None, 

30 post_process=None, encoding="utf8"): 

31 """ 

32 Exports a table into a flat file. 

33 

34 @param table table name 

35 @param filename filename 

36 @param header add a header on the first line 

37 @param columns export only columns in this list (if None, export all) 

38 @param post_process post_process a line: 

39 - input: list, dictionary (for your own use, same one all the time) 

40 - output: list 

41 @param encoding encoding 

42 

43 .. exref:: 

44 :title: Export the results of a SQL query into a flat file 

45 :tag: SQL 

46 

47 :: 

48 

49 from pyensae.sql.database_main import Database 

50 dbfile = "filename.db3" 

51 filetxt = "fileview.txt" 

52 sql = "..." 

53 db = Database(dbfile) 

54 db.connect() 

55 db.export_view_into_flat_file (sql, fileview, header = True) 

56 db.close() 

57 """ 

58 if columns is None: 

59 sql = "SELECT * FROM " + table + ";" 

60 else: 

61 sql = "SELECT %s FROM %s ;" % (",".join(columns), table) 

62 

63 self.export_view_into_flat_file( 

64 sql, filename, header, post_process, encoding=encoding) 

65 

66 def _clean_string(self, s): 

67 """ 

68 Cleans string. 

69 

70 @param s string 

71 @return remove \\r\\t\\n 

72 """ 

73 rep = {"\t": "\\t", 

74 "\n": "\\n", 

75 "\r": "\\r", } 

76 for k, v in rep.items(): 

77 s = s.replace(k, v) 

78 return s 

79 

80 def export_view_into_flat_file(self, view_sql, filename, header=False, post_process=None, 

81 encoding="utf8"): 

82 """ 

83 Exports a table into a flat file. 

84 

85 @param view_sql SQL request 

86 @param filename filename 

87 @param header if != None, add a header on the first line (header is a list of string) 

88 @param post_process if != None, use this function to post-process a text line extracted from the file 

89 @param encoding if != None, use this as a parameter to convert any value into str 

90 """ 

91 sepline = "\n" 

92 

93 self._check_connection() 

94 

95 if header: 

96 if isinstance(header, (list, tuple)): 

97 header_line = "\t".join(header) + sepline 

98 elif isinstance(header, bool): 

99 col = self.get_sql_columns(view_sql) 

100 header_line = "\t".join(col) + sepline 

101 else: 

102 header_line = header + sepline 

103 else: 

104 header_line = "" 

105 

106 sql = view_sql 

107 cur = self.execute(sql) 

108 nbline = 0 

109 

110 f = open(filename, "w", encoding=encoding) 

111 f.write(header_line) 

112 memo = {} 

113 

114 for line_ in cur: 

115 

116 if post_process is not None: 

117 line = post_process(line_, memo) 

118 else: 

119 line = line_ 

120 

121 pr = "\t".join([self._clean_string(str(x)) for x in line]) 

122 

123 f.write(pr + sepline) 

124 nbline += 1 

125 if nbline % 100000 == 0: 

126 self.LOG(" exporting from view, line ", nbline) 

127 

128 f.close() 

129 cur.close() 

130 

131 ########################################################################## 

132 # importing functions 

133 ########################################################################## 

134 

135 def append_values(self, values, tablename, schema, cursor=None, 

136 skip_exception=False, encoding="utf-8"): 

137 """ 

138 Uses @see me _append_table to fill a table will the 

139 values contained in values (as list). 

140 

141 @param values list of list (each cell is a value) 

142 @param tablename name of the table to fill 

143 @param schema schema of the database, it must be present in case on the columns 

144 includes the tag "PRIMARYKEY", in that case, the value for this field 

145 will be automatically set up. 

146 @param cursor if None, create a new one 

147 @param skip_exception skip exception while inserting an element 

148 @param encoding encoding 

149 

150 """ 

151 self._append_table( 

152 values, 

153 tablename, 

154 schema, 

155 cursor=cursor, 

156 skip_exception=skip_exception, 

157 encoding=encoding) 

158 

159 def _append_table(self, file, table, columns, format="tsv", header=False, 

160 stop=-1, lower_case=False, cursor=None, fill_missing=0, 

161 unique=None, filter_case=None, strict_separator=False, 

162 skip_exception=False, changes=None, encoding="utf-8", 

163 **params): 

164 """ 

165 Appends element to a database. 

166 

167 @param file file name or a matrix (this matrix can be an iterator) 

168 @param table table name 

169 @param columns columns definition (see below) 

170 @param format tsv, the only one accepted for the time being, it can be a function (line, **params) 

171 @param header the file has a header of not, if True, skip the first line 

172 @param stop if -1, insert every line, otherwise stop when the number of inserted lines is stop 

173 @param lower_case put every str string in lower_case before inserting it 

174 @param cursor if None, create a new one 

175 @param fill_missing fill the missing values by a default value, at least not more than fill_missing values 

176 @param unique if unique is a column number, 

177 the function will not take into account another containing a value already seen on this column 

178 @param filter_case process every case information (used to replace space for example) 

179 @param strict_separator strict number of columns, it assumes there is no separator in the content of every column 

180 @param params see format 

181 @param skip_exception skip exception while inserting an element 

182 @param changes to rewrite column names 

183 @param encoding encoding 

184 @return number of inserted elements 

185 

186 The columns definition must follow the schema: 

187 - dictionary ``{ key:(column_name,python_type) }`` 

188 - or ``{ key:(column_name,python_type,preprocessing_function) }`` 

189 

190 ``preprocessing_function`` is a function whose prototype is for example: 

191 

192 @code 

193 def preprocessing_score (s) : 

194 return s.replace (",",".") 

195 @endcode 

196 

197 And: 

198 - if ``PRIMARYKEY`` is added, the key is considered as the primary key 

199 - if ``AUTOINCREMENT`` is added, the key will automatically filled (like an id) 

200 

201 """ 

202 if changes is None: 

203 changes = {} 

204 if stop != -1: 

205 self.LOG("SQL append table stop is ", stop) 

206 self._check_connection() 

207 nbinsert = 0 

208 unique_key = {} 

209 if isinstance(file, list) or ( 

210 isinstance(file, Iterable) and not isinstance(file, str)): 

211 primarykey = None 

212 for c, v in columns.items(): 

213 if "PRIMARYKEY" in v: 

214 primarykey = v[0] 

215 

216 if table not in self.get_table_list(): 

217 raise DBException("unable to find table " + table) 

218 

219 all = 0 

220 num_line = 0 

221 for line in file: 

222 if stop != -1 and all >= stop: 

223 break 

224 dic = self._process_text_line(line, columns, format=format, lower_case=lower_case, 

225 num_line=num_line, filter_case=filter_case, 

226 strict_separator=strict_separator) 

227 

228 if unique is not None: 

229 if dic[unique] in unique_key: 

230 continue 

231 unique_key[dic[unique]] = 0 

232 

233 num_line += 1 

234 if dic is not None: 

235 self._get_insert_request(dic, table, True, primarykey, cursor=cursor, 

236 skip_exception=skip_exception) 

237 nbinsert += 1 

238 #self._connection.execute (s) 

239 all += 1 

240 if all % 100000 == 0: 

241 self.LOG( 

242 "adding %d lines into table %s" % 

243 (all, table)) 

244 else: 

245 primarykey = None 

246 for c, v in columns.items(): 

247 if "PRIMARYKEY" in v: 

248 primarykey = v[0] 

249 

250 if table not in self.get_table_list(): 

251 table_list = self.get_table_list() 

252 message = "unable to find table " + table + \ 

253 " in [" + ",".join(table_list) + "]" 

254 raise DBException(message) 

255 

256 column_has_space = len( 

257 [v[0] for k, v in columns.items() if ' ' in v[0]]) > 0 

258 self.LOG( 

259 " column_has_space", column_has_space, [ 

260 v[0] for k, v in columns.items()]) 

261 

262 if strict_separator or column_has_space: 

263 file = TextFile(file, errors='ignore', 

264 fLOG=self.LOG, encoding=encoding) 

265 skip = False 

266 else: 

267 self.LOG(" changes", changes) 

268 file = TextFileColumns(file, errors='ignore', fLOG=self.LOG, 

269 regex=columns, changes=changes, encoding=encoding) 

270 skip = True 

271 

272 file.open() 

273 all = 0 

274 num_line = 0 

275 every = 100000 

276 tsv = format == "tsv" 

277 

278 for line in file: 

279 if stop != -1 and all >= stop: 

280 break 

281 num_line += 1 

282 if skip: 

283 dic = line 

284 else: 

285 if header and num_line == 1: 

286 continue 

287 if len(line.strip("\r\n")) == 0: 

288 continue 

289 if tsv: 

290 dic = self._process_text_line(line, columns, format, lower_case=lower_case, 

291 num_line=num_line - 1, fill_missing=fill_missing, 

292 filter_case=filter_case, strict_separator=strict_separator) 

293 else: 

294 dic = format(line, **params) 

295 if dic is None: 

296 continue 

297 

298 if unique is not None: 

299 if dic[unique] in unique_key: 

300 continue 

301 unique_key[dic[unique]] = 0 

302 

303 if dic is not None: 

304 self._get_insert_request( 

305 dic, table, True, primarykey, cursor=cursor) 

306 nbinsert += 1 

307 all += 1 

308 if all % every == 0: 

309 self.LOG( 

310 "adding %d lines into table %s" % 

311 (all, table)) 

312 file.close() 

313 

314 if cursor is not None: 

315 cursor.close() 

316 self.commit() 

317 return nbinsert 

318 

319 def import_table_from_flat_file(self, file, table, columns, format="tsv", header=False, 

320 display=False, lower_case=False, table_exists=False, 

321 temporary=False, fill_missing=False, indexes=None, 

322 filter_case=None, change_to_text=None, strict_separator=False, 

323 add_key=None, encoding="utf-8", **params): 

324 """ 

325 Adds a table to database from a file. 

326 

327 @param file file name or matrix 

328 @param table table name 

329 @param columns columns definition (see below) 

330 if None: columns are guessed 

331 @param format tsv, the only one accepted for the time being, 

332 it can be a function whose parameter are a line and **params 

333 @param header the file has a header of not, if True, skip the first line 

334 @param lower_case put every string in lower case before inserting it 

335 @param table_exists if True, do not create the table 

336 @param temporary adding a temporary table 

337 @param fill_missing fill the missing values 

338 @param indexes add indexes before appending all the available observations 

339 @param filter_case process every case information (used to replace space for example) 

340 @param encoding encoding 

341 @param params see format 

342 @param change_to_text changes the format from any to TEXT 

343 @param display if True, print more information on stdout 

344 @param strict_separator strict number of columns, it assumes there is no separator in the content of every column 

345 @param add_key name of a key to add (or None if nothing to add) 

346 @return the number of added rows 

347 

348 The columns definition must follow the schema: 

349 

350 - dictionary ``{ key: (column_name,python_type) }`` 

351 - or ``{ key: (column_name,python_type,preprocessing_function) }`` 

352 

353 ``preprocessing_function`` is a function whose prototype is for example: 

354 

355 :: 

356 

357 def preprocessing_score (s) : 

358 return s.replace (",",".") 

359 

360 And: 

361 

362 - if ``PRIMARYKEY`` is added, the key is considered as the primary key 

363 - if ``AUTOINCREMENT`` is added, the key will automatically filled (like an id) 

364 

365 @warning The function does not react well when a column name 

366 includes a space. 

367 """ 

368 if indexes is None: 

369 indexes = [] 

370 if change_to_text is None: 

371 change_to_text = [] 

372 if display: 

373 if isinstance(file, list): 

374 self.LOG("processing file ", file[:min(len(file), 10)]) 

375 else: 

376 self.LOG("processing file ", file) 

377 

378 self._check_connection() 

379 if columns is None: 

380 # here, some spaces might have been replaced by "_", we need to get 

381 # them back 

382 columns, changes = self._guess_columns( 

383 file, format, columns, filter_case=filter_case, header=header, encoding=encoding) 

384 elif isinstance(columns, list): 

385 columns_, changes = self._guess_columns( 

386 file, format, columns, filter_case=filter_case, header=header, encoding=encoding) 

387 if len(columns_) != len(columns): 

388 raise DBException( 

389 "different number of columns:\ncolumns={0}\nguessed={1}".format( 

390 str(columns), str(columns_))) 

391 columns = columns_ 

392 

393 if add_key is not None: 

394 columns[len(columns)] = ( 

395 add_key, int, "PRIMARYKEY", "AUTOINCREMENT") 

396 

397 for i in columns: 

398 v = columns[i] 

399 if v[0] in change_to_text: 

400 if len(v) <= 2: 

401 v = (v[0], (str, 1000000)) 

402 else: 

403 v = (v[0], (str, 1000000)) + v[2:] 

404 columns[i] = v 

405 

406 if display: 

407 self.LOG(" columns ", columns) 

408 

409 if not isinstance(file, list) and not os.path.exists(file): 

410 raise DBException("unable to find file " + file) 

411 

412 if not table_exists: 

413 cursor = self.create_table(table, columns, temporary=temporary) 

414 elif table not in self.get_table_list(): 

415 raise DBException("unable to find table " + table + " (1)") 

416 else: 

417 cursor = None 

418 

419 if table not in self.get_table_list(): 

420 raise DBException("unable to find table " + table + " (2)") 

421 nb = self._append_table(file, table, columns, format=format, header=header, 

422 lower_case=lower_case, cursor=cursor, fill_missing=fill_missing, 

423 filter_case=filter_case, strict_separator=strict_separator, 

424 changes=changes, encoding=encoding, **params) 

425 

426 self.LOG(nb, " lines imported") 

427 

428 for ind in indexes: 

429 if isinstance(ind, str): 

430 indexname = table + "_" + ind 

431 else: 

432 indexname = table + "_" + "_".join(ind) 

433 if not self.has_index(indexname): 

434 self.create_index(indexname, table, ind) 

435 

436 return nb