Coverage for src/ensae_projects/datainc/croix_rouge.py: 41%
85 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-07-20 04:37 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2023-07-20 04:37 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Data related to La Croix-Rouge (Hackathon Microsoft / ENSAE / Croix-Rouge / 2015)
5"""
6import os
7import io
8import pandas
9from pyquickhelper.filehelper import encrypt_stream, decrypt_stream
10from pyquickhelper.pandashelper import df2rst, df2html
11from pyquickhelper.loghelper import get_password
12from .data_exception import ProjectDataException, PasswordException
15def get_password_from_keyring_or_env(pwd):
16 """
17 Gets the password from `keyring
18 <https://pypi.python.org/pypi/keyring>`_ first,
19 then from the environment variables.
21 @param pwd password to use or None to get it as ``os.environ["PWDCROIXROUGE"]``
22 or from `keyring
23 <https://pypi.python.org/pypi/keyring>`_.
24 @return password
26 To set the password for `keyring
27 <https://pypi.python.org/pypi/keyring>`_:
29 ::
31 from pyquickhelper.loghelper import set_password
32 set_password("HACKATHON2015", "PWDCROIXROUGE", "value")
33 """
34 if pwd is None:
35 pwd = get_password("HACKATHON2015", "PWDCROIXROUGE",
36 ask=False) # pylint: disable=E1123
37 if pwd is None:
38 if "PWDCROIXROUGE" not in os.environ:
39 raise PasswordException(
40 "password not found in environment variables: "
41 "PWDCROIXROUGE is not set")
42 pwd = os.environ["PWDCROIXROUGE"]
43 return bytes(pwd, encoding="ascii")
44 if not isinstance(pwd, bytes):
45 return bytes(pwd, encoding="ascii")
46 return pwd
49def encrypt_file(infile, outfile, password=None):
50 """
51 Encrypts a file with a specific password.
53 @param password password for the hackathon, if None, look into
54 ``os.environ["PWDCROIXROUGE"]``
55 @param infile input file
56 @param outfile output file
57 @return outfile
58 """
59 password = get_password_from_keyring_or_env(password)
60 return encrypt_stream(password, infile, outfile)
63def decrypt_dataframe(infile, password=None, sep="\t", encoding="utf8", **kwargs):
64 """
65 Reads an encrypted dataframe.
67 @param infile filename
68 @param password password
69 @param sep separator
70 @param encoding encoding
71 @param kwargs others options for :epkg:`pandas:read_csv`
72 @return dataframe
73 """
74 password = get_password_from_keyring_or_env(password)
75 data = decrypt_stream(password, infile)
76 st = io.BytesIO(data)
77 df = pandas.read_csv(st, sep=sep, encoding="utf8", **kwargs)
78 return df
81def get_meaning(table="invoice", password=None):
82 """
83 Retrieves data related to the meaning of a table.
85 @param table SINVOICE or SINVOICE_V, ITTMASTER or stojou
86 @param password password, see @see fn get_password_from_keyring_or_env
87 @return DataFrame
88 """
89 fold = os.path.abspath(os.path.dirname(__file__))
90 if table == "invoice":
91 name = os.path.join(
92 fold, "hackathon_2015_croix_rouge", "meaning_invoice.enc")
93 df = decrypt_dataframe(name, password=password)
94 df.columns = [_.strip() for _ in df.columns]
95 for c in df.columns:
96 df[c] = df[c].apply(lambda s: s.strip()) # pylint: disable=E1101,E1137,E1136
97 df.columns = ["Zone"] + list(df.columns[1:])
98 return df
99 if table in {"ITMMASTER", "SINVOICE", "SINVOICE_V", "stojou"}:
100 name = os.path.join(
101 fold, "hackathon_2015_croix_rouge", "%s.schema.enc" % table)
102 df = decrypt_dataframe(name, password=password,
103 sep="," if "stojou" in table else "\t")
104 if table in "ITMMASTER":
105 df["Zone"] = df.index + 1
106 df["Zone"] = df.Zone.apply(lambda x: "ITM_%03d" % x) # pylint: disable=E1101,E1137
107 df.columns = [_.strip() for _ in df.columns]
108 # we remove column always null
109 df = df.dropna(axis=1, how='all') # pylint: disable=E1101
110 return df
111 raise ProjectDataException(
112 "unable to find information about table {0}".format(table))
115def merge_schema(tables=None, password=None):
116 """
117 Merges schemas of various databases.
119 @param tables list of tables or None for all
120 @param password password
121 @return dataframe with all columns
122 """
123 if tables is None:
124 tables = ["invoice", "ITMMASTER", "SINVOICE", "SINVOICE_V", "stojou"]
126 dfs = [get_meaning(tbl, password=password) for tbl in tables]
127 for df, name in zip(dfs, tables):
128 df["name"] = name
130 join = None
131 for name, df in sorted(zip(tables, dfs)):
132 nickname = name[0] + name[-1]
133 df = df.copy()
134 df.columns = [c + "_" + nickname if c !=
135 "Zone" else c for c in df.columns]
136 if join is None:
137 join = df
138 else:
139 join = join.merge(df, on="Zone", how="outer",
140 suffixes=("", "_" + nickname))
142 # we merge what can be merged
143 def merge_values(row, cs):
144 return", ".join(list(set(row[c] for c in cs if isinstance(row[c], str))))
146 for prf in "Typ,Menu,Long,Dim,Act,Intitulé long,Table liée,Expression de lien,Vérification,Obligatoire,RAZ".split(","):
147 cs = [c for c in join.columns if c.startswith(prf)]
148 if prf == "Intitulé long":
149 cs.append("Description_ie")
150 new_col = join.apply(lambda row, cs=cs: merge_values(row, cs), axis=1)
151 join = join[[c for c in join.columns if c not in cs]]
152 join[prf] = new_col
154 cols = "Zone,name_ie,name_IR,name_SE,name_SV,name_su,Intitulé long" + \
155 ",Typ,Menu,Long,Act,Dim,Table liée,Expression de lien,Vérification,Obligatoire,RAZ"
156 cols = cols.split(",")
157 join = join.sort_values("Zone")
158 join = join[cols]
159 join = join.reset_index(drop=True)
160 join = join.reset_index(drop=False)
161 return join
164def df2rsthtml(df, format="html", fillna=""): # pylint: disable=W0622
165 """
166 Writes a table into RST or HTML format.
168 @param df dataframe
169 @param format format
170 @param fillna fill empty values
171 @return string
172 """
173 df = df.fillna(fillna)
174 if format == "html":
175 return df2html(df)
176 if format == "rst":
177 return df2rst(df)
178 raise ValueError("Unknown format '{0}'".format(format))