Coverage for src/pyensae/sql/file_text_binary.py: 76%

425 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-03 02:16 +0200

1""" 

2@file 

3 

4@brief contains a class which opens a text file as a binary file. 

5""" 

6 

7 

8import re 

9import os 

10import math 

11import time 

12import decimal 

13from pyquickhelper.loghelper import noLOG 

14from .type_helpers import guess_type_value 

15 

16 

17class TextFile: 

18 

19 """ 

20 This class opens a text file as if it were a binary file. 

21 It can deal with null characters which are missed by open function. 

22 

23 @var filename file name 

24 @var errors decoding in utf8 can raise some errors, 

25 see `str <https://docs.python.org/3.4/library/stdtypes.html?highlight=str#str>`_ 

26 to understand the meaning of this parameter 

27 @var LOG logging function 

28 @var _buffer_size read a text file _buffer_size bytes each time 

29 @var _filter function filter, None or return True or False whether a line should considered or not 

30 @var _encoding encoding 

31 

32 Example: 

33 

34 :: 

35 

36 f = TextFile(filename) 

37 f.open () 

38 for line in f : 

39 print line 

40 f.close () 

41 """ 

42 

43 _split_expr = re.compile("\\r?\\t", re.U) 

44 _sep_available = "\t;,| " 

45 

46 def __init__(self, filename, errors=None, fLOG=noLOG, buffer_size=2 ** 20, 

47 filter=None, separated=False, encoding="utf-8"): 

48 """ 

49 @param filename filename 

50 @param errors see str (errors = ...) 

51 @param fLOG LOG function, see `fLOG <http://www.xavierdupre.fr/app/pyquickhelper/ 

52 helpsphinx/pyquickhelper/loghelper/flog.html#pyquickhelper.loghelper.flog.fLOG>`_ 

53 @param buffer_size buffer_size (mostly use to test the reading function) 

54 @param filter None if there is no filter, otherwise it is a function which takes a list and returns a boolean 

55 which tells if the line must considered or not 

56 @param separated if True, the line returned by the iterator are splitted by the most probable separator 

57 """ 

58 self.filename = filename 

59 self._encoding = encoding 

60 self.errors = errors 

61 self.LOG = fLOG 

62 self._buffer_size = buffer_size 

63 self._filter = filter 

64 self._separated = separated 

65 

66 def open(self): 

67 """ 

68 Opens the file in reading mode. 

69 """ 

70 self.LOG(" TextFile: opening file ", self.filename) 

71 if self._separated: 

72 res = self.guess_columns() 

73 self.LOG(" TextFile: guessed: ", res) 

74 sep = res[2] 

75 self._separated_value = sep 

76 

77 self._f = open(self.filename, "r", encoding=self._encoding) 

78 self._nbline = 0 

79 self._read = 0 

80 

81 def close(self): 

82 """ 

83 Closes the file. 

84 """ 

85 self._f.close() 

86 self.LOG(" TextFile: closing file ", self.filename) 

87 del self.__dict__["_f"] 

88 

89 def get_nb_readlines(self): 

90 """ 

91 Returns the number of read lines. 

92 """ 

93 return self._nbline 

94 

95 def get_nb_readbytes(self): 

96 """ 

97 Returns the number of read bytes. 

98 """ 

99 return self._nbline 

100 

101 def readlines(self): 

102 """ 

103 Extracts all the lines, 

104 the file must not be opened through method open 

105 ``\\n`` are removed. 

106 """ 

107 self.open() 

108 res = [] 

109 for line in self: 

110 li = line.strip("\r\n") 

111 res.append(li) 

112 self.close() 

113 return res 

114 

115 def __iter__(self): 

116 """ 

117 Iterator 

118 

119 :: 

120 

121 f = open('...', 'r') 

122 for line in f : 

123 ... 

124 f.close () 

125 

126 @return a str string 

127 """ 

128 if "_f" not in self.__dict__: 

129 raise Exception("file %s is not opened." % self.filename) 

130 filesize = os.stat(self.filename).st_size 

131 size = self._buffer_size 

132 blin = self._f.read(size) 

133 self._read = len(blin) 

134 

135 if blin.startswith("\xef\xbb\xbf"): 

136 self.LOG(" removing the three first character u'\\xef\\xbb\\xbf'") 

137 blin = blin[3:] 

138 if blin.startswith("\ufeff"): 

139 self.LOG(" removing the three first character u'\\ufeff'") 

140 blin = blin[len("\ufeff"):] 

141 

142 endline = "\n" 

143 endchar = "\r" 

144 

145 begin = 0 

146 sel = 0 

147 tim = time.perf_counter() 

148 while len(blin) > 0: 

149 

150 pos = blin.find(endline, begin) 

151 if pos == -1: 

152 pos = blin.find(endchar, begin) 

153 if begin == 0 and pos != -1: 

154 self.LOG(" problem in file ", self.filename) 

155 self.LOG(" the line separator is not \\n but \\r") 

156 

157 while pos == -1: 

158 if begin > 0: 

159 blin = blin[begin:] 

160 begin = 0 

161 temp = self._f.read(size) 

162 self._read += len(temp) 

163 blin += temp 

164 pos = blin.find("\n") 

165 if pos == -1: 

166 pos = blin.find("\r", begin) 

167 if len(temp) == 0 and pos == -1: 

168 pos = len(blin) 

169 

170 temp = blin[begin:pos] 

171 line = str(temp) 

172 begin = pos + 1 

173 

174 tim2 = time.perf_counter() 

175 if tim2 - tim > 60: 

176 tim = tim2 

177 ratio = float(self._read) / filesize * 100 

178 self.LOG( 

179 " processing line ", 

180 self._nbline, 

181 " read bytes ", 

182 self._read, 

183 " sel ", 

184 sel, 

185 " ratio %2.2f" % 

186 ratio, 

187 "%") 

188 

189 r = line 

190 if self._encoding == "utf-8": 

191 r = r.rstrip(endchar) 

192 if self._filter is None or self._filter(r): 

193 if self._separated: 

194 yield r.split(self._separated_value) 

195 else: 

196 yield r 

197 

198 self._nbline += 1 

199 

200 def _load(self, filename, this_column, file_column, prefix, **param): 

201 """ 

202 load... 

203 """ 

204 f = TextFile(filename, fLOG=self.LOG, encoding=self._encoding, **param) 

205 f.open() 

206 cont = {} 

207 for line in f: 

208 if f.get_nb_readlines() == 0: 

209 columns = self._interpret_columns(line) 

210 else: 

211 col = self._interpret(line) 

212 key = col[columns[file_column]] 

213 cont[key] = col 

214 f.close() 

215 return cont, columns, this_column, file_column, prefix 

216 

217 def _interpret_columns(self, line): 

218 """ 

219 Interprets the first line which contains the columns name. 

220 

221 @param line string 

222 @return dictionary { name:position }""" 

223 col = self._interpret(line) 

224 res = {} 

225 for i in range(0, len(col)): 

226 res[col[i]] = i 

227 return res 

228 

229 def _interpret(self, line): 

230 """ 

231 Splits a line into a list, separator ``\\t``. 

232 

233 @param line string 

234 @return list 

235 """ 

236 col = TextFile._split_expr.split(line.strip(" \r\n")) 

237 return col 

238 

239 def join(self, definition, output, missing_value="", unique=None, **param): 

240 """ 

241 Joins several files together. 

242 

243 @param definition list of triplets: 

244 filename, this_column, file_column, prefix 

245 @param output if None, return the results as a list, otherwise save it into output 

246 @param param parameter used to open files 

247 @param missing_value specify a value for the missing values 

248 @param unique if unique is a column name, do not process a line whose value has already been processed, None otherwise 

249 @return columns, matrix or number of of missing values 

250 

251 We assume that every file starts with header giving columns names. 

252 The function associates *this_column* value to *file_column* and 

253 appends all the columns from filename with a prefix. 

254 We also assumes values in file_column are unique. 

255 """ 

256 if output is not None: 

257 output = open(output, "w", encoding=self._encoding) 

258 

259 files = [] 

260 for i, tu in enumerate(definition): 

261 if len(tu) == 2: 

262 a, b = tu 

263 c = b 

264 d = "f%d_" % (i + 1) 

265 elif len(tu) == 3: 

266 a, b, c = tu 

267 d = "f%d_" % (i + 1) 

268 elif len(tu) == 4: 

269 a, b, c, d = tu 

270 else: 

271 raise ValueError( 

272 "definition must contain tuple (size, 2, 3 ,4), not {0}".format(tu)) 

273 files.append(self._load(a, b, c, d, **param)) 

274 

275 res = [] 

276 miss = 0 

277 uniquekey = {} 

278 

279 self.open() 

280 for line in self: 

281 if self.get_nb_readlines() == 0: 

282 columns = self._interpret_columns(line) 

283 oldnb = len(columns) 

284 last = max(columns.values()) + 1 

285 for file in files: 

286 col = file[1] 

287 pre = file[-1] 

288 for k, v in col.items(): 

289 columns[pre + k] = last + v 

290 last += len(col) 

291 linecol = ["" for c in columns] 

292 for k, v in columns.items(): 

293 linecol[v] = k 

294 

295 if output is None: 

296 res.append(linecol) 

297 else: 

298 output.write("\t".join(linecol) + "\n") 

299 

300 s1 = len(linecol) 

301 s2 = oldnb 

302 for f in files: 

303 s2 += len(f[1]) 

304 if s1 != s2: 

305 mes = "size problem %d != " % (s1) 

306 mes += " + ".join([str(x) 

307 for x in [oldnb, ] + [len(f[1]) for f in files]]) 

308 raise Exception(mes) 

309 

310 else: 

311 col = self._interpret(line) 

312 

313 if unique is not None: 

314 key = columns[unique] 

315 val = col[key] 

316 if val in uniquekey: 

317 uniquekey[val] += 1 

318 continue 

319 uniquekey[val] = 1 

320 

321 if len(col) != oldnb: 

322 col.extend(["" for i in range(0, oldnb - len(col))]) 

323 if len(col) != oldnb: 

324 mes = "line %d: problem len(col) = %d and oldnb = %d\n%s" % ( 

325 self.get_nb_readlines(), len(col), oldnb, repr(line)) 

326 raise Exception(mes) 

327 

328 for file in files: 

329 cont = file[0] 

330 c = file[1] 

331 this_key = col[columns[file[2]]] 

332 if this_key in cont: 

333 val = cont[this_key] 

334 if len(val) == 0 or (len(val) == 1 and len(val[0]) == 0): 

335 # empty line 

336 continue 

337 if len(val) != len(c): 

338 ll = self.get_nb_readlines() 

339 mes = "line %d: problem len(val) = %d and len (c) = %d\n\"%s\"" % ( 

340 ll, len(val), len(c), file) 

341 raise Exception(mes) 

342 else: 

343 val = [missing_value for k in c] 

344 miss += len(val) 

345 col.extend(val) 

346 

347 if len(col) != len(columns): 

348 vals = list(set(col)) 

349 if vals == ['']: 

350 continue 

351 mes = "problem 1 with line %d\n" % self.get_nb_readlines() 

352 mes += "len (col) = %d len (columns) = %d" % (len(col), 

353 len(columns)) 

354 raise Exception(mes) 

355 

356 if len(("\t".join(col)).split("\t")) != len(col): 

357 mes = "problem 2 with line %d\n" % self.get_nb_readlines() 

358 mes += "len (col) = %d len (columns) = %d" % ( 

359 len(("\t".join(col)).split("\t")), len(columns)) 

360 raise Exception(mes) 

361 

362 if output is None: 

363 res.append(col) 

364 else: 

365 output.write("\t".join(col) + "\n") 

366 

367 if output is None: 

368 return res 

369 else: 

370 output.close() 

371 return miss 

372 

373 def _count_s(self, car): 

374 """ 

375 Returns the number of every character in car. 

376 """ 

377 res = {} 

378 for i, c in enumerate(car): 

379 if c in res: 

380 res[c] += 1 

381 else: 

382 res[c] = 1 

383 return res 

384 

385 def _get_type(self, s): 

386 """ 

387 Guesses the type of value s. 

388 """ 

389 return guess_type_value(s) 

390 

391 def guess_columns(self, nb=100, force_header=False, changes=None, force_noheader=False, 

392 fields=None, regex=None, force_sep=None, mistake=3): 

393 """ 

394 Guesses the columns type. 

395 

396 @param nb number of lines to have a look to in order to find all the necessary elements 

397 @param force_header impose a header whether it is detect or not 

398 @param changes modify some column names, example { "query":"query___" } 

399 @param force_noheader there is no header at all 

400 @param fields name of the columns if there is no header (instead of c000, c001...) 

401 @param regex if the default expression for a field is not the expected one, change by looking into regex 

402 @param force_sep force the separator to be the one chosen by the user (None by default) 

403 @param mistake not more than mistake conversion in numbers are allowed 

404 @return 4-tuple, see below 

405 

406 Returned result is a 4 t-uple: 

407 

408 - True or False: presence of a header (it means 

409 there is at least one numerical column) 

410 - column definition ``{ position : (name, type) }`` or 

411 ``{ position : (name, (str, max_length*2)) }`` 

412 - separator 

413 - regex which allow the user to extract information from the file 

414 

415 The column separator is looked into ``, | ; \\t`` 

416 @warning The file must not be opened, it will be several times. 

417 """ 

418 if changes is None: 

419 changes = {} 

420 if regex is None: 

421 regex = {} 

422 self.LOG(" TextFile.guess_columns: processing file ", self.filename) 

423 

424 endlinechar = "\n " 

425 

426 # n lines 

427 temp = TextFile(self.filename, encoding=self._encoding, fLOG=self.LOG) 

428 lines = [] 

429 

430 temp.open() 

431 for line in temp: 

432 line = line.strip(endlinechar) 

433 if len(line) == 0: 

434 continue 

435 lines.append(line) 

436 if len(lines) > nb: 

437 break 

438 self.LOG(" TextFile.guess_columns: using ", len(lines), " lines") 

439 temp.close() 

440 

441 # guess the separation 

442 sep = TextFile._sep_available 

443 if force_sep not in (None, force_sep): 

444 sep += force_sep 

445 h = {} 

446 mx = 0 

447 for line in lines: 

448 co = self._count_s(line) 

449 for s in sep: 

450 n = co.get(s, 0) 

451 if n == 0: 

452 continue 

453 k = s, n 

454 if k not in h: 

455 h[k] = 1 

456 else: 

457 h[k] += 1 

458 mx = max(n, mx) 

459 

460 mx += 1 

461 best = None 

462 iner = None 

463 for c in sep: 

464 m = {} 

465 z = 0 

466 for k in range(mx): 

467 if (c, k) in h: 

468 m[k] = h[c, k] 

469 z += k * m[k] 

470 

471 if len(m) == 0: 

472 continue 

473 g = max(sum(m.values()), len(lines)) 

474 if z < max(len(lines) * 9 / 10, 1): 

475 continue 

476 

477 for k in m: 

478 m[k] = float(m[k]) / g 

479 s = 0.0 

480 for k in m: 

481 s += m[k] * math.log(m[k]) 

482 if iner is None or s > iner: 

483 iner = s 

484 best = c 

485 

486 bestsep = best 

487 

488 if force_sep is not None and bestsep != force_sep: 

489 self.LOG( 

490 " TextFile.guess_columns: changes the separator", 

491 repr(force_sep)) 

492 bestsep = force_sep 

493 

494 bestcol = 0 

495 bestnb = 0 

496 for k in range(mx): 

497 if (bestsep, k) in h: 

498 if bestnb < h[bestsep, k]: 

499 bestnb = h[bestsep, k] 

500 bestcol = k + 1 

501 

502 self.LOG(" TextFile.guess_columns: sep ", repr(bestsep), "nb cols", bestcol, " bestnb ", 

503 bestnb, " more ", h) 

504 

505 # determine the type of every column 

506 

507 h = {} 

508 for line in lines: 

509 cols = line.split(bestsep) 

510 for i in range(len(cols)): 

511 ty = self._get_type(cols[i]) 

512 k = i, ty 

513 if k not in h: 

514 h[k] = 1 

515 else: 

516 h[k] += 1 

517 

518 columns = {} 

519 for a in h: 

520 k, t = a 

521 if k >= bestcol: 

522 continue 

523 if k not in columns: 

524 columns[k] = (t, h[a]) 

525 elif h[a] > columns[k][1]: 

526 columns[k] = (t, h[a]) 

527 

528 for pos in columns: 

529 # int and float corrections 

530 if columns[pos][0] == int and h.get((pos, float), 0) > 0: 

531 self.LOG( 

532 " changing column type ", 

533 pos, 

534 columns[pos], 

535 " into ", 

536 float) 

537 columns[pos] = (float, h[pos, float] + h[pos, int]) 

538 su = h.get((pos, str), 0) 

539 if (columns[pos][0] == int or columns[pos][0] == float or columns[ 

540 pos][0] == decimal.Decimal) and su > mistake: 

541 self.LOG( 

542 " changing column type ", 

543 pos, 

544 columns[pos], 

545 " into ", 

546 str, 

547 " mistakes ", 

548 su, 

549 " > ", 

550 mistake) 

551 columns[pos] = (str, columns[pos][1] + su) 

552 

553 # header or not 

554 

555 mat = 0 

556 no = 0 

557 cols = lines[0].split(bestsep) 

558 for i, c in enumerate(cols): 

559 t = self._get_type(c) 

560 e = columns.get(i, (str, 0))[0] 

561 if e != str: 

562 if t == e: 

563 mat += 1 

564 else: 

565 no += 1 

566 header = not force_noheader and (force_header or (no > mat)) 

567 

568 # determine the column name 

569 

570 if header: 

571 names = lines[0].split(bestsep) 

572 del lines[0] 

573 if len(names) != bestcol: 

574 raise Exception( 

575 "unable to continue: the header does not contain the same number of columns %s != %s" % 

576 (len(names), bestcol)) 

577 elif fields is not None: 

578 if len(fields) != bestcol: 

579 raise Exception( 

580 "the number of fields (%d) is different of the number of columns found in the file %d" % 

581 (len(fields), bestcol)) 

582 names = fields 

583 else: 

584 hhhh, _ = 0, bestcol 

585 while _ > 0: 

586 hhhh, _ = hhhh, _ / 10 # pylint: disable=W0127 

587 format = "c%0" + str(hhhh) + "d" 

588 names = [format % i for i in range(bestcol)] 

589 

590 for k in columns: 

591 if k >= len(names): 

592 raise Exception( 

593 "incoherence in the file being read: %d >= %d: " % 

594 (k, len(names)) + repr(names) + "\n" + repr(columns)) 

595 columns[k] = (changes.get(names[k], names[k]), columns[k][0]) 

596 

597 self.LOG( 

598 " TextFile.guess_columns: header ", 

599 header, 

600 " columns ", 

601 columns) 

602 coy = columns.copy() 

603 

604 # end 

605 exp = self._build_regex(bestsep, columns, regex=regex) 

606 self.LOG(" TextFile.guess_columns: regex ", exp) 

607 

608 # determines the length of columns 

609 length = {} 

610 no = 0 

611 for line in lines: 

612 spl = line.split(bestsep) 

613 if len(spl) != len(columns): 

614 continue 

615 no += 1 

616 for i, c in enumerate(spl): 

617 vl = length.get(i, 0) 

618 if vl < len(c): 

619 length[i] = len(c) 

620 

621 for c in columns: 

622 v = columns[c] 

623 if v[1] == str and c in length and length[c] > 0: 

624 v = (v[0], (v[1], length[c] * 2)) 

625 columns[c] = v 

626 

627 if coy != columns: 

628 self.LOG( 

629 " TextFile.guess_columns: header ", 

630 header, 

631 " columns ", 

632 columns) 

633 

634 return header, columns, bestsep, exp 

635 

636 def count_rejected_lines(self, header, exp, output=None): 

637 """ 

638 Counts the number of rejected lines by regular expression exp. 

639 

640 @param header header or not in the first line 

641 @param exp regular expression 

642 @param output if != None, output is a stream which will receive the unrecognized line (see below) 

643 @return nb_accepted, nb rejected 

644 

645 Format for the file containing the unrecognized lines: 

646 @code 

647 line number \t line 

648 @endcode 

649 

650 """ 

651 if isinstance(exp, str): 

652 exp = re.compile(exp, re.U) 

653 acc, rej = 0., 0. 

654 temp = TextFile(self.filename, fLOG=self.LOG, encoding=self._encoding) 

655 temp.open() 

656 nb = 0 

657 for line in temp: 

658 nb += 1 

659 if header and acc + rej == 0: 

660 header = False 

661 continue 

662 if len(line) == 0: 

663 continue 

664 r = exp.search(line) 

665 if r: 

666 acc += 1 

667 else: 

668 rej += 1 

669 if output is not None: 

670 output.write("%d\t%s\n" % (nb - 1, line)) 

671 temp.close() 

672 return acc, rej 

673 

674 _build_regex_default_value_types = {int: "([-]?[1-9][0-9]*?)|(0?)", 

675 decimal.Decimal: "([-]?[1-9][0-9]*?L?)|(0?)", 

676 float: "[-]?[0-9]*?([.][0-9]*?)?([eE][-]?[0-9]{0,4})?", 

677 str: ".*"} 

678 

679 def _build_regex(self, sep, columns, # pylint: disable=W0102 

680 exp=_build_regex_default_value_types, # pylint: disable=W0102 

681 nomore=False, regex=None): 

682 """ 

683 Builds a regular expression. 

684 

685 @param sep separator 

686 @param columns columns definition 

687 @param exp regular expression associated to each type, (see below for the default value) 

688 @param nomore private argument, no more try, not possible to simplify 

689 @param regex if the default expression for a field is not the expected one, look into regex if there is one 

690 @return regex 

691 

692 Default value for ``exp``: 

693 

694 @code 

695 { 

696 int: "([-]?[1-9][0-9]*?)|(0?)", 

697 decimal.Decimal: "([-]?[1-9][0-9]*?L?)|(0?)", 

698 float: "[-]?[0-9]*?([.][0-9]*?)?([eE][-]?[0-9]{0,4})?", 

699 str: ".*" 

700 } 

701 @endcode 

702 

703 """ 

704 if regex is None: 

705 regex = {} 

706 mx = max(columns.keys()) + 1 

707 res = [None for i in range(mx)] 

708 for k, v in columns.items(): 

709 t = v[1] 

710 if t not in exp: 

711 raise Exception("unknown type %s" % str(t)) 

712 nv0 = v[0].strip() 

713 if nv0 in regex: 

714 res[k] = (nv0, regex[nv0]) 

715 else: 

716 res[k] = (nv0, exp[t]) 

717 for c in res: 

718 if " " in c[0]: 

719 raise ValueError( 

720 "Accents are not allowed for column names: {0}".format(c)) 

721 res = ["(?P<%s>%s)" % c for c in res] 

722 if sep == "\t": 

723 sep = "\\t" 

724 final = "^%s$" % sep.join(res) 

725 

726 try: 

727 self.LOG(" compiling", final) 

728 exp = re.compile(final) 

729 return final 

730 except Exception as e: 

731 if "but this version only supports 100 named groups" in str(e): 

732 self.LOG( 

733 " problem with expression (more than 100 groups) ", 

734 final) 

735 if nomore: 

736 if "bad character in group name" in str(e): 

737 reg = re.compile("?P<(.*?)>") 

738 all = reg.findall(final) 

739 s = ",".join(all) 

740 raise RuntimeError( # pylint: disable=W0707 

741 "this expression does not compile (%r), pattern %r, columns %r" % 

742 (str(e), final, s)) 

743 raise RuntimeError( # pylint: disable=W0707 

744 "This expression does not compile (%r), pattern %r" % 

745 (str(e), final)) 

746 

747 exp = {int: "[-]?[0-9]*?", 

748 float: "[0-9.eE]*?", 

749 str: ".*"} 

750 return self._build_regex(sep, columns, exp, True)