Source code for coniferest.pineforest

import numpy as np
from sklearn.tree._tree import DTYPE as TreeDTYPE  # noqa

from .coniferest import Coniferest, ConiferestEvaluator
from .label import Label
from .utils import average_path_length

__all__ = ["PineForest"]


[docs] class PineForest(Coniferest): """ Pine Forest for active anomaly detection. Pine Forests are filtering isolation forests. That's a simple concept of incorporating prior knowledge about what is anomalous and what is not. Standard fit procedure with two parameters works exactly the same as the isolation forests' one. It differs when we supply additional parameter `labels`, then the behaviour changes. At that case fit generates additional not only `n_trees` but with additional `n_spare_trees` and then filters out `n_spare_trees`, leaving only those `n_trees` that deliver better scores for the data known to be anomalous. Parameters ---------- n_trees : int, optional Number of trees to keep for estimating anomaly scores. n_subsamples : int, optional How many subsamples should be used to build every tree. max_depth : int or None, optional Maximum depth of every tree. If None, `log2(n_subsamples)` is used. n_spare_trees : int, optional Number of trees to generate additionally for further filtering. regenerate_trees : bool, optional Should we throughout all the trees during retraining or should we mix old trees with the fresh ones. False by default, so we mix. weight_ratio : float, optional What is the relative weight of false positives relative to true positives (i.e. we are not interested in negatives in anomaly detection, right?). The weight is used during the filtering process. n_jobs : int, default=-1 Number of threads to use for scoring. If -1, use all available CPUs. random_seed : int or None, optional Random seed. If None - random seed is used. """ def __init__( self, n_trees=100, n_subsamples=256, max_depth=None, n_spare_trees=400, regenerate_trees=False, weight_ratio=1.0, n_jobs=-1, random_seed=None, sampletrees_per_batch=1 << 20, ): super().__init__( trees=[], n_subsamples=n_subsamples, max_depth=max_depth, n_jobs=n_jobs, random_seed=random_seed, sampletrees_per_batch=sampletrees_per_batch, ) self.n_trees = n_trees self.n_spare_trees = n_spare_trees self.weight_ratio = weight_ratio self.regenerate_trees = regenerate_trees self.evaluator = None def _expand_trees(self, data, n_trees): """ Expand the forest. Grow new trees. Parameters ---------- data Data to build the trees from. n_trees What tree population are we aiming at? Returns ------- None """ n = n_trees - len(self.trees) if n > 0: self.trees.extend(self.build_trees(data, n)) def _contract_trees(self, known_data, known_labels, n_trees): """ Contract the forest with a chainsaw. According to the known data. Parameters ---------- known_data Array with features of objects we know labels about. known_labels Array with labels. Of Label type. n_trees Aiming tree population after contraction. Returns ------- None """ n_filter = len(self.trees) - n_trees if n_filter > 0: self.trees = self.filter_trees( trees=self.trees, data=known_data, labels=known_labels, n_filter=n_filter, weight_ratio=self.weight_ratio, )
[docs] def fit(self, data, labels=None): """ Build the trees with the data `data`. Parameters ---------- data Array with feature values of objects. labels Optional. Labels of objects. May be regular, anomalous or unknown. See `Label` data for details. Returns ------- self """ # If no labels were supplied, train with them. if labels is None: self.fit_known(data) return self # Otherwise select known data, and train on it. labels = np.asarray(labels) index = labels != Label.UNKNOWN return self.fit_known(data, data[index, :], labels[index])
[docs] def fit_known(self, data, known_data=None, known_labels=None): """ The same `fit` but with a bit of different API. Known data and labels are separated from training data for time and space optimality. High chances are that `known_data` is much smaller that `data`. At that case it is not reasonable to hold the labels for whole `data`. Parameters ---------- data Training data (array with feature values) to build trees with. known_data Feature values of known data. known_labels Labels of known data. Returns ------- self """ known_data, known_labels = self._validate_known_data(known_data, known_labels) if self.regenerate_trees: self.trees = [] if ( known_data is None or len(known_data) == 0 or known_labels is None or len(known_labels) == 0 or np.all(known_labels == Label.UNKNOWN) ): self._expand_trees(data, self.n_trees) else: self._expand_trees(data, self.n_trees + self.n_spare_trees) self._contract_trees(known_data, known_labels, self.n_trees) self.evaluator = ConiferestEvaluator(self) return self
# Made non-static to make sure we always use it in a way that makes inheritance with truly non-static methods # possible.
[docs] def filter_trees(self, trees, data, labels, n_filter, weight_ratio=1): """ Filter the trees out. Parameters ---------- trees Trees to filter. n_filter Number of trees to filter out. data The labeled objects themselves. labels The labels of the objects. -1 is anomaly, 1 is not anomaly, 0 is uninformative. weight_ratio Weight of the false positive experience relative to false negative. Defaults to 1. """ data = np.asarray(data, dtype=TreeDTYPE) n_samples, _ = data.shape n_trees = len(trees) heights = np.empty(shape=(n_samples, n_trees)) for tree_index in range(n_trees): tree = trees[tree_index] leaves_index = tree.apply(data) n_samples_leaf = tree.n_node_samples[leaves_index] n_samples_leaf = n_samples_leaf.astype(dtype=np.float64) heights[:, tree_index] = ( np.ravel(tree.decision_path(data).sum(axis=1)) + average_path_length(n_samples_leaf) - 1 ) weights = labels.copy() weights[labels == Label.REGULAR] = weight_ratio * Label.REGULAR weighted_paths = (heights * np.reshape(weights, (-1, 1))).sum(axis=0) indices = weighted_paths.argsort()[n_filter:] return [trees[i] for i in indices]
[docs] def score_samples(self, samples): """ Computer scores for the supplied data. Parameters ---------- samples Feature values to compute scores on. Returns ------- Array with computed scores. """ return self.evaluator.score_samples(samples)
[docs] def feature_signature(self, x): return self.evaluator.feature_signature(x)
[docs] def feature_importance(self, x): return self.evaluator.feature_importance(x)
[docs] def apply(self, x): """ Apply the forest to X, return leaf indices. Parameters ---------- x : ndarray shape (n_samples, n_features) 2-d array with features. Returns ------- x_leafs : ndarray of shape (n_samples, n_estimators) For each datapoint x in X and for each tree in the forest, return the index of the leaf x ends up in. """ return self.evaluator.apply(x)