Coverage for mlinsights/plotting/visualize.py: 97%
195 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
1"""
2@file
3@brief Helpers to visualize a pipeline.
4"""
5import pprint
6from collections import OrderedDict
7import numpy
8import pandas
9from sklearn.base import TransformerMixin, ClassifierMixin, RegressorMixin
10from sklearn.pipeline import Pipeline, FeatureUnion
11from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
12from ..helpers.pipeline import enumerate_pipeline_models
15def _pipeline_info(pipe, data, context, former_data=None):
16 """
17 Internal function to convert a pipeline into
18 some graph.
19 """
20 def _get_name(context, prefix='-v-', info=None, data=None):
21 if info is None:
22 raise RuntimeError("info should not be None") # pragma: no cover
23 if isinstance(prefix, list):
24 return [_get_name(context, el, info, data) for el in prefix]
25 if isinstance(prefix, int):
26 prefix = former_data[prefix]
27 if isinstance(prefix, int):
28 raise TypeError( # pragma: no cover
29 f"prefix must be a string.\ninfo={info}")
30 sug = "%s%d" % (prefix, context['n'])
31 while sug in context['names']:
32 context['n'] += 1
33 sug = "%s%d" % (prefix, context['n'])
34 context['names'][sug] = info
35 return sug
37 def _get_name_simple(name, data):
38 if isinstance(name, str):
39 return name
40 res = data[name]
41 if isinstance(res, int):
42 raise RuntimeError( # pragma: no cover
43 f"Column name is still a number and not a name: {name} and {data}.")
44 return res
46 if isinstance(pipe, Pipeline):
47 infos = []
48 for _, model in pipe.steps:
49 info = _pipeline_info(model, data, context)
50 data = info[-1]["outputs"]
51 infos.extend(info)
52 return infos
54 if isinstance(pipe, ColumnTransformer):
55 infos = []
56 outputs = []
57 for _, model, vs in pipe.transformers:
58 if all(map(lambda o: isinstance(o, int), vs)):
59 new_data = []
60 if isinstance(data, OrderedDict):
61 new_data = [_[1] for _ in data.items()]
62 else:
63 mx = max(vs)
64 while len(new_data) < mx:
65 if len(data) > len(new_data):
66 new_data.append(data[len(new_data)])
67 else:
68 new_data.append(data[-1])
69 else:
70 new_data = OrderedDict()
71 for v in vs:
72 new_data[v] = data.get(v, v)
74 info = _pipeline_info(
75 model, new_data, context, former_data=new_data)
76 #new_outputs = []
77 # for o in info[-1]['outputs']:
78 # add = _get_name(context, prefix=o, info=info)
79 # outputs.append(add)
80 # new_outputs.append(add)
81 #info[-1]['outputs'] = new_outputs
82 outputs.extend(info[-1]['outputs'])
83 infos.extend(info)
85 final_hat = False
86 if pipe.remainder == "passthrough":
88 done = [set(d['inputs']) for d in info]
89 merged = done[0]
90 for d in done[1:]:
91 merged.union(d)
92 new_data = OrderedDict(
93 [(k, v) for k, v in data.items() if k not in merged])
95 info = _pipeline_info(
96 "passthrough", new_data, context, former_data=new_data)
97 outputs.extend(info[-1]['outputs'])
98 infos.extend(info)
99 final_hat = True
101 if len(pipe.transformers) > 1 or final_hat:
102 info = {'name': 'union', 'inputs': outputs, 'type': 'transform'}
103 info['outputs'] = [_get_name(context, info=info)]
104 infos.append(info)
105 return infos
107 if isinstance(pipe, FeatureUnion):
108 infos = []
109 outputs = []
110 for _, model in pipe.transformer_list:
111 info = _pipeline_info(model, data, context)
112 new_outputs = []
113 for o in info[-1]['outputs']:
114 add = _get_name(context, prefix=o, info=info)
115 outputs.append(add)
116 new_outputs.append(add)
117 info[-1]['outputs'] = new_outputs
118 infos.extend(info)
119 if len(pipe.transformer_list) > 1:
120 info = {'name': 'union', 'inputs': outputs, 'type': 'transform'}
121 info['outputs'] = [_get_name(context, info=info)]
122 infos.append(info)
123 return infos
125 if isinstance(pipe, TransformedTargetRegressor):
126 raise NotImplementedError( # pragma: no cover
127 "Not yet implemented for TransformedTargetRegressor.")
129 if isinstance(pipe, TransformerMixin):
130 info = {'name': pipe.__class__.__name__, 'type': 'transform'}
131 if len(data) == 1:
132 info['outputs'] = data
133 info['inputs'] = data
134 info = [info]
135 else:
136 info['inputs'] = [_get_name(context, info=info)]
137 info['outputs'] = [_get_name(context, info=info)]
138 info = [{'name': 'union', 'outputs': info['inputs'],
139 'inputs': data, 'type': 'transform'}, info]
140 return info
142 if isinstance(pipe, ClassifierMixin):
143 info = {'name': pipe.__class__.__name__, 'type': 'classifier'}
144 exp = ['PredictedLabel', 'Probabilities']
145 if len(data) == 1:
146 info['outputs'] = exp
147 info['inputs'] = data
148 info = [info]
149 else:
150 info['outputs'] = exp
151 info['inputs'] = [_get_name(context, info=info)]
152 info = [{'name': 'union', 'outputs': info['inputs'], 'inputs': data,
153 'type': 'transform'}, info]
154 return info
156 if isinstance(pipe, RegressorMixin):
157 info = {'name': pipe.__class__.__name__, 'type': 'regressor'}
158 exp = ['Prediction']
159 if len(data) == 1:
160 info['outputs'] = exp
161 info['inputs'] = data
162 info = [info]
163 else:
164 info['outputs'] = exp
165 info['inputs'] = [_get_name(context, info=info)]
166 info = [{'name': 'union', 'outputs': info['inputs'], 'inputs': data,
167 'type': 'transform'}, info]
168 return info
170 if isinstance(pipe, str):
171 if pipe == "passthrough":
172 info = {'name': 'Identity', 'type': 'transform'}
173 info['inputs'] = [_get_name_simple(n, former_data) for n in data]
174 if isinstance(data, (OrderedDict, dict)) and len(data) > 1:
175 info['outputs'] = [
176 _get_name(context, data=k, info=info)
177 for k in data]
178 else:
179 info['outputs'] = _get_name(context, data=data, info=info)
180 info = [info]
181 else:
182 raise NotImplementedError( # pragma: no cover
183 f"Not yet implemented for keyword '{type(pipe)}'.")
184 return info
186 raise NotImplementedError( # pragma: no cover
187 f"Not yet implemented for {type(pipe)}.")
190def pipeline2dot(pipe, data, **params):
191 """
192 Exports a *scikit-learn* pipeline to
193 :epkg:`DOT` language. See :ref:`visualizepipelinerst`
194 for an example.
196 @param pipe *scikit-learn* pipeline
197 @param data training data as a dataframe or a numpy array,
198 or just a list with the variable names
199 @param params additional params to draw the graph
200 @return string
202 Default options for the graph are:
204 ::
206 options = {
207 'orientation': 'portrait',
208 'ranksep': '0.25',
209 'nodesep': '0.05',
210 'width': '0.5',
211 'height': '0.1',
212 }
213 """
214 raw_data = data
215 data = OrderedDict()
216 if isinstance(raw_data, pandas.DataFrame):
217 for k, c in enumerate(raw_data.columns):
218 data[c] = 'sch0:f%d' % k
219 elif isinstance(raw_data, numpy.ndarray):
220 if len(raw_data.shape) != 2:
221 raise NotImplementedError( # pragma: no cover
222 f"Unexpected training data dimension {raw_data.shape}.")
223 for i in range(raw_data.shape[1]):
224 data['X%d' % i] = 'sch0:f%d' % i
225 elif not isinstance(raw_data, list):
226 raise TypeError( # pragma: no cover
227 f"Unexpected data type: {type(raw_data)}.")
229 options = {
230 'orientation': 'portrait',
231 'ranksep': '0.25',
232 'nodesep': '0.05',
233 'width': '0.5',
234 'height': '0.1',
235 }
236 options.update(params)
238 exp = ["digraph{"]
239 for opt in ['orientation', 'pad', 'nodesep', 'ranksep']:
240 if opt in options:
241 exp.append(f" {opt}={options[opt]};")
242 fontsize = 8
243 info = [dict(schema_after=data)]
244 names = OrderedDict()
245 for d in data:
246 names[d] = info
247 info.extend(_pipeline_info(pipe, data, context=dict(n=0, names=names)))
248 columns = OrderedDict()
250 for i, line in enumerate(info):
251 if i == 0:
252 schema = line['schema_after']
253 labs = []
254 for c, col in enumerate(schema):
255 columns[col] = f'sch0:f{c}'
256 labs.append(f"<f{c}> {col}")
257 node = ' sch0[label="{0}",shape=record,fontsize={1}];'.format(
258 "|".join(labs), params.get('fontsize', fontsize))
259 exp.append(node)
260 else:
261 exp.append('')
262 if line['type'] == 'transform':
263 node = ' node{0}[label="{1}",shape=box,style="filled' \
264 ',rounded",color=cyan,fontsize={2}];'.format(
265 i, line['name'],
266 int(params.get('fontsize', fontsize) * 1.5))
267 else:
268 node = ' node{0}[label="{1}",shape=box,style="filled,' \
269 'rounded",color=yellow,fontsize={2}];'.format(
270 i, line['name'],
271 int(params.get('fontsize', fontsize) * 1.5))
272 exp.append(node)
274 for inp in line['inputs']:
275 if isinstance(inp, int):
276 raise IndexError( # pragma: no cover
277 "Unable to guess columns {} in\n{}\n---\n{}".format(
278 inp, pprint.pformat(columns), '\n'.join(exp)))
279 else:
280 nc = columns.get(inp, inp)
281 edge = f' {nc} -> node{i};'
282 exp.append(edge)
284 labs = []
285 for c, out in enumerate(line['outputs']):
286 columns[out] = f'sch{i}:f{c}'
287 labs.append(f"<f{c}> {out}")
288 node = ' sch{0}[label="{1}",shape=record,fontsize={2}];'.format(
289 i, "|".join(labs), params.get('fontsize', fontsize))
290 exp.append(node)
292 for out in line['outputs']:
293 nc = columns[out]
294 edge = f' node{i} -> {nc};'
295 if edge not in exp:
296 exp.append(edge)
298 exp.append('}')
299 return "\n".join(exp)
302def pipeline2str(pipe, indent=3):
303 """
304 Exports a *scikit-learn* pipeline to text.
306 @param pipe *scikit-learn* pipeline
307 @return str
309 .. runpython::
310 :showcode:
312 from sklearn.linear_model import LogisticRegression
313 from sklearn.impute import SimpleImputer
314 from sklearn.preprocessing import OneHotEncoder
315 from sklearn.preprocessing import StandardScaler, MinMaxScaler
316 from sklearn.compose import ColumnTransformer
317 from sklearn.pipeline import Pipeline
319 from mlinsights.plotting import pipeline2str
321 numeric_features = ['age', 'fare']
322 numeric_transformer = Pipeline(steps=[
323 ('imputer', SimpleImputer(strategy='median')),
324 ('scaler', StandardScaler())])
326 categorical_features = ['embarked', 'sex', 'pclass']
327 categorical_transformer = Pipeline(steps=[
328 ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
329 ('onehot', OneHotEncoder(handle_unknown='ignore'))])
331 preprocessor = ColumnTransformer(
332 transformers=[
333 ('num', numeric_transformer, numeric_features),
334 ('cat', categorical_transformer, categorical_features),
335 ])
337 clf = Pipeline(steps=[('preprocessor', preprocessor),
338 ('classifier', LogisticRegression(solver='lbfgs'))])
339 text = pipeline2str(clf)
340 print(text)
341 """
342 rows = []
343 for coor, model, vs in enumerate_pipeline_models(pipe):
344 spaces = " " * indent * (len(coor) - 1)
345 if vs is None:
346 msg = f"{spaces}{model.__class__.__name__}"
347 else:
348 v = ','.join(map(str, vs))
349 msg = f"{spaces}{model.__class__.__name__}({v})"
350 rows.append(msg)
351 return "\n".join(rows)