Source code for coniferest.aadforest

from numbers import Real
from typing import Callable

import clarabel
import numpy as np
from scipy import sparse

from .calc_trees import calc_paths_sum  # noqa
from .coniferest import Coniferest, ConiferestEvaluator
from .label import Label

__all__ = ["AADForest"]


class AADEvaluator(ConiferestEvaluator):
    def __init__(self, aad):
        super(AADEvaluator, self).__init__(aad, map_value=aad.map_value)
        self.C_a = aad.C_a
        self.budget = aad.budget
        self.n_jobs = aad.n_jobs
        self.prior_influence = aad.prior_influence
        self.weights = np.full(shape=(self.n_leaves,), fill_value=np.reciprocal(np.sqrt(self.n_leaves)))

        leaf_mask = self.selectors["feature"] < 0
        self.leaf_values = self.selectors["value"][leaf_mask]

    def _q_tau(self, scores):
        if self.budget == "auto":
            # When the regularization is disabled then the problem is degenerate
            if self.prior_influence == 0:
                return 1.0

            return None
        elif isinstance(self.budget, int):
            if self.budget >= len(scores):
                return np.max(scores)

            return np.partition(scores, self.budget)[self.budget]
        elif isinstance(self.budget, float):
            return np.quantile(scores, self.budget)

        raise ValueError('self.budget must be an int or float or "auto"')

    def score_samples(self, x, weights=None):
        """
        Perform the computations.

        Parameters
        ----------
        x
            Features to calculate scores of. Should be C-contiguous for performance.
        weights
            Specific leaf weights to use instead of self.weights

        Returns
        -------
        Array of scores.
        """
        if not x.flags["C_CONTIGUOUS"]:
            x = np.ascontiguousarray(x)

        if weights is None:
            weights = self.weights

        return calc_paths_sum(
            self.selectors,
            self.node_offsets,
            x,
            weights,
            num_threads=self.num_threads,
            batch_size=self.batch_size,
        )

    def fit_known(self, data, known_data, known_labels, prior_weights=None):
        scores = self.score_samples(data)
        q_tau = self._q_tau(scores)
        known_leafs = self.apply(known_data)

        n_anomalies = np.count_nonzero(known_labels == Label.ANOMALY)
        n_nominals = np.count_nonzero(known_labels == Label.REGULAR)
        prior_influence = self.prior_influence(n_anomalies, n_nominals)

        if prior_weights is None:
            prior_weights = self.weights

        n_weights = self.weights.shape[0]
        n_knowns = n_anomalies + n_nominals
        offset_knowns = n_weights
        n_q_tau = int(q_tau is None)
        offset_q_tau = n_weights + n_knowns
        n_variables = n_weights + n_knowns + n_q_tau
        n_constraints = 2 * n_knowns

        # Problem matrix P
        data = np.full_like(self.weights, prior_influence)
        ind = np.arange(n_weights)
        P = sparse.csc_matrix((data, (ind, ind)), shape=(n_variables, n_variables))

        # Problem vector q
        q_known = np.zeros_like(known_labels, dtype=self.weights.dtype)
        if n_anomalies > 0:
            q_known[known_labels == Label.ANOMALY] = self.C_a * np.reciprocal(float(n_anomalies))
        if n_nominals > 0:
            q_known[known_labels == Label.REGULAR] = np.reciprocal(float(n_nominals))

        q = np.concatenate(
            [
                -prior_influence * prior_weights,
                q_known,
                np.zeros(shape=(n_q_tau,), dtype=self.weights.dtype),
            ]
        )

        # Constraints matrix A:
        ## - Weight variables
        ## - Cost variables
        ## - q_tau variable
        col_ind = np.concatenate(
            [
                known_leafs.flatten(),
                np.tile(np.arange(offset_knowns, offset_q_tau), 2),
                np.full((n_knowns,), offset_q_tau) if n_q_tau else [],
            ]
        )
        row_ind = np.concatenate(
            [
                np.repeat(np.arange(n_knowns), self.n_trees),
                np.arange(n_constraints),
                np.arange(n_knowns) if n_q_tau else [],
            ]
        )
        data = np.concatenate(
            [
                (-self.leaf_values[known_leafs] * known_labels.reshape(-1, 1)).flatten(),
                np.full((n_constraints,), -1),
                known_labels if n_q_tau else [],
            ]
        )

        A = sparse.csc_matrix((data, (row_ind, col_ind)), shape=(n_constraints, n_variables))

        # Constraints vector b
        b = np.concatenate(
            [
                np.zeros((n_knowns,)) if n_q_tau else -q_tau * known_labels,
                np.zeros((n_knowns,)),
            ]
        )

        # Constraints type: all \ge 0
        cones = [
            clarabel.NonnegativeConeT(n_constraints),
        ]

        settings = clarabel.DefaultSettings()
        settings.verbose = False
        # max_threads = 0 means automatic
        settings.max_threads = self.n_jobs if self.n_jobs > 0 else 0

        solver = clarabel.DefaultSolver(P, q, A, b, cones, settings)

        res = solver.solve()

        self.weights = np.asarray(res.x[:n_weights])
        weights_norm = np.linalg.norm(self.weights)
        self.weights /= weights_norm


[docs] class AADForest(Coniferest): r""" Active Anomaly Detection with Isolation Forest. See Das et al., 2017 https://arxiv.org/abs/1708.09441 The method solves the optimization problem: .. math:: \mathbf{w} = \arg\min_{\mathbf{w}} \left( \frac{C_a}{\left|\mathcal{A}\right|} \sum_{i \in \mathcal{A}} \mathrm{ReLU}\left(s(\mathbf{x_i} | \mathbf{w}) - q_{\tau}\right) + \frac{1}{\left|\mathcal{N}\right|} \sum_{i \in \mathcal{N}} \mathrm{ReLU}\left(q_{\tau} - s(\mathbf{x_i} | \mathbf{w})\right) + \frac{\alpha}{2} \lVert \mathbf{w} - \mathbf{w_0}\rVert^2\right), where :math:`C_a` is `C_a`, regularization parameter :math:`\alpha` is `prior_influence`, :math:`\mathcal{A}` is a set of known anomalies, :math:`\mathcal{N}` is a set of known nominals, :math:`s(\mathbf{x_i} | \mathbf{w})` is the anomaly score of instance with features :math:`\mathbf{x_i}` given weights :math:`\mathbf{w}`. This problem is reformulated as an equivalent quadratic programming problem: .. math:: \begin{bmatrix} \mathbf{w}\\ \mathbf{u} \end{bmatrix} = \arg\min_{\mathbf{w}, \mathbf{u}} \left( \frac{C_a}{\left|\mathcal{A}\right|} \sum_{i \in \mathcal{A}} u_i + \frac{1}{\left|\mathcal{N}\right|} \sum_{i \in \mathcal{N}} u_i + \frac{\alpha}{2} \lVert \mathbf{w} - \mathbf{w_0} \rVert^2\right), with the following convex constraints: .. math:: u_i &\ge 0 \quad & i \in \mathcal{A} \cup \mathcal{N},\\ u_i - s(\mathbf{x_i} | \mathbf{w}) &\ge - q_{\tau}\quad & i \in \mathcal{A},\\ u_i + s(\mathbf{x_i} | \mathbf{w}) &\ge q_{\tau}\quad & i \in \mathcal{N}.\\ Parameters ---------- n_trees : int, optional Number of trees in the isolation forest. n_subsamples : int, optional How many subsamples should be used to build every tree. max_depth : int or None, optional Maximum depth of every tree. If None, `log2(n_subsamples)` is used. budget : int or float or "auto", optional Budget of anomalies. If the type is floating point it is considered as fraction of full data. If the type is integer it is considered as the number of items. If string "auto" is set then the exact parameter is found during the train. Default is "auto". n_jobs : int, default=-1 Number of threads to use for scoring. If -1, use all available CPUs. random_seed : int or None, optional Random seed to use for reproducibility. If None - random seed is used. prior_influence : float or callable, optional An regularization coefficient value in the loss functioin. Default is 1.0. Signature: '(anomaly_count, nominal_count) -> float' map_value : ["const", "exponential", "linear", "reciprocal"] or callable, optional An function applied to the leaf depth before weighting. Possible meaning variants are: 1, 1-exp(-x), x, -1/x. """ def __init__( self, n_trees=100, n_subsamples=256, max_depth=None, budget="auto", C_a=1.0, prior_influence=1.0, n_jobs=-1, random_seed=None, sampletrees_per_batch=1 << 20, map_value=None, ): super().__init__( trees=[], n_subsamples=n_subsamples, max_depth=max_depth, n_jobs=n_jobs, random_seed=random_seed, sampletrees_per_batch=sampletrees_per_batch, ) self.n_trees = n_trees if not (isinstance(budget, (int, float)) or budget == "auto"): raise ValueError('budget must be an int or float or "auto"') self.budget = budget self.C_a = C_a if isinstance(prior_influence, Callable): self.prior_influence = prior_influence elif isinstance(prior_influence, Real): self.prior_influence = lambda anomaly_count, nominal_count: prior_influence else: raise ValueError("prior_influence is neither a callable nor a constant") MAP_VALUES = { "const": np.ones_like, "exponential": lambda x: -np.expm1(-x), "linear": lambda x: x, "reciprocal": lambda x: -np.reciprocal(x), } if map_value is None: map_value = "reciprocal" if isinstance(map_value, Callable): self.map_value = map_value elif map_value in MAP_VALUES: self.map_value = MAP_VALUES[map_value] else: raise ValueError(f"map_value is neither a callable nor one of {', '.join(MAP_VALUES.keys())}.") self.evaluator = None def _build_trees(self, data): if len(self.trees) == 0: self.trees = self.build_trees(data, self.n_trees) self.evaluator = AADEvaluator(self)
[docs] def fit(self, data, labels=None): """ Build the trees with the data `data`. Parameters ---------- data Array with feature values of objects. labels Optional. Labels of objects. May be regular, anomalous or unknown. See `Label` data for details. Returns ------- self """ # If no labels were supplied, train with them. if labels is None: return self.fit_known(data) # Otherwise select known data, and train on it. labels = np.asarray(labels) index = labels != Label.UNKNOWN return self.fit_known(data, data[index, :], labels[index])
[docs] def fit_known(self, data, known_data=None, known_labels=None): """ The same `fit` but with a bit of different API. Known data and labels are separated from training data for time and space optimality. High chances are that `known_data` is much smaller that `data`. At that case it is not reasonable to hold the labels for whole `data`. Parameters ---------- data Training data (array with feature values) to build trees with. known_data Feature values of known data. known_labels Labels of known data. Returns ------- self """ known_data, known_labels = self._validate_known_data(known_data, known_labels) self._build_trees(data) if ( known_data is None or len(known_data) == 0 or known_labels is None or len(known_labels) == 0 or np.all(known_labels == Label.UNKNOWN) ): return self index = known_labels != Label.UNKNOWN known_data, known_labels = known_data[index], known_labels[index] self.evaluator.fit_known(data, known_data, known_labels) return self
[docs] def score_samples(self, samples): """ Computer scores for the supplied data. Parameters ---------- samples Feature values to compute scores on. Returns ------- Array with computed scores. """ return self.evaluator.score_samples(samples)
[docs] def feature_signature(self, x): raise NotImplementedError()
[docs] def feature_importance(self, x): raise NotImplementedError()
[docs] def apply(self, x): """ Apply the forest to X, return leaf indices. Parameters ---------- x : ndarray shape (n_samples, n_features) 2-d array with features. Returns ------- x_leafs : ndarray of shape (n_samples, n_estimators) For each datapoint x in X and for each tree in the forest, return the index of the leaf x ends up in. """ return self.evaluator.apply(x)