Coverage for mlinsights/helpers/pipeline.py: 100%

100 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2022-09-27 08:44 +0200

1""" 

2@file 

3@brief Dig into pipelines. 

4""" 

5import textwrap 

6import warnings 

7from types import MethodType 

8from sklearn.base import TransformerMixin, ClassifierMixin, RegressorMixin, BaseEstimator 

9from sklearn.pipeline import Pipeline, FeatureUnion 

10from sklearn.compose import ColumnTransformer, TransformedTargetRegressor 

11 

12 

13def enumerate_pipeline_models(pipe, coor=None, vs=None): 

14 """ 

15 Enumerates all the models within a pipeline. 

16 

17 @param pipe *scikit-learn* pipeline 

18 @param coor current coordinate 

19 @param vs subset of variables for the model, None for all 

20 @return iterator on models ``tuple(coordinate, model)`` 

21 

22 See notebook :ref:`visualizepipelinerst`. 

23 """ 

24 if coor is None: 

25 coor = (0,) 

26 if pipe == "passthrough": 

27 class PassThrough: 

28 "dummy class to help display" 

29 pass 

30 yield coor, PassThrough(), vs 

31 else: 

32 yield coor, pipe, vs 

33 if hasattr(pipe, 'transformer_and_mapper_list') and len(pipe.transformer_and_mapper_list): 

34 # azureml DataTransformer 

35 raise NotImplementedError( # pragma: no cover 

36 "Unable to handle this specific case.") 

37 elif hasattr(pipe, 'mapper') and pipe.mapper: 

38 # azureml DataTransformer 

39 for couple in enumerate_pipeline_models( # pragma: no cover 

40 pipe.mapper, coor + (0,)): # pragma: no cover 

41 yield couple # pragma: no cover 

42 elif hasattr(pipe, 'built_features'): # pragma: no cover 

43 # sklearn_pandas.dataframe_mapper.DataFrameMapper 

44 for i, (columns, transformers, _) in enumerate(pipe.built_features): 

45 if isinstance(columns, str): 

46 columns = (columns,) 

47 if transformers is None: 

48 yield (coor + (i,)), None, columns 

49 else: 

50 for couple in enumerate_pipeline_models(transformers, coor + (i,), columns): 

51 yield couple 

52 elif isinstance(pipe, Pipeline): 

53 for i, (_, model) in enumerate(pipe.steps): 

54 for couple in enumerate_pipeline_models(model, coor + (i,)): 

55 yield couple 

56 elif isinstance(pipe, ColumnTransformer): 

57 for i, (_, fitted_transformer, column) in enumerate(pipe.transformers): 

58 for couple in enumerate_pipeline_models( 

59 fitted_transformer, coor + (i,), column): 

60 yield couple 

61 elif isinstance(pipe, FeatureUnion): 

62 for i, (_, model) in enumerate(pipe.transformer_list): 

63 for couple in enumerate_pipeline_models(model, coor + (i,)): 

64 yield couple 

65 elif isinstance(pipe, TransformedTargetRegressor): 

66 raise NotImplementedError( # pragma: no cover 

67 "Not yet implemented for TransformedTargetRegressor.") 

68 elif isinstance(pipe, (TransformerMixin, ClassifierMixin, RegressorMixin)): 

69 pass 

70 elif isinstance(pipe, BaseEstimator): # pragma: no cover 

71 pass 

72 else: 

73 raise TypeError( # pragma: no cover 

74 f"pipe is not a scikit-learn object: {type(pipe)}\n{pipe}") 

75 

76 

77class BaseEstimatorDebugInformation: 

78 """ 

79 Stores information when the outputs of a pipeline 

80 is computed. It as added by function 

81 @see fct alter_pipeline_for_debugging. 

82 """ 

83 

84 def __init__(self, model): 

85 self.model = model 

86 self.inputs = {} 

87 self.outputs = {} 

88 self.methods = {} 

89 if hasattr(model, "transform") and callable(model.transform): 

90 model._debug_transform = model.transform 

91 self.methods["transform"] = lambda model, X: model._debug_transform( 

92 X) 

93 if hasattr(model, "predict") and callable(model.predict): 

94 model._debug_predict = model.predict 

95 self.methods["predict"] = lambda model, X: model._debug_predict(X) 

96 if hasattr(model, "predict_proba") and callable(model.predict_proba): 

97 model._debug_predict_proba = model.predict_proba 

98 self.methods["predict_proba"] = lambda model, X: model._debug_predict_proba( 

99 X) 

100 if hasattr(model, "decision_function") and callable(model.decision_function): 

101 model._debug_decision_function = model.decision_function 

102 self.methods["decision_function"] = lambda model, X: model._debug_decision_function( 

103 X) 

104 

105 def __repr__(self): 

106 """ 

107 usual 

108 """ 

109 return self.to_str() 

110 

111 def to_str(self, nrows=5): 

112 """ 

113 Tries to produce a readable message. 

114 """ 

115 rows = [ 

116 f'BaseEstimatorDebugInformation({self.model.__class__.__name__})'] 

117 for k in sorted(self.inputs): 

118 if k in self.outputs: 

119 rows.append(' ' + k + '(') 

120 self.display(self.inputs[k], nrows) 

121 rows.append(textwrap.indent( 

122 self.display(self.inputs[k], nrows), ' ')) 

123 rows.append(' ) -> (') 

124 rows.append(textwrap.indent( 

125 self.display(self.outputs[k], nrows), ' ')) 

126 rows.append(' )') 

127 else: 

128 raise KeyError( # pragma: no cover 

129 f"Unable to find output for method '{k}'.") 

130 return "\n".join(rows) 

131 

132 def display(self, data, nrows): 

133 """ 

134 Displays the first 

135 """ 

136 text = str(data) 

137 rows = text.split('\n') 

138 if len(rows) > nrows: 

139 rows = rows[:nrows] 

140 rows.append('...') 

141 if hasattr(data, 'shape'): 

142 rows.insert(0, f"shape={data.shape!r} type={type(data)!r}") 

143 else: 

144 rows.insert(0, f"type={type(data)!r}") # pragma: no cover 

145 return "\n".join(rows) 

146 

147 

148def alter_pipeline_for_debugging(pipe): 

149 """ 

150 Overwrite methods *transform*, *predict*, *predict_proba* 

151 or *decision_function* to collect the last inputs and outputs 

152 seen in these methods. 

153 

154 @param pipe *scikit-learn* pipeline 

155 

156 The object *pipe* is modified, it should be copied 

157 before calling this function if you need the object 

158 untouched after that. The prediction is slower. 

159 See notebook :ref:`visualizepipelinerst`. 

160 """ 

161 

162 def transform(self, X, *args, **kwargs): 

163 self._debug.inputs['transform'] = X 

164 y = self._debug.methods['transform'](self, X, *args, **kwargs) 

165 self._debug.outputs['transform'] = y 

166 return y 

167 

168 def predict(self, X, *args, **kwargs): 

169 self._debug.inputs['predict'] = X 

170 y = self._debug.methods['predict'](self, X, *args, **kwargs) 

171 self._debug.outputs['predict'] = y 

172 return y 

173 

174 def predict_proba(self, X, *args, **kwargs): 

175 self._debug.inputs['predict_proba'] = X 

176 y = self._debug.methods['predict_proba'](self, X, *args, **kwargs) 

177 self._debug.outputs['predict_proba'] = y 

178 return y 

179 

180 def decision_function(self, X, *args, **kwargs): 

181 self._debug.inputs['decision_function'] = X 

182 y = self._debug.methods['decision_function'](self, X, *args, **kwargs) 

183 self._debug.outputs['decision_function'] = y 

184 return y 

185 

186 new_methods = { 

187 'decision_function': decision_function, 

188 'transform': transform, 

189 'predict': predict, 

190 'predict_proba': predict_proba, 

191 } 

192 

193 if hasattr(pipe, '_debug'): 

194 raise RuntimeError( # pragma: no cover 

195 "The same operator cannot be used twice in " 

196 "the same pipeline or this method was called " 

197 "a second time.") 

198 

199 for model_ in enumerate_pipeline_models(pipe): 

200 model = model_[1] 

201 model._debug = BaseEstimatorDebugInformation(model) 

202 for k in model._debug.methods: 

203 try: 

204 setattr(model, k, MethodType(new_methods[k], model)) 

205 except AttributeError: # pragma: no cover 

206 warnings.warn( 

207 f"Unable to overwrite method {k!r} for class " 

208 f"{type(model)!r}.")