Coverage for mlinsights/plotting/visualize.py: 97%

195 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-28 08:46 +0100

1""" 

2@file 

3@brief Helpers to visualize a pipeline. 

4""" 

5import pprint 

6from collections import OrderedDict 

7import numpy 

8import pandas 

9from sklearn.base import TransformerMixin, ClassifierMixin, RegressorMixin 

10from sklearn.pipeline import Pipeline, FeatureUnion 

11from sklearn.compose import ColumnTransformer, TransformedTargetRegressor 

12from ..helpers.pipeline import enumerate_pipeline_models 

13 

14 

15def _pipeline_info(pipe, data, context, former_data=None): 

16 """ 

17 Internal function to convert a pipeline into 

18 some graph. 

19 """ 

20 def _get_name(context, prefix='-v-', info=None, data=None): 

21 if info is None: 

22 raise RuntimeError("info should not be None") # pragma: no cover 

23 if isinstance(prefix, list): 

24 return [_get_name(context, el, info, data) for el in prefix] 

25 if isinstance(prefix, int): 

26 prefix = former_data[prefix] 

27 if isinstance(prefix, int): 

28 raise TypeError( # pragma: no cover 

29 f"prefix must be a string.\ninfo={info}") 

30 sug = "%s%d" % (prefix, context['n']) 

31 while sug in context['names']: 

32 context['n'] += 1 

33 sug = "%s%d" % (prefix, context['n']) 

34 context['names'][sug] = info 

35 return sug 

36 

37 def _get_name_simple(name, data): 

38 if isinstance(name, str): 

39 return name 

40 res = data[name] 

41 if isinstance(res, int): 

42 raise RuntimeError( # pragma: no cover 

43 f"Column name is still a number and not a name: {name} and {data}.") 

44 return res 

45 

46 if isinstance(pipe, Pipeline): 

47 infos = [] 

48 for _, model in pipe.steps: 

49 info = _pipeline_info(model, data, context) 

50 data = info[-1]["outputs"] 

51 infos.extend(info) 

52 return infos 

53 

54 if isinstance(pipe, ColumnTransformer): 

55 infos = [] 

56 outputs = [] 

57 for _, model, vs in pipe.transformers: 

58 if all(map(lambda o: isinstance(o, int), vs)): 

59 new_data = [] 

60 if isinstance(data, OrderedDict): 

61 new_data = [_[1] for _ in data.items()] 

62 else: 

63 mx = max(vs) 

64 while len(new_data) < mx: 

65 if len(data) > len(new_data): 

66 new_data.append(data[len(new_data)]) 

67 else: 

68 new_data.append(data[-1]) 

69 else: 

70 new_data = OrderedDict() 

71 for v in vs: 

72 new_data[v] = data.get(v, v) 

73 

74 info = _pipeline_info( 

75 model, new_data, context, former_data=new_data) 

76 #new_outputs = [] 

77 # for o in info[-1]['outputs']: 

78 # add = _get_name(context, prefix=o, info=info) 

79 # outputs.append(add) 

80 # new_outputs.append(add) 

81 #info[-1]['outputs'] = new_outputs 

82 outputs.extend(info[-1]['outputs']) 

83 infos.extend(info) 

84 

85 final_hat = False 

86 if pipe.remainder == "passthrough": 

87 

88 done = [set(d['inputs']) for d in info] 

89 merged = done[0] 

90 for d in done[1:]: 

91 merged.union(d) 

92 new_data = OrderedDict( 

93 [(k, v) for k, v in data.items() if k not in merged]) 

94 

95 info = _pipeline_info( 

96 "passthrough", new_data, context, former_data=new_data) 

97 outputs.extend(info[-1]['outputs']) 

98 infos.extend(info) 

99 final_hat = True 

100 

101 if len(pipe.transformers) > 1 or final_hat: 

102 info = {'name': 'union', 'inputs': outputs, 'type': 'transform'} 

103 info['outputs'] = [_get_name(context, info=info)] 

104 infos.append(info) 

105 return infos 

106 

107 if isinstance(pipe, FeatureUnion): 

108 infos = [] 

109 outputs = [] 

110 for _, model in pipe.transformer_list: 

111 info = _pipeline_info(model, data, context) 

112 new_outputs = [] 

113 for o in info[-1]['outputs']: 

114 add = _get_name(context, prefix=o, info=info) 

115 outputs.append(add) 

116 new_outputs.append(add) 

117 info[-1]['outputs'] = new_outputs 

118 infos.extend(info) 

119 if len(pipe.transformer_list) > 1: 

120 info = {'name': 'union', 'inputs': outputs, 'type': 'transform'} 

121 info['outputs'] = [_get_name(context, info=info)] 

122 infos.append(info) 

123 return infos 

124 

125 if isinstance(pipe, TransformedTargetRegressor): 

126 raise NotImplementedError( # pragma: no cover 

127 "Not yet implemented for TransformedTargetRegressor.") 

128 

129 if isinstance(pipe, TransformerMixin): 

130 info = {'name': pipe.__class__.__name__, 'type': 'transform'} 

131 if len(data) == 1: 

132 info['outputs'] = data 

133 info['inputs'] = data 

134 info = [info] 

135 else: 

136 info['inputs'] = [_get_name(context, info=info)] 

137 info['outputs'] = [_get_name(context, info=info)] 

138 info = [{'name': 'union', 'outputs': info['inputs'], 

139 'inputs': data, 'type': 'transform'}, info] 

140 return info 

141 

142 if isinstance(pipe, ClassifierMixin): 

143 info = {'name': pipe.__class__.__name__, 'type': 'classifier'} 

144 exp = ['PredictedLabel', 'Probabilities'] 

145 if len(data) == 1: 

146 info['outputs'] = exp 

147 info['inputs'] = data 

148 info = [info] 

149 else: 

150 info['outputs'] = exp 

151 info['inputs'] = [_get_name(context, info=info)] 

152 info = [{'name': 'union', 'outputs': info['inputs'], 'inputs': data, 

153 'type': 'transform'}, info] 

154 return info 

155 

156 if isinstance(pipe, RegressorMixin): 

157 info = {'name': pipe.__class__.__name__, 'type': 'regressor'} 

158 exp = ['Prediction'] 

159 if len(data) == 1: 

160 info['outputs'] = exp 

161 info['inputs'] = data 

162 info = [info] 

163 else: 

164 info['outputs'] = exp 

165 info['inputs'] = [_get_name(context, info=info)] 

166 info = [{'name': 'union', 'outputs': info['inputs'], 'inputs': data, 

167 'type': 'transform'}, info] 

168 return info 

169 

170 if isinstance(pipe, str): 

171 if pipe == "passthrough": 

172 info = {'name': 'Identity', 'type': 'transform'} 

173 info['inputs'] = [_get_name_simple(n, former_data) for n in data] 

174 if isinstance(data, (OrderedDict, dict)) and len(data) > 1: 

175 info['outputs'] = [ 

176 _get_name(context, data=k, info=info) 

177 for k in data] 

178 else: 

179 info['outputs'] = _get_name(context, data=data, info=info) 

180 info = [info] 

181 else: 

182 raise NotImplementedError( # pragma: no cover 

183 f"Not yet implemented for keyword '{type(pipe)}'.") 

184 return info 

185 

186 raise NotImplementedError( # pragma: no cover 

187 f"Not yet implemented for {type(pipe)}.") 

188 

189 

190def pipeline2dot(pipe, data, **params): 

191 """ 

192 Exports a *scikit-learn* pipeline to 

193 :epkg:`DOT` language. See :ref:`visualizepipelinerst` 

194 for an example. 

195 

196 @param pipe *scikit-learn* pipeline 

197 @param data training data as a dataframe or a numpy array, 

198 or just a list with the variable names 

199 @param params additional params to draw the graph 

200 @return string 

201 

202 Default options for the graph are: 

203 

204 :: 

205 

206 options = { 

207 'orientation': 'portrait', 

208 'ranksep': '0.25', 

209 'nodesep': '0.05', 

210 'width': '0.5', 

211 'height': '0.1', 

212 } 

213 """ 

214 raw_data = data 

215 data = OrderedDict() 

216 if isinstance(raw_data, pandas.DataFrame): 

217 for k, c in enumerate(raw_data.columns): 

218 data[c] = 'sch0:f%d' % k 

219 elif isinstance(raw_data, numpy.ndarray): 

220 if len(raw_data.shape) != 2: 

221 raise NotImplementedError( # pragma: no cover 

222 f"Unexpected training data dimension {raw_data.shape}.") 

223 for i in range(raw_data.shape[1]): 

224 data['X%d' % i] = 'sch0:f%d' % i 

225 elif not isinstance(raw_data, list): 

226 raise TypeError( # pragma: no cover 

227 f"Unexpected data type: {type(raw_data)}.") 

228 

229 options = { 

230 'orientation': 'portrait', 

231 'ranksep': '0.25', 

232 'nodesep': '0.05', 

233 'width': '0.5', 

234 'height': '0.1', 

235 } 

236 options.update(params) 

237 

238 exp = ["digraph{"] 

239 for opt in ['orientation', 'pad', 'nodesep', 'ranksep']: 

240 if opt in options: 

241 exp.append(f" {opt}={options[opt]};") 

242 fontsize = 8 

243 info = [dict(schema_after=data)] 

244 names = OrderedDict() 

245 for d in data: 

246 names[d] = info 

247 info.extend(_pipeline_info(pipe, data, context=dict(n=0, names=names))) 

248 columns = OrderedDict() 

249 

250 for i, line in enumerate(info): 

251 if i == 0: 

252 schema = line['schema_after'] 

253 labs = [] 

254 for c, col in enumerate(schema): 

255 columns[col] = f'sch0:f{c}' 

256 labs.append(f"<f{c}> {col}") 

257 node = ' sch0[label="{0}",shape=record,fontsize={1}];'.format( 

258 "|".join(labs), params.get('fontsize', fontsize)) 

259 exp.append(node) 

260 else: 

261 exp.append('') 

262 if line['type'] == 'transform': 

263 node = ' node{0}[label="{1}",shape=box,style="filled' \ 

264 ',rounded",color=cyan,fontsize={2}];'.format( 

265 i, line['name'], 

266 int(params.get('fontsize', fontsize) * 1.5)) 

267 else: 

268 node = ' node{0}[label="{1}",shape=box,style="filled,' \ 

269 'rounded",color=yellow,fontsize={2}];'.format( 

270 i, line['name'], 

271 int(params.get('fontsize', fontsize) * 1.5)) 

272 exp.append(node) 

273 

274 for inp in line['inputs']: 

275 if isinstance(inp, int): 

276 raise IndexError( # pragma: no cover 

277 "Unable to guess columns {} in\n{}\n---\n{}".format( 

278 inp, pprint.pformat(columns), '\n'.join(exp))) 

279 else: 

280 nc = columns.get(inp, inp) 

281 edge = f' {nc} -> node{i};' 

282 exp.append(edge) 

283 

284 labs = [] 

285 for c, out in enumerate(line['outputs']): 

286 columns[out] = f'sch{i}:f{c}' 

287 labs.append(f"<f{c}> {out}") 

288 node = ' sch{0}[label="{1}",shape=record,fontsize={2}];'.format( 

289 i, "|".join(labs), params.get('fontsize', fontsize)) 

290 exp.append(node) 

291 

292 for out in line['outputs']: 

293 nc = columns[out] 

294 edge = f' node{i} -> {nc};' 

295 if edge not in exp: 

296 exp.append(edge) 

297 

298 exp.append('}') 

299 return "\n".join(exp) 

300 

301 

302def pipeline2str(pipe, indent=3): 

303 """ 

304 Exports a *scikit-learn* pipeline to text. 

305 

306 @param pipe *scikit-learn* pipeline 

307 @return str 

308 

309 .. runpython:: 

310 :showcode: 

311 

312 from sklearn.linear_model import LogisticRegression 

313 from sklearn.impute import SimpleImputer 

314 from sklearn.preprocessing import OneHotEncoder 

315 from sklearn.preprocessing import StandardScaler, MinMaxScaler 

316 from sklearn.compose import ColumnTransformer 

317 from sklearn.pipeline import Pipeline 

318 

319 from mlinsights.plotting import pipeline2str 

320 

321 numeric_features = ['age', 'fare'] 

322 numeric_transformer = Pipeline(steps=[ 

323 ('imputer', SimpleImputer(strategy='median')), 

324 ('scaler', StandardScaler())]) 

325 

326 categorical_features = ['embarked', 'sex', 'pclass'] 

327 categorical_transformer = Pipeline(steps=[ 

328 ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), 

329 ('onehot', OneHotEncoder(handle_unknown='ignore'))]) 

330 

331 preprocessor = ColumnTransformer( 

332 transformers=[ 

333 ('num', numeric_transformer, numeric_features), 

334 ('cat', categorical_transformer, categorical_features), 

335 ]) 

336 

337 clf = Pipeline(steps=[('preprocessor', preprocessor), 

338 ('classifier', LogisticRegression(solver='lbfgs'))]) 

339 text = pipeline2str(clf) 

340 print(text) 

341 """ 

342 rows = [] 

343 for coor, model, vs in enumerate_pipeline_models(pipe): 

344 spaces = " " * indent * (len(coor) - 1) 

345 if vs is None: 

346 msg = f"{spaces}{model.__class__.__name__}" 

347 else: 

348 v = ','.join(map(str, vs)) 

349 msg = f"{spaces}{model.__class__.__name__}({v})" 

350 rows.append(msg) 

351 return "\n".join(rows)