# Coverage for mlinsights/mlmodel/decision_tree_logreg.py: 99%

## 189 statements

, created at 2022-08-09 08:45 +0200

1"""

2@file

3@brief Builds a tree of logistic regressions.

4"""

5import numpy

6import scipy.sparse as sparse # pylint: disable=R0402

7from sklearn.linear_model import LogisticRegression

8from sklearn.base import BaseEstimator, ClassifierMixin, clone

9from sklearn.linear_model._base import LinearClassifierMixin

12def logistic(x):

13 """

14 Computes :math:\\frac{1}{1 + e^{-x}}.

15 """

16 return 1. / (1. + numpy.exp(-x))

19def likelihood(x, y, theta=1., th=0.):

20 """

21 Computes :math:\\sum_i y_i f(\\theta (x_i - x_0)) + (1 - y_i) (1 - f(\\theta (x_i - x_0)))

22 where :math:f(x_i) is :math:\\frac{1}{1 + e^{-x}}.

23 """

24 lr = logistic((x - th) * theta)

25 return y * lr + (1. - y) * (1 - lr)

28class _DecisionTreeLogisticRegressionNode:

29 """

30 Describes the tree structure hold by class

31 @see cl DecisionTreeLogisticRegression.

32 See also notebook :ref:decisiontreelogregrst.

33 """

35 def __init__(self, estimator, threshold=0.5, depth=1, index=0):

36 """

37 constructor

39 @param estimator binary estimator

40 """

41 self.index = index

42 self.estimator = estimator

43 self.above = None

44 self.below = None

45 self.threshold = threshold

46 self.depth = depth

48 def predict(self, X):

49 """

50 Predicts

51 """

52 prob = self.predict_proba(X)

53 return (prob[:, 1] >= 0.5).astype(numpy.int32)

55 def predict_proba(self, X):

56 """

57 Returns the classification probabilities.

59 @param X features

60 @return probabilties

61 """

62 prob = self.estimator.predict_proba(X)

63 above = prob[:, 1] > self.threshold

64 below = ~ above

65 n_above = above.sum()

66 n_below = below.sum()

67 if self.above is not None and n_above > 0:

68 prob_above = self.above.predict_proba(X[above])

69 prob[above] = prob_above

70 if self.below is not None and n_below > 0:

71 prob_below = self.below.predict_proba(X[below])

72 prob[below] = prob_below

73 return prob

75 def decision_path(self, X, mat, indices):

76 """

77 Returns the classification probabilities.

79 @param X features

80 @param mat decision path (allocated matrix)

81 """

82 mat[indices, self.index] = 1

83 prob = self.estimator.predict_proba(X)

84 above = prob[:, 1] > self.threshold

85 below = ~ above

86 n_above = above.sum()

87 n_below = below.sum()

88 indices_above = indices[above]

89 indices_below = indices[below]

90 if self.above is not None and n_above > 0:

91 self.above.decision_path(X[above], mat, indices_above)

92 if self.below is not None and n_below > 0:

93 self.below.decision_path(X[below], mat, indices_below)

95 def fit(self, X, y, sample_weight, dtlr, total_N):

96 """

97 Fits a logistic regression, then splits the sample into

98 positive and negative examples, finally tries to fit

99 logistic regressions on both subsamples. This method only

100 works on a linear classifier.

102 @param X features

103 @param y binary labels

104 @param sample_weight weights of every sample

105 @param dtlr @see cl DecisionTreeLogisticRegression

106 @param total_N total number of observation

107 @return last index

108 """

109 self.estimator.fit(X, y, sample_weight=sample_weight)

110 if dtlr.verbose >= 1:

111 print("[DTLR ] %s trained acc %1.2f N=%d" % ( # pragma: no cover

112 " " * self.depth, self.estimator.score(X, y), X.shape))

113 prob = self.fit_improve(dtlr, total_N, X, y,

114 sample_weight=sample_weight)

116 if self.depth + 1 > dtlr.max_depth:

117 return self.index

118 if X.shape < dtlr.min_samples_split:

119 return self.index

121 above = prob[:, 1] > self.threshold

122 below = ~ above

123 n_above = above.sum()

124 n_below = below.sum()

125 y_above = set(y[above])

126 y_below = set(y[below])

128 def _fit_side(index, y_above_below, above_below, n_above_below, side):

129 if dtlr.verbose >= 1:

130 print("[DTLR*] %s%s: n_class=%d N=%d - %d/%d" % ( # pragma: no cover

131 " " * self.depth, side,

132 len(y_above_below), above_below.shape,

133 n_above_below, total_N))

134 if (len(y_above_below) > 1 and

135 above_below.shape > dtlr.min_samples_leaf * 2 and

136 (float(n_above_below) / total_N >=

137 dtlr.min_weight_fraction_leaf * 2) and

138 n_above_below < total_N):

139 estimator = clone(dtlr.estimator)

140 sw = sample_weight[above_below] if sample_weight is not None else None

141 node = _DecisionTreeLogisticRegressionNode(

142 estimator, self.threshold, depth=self.depth + 1, index=index)

143 last_index = node.fit(

144 X[above_below], y[above_below], sw, dtlr, total_N)

145 return node, last_index

146 return None, index

148 self.above, last = _fit_side(

149 self.index + 1, y_above, above, n_above, "above")

150 self.below, last = _fit_side(

151 last + 1, y_below, below, n_below, "below")

152 return last

154 @property

155 def tree_depth_(self):

156 """

157 Returns the maximum depth of the tree.

158 """

159 dt = self.depth

160 if self.above is not None:

161 dt = max(dt, self.above.tree_depth_)

162 if self.below is not None:

163 dt = max(dt, self.below.tree_depth_)

164 return dt

166 def fit_improve(self, dtlr, total_N, X, y, sample_weight):

167 """

168 The method only works on a linear classifier, it changes

169 the intercept in order to be within the constraints

170 imposed by the *min_samples_leaf* and *min_weight_fraction_leaf*.

171 The algorithm has a significant cost as it sorts every observation

172 and chooses the best intercept.

174 @param dtlr @see cl DecisionTreeLogisticRegression

175 @param total_N total number of observations

176 @param X features

177 @param y labels

178 @param sample_weight sample weight

179 @return probabilities

180 """

181 if self.estimator is None:

182 raise RuntimeError(

183 "Estimator was not trained.") # pragma: no cover

184 prob = self.estimator.predict_proba(X)

185 if dtlr.fit_improve_algo in (None, 'none'):

186 return prob

188 if not isinstance(self.estimator, LinearClassifierMixin):

189 # The classifier is not linear and cannot be improved.

190 if dtlr.fit_improve_algo == 'intercept_sort_always': # pragma: no cover

191 raise RuntimeError(

192 f"The model is not linear "

193 f"({self.estimator.__class__.__name__!r}), "

194 f"intercept cannot be improved.")

195 return prob

197 above = prob[:, 1] > self.threshold

198 below = ~ above

199 n_above = above.sum()

200 n_below = below.sum()

201 n_min = min(n_above, n_below)

202 p1p2 = float(n_above * n_below) / X.shape ** 2

203 if dtlr.verbose >= 2:

204 print("[DTLRI] %s imp %d <> %d, p1p2=%1.3f <> %1.3f" % ( # pragma: no cover

205 " " * self.depth, n_min, dtlr.min_samples_leaf,

206 p1p2, dtlr.p1p2))

207 if (n_min >= dtlr.min_samples_leaf and

208 float(n_min) / total_N >= dtlr.min_weight_fraction_leaf and

209 p1p2 > dtlr.p1p2 and

210 dtlr.fit_improve_algo != 'intercept_sort_always'):

211 return prob

213 coef = self.estimator.coef_

214 decision_function = (X @ coef.T).ravel()

215 order = numpy.argsort(decision_function, axis=0)

216 begin = dtlr.min_samples_leaf

218 sorted_df = decision_function[order]

219 sorted_y = decision_function[order]

220 N = sorted_y.shape

222 best = None

223 besti = None

224 beta_best = None

225 for i in range(begin, N - begin):

226 beta = - sorted_df[i]

227 like = numpy.sum(likelihood(decision_function + beta, y)) / N

228 w = float(i * (N - i)) / N**2

229 like += w * dtlr.gamma

230 if besti is None or like > best:

231 best = like

232 besti = i

233 beta_best = beta

235 if beta_best is not None:

236 if dtlr.verbose >= 1:

237 print("[DTLRI] %s change intercept %f --> %f in [%f, %f]" % ( # pragma: no cover

238 " " * self.depth, self.estimator.intercept_, beta_best,

239 - sorted_df[-1], - sorted_df))

240 self.estimator.intercept_ = beta_best

241 prob = self.estimator.predict_proba(X)

242 return prob

244 def enumerate_leaves_index(self):

245 """

246 Returns the leaves index.

247 """

248 if self.above is None or self.below is None:

249 yield self.index

250 if self.above is not None:

251 for index in self.above.enumerate_leaves_index():

252 yield index

253 if self.below is not None:

254 for index in self.below.enumerate_leaves_index():

255 yield index

258class DecisionTreeLogisticRegression(BaseEstimator, ClassifierMixin):

259 """

260 Fits a logistic regression, then fits two other

261 logistic regression for every observation on both sides

262 of the border. It goes one until a tree is built.

263 It only handles a binary classification.

264 The built tree cannot be deeper than the maximum recursion.

266 :param estimator: binary classification estimator,

267 if empty, use a logistic regression, the theoritical

268 model defined with a logistic regression but it could

269 any binary classifier

270 :param max_depth: int, default=None

271 The maximum depth of the tree. If None, then nodes are expanded until

272 all leaves are pure or until all leaves contain less than

273 min_samples_split samples. It must be below the maximum

274 allowed recursion by python.

275 :param min_samples_split: int or float, default=2

276 The minimum number of samples required to split an internal node:

277 - If int, then consider min_samples_split as the minimum number.

278 - If float, then min_samples_split is a fraction and

279 ceil(min_samples_split * n_samples) are the minimum

280 number of samples for each split.

281 :param min_samples_leaf: int or float, default=1

282 The minimum number of samples required to be at a leaf node.

283 A split point at any depth will only be considered if it leaves at

284 least min_samples_leaf training samples in each of the left and

285 right branches. This may have the effect of smoothing the model,

286 especially in regression.

287 - If int, then consider min_samples_leaf as the minimum number.

288 - If float, then min_samples_leaf is a fraction and

289 ceil(min_samples_leaf * n_samples) are the minimum

290 number of samples for each node.

291 :param min_weight_fraction_leaf: float, default=0.0

292 The minimum weighted fraction of the sum total of weights (of all

293 the input samples) required to be at a leaf node. Samples have

294 equal weight when sample_weight is not provided.

295 :param fit_improve_algo: string, one of the following value:

296 - 'auto': chooses the best option below, 'none' for

297 every non linear model, 'intercept_sort' for linear models

298 - 'none': does not nothing once the binary classifier is fit

299 - 'intercept_sort': if one side of the classifier is too small,

300 the method changes the best intercept possible verifying

301 the constraints

302 - 'intercept_sort_always': always chooses the best intercept

303 possible

304 :param p1p2: threshold in [0, 1]

305 for every split, we can define probabilities :math:p_1 p_2

306 which define the ratio of samples in both splits,

307 if :math:p_1 p_2 is below the threshold,

308 method *fit_improve* is called

309 :param gamma: weight before the coefficient :math:p (1-p).

310 When the model tries to improve the linear classifier,

311 it looks a better intercept which maximizes the

312 likelihood and verifies the constraints.

313 In order to force the classifier to choose a value

314 which splits the dataset into 2 almost equal folds,

315 the function maximimes :math:likelihood + \\gamma p (1 - p)

316 where *p* is the proportion of samples falling in the first

317 fold.

318 :param verbose: prints out information about the training

319 :param strategy: 'parallel' or 'perpendicular',

320 see below

322 Fitted attributes:

324 * classes_: ndarray of shape (n_classes,) or list of ndarray

325 The classes labels (single output problem),

326 or a list of arrays of class labels (multi-output problem).

327 * tree_: Tree

328 The underlying Tree object.

330 The class implements two strategies to build the tree.

331 The first one 'parallel' splits the feature space using

332 the hyperplan defined by a logistic regression, the second

333 strategy 'perpendicular' splis the feature space based on

334 a hyperplan perpendicular to a logistic regression. By doing

335 this, two logistic regression fit on both sub parts must

336 necessary decreases the training error.

337 """

339 _fit_improve_algo_values = (

340 None, 'none', 'auto', 'intercept_sort', 'intercept_sort_always')

342 def __init__(self, estimator=None,

343 max_depth=20, min_samples_split=2,

344 min_samples_leaf=2, min_weight_fraction_leaf=0.0,

345 fit_improve_algo='auto', p1p2=0.09,

346 gamma=1., verbose=0, strategy='parallel'):

347 "constructor"

348 ClassifierMixin.__init__(self)

349 BaseEstimator.__init__(self)

350 # logistic regression

351 if estimator is None:

352 self.estimator = LogisticRegression()

353 else:

354 self.estimator = estimator

355 if max_depth is None:

356 raise ValueError("'max_depth' cannot be None.") # pragma: no cover

357 if max_depth > 1024:

358 raise ValueError(

359 "'max_depth' must be <= 1024.") # pragma: no cover

360 self.max_depth = max_depth

361 self.min_samples_split = min_samples_split

362 self.min_samples_leaf = min_samples_leaf

363 self.min_weight_fraction_leaf = min_weight_fraction_leaf

364 self.fit_improve_algo = fit_improve_algo

365 self.p1p2 = p1p2

366 self.gamma = gamma

367 self.verbose = verbose

368 self.strategy = strategy

370 if self.fit_improve_algo not in DecisionTreeLogisticRegression._fit_improve_algo_values:

371 raise ValueError(

372 f"fit_improve_algo={self.fit_improve_algo!r} "

373 f"not in {DecisionTreeLogisticRegression._fit_improve_algo_values}.")

375 def fit(self, X, y, sample_weight=None):

376 """

377 Builds the tree model.

379 :param X: numpy array or sparse matrix of shape [n_samples,n_features]

380 Training data

381 :param y: numpy array of shape [n_samples, n_targets]

382 Target values. Will be cast to X's dtype if necessary

383 :param sample_weight: numpy array of shape [n_samples]

384 Individual weights for each sample

385 :return: self : returns an instance of self.

387 Fitted attributes:

389 * classes_: classes

390 * tree_: tree structure, see @see cl _DecisionTreeLogisticRegressionNode

391 * n_nodes_: number of nodes

392 """

393 if not isinstance(X, numpy.ndarray):

394 if hasattr(X, 'values'):

395 X = X.values

396 if not isinstance(X, numpy.ndarray):

397 raise TypeError("'X' must be an array.")

398 if (sample_weight is not None and

399 not isinstance(sample_weight, numpy.ndarray)):

400 raise TypeError(

401 "'sample_weight' must be an array.") # pragma: no cover

402 self.classes_ = numpy.array(sorted(set(y)))

403 if len(self.classes_) != 2:

404 raise RuntimeError(

405 f"The model only supports binary classification but labels are "

406 f"{self.classes_}.")

408 if self.strategy == 'parallel':

409 return self._fit_parallel(X, y, sample_weight)

410 if self.strategy == 'perpendicular':

411 return self._fit_perpendicular(X, y, sample_weight)

412 raise ValueError(

413 f"Unknown strategy '{self.strategy}'.")

415 def _fit_parallel(self, X, y, sample_weight):

416 "Implements the parallel strategy."

417 cls = (y == self.classes_).astype(numpy.int32)

418 estimator = clone(self.estimator)

419 self.tree_ = _DecisionTreeLogisticRegressionNode(estimator, 0.5)

420 self.n_nodes_ = self.tree_.fit(

421 X, cls, sample_weight, self, X.shape) + 1

422 return self

424 def _fit_perpendicular(self, X, y, sample_weight):

425 "Implements the perpendicular strategy."

426 raise NotImplementedError() # pragma: no cover

428 def predict(self, X):

429 """

430 Runs the predictions.

431 """

432 labels = self.tree_.predict(X)

433 return numpy.take(self.classes_, labels)

435 def predict_proba(self, X):

436 """

437 Converts predictions into probabilities.

438 """

439 return self.tree_.predict_proba(X)

441 def decision_function(self, X):

442 """

443 Calls *decision_function*.

444 """

445 raise NotImplementedError( # pragma: no cover

446 "Decision function is not available for this model.")

448 @property

449 def tree_depth_(self):

450 """

451 Returns the maximum depth of the tree.

452 """

453 return self.tree_.tree_depth_

455 def decision_path(self, X, check_input=True):

456 """

457 Returns the decision path.

459 @param X inputs

460 @param check_input unused

461 @return sparse matrix

462 """

463 mat = sparse.lil_matrix((X.shape, self.n_nodes_), dtype=numpy.int32)

464 self.tree_.decision_path(X, mat, numpy.arange(X.shape))

465 return sparse.csr_matrix(mat)

467 def get_leaves_index(self):

468 """

469 Returns the index of every leave.

470 """

471 indices = self.tree_.enumerate_leaves_index()

472 return numpy.array(sorted(indices))