Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2@file
3@brief Dig into pipelines.
4"""
5import textwrap
6import warnings
7from types import MethodType
8from sklearn.base import TransformerMixin, ClassifierMixin, RegressorMixin, BaseEstimator
9from sklearn.pipeline import Pipeline, FeatureUnion
10from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
13def enumerate_pipeline_models(pipe, coor=None, vs=None):
14 """
15 Enumerates all the models within a pipeline.
17 @param pipe *scikit-learn* pipeline
18 @param coor current coordinate
19 @param vs subset of variables for the model, None for all
20 @return iterator on models ``tuple(coordinate, model)``
22 See notebook :ref:`visualizepipelinerst`.
23 """
24 if coor is None:
25 coor = (0,)
26 if pipe == "passthrough":
27 class PassThrough:
28 "dummy class to help display"
29 pass
30 yield coor, PassThrough(), vs
31 else:
32 yield coor, pipe, vs
33 if hasattr(pipe, 'transformer_and_mapper_list') and len(pipe.transformer_and_mapper_list):
34 # azureml DataTransformer
35 raise NotImplementedError( # pragma: no cover
36 "Unable to handle this specific case.")
37 elif hasattr(pipe, 'mapper') and pipe.mapper:
38 # azureml DataTransformer
39 for couple in enumerate_pipeline_models( # pragma: no cover
40 pipe.mapper, coor + (0,)): # pragma: no cover
41 yield couple # pragma: no cover
42 elif hasattr(pipe, 'built_features'): # pragma: no cover
43 # sklearn_pandas.dataframe_mapper.DataFrameMapper
44 for i, (columns, transformers, _) in enumerate(pipe.built_features):
45 if isinstance(columns, str):
46 columns = (columns,)
47 if transformers is None:
48 yield (coor + (i,)), None, columns
49 else:
50 for couple in enumerate_pipeline_models(transformers, coor + (i,), columns):
51 yield couple
52 elif isinstance(pipe, Pipeline):
53 for i, (_, model) in enumerate(pipe.steps):
54 for couple in enumerate_pipeline_models(model, coor + (i,)):
55 yield couple
56 elif isinstance(pipe, ColumnTransformer):
57 for i, (_, fitted_transformer, column) in enumerate(pipe.transformers):
58 for couple in enumerate_pipeline_models(
59 fitted_transformer, coor + (i,), column):
60 yield couple
61 elif isinstance(pipe, FeatureUnion):
62 for i, (_, model) in enumerate(pipe.transformer_list):
63 for couple in enumerate_pipeline_models(model, coor + (i,)):
64 yield couple
65 elif isinstance(pipe, TransformedTargetRegressor):
66 raise NotImplementedError( # pragma: no cover
67 "Not yet implemented for TransformedTargetRegressor.")
68 elif isinstance(pipe, (TransformerMixin, ClassifierMixin, RegressorMixin)):
69 pass
70 elif isinstance(pipe, BaseEstimator): # pragma: no cover
71 pass
72 else:
73 raise TypeError( # pragma: no cover
74 "pipe is not a scikit-learn object: {}\n{}".format(type(pipe), pipe))
77class BaseEstimatorDebugInformation:
78 """
79 Stores information when the outputs of a pipeline
80 is computed. It as added by function
81 @see fct alter_pipeline_for_debugging.
82 """
84 def __init__(self, model):
85 self.model = model
86 self.inputs = {}
87 self.outputs = {}
88 self.methods = {}
89 if hasattr(model, "transform") and callable(model.transform):
90 model._debug_transform = model.transform
91 self.methods["transform"] = lambda model, X: model._debug_transform(
92 X)
93 if hasattr(model, "predict") and callable(model.predict):
94 model._debug_predict = model.predict
95 self.methods["predict"] = lambda model, X: model._debug_predict(X)
96 if hasattr(model, "predict_proba") and callable(model.predict_proba):
97 model._debug_predict_proba = model.predict_proba
98 self.methods["predict_proba"] = lambda model, X: model._debug_predict_proba(
99 X)
100 if hasattr(model, "decision_function") and callable(model.decision_function):
101 model._debug_decision_function = model.decision_function
102 self.methods["decision_function"] = lambda model, X: model._debug_decision_function(
103 X)
105 def __repr__(self):
106 """
107 usual
108 """
109 return self.to_str()
111 def to_str(self, nrows=5):
112 """
113 Tries to produce a readable message.
114 """
115 rows = ['BaseEstimatorDebugInformation({})'.format(
116 self.model.__class__.__name__)]
117 for k in sorted(self.inputs):
118 if k in self.outputs:
119 rows.append(' ' + k + '(')
120 self.display(self.inputs[k], nrows)
121 rows.append(textwrap.indent(
122 self.display(self.inputs[k], nrows), ' '))
123 rows.append(' ) -> (')
124 rows.append(textwrap.indent(
125 self.display(self.outputs[k], nrows), ' '))
126 rows.append(' )')
127 else:
128 raise KeyError( # pragma: no cover
129 "Unable to find output for method '{}'.".format(k))
130 return "\n".join(rows)
132 def display(self, data, nrows):
133 """
134 Displays the first
135 """
136 text = str(data)
137 rows = text.split('\n')
138 if len(rows) > nrows:
139 rows = rows[:nrows]
140 rows.append('...')
141 if hasattr(data, 'shape'):
142 rows.insert(0, "shape=%r type=%r" % (data.shape, type(data)))
143 else:
144 rows.insert(0, "type=%r" % type(data)) # pragma: no cover
145 return "\n".join(rows)
148def alter_pipeline_for_debugging(pipe):
149 """
150 Overwrite methods *transform*, *predict*, *predict_proba*
151 or *decision_function* to collect the last inputs and outputs
152 seen in these methods.
154 @param pipe *scikit-learn* pipeline
156 The object *pipe* is modified, it should be copied
157 before calling this function if you need the object
158 untouched after that. The prediction is slower.
159 See notebook :ref:`visualizepipelinerst`.
160 """
162 def transform(self, X, *args, **kwargs):
163 self._debug.inputs['transform'] = X
164 y = self._debug.methods['transform'](self, X, *args, **kwargs)
165 self._debug.outputs['transform'] = y
166 return y
168 def predict(self, X, *args, **kwargs):
169 self._debug.inputs['predict'] = X
170 y = self._debug.methods['predict'](self, X, *args, **kwargs)
171 self._debug.outputs['predict'] = y
172 return y
174 def predict_proba(self, X, *args, **kwargs):
175 self._debug.inputs['predict_proba'] = X
176 y = self._debug.methods['predict_proba'](self, X, *args, **kwargs)
177 self._debug.outputs['predict_proba'] = y
178 return y
180 def decision_function(self, X, *args, **kwargs):
181 self._debug.inputs['decision_function'] = X
182 y = self._debug.methods['decision_function'](self, X, *args, **kwargs)
183 self._debug.outputs['decision_function'] = y
184 return y
186 new_methods = {
187 'decision_function': decision_function,
188 'transform': transform,
189 'predict': predict,
190 'predict_proba': predict_proba,
191 }
193 if hasattr(pipe, '_debug'):
194 raise RuntimeError( # pragma: no cover
195 "The same operator cannot be used twice in "
196 "the same pipeline or this method was called "
197 "a second time.")
199 for model_ in enumerate_pipeline_models(pipe):
200 model = model_[1]
201 model._debug = BaseEstimatorDebugInformation(model)
202 for k in model._debug.methods:
203 try:
204 setattr(model, k, MethodType(new_methods[k], model))
205 except AttributeError: # pragma: no cover
206 warnings.warn("Unable to overwrite method '{}' for class "
207 "{}.".format(k, type(model)))