Coverage for src/manydataapi/parsers/folders.py: 95%
39 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-02 08:38 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-02 08:38 +0200
1# -*- coding:utf-8 -*-
2"""
3@file
4@brief Parses format from a paying machine.
5"""
6import re
7import os
8from .dataframe_helper import dataframe_to
11def read_folder(folder=".", reader="CT1", pattern=".*[.].{1,3}$",
12 verbose=False, out=None, fLOG=None):
13 """
14 Applies the same parser on many files in a folder.
16 :param folder: folder
17 :param reader: reader name or function which processes
18 a string or a filename, possible read name: `CT1`.
19 :param pattern: file pattern
20 :param verbose: to show progress, it requires module :epkg:`tqdm`
21 :param out: output the dataframe in a file
22 :param fLOG: logging function
23 :return: concatenated list or DataFrame
25 The function is also available through a command line.
27 .. cmdref::
28 :title: Parses and merges files in a dictionary with format CT1
29 :cmd: -m manydataapi read_folder --help
30 """
31 if isinstance(reader, str):
32 if reader.lower() == 'ct1':
33 from .ct1 import read_ct1
35 def reader_(name):
36 return read_ct1(name, as_df=True)
37 reader = reader_
38 else:
39 raise ValueError( # pragma: no cover
40 "Unknown parser '{}'.".format(reader))
42 if verbose and fLOG:
43 fLOG("look into '%s'." % folder)
44 names = []
45 pat = re.compile(pattern)
46 for name in os.listdir(folder):
47 if pat.search(name):
48 names.append(name)
49 if len(names) == 0:
50 raise FileNotFoundError( # pragma: no cover
51 "Unable to find file in '{}' following pattern '{}'.".format(
52 folder, pattern))
53 objs = []
55 if verbose:
56 from tqdm import tqdm # pragma: no cover
57 loop = tqdm(names) # pragma: no cover
58 else:
59 loop = iter(names)
61 for name in loop:
62 try:
63 obj = reader(os.path.join(folder, name))
64 except (ValueError, KeyError) as e: # pragma: no cover
65 raise ValueError("Unable to parse file '{}'.".format(name)) from e
66 objs.append(obj)
68 if isinstance(objs[0], list):
69 res = []
70 for obj in objs:
71 res.extend(obj)
72 return res
73 from pandas import DataFrame, concat
74 if isinstance(objs[0], DataFrame):
75 df = concat(objs, sort=False)
76 if out is not None:
77 dataframe_to(df, out)
78 if verbose and fLOG:
79 fLOG("wrote '%s'." % out)
80 return df
81 raise TypeError( # pragma: no cover
82 "Unable to merge type {}.".format(type(objs[0])))