Skip to content

API Reference

Complete API documentation for all Nonconform modules and classes.

Core Modules

Estimation

nonconform.estimation

Conformal anomaly detection estimators.

This module provides the core conformal anomaly detection classes that wrap PyOD detectors with uncertainty quantification capabilities.

BaseConformalDetector

Bases: ABC

Abstract base class for all conformal anomaly detectors.

Defines the core interface that all conformal anomaly detection implementations must provide. This ensures consistent behavior across different conformal methods (standard, weighted, etc.) while maintaining flexibility.

Design Pattern: All conformal detectors follow a two-phase workflow: 1. Calibration Phase: fit() trains detector, computes calibration scores 2. Inference Phase: predict() converts new data scores to valid p-values

Implementation Requirements: Subclasses must implement both abstract methods to provide: - Training/calibration logic in fit() - P-value generation logic in predict()

Note

This is an abstract class and cannot be instantiated directly. Use concrete implementations like StandardConformalDetector or WeightedConformalDetector.

fit abstractmethod
fit(x: DataFrame | ndarray) -> None

Fit the detector model(s) and compute calibration scores.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The dataset used for fitting the model(s) and determining calibration scores.

required
Source code in nonconform/estimation/base.py
@_ensure_numpy_array
@abstractmethod
def fit(self, x: pd.DataFrame | np.ndarray) -> None:
    """Fit the detector model(s) and compute calibration scores.

    Args:
        x (pd.DataFrame | np.ndarray): The dataset used for
            fitting the model(s) and determining calibration scores.
    """
    pass
predict abstractmethod
predict(
    x: DataFrame | ndarray, raw: bool = False
) -> np.ndarray

Generate anomaly estimates or p-values for new data.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The new data instances for which to make anomaly estimates.

required
raw bool

Whether to return raw anomaly scores or processed anomaly estimates (e.g., p-values). Defaults to False.

False

Returns:

Type Description
ndarray

np.ndarray: An array containing the anomaly estimates.

Source code in nonconform/estimation/base.py
@_ensure_numpy_array
@abstractmethod
def predict(
    self,
    x: pd.DataFrame | np.ndarray,
    raw: bool = False,
) -> np.ndarray:
    """Generate anomaly estimates or p-values for new data.

    Args:
        x (pd.DataFrame | np.ndarray): The new data instances
            for which to make anomaly estimates.
        raw (bool, optional): Whether to return raw anomaly scores or
            processed anomaly estimates (e.g., p-values). Defaults to False.

    Returns:
        np.ndarray: An array containing the anomaly estimates.
    """
    pass

ConformalDetector

ConformalDetector(
    detector: BaseDetector,
    strategy: BaseStrategy,
    weight_estimator: BaseWeightEstimator | None = None,
    aggregation: Aggregation = Aggregation.MEDIAN,
    seed: int | None = None,
)

Bases: BaseConformalDetector

Unified conformal anomaly detector with optional covariate shift handling.

Provides distribution-free anomaly detection with valid p-values and False Discovery Rate (FDR) control by wrapping any PyOD detector with conformal inference. Optionally handles covariate shift through importance weighting when a weight estimator is specified.

When no weight estimator is provided (standard conformal prediction): - Uses classical conformal inference for exchangeable data - Provides optimal performance and memory usage - Suitable when training and test data come from the same distribution

When a weight estimator is provided (weighted conformal prediction): - Handles distribution shift between calibration and test data - Estimates importance weights to maintain statistical validity - Slightly higher computational cost but robust to covariate shift

Examples:

Standard conformal prediction (no distribution shift):

from pyod.models.iforest import IForest
from nonconform.estimation import ConformalDetector
from nonconform.strategy import Split

# Create standard conformal detector
detector = ConformalDetector(
    detector=IForest(), strategy=Split(n_calib=0.2), seed=42
)

# Fit on normal training data
detector.fit(X_train)

# Get p-values for test data
p_values = detector.predict(X_test)

Weighted conformal prediction (with distribution shift):

from nonconform.estimation.weight import LogisticWeightEstimator

# Create weighted conformal detector
detector = ConformalDetector(
    detector=IForest(),
    strategy=Split(n_calib=0.2),
    weight_estimator=LogisticWeightEstimator(seed=42),
    seed=42,
)

# Same usage as standard conformal
detector.fit(X_train)
p_values = detector.predict(X_test)

Attributes:

Name Type Description
detector BaseDetector

The underlying PyOD anomaly detection model.

strategy BaseStrategy

The calibration strategy for computing p-values.

weight_estimator BaseWeightEstimator | None

Optional weight estimator for handling covariate shift.

aggregation Aggregation

Method for combining scores from multiple models.

seed int | None

Random seed for reproducible results.

detector_set list[BaseDetector]

List of trained detector models (populated after fit).

calibration_set ndarray

Calibration scores for p-value computation (populated by fit).

is_fitted bool

Whether the detector has been fitted.

calibration_samples ndarray

Data instances used for calibration (only for weighted mode).

Parameters:

Name Type Description Default
detector BaseDetector

The base anomaly detection model to be used (e.g., an instance of a PyOD detector).

required
strategy BaseStrategy

The conformal strategy to apply for fitting and calibration.

required
weight_estimator BaseWeightEstimator | None

Weight estimator for handling covariate shift. If None, uses standard conformal prediction (equivalent to IdentityWeightEstimator). Defaults to None.

None
aggregation Aggregation

Method used for aggregating scores from multiple detector models. Defaults to Aggregation.MEDIAN.

MEDIAN
seed int | None

Random seed for reproducibility. Defaults to None.

None

Raises:

Type Description
ValueError

If seed is negative.

TypeError

If aggregation is not an Aggregation enum.

Source code in nonconform/estimation/conformal.py
def __init__(
    self,
    detector: PyODBaseDetector,
    strategy: BaseStrategy,
    weight_estimator: BaseWeightEstimator | None = None,
    aggregation: Aggregation = Aggregation.MEDIAN,
    seed: int | None = None,
):
    """Initialize the ConformalDetector.

    Args:
        detector (PyODBaseDetector): The base anomaly detection model to be
            used (e.g., an instance of a PyOD detector).
        strategy (BaseStrategy): The conformal strategy to apply for fitting
            and calibration.
        weight_estimator (BaseWeightEstimator | None, optional): Weight estimator
            for handling covariate shift. If None, uses standard conformal
            prediction (equivalent to IdentityWeightEstimator). Defaults to None.
        aggregation (Aggregation, optional): Method used for aggregating
            scores from multiple detector models. Defaults to Aggregation.MEDIAN.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.

    Raises:
        ValueError: If seed is negative.
        TypeError: If aggregation is not an Aggregation enum.
    """
    if seed is not None and seed < 0:
        raise ValueError(f"seed must be a non-negative integer or None, got {seed}")
    if not isinstance(aggregation, Aggregation):
        valid_methods = ", ".join([f"Aggregation.{a.name}" for a in Aggregation])
        raise TypeError(
            f"aggregation must be an Aggregation enum, "
            f"got {type(aggregation).__name__}. "
            f"Valid options: {valid_methods}. "
            f"Example: ConformalDetector(detector=model, "
            f"strategy=strategy, aggregation=Aggregation.MEDIAN)"
        )

    self.detector: PyODBaseDetector = _set_params(detector, seed)
    self.strategy: BaseStrategy = strategy
    self.weight_estimator: BaseWeightEstimator | None = weight_estimator
    self.aggregation: Aggregation = aggregation
    self.seed: int | None = seed

    # Determine if we're in weighted mode
    self._is_weighted_mode = weight_estimator is not None and not isinstance(
        weight_estimator, IdentityWeightEstimator
    )

    self._detector_set: list[PyODBaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_samples: np.ndarray = np.array([])
detector_set property
detector_set: list[BaseDetector]

Returns a copy of the list of trained detector models.

Returns:

Type Description
list[BaseDetector]

list[PyODBaseDetector]: Copy of trained detectors populated after fit().

Note

Returns a defensive copy to prevent external modification of internal state.

calibration_set property
calibration_set: ndarray

Returns a copy of the calibration scores.

Returns:

Type Description
ndarray

numpy.ndarray: Copy of calibration scores populated after fit().

Note

Returns a defensive copy to prevent external modification of internal state.

calibration_samples property
calibration_samples: ndarray

Returns a copy of the calibration samples used for weight computation.

Only available when using weighted conformal prediction (non-identity weight estimator). For standard conformal prediction, returns an empty array.

Returns:

Type Description
ndarray

np.ndarray: Copy of data instances used for calibration, or empty array if using standard conformal prediction.

Note

Returns a defensive copy to prevent external modification of internal state.

is_fitted property
is_fitted: bool

Returns whether the detector has been fitted.

Returns:

Name Type Description
bool bool

True if fit() has been called and models are trained.

fit
fit(
    x: DataFrame | ndarray, iteration_callback=None
) -> None

Fits the detector model(s) and computes calibration scores.

This method uses the specified strategy to train the base detector(s) on parts of the provided data and then calculates non-conformity scores on other parts (calibration set) to establish a baseline for typical behavior. The resulting trained models and calibration scores are stored in self._detector_set and self._calibration_set.

For weighted conformal prediction, calibration samples are also stored for weight computation during prediction.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The dataset used for fitting the model(s) and determining calibration scores. The strategy will dictate how this data is split or used.

required
iteration_callback callable | None

Optional callback function for strategies that support iteration tracking (e.g., Bootstrap). Called after each iteration with (iteration, scores). Defaults to None.

None
Source code in nonconform/estimation/conformal.py
@_ensure_numpy_array
def fit(self, x: pd.DataFrame | np.ndarray, iteration_callback=None) -> None:
    """Fits the detector model(s) and computes calibration scores.

    This method uses the specified strategy to train the base detector(s)
    on parts of the provided data and then calculates non-conformity
    scores on other parts (calibration set) to establish a baseline for
    typical behavior. The resulting trained models and calibration scores
    are stored in `self._detector_set` and `self._calibration_set`.

    For weighted conformal prediction, calibration samples are also stored
    for weight computation during prediction.

    Args:
        x (pd.DataFrame | np.ndarray): The dataset used for
            fitting the model(s) and determining calibration scores.
            The strategy will dictate how this data is split or used.
        iteration_callback (callable | None): Optional callback function
            for strategies that support iteration tracking (e.g., Bootstrap).
            Called after each iteration with (iteration, scores). Defaults to None.
    """
    # Pass weighted flag only when using non-identity weight estimator
    self._detector_set, self._calibration_set = self.strategy.fit_calibrate(
        x=x,
        detector=self.detector,
        weighted=self._is_weighted_mode,
        seed=self.seed,
        iteration_callback=iteration_callback,
    )

    # Store calibration samples only for weighted mode
    if self._is_weighted_mode:
        if (
            self.strategy.calibration_ids is not None
            and len(self.strategy.calibration_ids) > 0
        ):
            self._calibration_samples = x[self.strategy.calibration_ids]
        else:
            # Handle case where calibration_ids might be empty or None
            self._calibration_samples = np.array([])
predict
predict(
    x: DataFrame | ndarray, raw: bool = False
) -> np.ndarray

Generate anomaly estimates (p-values or raw scores) for new data.

Based on the fitted models and calibration scores, this method evaluates new data points. For standard conformal prediction, returns p-values based on the calibration distribution. For weighted conformal prediction, incorporates importance weights to handle covariate shift.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The new data instances for which to generate anomaly estimates.

required
raw bool

Whether to return raw anomaly scores or p-values. Defaults to False. * If True: Returns the aggregated anomaly scores (non-conformity estimates) from the detector set for each data point. * If False: Returns the p-values for each data point based on the calibration set, optionally weighted for distribution shift.

False

Returns:

Type Description
ndarray

np.ndarray: An array containing the anomaly estimates. The content of the

ndarray

array depends on the raw argument:

ndarray
  • If raw=True, an array of anomaly scores (float).
ndarray
  • If raw=False, an array of p-values (float).
Source code in nonconform/estimation/conformal.py
@_ensure_numpy_array
def predict(
    self,
    x: pd.DataFrame | np.ndarray,
    raw: bool = False,
) -> np.ndarray:
    """Generate anomaly estimates (p-values or raw scores) for new data.

    Based on the fitted models and calibration scores, this method evaluates
    new data points. For standard conformal prediction, returns p-values based
    on the calibration distribution. For weighted conformal prediction,
    incorporates importance weights to handle covariate shift.

    Args:
        x (pd.DataFrame | np.ndarray): The new data instances
            for which to generate anomaly estimates.
        raw (bool, optional): Whether to return raw anomaly scores or
            p-values. Defaults to False.
            * If True: Returns the aggregated anomaly scores (non-conformity
              estimates) from the detector set for each data point.
            * If False: Returns the p-values for each data point based on
              the calibration set, optionally weighted for distribution shift.

    Returns:
        np.ndarray: An array containing the anomaly estimates. The content of the
        array depends on the `raw` argument:
        - If raw=True, an array of anomaly scores (float).
        - If raw=False, an array of p-values (float).
    """
    logger = get_logger("estimation.conformal")
    iterable = (
        tqdm(
            self._detector_set,
            total=len(self._detector_set),
            desc=f"Aggregating {len(self._detector_set)} models",
        )
        if logger.isEnabledFor(logging.DEBUG)
        else self._detector_set
    )
    scores_list = [model.decision_function(x) for model in iterable]

    estimates = aggregate(method=self.aggregation, scores=scores_list)

    if raw:
        return estimates

    # Choose p-value calculation method based on weight estimator
    if self._is_weighted_mode and self.weight_estimator is not None:
        # Weighted p-value calculation
        self.weight_estimator.fit(self._calibration_samples, x)
        w_cal, w_x = self.weight_estimator.get_weights()
        return calculate_weighted_p_val(
            np.array(estimates),
            self._calibration_set,
            np.array(w_x),
            np.array(w_cal),
        )
    else:
        # Standard p-value calculation (faster path)
        return calculate_p_val(
            scores=estimates, calibration_set=self._calibration_set
        )

base

BaseConformalDetector

Bases: ABC

Abstract base class for all conformal anomaly detectors.

Defines the core interface that all conformal anomaly detection implementations must provide. This ensures consistent behavior across different conformal methods (standard, weighted, etc.) while maintaining flexibility.

Design Pattern: All conformal detectors follow a two-phase workflow: 1. Calibration Phase: fit() trains detector, computes calibration scores 2. Inference Phase: predict() converts new data scores to valid p-values

Implementation Requirements: Subclasses must implement both abstract methods to provide: - Training/calibration logic in fit() - P-value generation logic in predict()

Note

This is an abstract class and cannot be instantiated directly. Use concrete implementations like StandardConformalDetector or WeightedConformalDetector.

fit abstractmethod
fit(x: DataFrame | ndarray) -> None

Fit the detector model(s) and compute calibration scores.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The dataset used for fitting the model(s) and determining calibration scores.

required
Source code in nonconform/estimation/base.py
@_ensure_numpy_array
@abstractmethod
def fit(self, x: pd.DataFrame | np.ndarray) -> None:
    """Fit the detector model(s) and compute calibration scores.

    Args:
        x (pd.DataFrame | np.ndarray): The dataset used for
            fitting the model(s) and determining calibration scores.
    """
    pass
predict abstractmethod
predict(
    x: DataFrame | ndarray, raw: bool = False
) -> np.ndarray

Generate anomaly estimates or p-values for new data.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The new data instances for which to make anomaly estimates.

required
raw bool

Whether to return raw anomaly scores or processed anomaly estimates (e.g., p-values). Defaults to False.

False

Returns:

Type Description
ndarray

np.ndarray: An array containing the anomaly estimates.

Source code in nonconform/estimation/base.py
@_ensure_numpy_array
@abstractmethod
def predict(
    self,
    x: pd.DataFrame | np.ndarray,
    raw: bool = False,
) -> np.ndarray:
    """Generate anomaly estimates or p-values for new data.

    Args:
        x (pd.DataFrame | np.ndarray): The new data instances
            for which to make anomaly estimates.
        raw (bool, optional): Whether to return raw anomaly scores or
            processed anomaly estimates (e.g., p-values). Defaults to False.

    Returns:
        np.ndarray: An array containing the anomaly estimates.
    """
    pass

conformal

ConformalDetector
ConformalDetector(
    detector: BaseDetector,
    strategy: BaseStrategy,
    weight_estimator: BaseWeightEstimator | None = None,
    aggregation: Aggregation = Aggregation.MEDIAN,
    seed: int | None = None,
)

Bases: BaseConformalDetector

Unified conformal anomaly detector with optional covariate shift handling.

Provides distribution-free anomaly detection with valid p-values and False Discovery Rate (FDR) control by wrapping any PyOD detector with conformal inference. Optionally handles covariate shift through importance weighting when a weight estimator is specified.

When no weight estimator is provided (standard conformal prediction): - Uses classical conformal inference for exchangeable data - Provides optimal performance and memory usage - Suitable when training and test data come from the same distribution

When a weight estimator is provided (weighted conformal prediction): - Handles distribution shift between calibration and test data - Estimates importance weights to maintain statistical validity - Slightly higher computational cost but robust to covariate shift

Examples:

Standard conformal prediction (no distribution shift):

from pyod.models.iforest import IForest
from nonconform.estimation import ConformalDetector
from nonconform.strategy import Split

# Create standard conformal detector
detector = ConformalDetector(
    detector=IForest(), strategy=Split(n_calib=0.2), seed=42
)

# Fit on normal training data
detector.fit(X_train)

# Get p-values for test data
p_values = detector.predict(X_test)

Weighted conformal prediction (with distribution shift):

from nonconform.estimation.weight import LogisticWeightEstimator

# Create weighted conformal detector
detector = ConformalDetector(
    detector=IForest(),
    strategy=Split(n_calib=0.2),
    weight_estimator=LogisticWeightEstimator(seed=42),
    seed=42,
)

# Same usage as standard conformal
detector.fit(X_train)
p_values = detector.predict(X_test)

Attributes:

Name Type Description
detector BaseDetector

The underlying PyOD anomaly detection model.

strategy BaseStrategy

The calibration strategy for computing p-values.

weight_estimator BaseWeightEstimator | None

Optional weight estimator for handling covariate shift.

aggregation Aggregation

Method for combining scores from multiple models.

seed int | None

Random seed for reproducible results.

detector_set list[BaseDetector]

List of trained detector models (populated after fit).

calibration_set ndarray

Calibration scores for p-value computation (populated by fit).

is_fitted bool

Whether the detector has been fitted.

calibration_samples ndarray

Data instances used for calibration (only for weighted mode).

Parameters:

Name Type Description Default
detector BaseDetector

The base anomaly detection model to be used (e.g., an instance of a PyOD detector).

required
strategy BaseStrategy

The conformal strategy to apply for fitting and calibration.

required
weight_estimator BaseWeightEstimator | None

Weight estimator for handling covariate shift. If None, uses standard conformal prediction (equivalent to IdentityWeightEstimator). Defaults to None.

None
aggregation Aggregation

Method used for aggregating scores from multiple detector models. Defaults to Aggregation.MEDIAN.

MEDIAN
seed int | None

Random seed for reproducibility. Defaults to None.

None

Raises:

Type Description
ValueError

If seed is negative.

TypeError

If aggregation is not an Aggregation enum.

Source code in nonconform/estimation/conformal.py
def __init__(
    self,
    detector: PyODBaseDetector,
    strategy: BaseStrategy,
    weight_estimator: BaseWeightEstimator | None = None,
    aggregation: Aggregation = Aggregation.MEDIAN,
    seed: int | None = None,
):
    """Initialize the ConformalDetector.

    Args:
        detector (PyODBaseDetector): The base anomaly detection model to be
            used (e.g., an instance of a PyOD detector).
        strategy (BaseStrategy): The conformal strategy to apply for fitting
            and calibration.
        weight_estimator (BaseWeightEstimator | None, optional): Weight estimator
            for handling covariate shift. If None, uses standard conformal
            prediction (equivalent to IdentityWeightEstimator). Defaults to None.
        aggregation (Aggregation, optional): Method used for aggregating
            scores from multiple detector models. Defaults to Aggregation.MEDIAN.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.

    Raises:
        ValueError: If seed is negative.
        TypeError: If aggregation is not an Aggregation enum.
    """
    if seed is not None and seed < 0:
        raise ValueError(f"seed must be a non-negative integer or None, got {seed}")
    if not isinstance(aggregation, Aggregation):
        valid_methods = ", ".join([f"Aggregation.{a.name}" for a in Aggregation])
        raise TypeError(
            f"aggregation must be an Aggregation enum, "
            f"got {type(aggregation).__name__}. "
            f"Valid options: {valid_methods}. "
            f"Example: ConformalDetector(detector=model, "
            f"strategy=strategy, aggregation=Aggregation.MEDIAN)"
        )

    self.detector: PyODBaseDetector = _set_params(detector, seed)
    self.strategy: BaseStrategy = strategy
    self.weight_estimator: BaseWeightEstimator | None = weight_estimator
    self.aggregation: Aggregation = aggregation
    self.seed: int | None = seed

    # Determine if we're in weighted mode
    self._is_weighted_mode = weight_estimator is not None and not isinstance(
        weight_estimator, IdentityWeightEstimator
    )

    self._detector_set: list[PyODBaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_samples: np.ndarray = np.array([])
detector_set property
detector_set: list[BaseDetector]

Returns a copy of the list of trained detector models.

Returns:

Type Description
list[BaseDetector]

list[PyODBaseDetector]: Copy of trained detectors populated after fit().

Note

Returns a defensive copy to prevent external modification of internal state.

calibration_set property
calibration_set: ndarray

Returns a copy of the calibration scores.

Returns:

Type Description
ndarray

numpy.ndarray: Copy of calibration scores populated after fit().

Note

Returns a defensive copy to prevent external modification of internal state.

calibration_samples property
calibration_samples: ndarray

Returns a copy of the calibration samples used for weight computation.

Only available when using weighted conformal prediction (non-identity weight estimator). For standard conformal prediction, returns an empty array.

Returns:

Type Description
ndarray

np.ndarray: Copy of data instances used for calibration, or empty array if using standard conformal prediction.

Note

Returns a defensive copy to prevent external modification of internal state.

is_fitted property
is_fitted: bool

Returns whether the detector has been fitted.

Returns:

Name Type Description
bool bool

True if fit() has been called and models are trained.

fit
fit(
    x: DataFrame | ndarray, iteration_callback=None
) -> None

Fits the detector model(s) and computes calibration scores.

This method uses the specified strategy to train the base detector(s) on parts of the provided data and then calculates non-conformity scores on other parts (calibration set) to establish a baseline for typical behavior. The resulting trained models and calibration scores are stored in self._detector_set and self._calibration_set.

For weighted conformal prediction, calibration samples are also stored for weight computation during prediction.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The dataset used for fitting the model(s) and determining calibration scores. The strategy will dictate how this data is split or used.

required
iteration_callback callable | None

Optional callback function for strategies that support iteration tracking (e.g., Bootstrap). Called after each iteration with (iteration, scores). Defaults to None.

None
Source code in nonconform/estimation/conformal.py
@_ensure_numpy_array
def fit(self, x: pd.DataFrame | np.ndarray, iteration_callback=None) -> None:
    """Fits the detector model(s) and computes calibration scores.

    This method uses the specified strategy to train the base detector(s)
    on parts of the provided data and then calculates non-conformity
    scores on other parts (calibration set) to establish a baseline for
    typical behavior. The resulting trained models and calibration scores
    are stored in `self._detector_set` and `self._calibration_set`.

    For weighted conformal prediction, calibration samples are also stored
    for weight computation during prediction.

    Args:
        x (pd.DataFrame | np.ndarray): The dataset used for
            fitting the model(s) and determining calibration scores.
            The strategy will dictate how this data is split or used.
        iteration_callback (callable | None): Optional callback function
            for strategies that support iteration tracking (e.g., Bootstrap).
            Called after each iteration with (iteration, scores). Defaults to None.
    """
    # Pass weighted flag only when using non-identity weight estimator
    self._detector_set, self._calibration_set = self.strategy.fit_calibrate(
        x=x,
        detector=self.detector,
        weighted=self._is_weighted_mode,
        seed=self.seed,
        iteration_callback=iteration_callback,
    )

    # Store calibration samples only for weighted mode
    if self._is_weighted_mode:
        if (
            self.strategy.calibration_ids is not None
            and len(self.strategy.calibration_ids) > 0
        ):
            self._calibration_samples = x[self.strategy.calibration_ids]
        else:
            # Handle case where calibration_ids might be empty or None
            self._calibration_samples = np.array([])
predict
predict(
    x: DataFrame | ndarray, raw: bool = False
) -> np.ndarray

Generate anomaly estimates (p-values or raw scores) for new data.

Based on the fitted models and calibration scores, this method evaluates new data points. For standard conformal prediction, returns p-values based on the calibration distribution. For weighted conformal prediction, incorporates importance weights to handle covariate shift.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The new data instances for which to generate anomaly estimates.

required
raw bool

Whether to return raw anomaly scores or p-values. Defaults to False. * If True: Returns the aggregated anomaly scores (non-conformity estimates) from the detector set for each data point. * If False: Returns the p-values for each data point based on the calibration set, optionally weighted for distribution shift.

False

Returns:

Type Description
ndarray

np.ndarray: An array containing the anomaly estimates. The content of the

ndarray

array depends on the raw argument:

ndarray
  • If raw=True, an array of anomaly scores (float).
ndarray
  • If raw=False, an array of p-values (float).
Source code in nonconform/estimation/conformal.py
@_ensure_numpy_array
def predict(
    self,
    x: pd.DataFrame | np.ndarray,
    raw: bool = False,
) -> np.ndarray:
    """Generate anomaly estimates (p-values or raw scores) for new data.

    Based on the fitted models and calibration scores, this method evaluates
    new data points. For standard conformal prediction, returns p-values based
    on the calibration distribution. For weighted conformal prediction,
    incorporates importance weights to handle covariate shift.

    Args:
        x (pd.DataFrame | np.ndarray): The new data instances
            for which to generate anomaly estimates.
        raw (bool, optional): Whether to return raw anomaly scores or
            p-values. Defaults to False.
            * If True: Returns the aggregated anomaly scores (non-conformity
              estimates) from the detector set for each data point.
            * If False: Returns the p-values for each data point based on
              the calibration set, optionally weighted for distribution shift.

    Returns:
        np.ndarray: An array containing the anomaly estimates. The content of the
        array depends on the `raw` argument:
        - If raw=True, an array of anomaly scores (float).
        - If raw=False, an array of p-values (float).
    """
    logger = get_logger("estimation.conformal")
    iterable = (
        tqdm(
            self._detector_set,
            total=len(self._detector_set),
            desc=f"Aggregating {len(self._detector_set)} models",
        )
        if logger.isEnabledFor(logging.DEBUG)
        else self._detector_set
    )
    scores_list = [model.decision_function(x) for model in iterable]

    estimates = aggregate(method=self.aggregation, scores=scores_list)

    if raw:
        return estimates

    # Choose p-value calculation method based on weight estimator
    if self._is_weighted_mode and self.weight_estimator is not None:
        # Weighted p-value calculation
        self.weight_estimator.fit(self._calibration_samples, x)
        w_cal, w_x = self.weight_estimator.get_weights()
        return calculate_weighted_p_val(
            np.array(estimates),
            self._calibration_set,
            np.array(w_x),
            np.array(w_cal),
        )
    else:
        # Standard p-value calculation (faster path)
        return calculate_p_val(
            scores=estimates, calibration_set=self._calibration_set
        )

weight

Weight estimators for covariate shift in conformal prediction.

This module provides various weight estimation strategies for handling distribution shift between calibration and test data in weighted conformal prediction.

BaseWeightEstimator

Bases: ABC

Abstract base class for weight estimators in weighted conformal prediction.

Weight estimators compute importance weights to correct for covariate shift between calibration and test distributions. They estimate density ratios w(x) = p_test(x) / p_calib(x) which are used to reweight conformal scores for better coverage guarantees under distribution shift.

Subclasses must implement the fit() and get_weights() methods to provide specific weight estimation strategies (e.g., logistic regression, random forest).

fit abstractmethod
fit(calibration_samples: ndarray, test_samples: ndarray)

Estimate density ratio weights

Source code in nonconform/estimation/weight/base.py
@abstractmethod
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray):
    """Estimate density ratio weights"""
    pass
get_weights abstractmethod
get_weights() -> tuple[np.ndarray, np.ndarray]

Return (calib_weights, test_weights)

Source code in nonconform/estimation/weight/base.py
@abstractmethod
def get_weights(self) -> tuple[np.ndarray, np.ndarray]:
    """Return (calib_weights, test_weights)"""
    pass
ForestWeightEstimator
ForestWeightEstimator(
    n_estimators: int = 100,
    max_depth: int | None = 5,
    min_samples_leaf: int = 10,
    clip_quantile: float = 0.05,
    seed: int | None = None,
)

Bases: BaseWeightEstimator

Random Forest-based weight estimator for covariate shift.

Uses Random Forest classifier to estimate density ratios between calibration and test distributions. Random Forest can capture non-linear relationships and complex interactions between features, making it suitable for handling more complex covariate shift patterns than logistic regression.

The Random Forest is trained to distinguish between calibration and test samples, and the predicted probabilities are used to compute importance weights w(x) = p_test(x) / p_calib(x).

Parameters:

Name Type Description Default
n_estimators int

Number of trees in the forest. Defaults to 100.

100
max_depth int

Maximum depth of trees. If None, nodes are expanded until all leaves are pure. Defaults to 5 to prevent overfitting.

5
min_samples_leaf int

Minimum number of samples required to be at a leaf node. Defaults to 10 to prevent overfitting.

10
clip_quantile float

Quantile for weight clipping. If 0.05, clips to 5th and 95th percentiles. If None, uses fixed [0.35, 45.0] range.

0.05
seed int

Random seed for reproducible results.

None
Source code in nonconform/estimation/weight/forest.py
def __init__(
    self,
    n_estimators: int = 100,
    max_depth: int | None = 5,
    min_samples_leaf: int = 10,
    clip_quantile: float = 0.05,
    seed: int | None = None,
):
    self.n_estimators = n_estimators
    self.max_depth = max_depth
    self.min_samples_leaf = min_samples_leaf
    self.clip_quantile = clip_quantile
    self.seed = seed
    self._w_calib = None
    self._w_test = None
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the Random Forest weight estimator on calibration and test samples.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required

Raises:

Type Description
ValueError

If calibration_samples is empty.

Source code in nonconform/estimation/weight/forest.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the Random Forest weight estimator on calibration and test samples.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.

    Raises:
        ValueError: If calibration_samples is empty.
    """
    if calibration_samples.shape[0] == 0:
        raise ValueError("Calibration samples are empty. Cannot compute weights.")

    # Label calibration samples as 0, test samples as 1
    calib_labeled = np.hstack(
        (
            calibration_samples,
            np.zeros((calibration_samples.shape[0], 1)),
        )
    )
    test_labeled = np.hstack((test_samples, np.ones((test_samples.shape[0], 1))))

    # Combine and shuffle
    joint_labeled = np.vstack((calib_labeled, test_labeled))
    rng = np.random.default_rng(seed=self.seed)
    rng.shuffle(joint_labeled)

    x_joint = joint_labeled[:, :-1]
    y_joint = joint_labeled[:, -1]

    # Build Random Forest classifier
    model = RandomForestClassifier(
        n_estimators=self.n_estimators,
        max_depth=self.max_depth,
        min_samples_leaf=self.min_samples_leaf,
        random_state=self.seed,
        class_weight="balanced",
        n_jobs=-1,  # Use all available cores
    )
    model.fit(x_joint, y_joint)

    # Compute probabilities
    calib_prob = model.predict_proba(calibration_samples)
    test_prob = model.predict_proba(test_samples)

    # Compute density ratios w(z) = p_test(z) / p_calib(z)
    # p_calib(z) = P(label=0 | z) ; p_test(z) = P(label=1 | z)
    w_calib = calib_prob[:, 1] / (calib_prob[:, 0] + 1e-9)
    w_test = test_prob[:, 1] / (test_prob[:, 0] + 1e-9)

    # Apply clipping
    if self.clip_quantile is not None:
        # Adaptive clipping based on percentiles
        all_weights = np.concatenate([w_calib, w_test])
        lower_bound = np.percentile(all_weights, self.clip_quantile * 100)
        upper_bound = np.percentile(all_weights, (1 - self.clip_quantile) * 100)

        self._w_calib = np.clip(w_calib, lower_bound, upper_bound)
        self._w_test = np.clip(w_test, lower_bound, upper_bound)
    else:
        # Fixed clipping (original behavior)
        self._w_calib = np.clip(w_calib, 0.35, 45.0)
        self._w_test = np.clip(w_test, 0.35, 45.0)

    self._is_fitted = True
get_weights
get_weights() -> tuple[np.ndarray, np.ndarray]

Return computed weights.

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of (calibration_weights, test_weights).

Raises:

Type Description
RuntimeError

If fit() has not been called.

Source code in nonconform/estimation/weight/forest.py
def get_weights(self) -> tuple[np.ndarray, np.ndarray]:
    """Return computed weights.

    Returns:
        Tuple of (calibration_weights, test_weights).

    Raises:
        RuntimeError: If fit() has not been called.
    """
    if not self._is_fitted:
        raise RuntimeError("Must call fit() before get_weights()")

    return self._w_calib.copy(), self._w_test.copy()
IdentityWeightEstimator
IdentityWeightEstimator()

Bases: BaseWeightEstimator

Identity weight estimator that returns uniform weights.

This estimator assumes no covariate shift and returns weights of 1.0 for all samples. Useful as a baseline or when covariate shift is known to be minimal.

This effectively makes weighted conformal prediction equivalent to standard conformal prediction.

Source code in nonconform/estimation/weight/identity.py
def __init__(self):
    self._n_calib = 0
    self._n_test = 0
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the identity weight estimator.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required
Source code in nonconform/estimation/weight/identity.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the identity weight estimator.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.
    """
    self._n_calib = calibration_samples.shape[0]
    self._n_test = test_samples.shape[0]
    self._is_fitted = True
get_weights
get_weights() -> tuple[np.ndarray, np.ndarray]

Return uniform weights of 1.0 for all samples.

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of (calibration_weights, test_weights) with all values = 1.0.

Raises:

Type Description
RuntimeError

If fit() has not been called.

Source code in nonconform/estimation/weight/identity.py
def get_weights(self) -> tuple[np.ndarray, np.ndarray]:
    """Return uniform weights of 1.0 for all samples.

    Returns:
        Tuple of (calibration_weights, test_weights) with all values = 1.0.

    Raises:
        RuntimeError: If fit() has not been called.
    """
    if not self._is_fitted:
        raise RuntimeError("Must call fit() before get_weights()")

    calib_weights = np.ones(self._n_calib, dtype=np.float64)
    test_weights = np.ones(self._n_test, dtype=np.float64)

    return calib_weights, test_weights
LogisticWeightEstimator
LogisticWeightEstimator(
    regularization="auto",
    clip_quantile=0.05,
    seed=None,
    class_weight="balanced",
    max_iter=1000,
)

Bases: BaseWeightEstimator

Logistic regression-based weight estimator for covariate shift.

Uses logistic regression to estimate density ratios between calibration and test distributions by training a classifier to distinguish between the two samples. The predicted probabilities are used to compute importance weights w(x) = p_test(x) / p_calib(x).

Parameters:

Name Type Description Default
regularization str or float

Regularization parameter for logistic regression. If 'auto', uses default sklearn parameter. If float, uses as C parameter.

'auto'
clip_quantile float

Quantile for weight clipping. If 0.05, clips to 5th and 95th percentiles. If None, uses fixed [0.35, 45.0] range.

0.05
seed int

Random seed for reproducible results.

None
class_weight str or dict

Weights associated with classes like {class_label: weight}. If 'balanced', uses n_samples / (n_classes * np.bincount(y)). Defaults to 'balanced'.

'balanced'
max_iter int

Max. number of iterations for the solver to converge. Defaults to 1000.

1000
Source code in nonconform/estimation/weight/logistic.py
def __init__(
    self,
    regularization="auto",
    clip_quantile=0.05,
    seed=None,
    class_weight="balanced",
    max_iter=1_000,
):
    self.regularization = regularization
    self.clip_quantile = clip_quantile
    self.seed = seed
    self.class_weight = class_weight
    self.max_iter = max_iter
    self._w_calib = None
    self._w_test = None
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the weight estimator on calibration and test samples.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required

Raises:

Type Description
ValueError

If calibration_samples is empty.

Source code in nonconform/estimation/weight/logistic.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the weight estimator on calibration and test samples.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.

    Raises:
        ValueError: If calibration_samples is empty.
    """
    if calibration_samples.shape[0] == 0:
        raise ValueError("Calibration samples are empty. Cannot compute weights.")

    # Label calibration samples as 0, test samples as 1
    calib_labeled = np.hstack(
        (
            calibration_samples,
            np.zeros((calibration_samples.shape[0], 1)),
        )
    )
    test_labeled = np.hstack((test_samples, np.ones((test_samples.shape[0], 1))))

    # Combine and shuffle
    joint_labeled = np.vstack((calib_labeled, test_labeled))
    rng = np.random.default_rng(seed=self.seed)
    rng.shuffle(joint_labeled)

    x_joint = joint_labeled[:, :-1]
    y_joint = joint_labeled[:, -1]

    # Build logistic regression pipeline
    c_param = 1.0 if self.regularization == "auto" else float(self.regularization)

    model = make_pipeline(
        StandardScaler(),
        LogisticRegression(
            C=c_param,
            max_iter=self.max_iter,
            random_state=self.seed,
            verbose=0,
            class_weight=self.class_weight,
        ),
        memory=None,
    )
    model.fit(x_joint, y_joint)

    # Compute probabilities
    calib_prob = model.predict_proba(calibration_samples)
    test_prob = model.predict_proba(test_samples)

    # Compute density ratios w(z) = p_test(z) / p_calib(z)
    # p_calib(z) = P(label=0 | z) ; p_test(z) = P(label=1 | z)
    w_calib = calib_prob[:, 1] / (calib_prob[:, 0] + 1e-9)
    w_test = test_prob[:, 1] / (test_prob[:, 0] + 1e-9)

    # Adaptive clipping based on percentiles
    all_weights = np.concatenate([w_calib, w_test])
    lower_bound = np.percentile(all_weights, self.clip_quantile * 100)
    upper_bound = np.percentile(all_weights, (1 - self.clip_quantile) * 100)

    self._w_calib = np.clip(w_calib, lower_bound, upper_bound)
    self._w_test = np.clip(w_test, lower_bound, upper_bound)

    self._is_fitted = True
get_weights
get_weights() -> tuple[np.ndarray, np.ndarray]

Return computed weights.

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of (calibration_weights, test_weights).

Raises:

Type Description
RuntimeError

If fit() has not been called.

Source code in nonconform/estimation/weight/logistic.py
def get_weights(self) -> tuple[np.ndarray, np.ndarray]:
    """Return computed weights.

    Returns:
        Tuple of (calibration_weights, test_weights).

    Raises:
        RuntimeError: If fit() has not been called.
    """
    if not self._is_fitted:
        raise RuntimeError("Must call fit() before get_weights()")

    return self._w_calib.copy(), self._w_test.copy()
base
BaseWeightEstimator

Bases: ABC

Abstract base class for weight estimators in weighted conformal prediction.

Weight estimators compute importance weights to correct for covariate shift between calibration and test distributions. They estimate density ratios w(x) = p_test(x) / p_calib(x) which are used to reweight conformal scores for better coverage guarantees under distribution shift.

Subclasses must implement the fit() and get_weights() methods to provide specific weight estimation strategies (e.g., logistic regression, random forest).

fit abstractmethod
fit(calibration_samples: ndarray, test_samples: ndarray)

Estimate density ratio weights

Source code in nonconform/estimation/weight/base.py
@abstractmethod
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray):
    """Estimate density ratio weights"""
    pass
get_weights abstractmethod
get_weights() -> tuple[np.ndarray, np.ndarray]

Return (calib_weights, test_weights)

Source code in nonconform/estimation/weight/base.py
@abstractmethod
def get_weights(self) -> tuple[np.ndarray, np.ndarray]:
    """Return (calib_weights, test_weights)"""
    pass
forest
ForestWeightEstimator
ForestWeightEstimator(
    n_estimators: int = 100,
    max_depth: int | None = 5,
    min_samples_leaf: int = 10,
    clip_quantile: float = 0.05,
    seed: int | None = None,
)

Bases: BaseWeightEstimator

Random Forest-based weight estimator for covariate shift.

Uses Random Forest classifier to estimate density ratios between calibration and test distributions. Random Forest can capture non-linear relationships and complex interactions between features, making it suitable for handling more complex covariate shift patterns than logistic regression.

The Random Forest is trained to distinguish between calibration and test samples, and the predicted probabilities are used to compute importance weights w(x) = p_test(x) / p_calib(x).

Parameters:

Name Type Description Default
n_estimators int

Number of trees in the forest. Defaults to 100.

100
max_depth int

Maximum depth of trees. If None, nodes are expanded until all leaves are pure. Defaults to 5 to prevent overfitting.

5
min_samples_leaf int

Minimum number of samples required to be at a leaf node. Defaults to 10 to prevent overfitting.

10
clip_quantile float

Quantile for weight clipping. If 0.05, clips to 5th and 95th percentiles. If None, uses fixed [0.35, 45.0] range.

0.05
seed int

Random seed for reproducible results.

None
Source code in nonconform/estimation/weight/forest.py
def __init__(
    self,
    n_estimators: int = 100,
    max_depth: int | None = 5,
    min_samples_leaf: int = 10,
    clip_quantile: float = 0.05,
    seed: int | None = None,
):
    self.n_estimators = n_estimators
    self.max_depth = max_depth
    self.min_samples_leaf = min_samples_leaf
    self.clip_quantile = clip_quantile
    self.seed = seed
    self._w_calib = None
    self._w_test = None
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the Random Forest weight estimator on calibration and test samples.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required

Raises:

Type Description
ValueError

If calibration_samples is empty.

Source code in nonconform/estimation/weight/forest.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the Random Forest weight estimator on calibration and test samples.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.

    Raises:
        ValueError: If calibration_samples is empty.
    """
    if calibration_samples.shape[0] == 0:
        raise ValueError("Calibration samples are empty. Cannot compute weights.")

    # Label calibration samples as 0, test samples as 1
    calib_labeled = np.hstack(
        (
            calibration_samples,
            np.zeros((calibration_samples.shape[0], 1)),
        )
    )
    test_labeled = np.hstack((test_samples, np.ones((test_samples.shape[0], 1))))

    # Combine and shuffle
    joint_labeled = np.vstack((calib_labeled, test_labeled))
    rng = np.random.default_rng(seed=self.seed)
    rng.shuffle(joint_labeled)

    x_joint = joint_labeled[:, :-1]
    y_joint = joint_labeled[:, -1]

    # Build Random Forest classifier
    model = RandomForestClassifier(
        n_estimators=self.n_estimators,
        max_depth=self.max_depth,
        min_samples_leaf=self.min_samples_leaf,
        random_state=self.seed,
        class_weight="balanced",
        n_jobs=-1,  # Use all available cores
    )
    model.fit(x_joint, y_joint)

    # Compute probabilities
    calib_prob = model.predict_proba(calibration_samples)
    test_prob = model.predict_proba(test_samples)

    # Compute density ratios w(z) = p_test(z) / p_calib(z)
    # p_calib(z) = P(label=0 | z) ; p_test(z) = P(label=1 | z)
    w_calib = calib_prob[:, 1] / (calib_prob[:, 0] + 1e-9)
    w_test = test_prob[:, 1] / (test_prob[:, 0] + 1e-9)

    # Apply clipping
    if self.clip_quantile is not None:
        # Adaptive clipping based on percentiles
        all_weights = np.concatenate([w_calib, w_test])
        lower_bound = np.percentile(all_weights, self.clip_quantile * 100)
        upper_bound = np.percentile(all_weights, (1 - self.clip_quantile) * 100)

        self._w_calib = np.clip(w_calib, lower_bound, upper_bound)
        self._w_test = np.clip(w_test, lower_bound, upper_bound)
    else:
        # Fixed clipping (original behavior)
        self._w_calib = np.clip(w_calib, 0.35, 45.0)
        self._w_test = np.clip(w_test, 0.35, 45.0)

    self._is_fitted = True
get_weights
get_weights() -> tuple[np.ndarray, np.ndarray]

Return computed weights.

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of (calibration_weights, test_weights).

Raises:

Type Description
RuntimeError

If fit() has not been called.

Source code in nonconform/estimation/weight/forest.py
def get_weights(self) -> tuple[np.ndarray, np.ndarray]:
    """Return computed weights.

    Returns:
        Tuple of (calibration_weights, test_weights).

    Raises:
        RuntimeError: If fit() has not been called.
    """
    if not self._is_fitted:
        raise RuntimeError("Must call fit() before get_weights()")

    return self._w_calib.copy(), self._w_test.copy()
identity
IdentityWeightEstimator
IdentityWeightEstimator()

Bases: BaseWeightEstimator

Identity weight estimator that returns uniform weights.

This estimator assumes no covariate shift and returns weights of 1.0 for all samples. Useful as a baseline or when covariate shift is known to be minimal.

This effectively makes weighted conformal prediction equivalent to standard conformal prediction.

Source code in nonconform/estimation/weight/identity.py
def __init__(self):
    self._n_calib = 0
    self._n_test = 0
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the identity weight estimator.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required
Source code in nonconform/estimation/weight/identity.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the identity weight estimator.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.
    """
    self._n_calib = calibration_samples.shape[0]
    self._n_test = test_samples.shape[0]
    self._is_fitted = True
get_weights
get_weights() -> tuple[np.ndarray, np.ndarray]

Return uniform weights of 1.0 for all samples.

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of (calibration_weights, test_weights) with all values = 1.0.

Raises:

Type Description
RuntimeError

If fit() has not been called.

Source code in nonconform/estimation/weight/identity.py
def get_weights(self) -> tuple[np.ndarray, np.ndarray]:
    """Return uniform weights of 1.0 for all samples.

    Returns:
        Tuple of (calibration_weights, test_weights) with all values = 1.0.

    Raises:
        RuntimeError: If fit() has not been called.
    """
    if not self._is_fitted:
        raise RuntimeError("Must call fit() before get_weights()")

    calib_weights = np.ones(self._n_calib, dtype=np.float64)
    test_weights = np.ones(self._n_test, dtype=np.float64)

    return calib_weights, test_weights
logistic
LogisticWeightEstimator
LogisticWeightEstimator(
    regularization="auto",
    clip_quantile=0.05,
    seed=None,
    class_weight="balanced",
    max_iter=1000,
)

Bases: BaseWeightEstimator

Logistic regression-based weight estimator for covariate shift.

Uses logistic regression to estimate density ratios between calibration and test distributions by training a classifier to distinguish between the two samples. The predicted probabilities are used to compute importance weights w(x) = p_test(x) / p_calib(x).

Parameters:

Name Type Description Default
regularization str or float

Regularization parameter for logistic regression. If 'auto', uses default sklearn parameter. If float, uses as C parameter.

'auto'
clip_quantile float

Quantile for weight clipping. If 0.05, clips to 5th and 95th percentiles. If None, uses fixed [0.35, 45.0] range.

0.05
seed int

Random seed for reproducible results.

None
class_weight str or dict

Weights associated with classes like {class_label: weight}. If 'balanced', uses n_samples / (n_classes * np.bincount(y)). Defaults to 'balanced'.

'balanced'
max_iter int

Max. number of iterations for the solver to converge. Defaults to 1000.

1000
Source code in nonconform/estimation/weight/logistic.py
def __init__(
    self,
    regularization="auto",
    clip_quantile=0.05,
    seed=None,
    class_weight="balanced",
    max_iter=1_000,
):
    self.regularization = regularization
    self.clip_quantile = clip_quantile
    self.seed = seed
    self.class_weight = class_weight
    self.max_iter = max_iter
    self._w_calib = None
    self._w_test = None
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the weight estimator on calibration and test samples.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required

Raises:

Type Description
ValueError

If calibration_samples is empty.

Source code in nonconform/estimation/weight/logistic.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the weight estimator on calibration and test samples.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.

    Raises:
        ValueError: If calibration_samples is empty.
    """
    if calibration_samples.shape[0] == 0:
        raise ValueError("Calibration samples are empty. Cannot compute weights.")

    # Label calibration samples as 0, test samples as 1
    calib_labeled = np.hstack(
        (
            calibration_samples,
            np.zeros((calibration_samples.shape[0], 1)),
        )
    )
    test_labeled = np.hstack((test_samples, np.ones((test_samples.shape[0], 1))))

    # Combine and shuffle
    joint_labeled = np.vstack((calib_labeled, test_labeled))
    rng = np.random.default_rng(seed=self.seed)
    rng.shuffle(joint_labeled)

    x_joint = joint_labeled[:, :-1]
    y_joint = joint_labeled[:, -1]

    # Build logistic regression pipeline
    c_param = 1.0 if self.regularization == "auto" else float(self.regularization)

    model = make_pipeline(
        StandardScaler(),
        LogisticRegression(
            C=c_param,
            max_iter=self.max_iter,
            random_state=self.seed,
            verbose=0,
            class_weight=self.class_weight,
        ),
        memory=None,
    )
    model.fit(x_joint, y_joint)

    # Compute probabilities
    calib_prob = model.predict_proba(calibration_samples)
    test_prob = model.predict_proba(test_samples)

    # Compute density ratios w(z) = p_test(z) / p_calib(z)
    # p_calib(z) = P(label=0 | z) ; p_test(z) = P(label=1 | z)
    w_calib = calib_prob[:, 1] / (calib_prob[:, 0] + 1e-9)
    w_test = test_prob[:, 1] / (test_prob[:, 0] + 1e-9)

    # Adaptive clipping based on percentiles
    all_weights = np.concatenate([w_calib, w_test])
    lower_bound = np.percentile(all_weights, self.clip_quantile * 100)
    upper_bound = np.percentile(all_weights, (1 - self.clip_quantile) * 100)

    self._w_calib = np.clip(w_calib, lower_bound, upper_bound)
    self._w_test = np.clip(w_test, lower_bound, upper_bound)

    self._is_fitted = True
get_weights
get_weights() -> tuple[np.ndarray, np.ndarray]

Return computed weights.

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of (calibration_weights, test_weights).

Raises:

Type Description
RuntimeError

If fit() has not been called.

Source code in nonconform/estimation/weight/logistic.py
def get_weights(self) -> tuple[np.ndarray, np.ndarray]:
    """Return computed weights.

    Returns:
        Tuple of (calibration_weights, test_weights).

    Raises:
        RuntimeError: If fit() has not been called.
    """
    if not self._is_fitted:
        raise RuntimeError("Must call fit() before get_weights()")

    return self._w_calib.copy(), self._w_test.copy()

Strategy

nonconform.strategy

Conformal calibration strategies.

This module provides different strategies for conformal calibration including split conformal, cross-validation, bootstrap, and jackknife methods.

Bootstrap

Bootstrap(
    resampling_ratio: float | None = None,
    n_bootstraps: int | None = None,
    n_calib: int | None = None,
    plus: bool = True,
)

Bases: BaseStrategy

Implements bootstrap-based conformal anomaly detection.

This strategy uses bootstrap resampling to create multiple training sets and calibration sets. For each bootstrap iteration: 1. A random subset of the data is sampled with replacement for training 2. The remaining samples are used for calibration 3. Optionally, a fixed number of calibration samples can be selected

The strategy can operate in two modes: 1. Standard mode: Uses a single model trained on all data for prediction 2. Plus mode: Uses an ensemble of models, each trained on a bootstrap sample

Attributes:

Name Type Description
_resampling_ratio float

Proportion of data to use for training in each bootstrap iteration

_n_bootstraps int

Number of bootstrap iterations

_n_calib int | None

Optional fixed number of calibration samples to use

_plus bool

Whether to use the plus variant (ensemble of models)

_detector_list list[BaseDetector]

List of trained detectors

_calibration_set list[float]

List of calibration scores

_calibration_ids list[int]

Indices of samples used for calibration

Exactly two of resampling_ratio, n_bootstraps, and n_calib should be provided. The third will be calculated by _configure.

Parameters:

Name Type Description Default
resampling_ratio float | None

The proportion of data to use for training in each bootstrap. Defaults to None.

None
n_bootstraps int | None

The number of bootstrap iterations. Defaults to None.

None
n_calib int | None

The desired size of the final calibration set. If set, collected scores/IDs might be subsampled. Defaults to None.

None
plus bool

If True, appends each bootstrapped model to _detector_list. If False, _detector_list will contain one model trained on all data after calibration scores are collected. Defaults to True.

True
Source code in nonconform/strategy/experimental/bootstrap.py
def __init__(
    self,
    resampling_ratio: float | None = None,
    n_bootstraps: int | None = None,
    n_calib: int | None = None,
    plus: bool = True,
):
    """Initialize the Bootstrap strategy.

    Exactly two of `resampling_ratio`, `n_bootstraps`, and `n_calib`
    should be provided. The third will be calculated by `_configure`.

    Args:
        resampling_ratio (float | None): The proportion of
            data to use for training in each bootstrap. Defaults to ``None``.
        n_bootstraps (int | None): The number of bootstrap
            iterations. Defaults to ``None``.
        n_calib (int | None): The desired size of the final
            calibration set. If set, collected scores/IDs might be
            subsampled. Defaults to ``None``.
        plus (bool, optional): If ``True``, appends each bootstrapped model
            to `_detector_list`. If ``False``, `_detector_list` will contain
            one model trained on all data after calibration scores are
            collected. Defaults to ``True``.
    """
    super().__init__(plus)
    self._resampling_ratio: float | None = resampling_ratio
    self._n_bootstraps: int | None = n_bootstraps
    self._n_calib: int | None = n_calib
    self._plus: bool = plus

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        logger = get_logger("strategy.bootstrap")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended for validity guarantees."
        )

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []
calibration_ids property
calibration_ids: list[int]

Returns a copy of the list of indices used for calibration.

These are indices relative to the original input data x provided to :meth:fit_calibrate. The list contains indices of all out-of-bag samples encountered during bootstrap iterations. If _n_calib was set and weighted was True in fit_calibrate, this list might be a subsample of all encountered IDs, corresponding to the subsampled _calibration_set.

Returns:

Type Description
list[int]

List[int]: A copy of integer indices.

Note

Returns a defensive copy to prevent external modification of internal state.

resampling_ratio property
resampling_ratio: float

Returns the resampling ratio.

Returns:

Name Type Description
float float

Proportion of data used for training in each bootstrap iteration.

n_bootstraps property
n_bootstraps: int

Returns the number of bootstrap iterations.

Returns:

Name Type Description
int int

Number of bootstrap iterations.

n_calib property
n_calib: int

Returns the target calibration set size.

Returns:

Name Type Description
int int

Target number of calibration samples.

plus property
plus: bool

Returns whether the plus variant is enabled.

Returns:

Name Type Description
bool bool

True if using ensemble mode, False if using single model.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, ndarray], None]
    | None = None,
) -> tuple[list[BaseDetector], np.ndarray]

Fit and calibrate the detector using bootstrap resampling.

This method implements the bootstrap strategy by: 1. Creating multiple bootstrap samples of the data 2. For each bootstrap iteration: - Train the detector on the bootstrap sample - Use the out-of-bootstrap samples for calibration - Store calibration scores and optionally the trained model 3. If not in plus mode, train a final model on all data 4. Optionally subsample the calibration set to a fixed size

The method provides robust calibration scores by using multiple bootstrap iterations, which helps account for the variability in the data and model training.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix of shape (n_samples, n_features).

required
detector BaseDetector

The base anomaly detector to be used.

required
weighted bool

Whether to use weighted calibration. If True, calibration scores are weighted by their sample indices. Defaults to False.

False
seed int | None

Random seed for reproducibility. Defaults to None.

None
iteration_callback Callable[[int, ndarray], None]

Optional callback function that gets called after each bootstrap iteration with the iteration number and calibration scores. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], list[float]]: A tuple containing: * List of trained detectors (either n_bootstraps models in plus mode or a single model in standard mode) * Array of calibration scores from all bootstrap iterations

Raises:

Type Description
ValueError

If resampling_ratio is not between 0 and 1, or if n_bootstraps is less than 1, or if n_calib is less than 1 when specified.

Source code in nonconform/strategy/experimental/bootstrap.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, np.ndarray], None] | None = None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fit and calibrate the detector using bootstrap resampling.

    This method implements the bootstrap strategy by:
    1. Creating multiple bootstrap samples of the data
    2. For each bootstrap iteration:
       - Train the detector on the bootstrap sample
       - Use the out-of-bootstrap samples for calibration
       - Store calibration scores and optionally the trained model
    3. If not in plus mode, train a final model on all data
    4. Optionally subsample the calibration set to a fixed size

    The method provides robust calibration scores by using multiple
    bootstrap iterations, which helps account for the variability in
    the data and model training.

    Args:
        x (pd.DataFrame | np.ndarray): Input data matrix of shape
            (n_samples, n_features).
        detector (BaseDetector): The base anomaly detector to be used.
        weighted (bool, optional): Whether to use weighted calibration.
            If True, calibration scores are weighted by their sample
            indices. Defaults to False.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.
        iteration_callback (Callable[[int, np.ndarray], None], optional):
            Optional callback function that gets called after each bootstrap
            iteration with the iteration number and calibration scores.
            Defaults to None.

    Returns:
        tuple[list[BaseDetector], list[float]]: A tuple containing:
            * List of trained detectors (either n_bootstraps models in plus
              mode or a single model in standard mode)
            * Array of calibration scores from all bootstrap iterations

    Raises:
        ValueError: If resampling_ratio is not between 0 and 1, or if
            n_bootstraps is less than 1, or if n_calib is less than 1
            when specified.
    """
    self._configure(len(x))

    _detector = detector
    _generator = np.random.default_rng(seed)

    folds = ShuffleSplit(
        n_splits=self._n_bootstraps,
        train_size=self._resampling_ratio,
        random_state=seed,
    )

    n_folds = folds.get_n_splits()
    last_iteration_index = (
        0  # To ensure unique iteration for final model if not _plus
    )
    logger = get_logger("strategy.bootstrap")
    fold_iterator = (
        tqdm(
            folds.split(x),
            total=n_folds,
            desc=f"Bootstrap training ({n_folds} folds)",
        )
        if logger.isEnabledFor(logging.INFO)
        else folds.split(x)
    )
    for i, (train_idx, calib_idx) in enumerate(fold_iterator):
        last_iteration_index = i
        self._calibration_ids.extend(calib_idx.tolist())

        model = copy(_detector)
        model = _set_params(model, seed=seed, random_iteration=True, iteration=i)
        model.fit(x[train_idx])

        current_scores = model.decision_function(x[calib_idx])

        # Call iteration callback if provided
        if iteration_callback is not None:
            iteration_callback(i, current_scores)

        if self._plus:
            self._detector_list.append(deepcopy(model))

        # Concatenate calibration scores
        if len(self._calibration_set) == 0:
            self._calibration_set = current_scores
        else:
            self._calibration_set = np.concatenate(
                [self._calibration_set, current_scores]
            )

    if not self._plus:
        model = copy(_detector)
        model = _set_params(
            model,
            seed=seed,
            random_iteration=True,
            iteration=(last_iteration_index + 1),
        )
        model.fit(x)
        self._detector_list.append(deepcopy(model))

    if self._n_calib is not None and self._n_calib < len(self._calibration_set):
        ids = _generator.choice(
            len(self._calibration_set), size=self._n_calib, replace=False
        )
        self._calibration_set = self._calibration_set[ids]
        if weighted:
            self._calibration_ids = [self._calibration_ids[i] for i in ids]

    return self._detector_list, self._calibration_set

Randomized

Randomized(
    n_iterations: int | None = None,
    n_calib: int | None = None,
    sampling_distr: Distribution = Distribution.UNIFORM,
    holdout_size_range: tuple[float, float] | None = None,
    beta_params: tuple[float, float] | None = None,
    grid_probs: tuple[list[int], list[float]] | None = None,
    plus: bool = True,
)

Bases: BaseStrategy

Implements randomized leave-p-out (rLpO) conformal anomaly detection.

This strategy uses randomized leave-p-out resampling where on each iteration a validation set size p is drawn at random, then a size-p validation set is sampled without replacement, the detector is trained on the rest, and calibration scores are computed. This approach smoothly interpolates between leave-one-out (p=1) and larger holdout strategies.

The strategy can operate in two modes: 1. Standard mode: Uses a single model trained on all data for prediction 2. Plus mode: Uses an ensemble of models, each trained on a different subset

Attributes:

Name Type Description
_sampling_distr Distribution

Distribution type for drawing holdout sizes

_n_iterations int | None

Number of rLpO iterations

_holdout_size_range tuple

Range of holdout sizes (relative or absolute)

_beta_params tuple

Alpha and beta parameters for beta distribution

_grid_probs tuple

Holdout sizes and probabilities for grid distribution

_n_calib int | None

Target number of calibration samples

_use_n_calib_mode bool

Whether to use n_calib mode vs n_iterations mode

_plus bool

Whether to use the plus variant (ensemble of models)

_detector_list list[BaseDetector]

List of trained detectors

_calibration_set list[float]

List of calibration scores

_calibration_ids list[int]

Indices of samples used for calibration

Parameters:

Name Type Description Default
n_iterations int | None

Number of rLpO iterations to perform. Cannot be used together with n_calib. Defaults to None.

None
n_calib int | None

Target number of calibration samples. Iterations will stop when this target is reached or exceeded, then subsample to exactly this size. Cannot be used with n_iterations. Defaults to None.

None
sampling_distr Distribution

Distribution for drawing holdout set sizes. Options: Distribution.BETA_BINOMIAL, Distribution.UNIFORM, Distribution.GRID. Defaults to Distribution.UNIFORM.

UNIFORM
holdout_size_range tuple[float, float]

Min and max holdout set sizes. Values in ]0, 1[ are interpreted as fractions of dataset size. Values >= 1 are interpreted as absolute sample counts. If None, defaults to (0.1, 0.5) for relative sizing. Defaults to None.

None
beta_params tuple[float, float]

Alpha and beta parameters for Beta distribution used to draw holdout size fractions. If None and sampling_distr is BETA_BINOMIAL, defaults to (2.0, 5.0). Common parameterizations: - (1.0, 1.0): Uniform sampling (equivalent to UNIFORM distribution) - (2.0, 5.0): Right-skewed, favors smaller holdout sizes [DEFAULT] - (5.0, 2.0): Left-skewed, favors larger holdout sizes - (2.0, 2.0): Bell-shaped, concentrated around middle sizes - (0.5, 0.5): U-shaped, concentrated at extremes Defaults to None.

None
grid_probs tuple[list[int], list[float]]

Holdout sizes and corresponding probabilities for grid distribution. Required if sampling_distr is Distribution.GRID. Defaults to None.

None
plus bool

If True, uses ensemble of models trained on different subsets. If False, uses single model trained on all data. Defaults to True.

True

Raises:

Type Description
ValueError

If required parameters for the chosen distribution are missing, if both n_iterations and n_calib are specified, or neither.

Source code in nonconform/strategy/experimental/randomized.py
def __init__(
    self,
    n_iterations: int | None = None,
    n_calib: int | None = None,
    sampling_distr: Distribution = Distribution.UNIFORM,
    holdout_size_range: tuple[float, float] | None = None,
    beta_params: tuple[float, float] | None = None,
    grid_probs: tuple[list[int], list[float]] | None = None,
    plus: bool = True,
):
    """Initialize the RandomizedLeaveOut strategy.

    Args:
        n_iterations (int | None, optional): Number of rLpO iterations to perform.
            Cannot be used together with n_calib. Defaults to None.
        n_calib (int | None, optional): Target number of calibration samples.
            Iterations will stop when this target is reached or exceeded, then
            subsample to exactly this size. Cannot be used with n_iterations.
            Defaults to None.
        sampling_distr (Distribution, optional): Distribution for drawing holdout
            set sizes. Options: Distribution.BETA_BINOMIAL, Distribution.UNIFORM,
            Distribution.GRID. Defaults to Distribution.UNIFORM.
        holdout_size_range (tuple[float, float], optional): Min and max holdout
            set sizes. Values in ]0, 1[ are interpreted as fractions of dataset
            size. Values >= 1 are interpreted as absolute sample counts.
            If None, defaults to (0.1, 0.5) for relative sizing. Defaults to None.
        beta_params (tuple[float, float], optional): Alpha and beta parameters
            for Beta distribution used to draw holdout size fractions. If None and
            sampling_distr is BETA_BINOMIAL, defaults to (2.0, 5.0).
            Common parameterizations:
            - (1.0, 1.0): Uniform sampling (equivalent to UNIFORM distribution)
            - (2.0, 5.0): Right-skewed, favors smaller holdout sizes [DEFAULT]
            - (5.0, 2.0): Left-skewed, favors larger holdout sizes
            - (2.0, 2.0): Bell-shaped, concentrated around middle sizes
            - (0.5, 0.5): U-shaped, concentrated at extremes
            Defaults to None.
        grid_probs (tuple[list[int], list[float]], optional): Holdout sizes and
            corresponding probabilities for grid distribution. Required if
            sampling_distr is Distribution.GRID. Defaults to None.
        plus (bool, optional): If True, uses ensemble of models trained on
            different subsets. If False, uses single model trained on all data.
            Defaults to True.

    Raises:
        ValueError: If required parameters for the chosen distribution are missing,
            if both n_iterations and n_calib are specified, or neither.
    """
    super().__init__(plus)

    # Validate that exactly one of n_iterations or n_calib is specified
    if n_iterations is not None and n_calib is not None:
        logger = get_logger("strategy.randomized")
        logger.warning(
            "Both n_iterations and n_calib specified. "
            "Using n_calib and ignoring n_iterations."
        )
        n_iterations = None
    elif n_iterations is None and n_calib is None:
        raise ValueError(
            "Must specify either n_iterations or n_calib. "
            "n_iterations controls the number of random leave-p-out iterations, "
            "while n_calib sets a target number of calibration samples to collect. "
            "Example: Randomized(n_iterations=1000) or Randomized(n_calib=5000)"
        )

    if n_iterations is not None and n_iterations < 1:
        raise ValueError(
            f"n_iterations must be at least 1, got {n_iterations}. "
            f"Typical values are 100-10000 depending on dataset size."
        )
    if n_calib is not None and n_calib < 1:
        raise ValueError(
            f"n_calib must be at least 1, got {n_calib}. "
            f"Typical values are 1000-100000 depending on desired precision."
        )

    self._n_iterations: int | None = n_iterations
    self._sampling_distr: Distribution = sampling_distr
    self._holdout_size_range: tuple[float, float] | None = holdout_size_range
    self._beta_params: tuple[float, float] | None = beta_params
    self._grid_probs: tuple[list[int], list[float]] | None = grid_probs
    self._n_calib: int | None = n_calib
    self._plus: bool = plus
    self._use_n_calib_mode: bool = n_calib is not None

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        logger = get_logger("strategy.randomized")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended for validity guarantees."
        )

    # Validate distribution-specific parameters
    self._validate_distribution_params()

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []
    self._n_data: int = 0
    self._holdout_sizes: list[int] = []
    self._iteration_scores: list[list[float]] = []
    # Will be set in _configure_holdout_size_range
    self._holdout_size_range_abs: tuple[int, int] = (1, 1)
calibration_ids property
calibration_ids: list[int]

Returns a copy of the list of indices used for calibration.

These are indices relative to the original input data x provided to :meth:fit_calibrate. The list contains indices of all holdout samples encountered during rLpO iterations.

Returns:

Type Description
list[int]

list[int]: A copy of integer indices for calibration samples.

Note

Returns a defensive copy to prevent external modification of internal state.

n_iterations property
n_iterations: int | None

Returns the number of iterations.

Returns:

Type Description
int | None

int | None: Number of iterations, or None if using n_calib mode.

n_calib property
n_calib: int | None

Returns the target calibration set size.

Returns:

Type Description
int | None

int | None: Target number of calibration samples,

int | None

or None if using n_iterations mode.

sampling_distr property
sampling_distr: Distribution

Returns the sampling distribution type.

Returns:

Name Type Description
Distribution Distribution

Distribution used for drawing holdout sizes.

holdout_size_range property
holdout_size_range: tuple[float, float]

Returns the holdout size range.

Returns:

Type Description
tuple[float, float]

tuple[float, float]: Min and max holdout set sizes.

beta_params property
beta_params: tuple[float, float] | None

Returns the beta distribution parameters.

Returns:

Type Description
tuple[float, float] | None

tuple[float, float] | None: Alpha and beta parameters,

tuple[float, float] | None

or None if not using beta distribution.

grid_probs property
grid_probs: tuple[list[int], list[float]] | None

Returns the grid probabilities.

Returns:

Type Description
tuple[list[int], list[float]] | None

tuple[list[int], list[float]] | None: Holdout sizes and probabilities, or None if not using grid distribution.

plus property
plus: bool

Returns whether the plus variant is enabled.

Returns:

Name Type Description
bool bool

True if using ensemble mode, False if using single model.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, ndarray], None]
    | None = None,
    track_p_values: bool = False,
) -> tuple[list[BaseDetector], np.ndarray]

Fit and calibrate the detector using randomized leave-p-out resampling.

This method implements the rLpO strategy by: 1. For each iteration, drawing a random holdout set size 2. Sampling a holdout set of that size without replacement 3. Training the detector on the remaining samples 4. Computing calibration scores on the holdout set 5. Optionally storing the trained model (in plus mode) 6. If using n_calib mode, stopping when target calibration size is reached

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix of shape (n_samples, n_features).

required
detector BaseDetector

The base anomaly detector to be used.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Whether to store calibration sample indices. Defaults to False.

False
iteration_callback Callable[[int, ndarray], None]

Optional callback function called after each iteration with the iteration number and calibration scores. Defaults to None.

None
track_p_values bool

If True, stores the holdout sizes and per-iteration scores for performance analysis. Can be accessed via get_iteration_info(). Defaults to False.

False

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], list[float]]: A tuple containing: * List of trained detectors (either multiple models in plus mode or a single model in standard mode) * Array of calibration scores from all iterations

Raises:

Type Description
ValueError

If holdout set size would leave insufficient training data.

Source code in nonconform/strategy/experimental/randomized.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, np.ndarray], None] | None = None,
    track_p_values: bool = False,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fit and calibrate the detector using randomized leave-p-out resampling.

    This method implements the rLpO strategy by:
    1. For each iteration, drawing a random holdout set size
    2. Sampling a holdout set of that size without replacement
    3. Training the detector on the remaining samples
    4. Computing calibration scores on the holdout set
    5. Optionally storing the trained model (in plus mode)
    6. If using n_calib mode, stopping when target calibration size is reached

    Args:
        x (pd.DataFrame | np.ndarray): Input data matrix of shape
            (n_samples, n_features).
        detector (BaseDetector): The base anomaly detector to be used.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.
        weighted (bool, optional): Whether to store calibration sample indices.
            Defaults to False.
        iteration_callback (Callable[[int, np.ndarray], None], optional):
            Optional callback function called after each iteration with the
            iteration number and calibration scores. Defaults to None.
        track_p_values (bool, optional): If True, stores the holdout sizes and
            per-iteration scores for performance analysis. Can be accessed
            via get_iteration_info(). Defaults to False.

    Returns:
        tuple[list[BaseDetector], list[float]]: A tuple containing:
            * List of trained detectors (either multiple models in plus
              mode or a single model in standard mode)
            * Array of calibration scores from all iterations

    Raises:
        ValueError: If holdout set size would leave insufficient training data.
    """
    self._configure_holdout_size_range(len(x))
    self._log_configuration()

    _detector = detector
    generator = np.random.default_rng(seed)

    logger = get_logger("strategy.randomized")

    # Determine iteration strategy and progress bar setup
    if self._use_n_calib_mode:
        # Use a high iteration limit but stop when n_calib is reached
        max_iterations = 10000  # Reasonable upper bound
        base_desc = f"Randomized (target: {self._n_calib})"
        total_for_progress = self._n_calib
    else:
        max_iterations = self._n_iterations
        base_desc = f"Randomized ({self._n_iterations} iterations)"
        total_for_progress = self._n_iterations

    actual_iterations = 0
    running_holdout_sum = 0
    progress_context = (
        tqdm(total=total_for_progress, desc=base_desc)
        if logger.isEnabledFor(logging.INFO)
        else contextlib.nullcontext()
    )
    with progress_context as pbar:
        while True:
            # Check stopping condition
            if self._use_n_calib_mode:
                if len(self._calibration_set) >= self._n_calib:
                    break
                if actual_iterations >= max_iterations:
                    logger.warning(
                        f"Reached maximum iterations ({max_iterations}) "
                        f"with only {len(self._calibration_set)} samples. "
                        f"Target was {self._n_calib}."
                    )
                    break
            else:
                if actual_iterations >= self._n_iterations:
                    break

            # Draw holdout set size for this iteration
            holdout_size = self._draw_holdout_size(generator)

            # Sample holdout set without replacement
            all_indices = np.arange(self._n_data)
            calib_idx = generator.choice(
                all_indices, size=holdout_size, replace=False
            )
            train_idx = np.setdiff1d(all_indices, calib_idx)

            if len(train_idx) < 1:
                raise ValueError(
                    f"No training samples left with holdout_size={holdout_size} "
                    f"for n={self._n_data}"
                )

            # Store calibration indices
            self._calibration_ids.extend(calib_idx.tolist())

            # Train model on training set
            model = copy(_detector)
            model = _set_params(
                model, seed=seed, random_iteration=True, iteration=actual_iterations
            )
            model.fit(x[train_idx])

            # Compute calibration scores on holdout set
            current_scores = model.decision_function(x[calib_idx])

            # Call iteration callback if provided
            if iteration_callback is not None:
                iteration_callback(actual_iterations, current_scores)

            # Store model if in plus mode
            if self._plus:
                self._detector_list.append(deepcopy(model))

            # Store calibration scores
            if len(self._calibration_set) == 0:
                self._calibration_set = current_scores
            else:
                self._calibration_set = np.concatenate(
                    [self._calibration_set, current_scores]
                )

            # Track holdout sizes and per-iteration scores if requested
            if track_p_values:
                self._holdout_sizes.append(holdout_size)
                self._iteration_scores.append(current_scores.tolist())

            actual_iterations += 1
            running_holdout_sum += holdout_size
            avg_holdout = running_holdout_sum / actual_iterations

            # Update progress bar based on mode
            if pbar is not None:
                if self._use_n_calib_mode:
                    # Update progress to show current calibration samples
                    pbar.n = min(len(self._calibration_set), self._n_calib)
                    pbar.desc = (
                        f"{base_desc} | iter: {actual_iterations}, "
                        f"avg_holdout: {avg_holdout:.1f}"
                    )
                    pbar.refresh()
                else:
                    pbar.update(1)
                    pbar.desc = f"{base_desc} | avg_holdout: {avg_holdout:.1f}"

    # If not in plus mode, train final model on all data
    if not self._plus:
        final_model = copy(_detector)
        final_model = _set_params(
            final_model,
            seed=seed,
            random_iteration=True,
            iteration=actual_iterations,
        )
        final_model.fit(x)
        self._detector_list.append(deepcopy(final_model))

    # Always subsample to exact n_calib in n_calib mode
    if self._use_n_calib_mode and len(self._calibration_set) != self._n_calib:
        generator = np.random.default_rng(seed)
        if len(self._calibration_set) > self._n_calib:
            # Subsample to exact target
            ids = generator.choice(
                len(self._calibration_set), size=self._n_calib, replace=False
            )
        else:
            # We have fewer than target - use all available
            ids = list(range(len(self._calibration_set)))
            logger.warning(
                f"Only collected {len(self._calibration_set)} calibration samples, "
                f"less than target {self._n_calib}"
            )

        self._calibration_set = self._calibration_set[ids]
        if weighted:
            self._calibration_ids = [self._calibration_ids[i] for i in ids]

        # Also subsample tracking data if enabled
        if track_p_values and self._holdout_sizes:
            # For simplicity, subsample proportionally across iterations
            selected_iterations = (
                set(ids % actual_iterations) if actual_iterations > 0 else set()
            )
            self._holdout_sizes = [
                size
                for i, size in enumerate(self._holdout_sizes)
                if i in selected_iterations
            ]
            self._iteration_scores = [
                scores
                for i, scores in enumerate(self._iteration_scores)
                if i in selected_iterations
            ]

    # Log final results - only for n_iterations mode
    if not self._use_n_calib_mode:
        final_calib_size = len(self._calibration_set)
        logger.info(f"Final calibration scores: {final_calib_size:,}")

    return self._detector_list, self._calibration_set
get_iteration_info
get_iteration_info() -> (
    tuple[list[int], list[list[float]]] | None
)

Get holdout sizes and per-iteration scores if tracking was enabled.

This method provides access to the holdout set sizes used in each iteration and the corresponding anomaly scores. This information can be used for performance analysis, plotting vs. holdout size, or understanding the distribution of holdout set sizes used.

Returns:

Type Description
tuple[list[int], list[list[float]]] | None

tuple[list[int], list[list[float]]] | None: A tuple containing: * List of holdout sizes for each iteration * List of score arrays, one per iteration Returns None if track_p_values was False during fit_calibrate.

Example

from nonconform.utils.func.enums import Distribution strategy = Randomized(n_calib=1000) strategy.fit_calibrate(X, detector, track_p_values=True) holdout_sizes, scores = strategy.get_iteration_info()

holdout_sizes[i] is the holdout set size for iteration i
scores[i] are the anomaly scores for iteration i
Source code in nonconform/strategy/experimental/randomized.py
def get_iteration_info(self) -> tuple[list[int], list[list[float]]] | None:
    """Get holdout sizes and per-iteration scores if tracking was enabled.

    This method provides access to the holdout set sizes used in each
    iteration and the corresponding anomaly scores. This information can be
    used for performance analysis, plotting vs. holdout size, or understanding
    the distribution of holdout set sizes used.

    Returns:
        tuple[list[int], list[list[float]]] | None: A tuple containing:
            * List of holdout sizes for each iteration
            * List of score arrays, one per iteration
            Returns None if track_p_values was False during fit_calibrate.

    Example:
        >>> from nonconform.utils.func.enums import Distribution
        >>> strategy = Randomized(n_calib=1000)
        >>> strategy.fit_calibrate(X, detector, track_p_values=True)
        >>> holdout_sizes, scores = strategy.get_iteration_info()
        >>> # holdout_sizes[i] is the holdout set size for iteration i
        >>> # scores[i] are the anomaly scores for iteration i
    """
    if not self._holdout_sizes:  # Empty list means tracking was not enabled
        return None
    return (
        self._holdout_sizes.copy(),
        [scores.copy() for scores in self._iteration_scores],
    )

BaseStrategy

BaseStrategy(plus: bool = True)

Bases: ABC

Abstract base class for anomaly detection calibration strategies.

This class provides a common interface for various calibration strategies applied to anomaly detectors. Subclasses must implement the core calibration logic and define how calibration data is identified and used.

Attributes:

Name Type Description
_plus bool

A flag, typically set during initialization, that may influence calibration behavior in subclasses (e.g., by applying an adjustment).

Parameters:

Name Type Description Default
plus bool

A flag that enables the "plus" variant which maintains statistical validity by retaining calibration models for inference. Strongly recommended for proper conformal guarantees. Defaults to True.

True
Source code in nonconform/strategy/base.py
def __init__(self, plus: bool = True):
    """Initialize the base calibration strategy.

    Args:
        plus (bool, optional): A flag that enables the "plus" variant which
            maintains statistical validity by retaining calibration models for
            inference. Strongly recommended for proper conformal guarantees.
            Defaults to ``True``.
    """
    self._plus: bool = plus
    self._calibration_ids: list[int]
calibration_ids abstractmethod property
calibration_ids: list[int]

Provides the indices of the data points used for calibration.

This abstract property must be implemented by subclasses. It should return a list of integer indices identifying which samples from the original input data (provided to fit_calibrate) were selected or designated as the calibration set.

Returns:

Type Description
list[int]

List[int]: A list of integer indices for the calibration data.

Raises:

Type Description
NotImplementedError

If the subclass does not implement this property.

fit_calibrate abstractmethod
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]

Fits the detector and performs calibration.

This abstract method must be implemented by subclasses to define the specific procedure for fitting the anomaly detector (if necessary) and then calibrating it using data derived from x. Calibration often involves determining thresholds or adjusting scores.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data, which may be used for both fitting the detector and deriving calibration data.

required
detector BaseDetector

The PyOD anomaly detection model to be fitted and/or calibrated.

required
weighted bool | None

A flag indicating whether a weighted approach should be used during calibration, if applicable to the subclass implementation.

False
seed int | None

A random seed for ensuring reproducibility in stochastic parts of the fitting or calibration process. Defaults to None.

None
iteration_callback callable | None

Optional callback function for strategies that support iteration tracking. Defaults to None.

None

Raises:

Type Description
NotImplementedError

If the subclass does not implement this method.

Source code in nonconform/strategy/base.py
@abc.abstractmethod
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fits the detector and performs calibration.

    This abstract method must be implemented by subclasses to define the
    specific procedure for fitting the anomaly detector (if necessary)
    and then calibrating it using data derived from `x`. Calibration often
    involves determining thresholds or adjusting scores.

    Args:
        x (pd.DataFrame | np.ndarray): The input data, which
            may be used for both fitting the detector and deriving
            calibration data.
        detector (BaseDetector): The PyOD anomaly detection model to be
            fitted and/or calibrated.
        weighted (bool | None): A flag indicating whether a weighted
            approach should be used during calibration, if applicable to
            the subclass implementation.
        seed (int | None): A random seed for ensuring reproducibility
            in stochastic parts of the fitting or calibration process.
            Defaults to None.
        iteration_callback (callable | None): Optional callback function
            for strategies that support iteration tracking. Defaults to None.

    Raises:
        NotImplementedError: If the subclass does not implement this method.
    """
    raise NotImplementedError(
        "The fit_calibrate() method must be implemented by subclasses."
    )

CrossValidation

CrossValidation(k: int, plus: bool = True)

Bases: BaseStrategy

Implements k-fold cross-validation for conformal anomaly detection.

This strategy splits the data into k folds and uses each fold as a calibration set while training on the remaining folds. This approach provides more robust calibration scores by utilizing all available data. The strategy can operate in two modes: 1. Standard mode: Uses a single model trained on all data for prediction 2. Plus mode: Uses an ensemble of k models, each trained on k-1 folds

Attributes:

Name Type Description
_k int

Number of folds for cross-validation

_plus bool

Whether to use the plus variant (ensemble of models)

_detector_list list[BaseDetector]

List of trained detectors

_calibration_set list[float]

List of calibration scores

_calibration_ids list[int]

Indices of samples used for calibration

Parameters:

Name Type Description Default
k int

The number of folds for cross-validation. Must be at least 2. Higher values provide more robust calibration but increase computational cost.

required
plus bool

If True, appends each fold-trained model to _detector_list, creating an ensemble. If False, _detector_list will contain one model trained on all data after calibration scores are collected. The plus variant maintains statistical validity and is strongly recommended. Defaults to True.

True
Source code in nonconform/strategy/cross_val.py
def __init__(self, k: int, plus: bool = True):
    """Initialize the CrossValidation strategy.

    Args:
        k (int): The number of folds for cross-validation. Must be at
            least 2. Higher values provide more robust calibration but
            increase computational cost.
        plus (bool, optional): If ``True``, appends each fold-trained model
            to `_detector_list`, creating an ensemble. If ``False``,
            `_detector_list` will contain one model trained on all data
            after calibration scores are collected. The plus variant
            maintains statistical validity and is strongly recommended.
            Defaults to ``True``.
    """
    super().__init__(plus)
    self._k: int = k
    self._plus: bool = plus

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        from nonconform.utils.func.logger import get_logger

        logger = get_logger("strategy.cross_val")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended "
            "for statistical guarantees."
        )

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []
calibration_ids property
calibration_ids: list[int]

Returns a copy of the list of indices from x used for calibration.

In k-fold cross-validation, every sample in the input data x is used exactly once as part of a calibration set (when its fold is the hold-out set). This property returns a list of all these indices, typically covering all indices from 0 to len(x)-1, but ordered by fold processing.

Returns:

Type Description
list[int]

list[int]: A copy of integer indices.

Note

Returns a defensive copy to prevent external modification of internal state.

k property
k: int

Returns the number of folds for cross-validation.

Returns:

Name Type Description
int int

Number of folds specified during initialization.

plus property
plus: bool

Returns whether the plus variant is enabled.

Returns:

Name Type Description
bool bool

True if using ensemble mode, False if using single model.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]

Fit and calibrate the detector using k-fold cross-validation.

This method implements the cross-validation strategy by: 1. Splitting the data into k folds 2. For each fold: - Train the detector on k-1 folds - Use the remaining fold for calibration - Store calibration scores and optionally the trained model 3. If not in plus mode, train a final model on all data

The method ensures that each sample is used exactly once for calibration, providing a more robust estimate of the calibration scores.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix of shape (n_samples, n_features).

required
detector BaseDetector

The base anomaly detector to be used.

required
weighted bool

Whether to use weighted calibration. Currently not implemented for cross-validation. Defaults to False.

False
seed int | None

Random seed for reproducibility. Defaults to None.

None
iteration_callback callable

Not used in CrossValidation strategy. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], list[float]]: A tuple containing: * List of trained detectors (either k models in plus mode or a single model in standard mode) * Array of calibration scores from all folds

Raises:

Type Description
ValueError

If k is less than 2 or if the data size is too small for the specified number of folds.

Source code in nonconform/strategy/cross_val.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fit and calibrate the detector using k-fold cross-validation.

    This method implements the cross-validation strategy by:
    1. Splitting the data into k folds
    2. For each fold:
       - Train the detector on k-1 folds
       - Use the remaining fold for calibration
       - Store calibration scores and optionally the trained model
    3. If not in plus mode, train a final model on all data

    The method ensures that each sample is used exactly once for calibration,
    providing a more robust estimate of the calibration scores.

    Args:
        x (pd.DataFrame | np.ndarray): Input data matrix of shape
            (n_samples, n_features).
        detector (BaseDetector): The base anomaly detector to be used.
        weighted (bool, optional): Whether to use weighted calibration.
            Currently not implemented for cross-validation. Defaults to False.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.
        iteration_callback (callable, optional): Not used in CrossValidation
            strategy.
            Defaults to None.

    Returns:
        tuple[list[BaseDetector], list[float]]: A tuple containing:
            * List of trained detectors (either k models in plus mode or
              a single model in standard mode)
            * Array of calibration scores from all folds

    Raises:
        ValueError: If k is less than 2 or if the data size is too small
            for the specified number of folds.
    """
    _detector = detector
    n_samples = len(x)

    # Validate k before creating KFold
    if self._k < 2:
        exc = ValueError(
            f"k must be at least 2 for k-fold cross-validation, got {self._k}"
        )
        exc.add_note(f"Received k={self._k}, which is invalid.")
        exc.add_note(
            "Cross-validation requires at least one split"
            " for training and one for calibration."
        )
        exc.add_note(
            f"With {n_samples} samples, consider k=min(10,"
            f" {n_samples // 10}) for balanced folds."
        )
        raise exc

    if n_samples < self._k:
        exc = ValueError(
            f"Not enough samples ({n_samples}) for "
            f"k-fold cross-validation with k={self._k}"
        )
        exc.add_note(
            f"Each fold needs at least 1 sample, but {n_samples} < {self._k}."
        )
        exc.add_note(
            f"Either increase your dataset size or reduce k to at most {n_samples}."
        )
        raise exc

    # Pre-allocate calibration array for efficiency
    self._calibration_set = np.empty(n_samples, dtype=np.float64)
    calibration_offset = 0

    folds = KFold(
        n_splits=self._k,
        shuffle=True,
        random_state=seed,
    )

    last_iteration_index = 0
    logger = get_logger("strategy.cross_val")
    fold_iterator = (
        tqdm(
            folds.split(x),
            total=self._k,
            desc=f"CV fold training ({self._k} folds)",
        )
        if logger.isEnabledFor(logging.INFO)
        else folds.split(x)
    )
    for i, (train_idx, calib_idx) in enumerate(fold_iterator):
        last_iteration_index = i
        self._calibration_ids.extend(calib_idx.tolist())

        model = copy(_detector)
        model = _set_params(model, seed=seed, random_iteration=True, iteration=i)
        model.fit(x[train_idx])

        if self._plus:
            self._detector_list.append(deepcopy(model))

        # Store calibration scores efficiently using pre-allocated array
        fold_scores = model.decision_function(x[calib_idx])
        n_fold_samples = len(fold_scores)
        end_idx = calibration_offset + n_fold_samples
        self._calibration_set[calibration_offset:end_idx] = fold_scores
        calibration_offset += n_fold_samples

    if not self._plus:
        model = copy(_detector)
        model = _set_params(
            model,
            seed=seed,
            random_iteration=True,
            iteration=(last_iteration_index + 1),
        )
        model.fit(x)
        self._detector_list.append(deepcopy(model))

    return self._detector_list, self._calibration_set

Jackknife

Jackknife(plus: bool = True)

Bases: BaseStrategy

Jackknife (leave-one-out) conformal anomaly detection strategy.

This strategy implements conformal prediction using the jackknife method, which is a special case of k-fold cross-validation where k equals the number of samples in the dataset (leave-one-out). For each sample, a model is trained on all other samples, and the left-out sample is used for calibration.

It internally uses a :class:~nonconform.strategy.cross_val.CrossValidation strategy, dynamically setting its _k parameter to the dataset size.

Attributes:

Name Type Description
_plus bool

If True, each model trained (one for each left-out sample) is retained. If False, a single model trained on the full dataset (after leave-one-out calibration) is retained. This behavior is delegated to the internal CrossValidation strategy.

_strategy CrossValidation

An instance of the :class:~nonconform.strategy.cross_val.CrossValidation strategy, configured for leave-one-out behavior.

_calibration_ids list[int] | None

Indices of the samples from the input data x used for calibration. Populated after :meth:fit_calibrate and accessible via :attr:calibration_ids. Initially None.

_detector_list List[BaseDetector]

A list of trained detector models, populated by :meth:fit_calibrate via the internal strategy.

_calibration_set ndarray

An array of calibration scores, one for each sample, populated by :meth:fit_calibrate via the internal strategy.

Parameters:

Name Type Description Default
plus bool

If True, instructs the internal cross-validation strategy to retain all models trained during the leave-one-out process. Strongly recommended for statistical validity. Defaults to True.

True
Source code in nonconform/strategy/jackknife.py
def __init__(self, plus: bool = True):
    """Initialize the Jackknife strategy.

    Args:
        plus (bool, optional): If ``True``, instructs the internal
            cross-validation strategy to retain all models trained during
            the leave-one-out process. Strongly recommended for statistical
            validity. Defaults to ``True``.
    """
    super().__init__(plus)
    self._plus: bool = plus

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        from nonconform.utils.func.logger import get_logger

        logger = get_logger("strategy.jackknife")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended "
            "for statistical guarantees."
        )

    self._strategy: CrossValidation = CrossValidation(k=1, plus=plus)
    self._calibration_ids: list[int] | None = None

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
calibration_ids property
calibration_ids: list[int] | None

Returns a copy of indices from x used for calibration via jackknife.

These are the indices of samples used to obtain calibration scores. In jackknife (leave-one-out), each sample is used once for calibration. The list is populated after fit_calibrate is called.

Returns:

Type Description
list[int] | None

list[int] | None: A copy of integer indices, or None if fit_calibrate has not been called.

Note

Returns a defensive copy to prevent external modification of internal state.

plus property
plus: bool

Returns whether the plus variant is enabled.

Returns:

Name Type Description
bool bool

True if using ensemble mode, False if using single model.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    weighted: bool = False,
    seed: int | None = None,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]

Fits detector(s) and gets calibration scores using jackknife.

This method configures the internal :class:~nonconform.strategy.cross_val.CrossValidation strategy to perform leave-one-out cross-validation by setting its number of folds (_k) to the total number of samples in x. It then delegates the fitting and calibration process to this internal strategy.

The results (trained models and calibration scores) and calibration sample IDs are retrieved from the internal strategy.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data.

required
detector BaseDetector

The PyOD base detector instance.

required
weighted bool

Passed to the internal CrossValidation strategy's fit_calibrate method. Its effect depends on the CrossValidation implementation. Defaults to False.

False
seed int | None

Random seed, passed to the internal CrossValidation strategy for reproducibility. Defaults to None.

None
iteration_callback callable

Not used in Jackknife strategy. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], np.ndarray]: A tuple containing: * A list of trained PyOD detector models. * An array of calibration scores (one per sample in x).

Source code in nonconform/strategy/jackknife.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    weighted: bool = False,  # Parameter passed to internal strategy
    seed: int | None = None,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fits detector(s) and gets calibration scores using jackknife.

    This method configures the internal
    :class:`~nonconform.strategy.cross_val.CrossValidation` strategy to
    perform leave-one-out cross-validation by setting its number of
    folds (`_k`) to the total number of samples in `x`. It then delegates
    the fitting and calibration process to this internal strategy.

    The results (trained models and calibration scores) and calibration
    sample IDs are retrieved from the internal strategy.

    Args:
        x (pd.DataFrame | np.ndarray): The input data.
        detector (BaseDetector): The PyOD base detector instance.
        weighted (bool, optional): Passed to the internal `CrossValidation`
            strategy's `fit_calibrate` method. Its effect depends on the
            `CrossValidation` implementation. Defaults to ``False``.
        seed (int | None, optional): Random seed, passed to the internal
            `CrossValidation` strategy for reproducibility. Defaults to None.
        iteration_callback (callable, optional): Not used in Jackknife strategy.
            Defaults to None.

    Returns:
        tuple[list[BaseDetector], np.ndarray]: A tuple containing:
            * A list of trained PyOD detector models.
            * An array of calibration scores (one per sample in `x`).
    """
    self._strategy._k = len(x)
    (
        self._detector_list,
        self._calibration_set,
    ) = self._strategy.fit_calibrate(
        x, detector, weighted, seed, iteration_callback
    )
    self._calibration_ids = self._strategy.calibration_ids
    return self._detector_list, self._calibration_set

JackknifeBootstrap

JackknifeBootstrap(
    n_bootstraps: int = 100,
    aggregation_method: Aggregation = Aggregation.MEAN,
    plus: bool = True,
)

Bases: BaseStrategy

Implements Jackknife+-after-Bootstrap (JaB+) conformal anomaly detection.

This strategy implements the JaB+ method which provides predictive inference for ensemble models trained on bootstrap samples. The key insight is that JaB+ uses the out-of-bag (OOB) samples from bootstrap iterations to compute calibration scores without requiring additional model training.

The strategy can operate in two modes: 1. Plus mode (plus=True): Uses ensemble of models for prediction (recommended) 2. Standard mode (plus=False): Uses single model trained on all data

Attributes:

Name Type Description
_n_bootstraps int

Number of bootstrap iterations

_aggregation_method Aggregation

How to aggregate OOB predictions

_plus bool

Whether to use the plus variant (ensemble of models)

_detector_list list[BaseDetector]

List of trained detectors (ensemble/single)

_calibration_set list[float]

List of calibration scores from JaB+ procedure

_calibration_ids list[int]

Indices of samples used for calibration

_bootstrap_models list[BaseDetector]

Models trained on each bootstrap sample

_oob_mask ndarray

Boolean matrix of shape (n_bootstraps, n_samples) indicating out-of-bag status

Parameters:

Name Type Description Default
n_bootstraps int

Number of bootstrap iterations. Defaults to 100.

100
aggregation_method Aggregation

Method to aggregate out-of-bag predictions. Options are Aggregation.MEAN or Aggregation.MEDIAN. Defaults to Aggregation.MEAN.

MEAN
plus bool

If True, uses ensemble of bootstrap models for prediction (maintains statistical validity). If False, uses single model trained on all data. Strongly recommended to use True. Defaults to True.

True

Raises:

Type Description
ValueError

If aggregation_method is not a valid Aggregation enum value.

ValueError

If n_bootstraps is less than 1.

Source code in nonconform/strategy/jackknife_bootstrap.py
def __init__(
    self,
    n_bootstraps: int = 100,
    aggregation_method: Aggregation = Aggregation.MEAN,
    plus: bool = True,
):
    """Initialize the Bootstrap (JaB+) strategy.

    Args:
        n_bootstraps (int, optional): Number of bootstrap iterations.
            Defaults to 100.
        aggregation_method (Aggregation, optional): Method to aggregate out-of-bag
            predictions. Options are Aggregation.MEAN or Aggregation.MEDIAN.
            Defaults to Aggregation.MEAN.
        plus (bool, optional): If True, uses ensemble of bootstrap models for
            prediction (maintains statistical validity). If False, uses single
            model trained on all data. Strongly recommended to use True.
            Defaults to True.

    Raises:
        ValueError: If aggregation_method is not a valid Aggregation enum value.
        ValueError: If n_bootstraps is less than 1.
    """
    super().__init__(plus=plus)

    if n_bootstraps < 1:
        exc = ValueError(
            f"Number of bootstraps must be at least 1, got {n_bootstraps}. "
            f"Typical values are 50-200 for jackknife-after-bootstrap."
        )
        exc.add_note(f"Received n_bootstraps={n_bootstraps}, which is invalid.")
        exc.add_note(
            "Jackknife-after-Bootstrap requires at least one bootstrap iteration."
        )
        exc.add_note("Consider using n_bootstraps=100 as a balanced default.")
        raise exc
    if aggregation_method not in [Aggregation.MEAN, Aggregation.MEDIAN]:
        exc = ValueError(
            f"aggregation_method must be Aggregation.MEAN or Aggregation.MEDIAN, "
            f"got {aggregation_method}. These are the only statistically valid "
            f"methods for combining out-of-bag predictions in JackknifeBootstrap()."
        )
        exc.add_note(f"Received aggregation_method={aggregation_method}")
        exc.add_note("Valid options are: Aggregation.MEAN, Aggregation.MEDIAN")
        exc.add_note(
            "These methods ensure statistical validity of the JaB+ procedure."
        )
        raise exc

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        logger = get_logger("strategy.jackknife_bootstrap")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended "
            "for statistical guarantees."
        )

    self._n_bootstraps: int = n_bootstraps
    self._aggregation_method: Aggregation = aggregation_method

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []

    # Internal state for JaB+ computation
    self._bootstrap_models: list[BaseDetector] = []
    self._oob_mask: np.ndarray = np.array([])
calibration_ids property
calibration_ids: list[int]

Returns a copy of the list of indices used for calibration.

In JaB+, all original training samples contribute to calibration through the out-of-bag mechanism.

Returns:

Type Description
list[int]

list[int]: Copy of integer indices (0 to n_samples-1).

Note

Returns a defensive copy to prevent external modification of internal state.

n_bootstraps property
n_bootstraps: int

Returns the number of bootstrap iterations.

aggregation_method property
aggregation_method: Aggregation

Returns the aggregation method used for OOB predictions.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, ndarray], None]
    | None = None,
    n_jobs: int | None = None,
) -> tuple[list[BaseDetector], np.ndarray]

Fit and calibrate using Jackknife+-after-Bootstrap method.

This method implements the JaB+ algorithm: 1. Generate bootstrap samples and train models 2. For each sample, compute out-of-bag predictions 3. Aggregate OOB predictions to get calibration scores 4. Train final model on all data

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix of shape (n_samples, n_features).

required
detector BaseDetector

The base anomaly detector to be used.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Not used in JaB+ method. Defaults to False.

False
iteration_callback Callable[[int, ndarray], None]

Optional callback function that gets called after each bootstrap iteration with the iteration number and current calibration scores. Defaults to None.

None
n_jobs int

Number of parallel jobs for bootstrap training. If None, uses sequential processing. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], list[float]]: A tuple containing: * List of trained detector models (if plus=True, single if plus=False) * Array of calibration scores from JaB+ procedure

Source code in nonconform/strategy/jackknife_bootstrap.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, np.ndarray], None] | None = None,
    n_jobs: int | None = None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fit and calibrate using Jackknife+-after-Bootstrap method.

    This method implements the JaB+ algorithm:
    1. Generate bootstrap samples and train models
    2. For each sample, compute out-of-bag predictions
    3. Aggregate OOB predictions to get calibration scores
    4. Train final model on all data

    Args:
        x (pd.DataFrame | np.ndarray): Input data matrix of shape
            (n_samples, n_features).
        detector (BaseDetector): The base anomaly detector to be used.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.
        weighted (bool, optional): Not used in JaB+ method. Defaults to False.
        iteration_callback (Callable[[int, np.ndarray], None], optional):
            Optional callback function that gets called after each bootstrap
            iteration with the iteration number and current calibration scores.
            Defaults to None.
        n_jobs (int, optional): Number of parallel jobs for bootstrap
            training. If None, uses sequential processing. Defaults to None.

    Returns:
        tuple[list[BaseDetector], list[float]]: A tuple containing:
            * List of trained detector models (if plus=True, single if plus=False)
            * Array of calibration scores from JaB+ procedure
    """
    n_samples = len(x)
    logger = get_logger("strategy.bootstrap")
    generator = np.random.default_rng(seed)

    logger.info(
        f"Bootstrap (JaB+) Configuration:\n"
        f"  • Data: {n_samples:,} total samples\n"
        f"  • Bootstrap iterations: {self._n_bootstraps:,}\n"
        f"  • Aggregation method: {self._aggregation_method}"
    )

    # Step 1: Pre-allocate data structures and generate bootstrap samples
    self._bootstrap_models = [None] * self._n_bootstraps
    self._oob_mask = np.zeros((self._n_bootstraps, n_samples), dtype=bool)

    # Generate all bootstrap indices at once for better memory locality
    all_bootstrap_indices = generator.choice(
        n_samples, size=(self._n_bootstraps, n_samples), replace=True
    )

    # Pre-compute OOB mask efficiently
    for i in range(self._n_bootstraps):
        bootstrap_indices = all_bootstrap_indices[i]
        in_bag_mask = np.zeros(n_samples, dtype=bool)
        in_bag_mask[bootstrap_indices] = True
        self._oob_mask[i] = ~in_bag_mask

    # Train models (with optional parallelization)
    if n_jobs is None or n_jobs == 1:
        # Sequential training
        bootstrap_iterator = (
            tqdm(
                range(self._n_bootstraps),
                desc=f"Bootstrap training ({self._n_bootstraps} iterations)",
            )
            if logger.isEnabledFor(logging.INFO)
            else range(self._n_bootstraps)
        )
        for i in bootstrap_iterator:
            bootstrap_indices = all_bootstrap_indices[i]
            model = self._train_single_model(
                detector, x, bootstrap_indices, seed, i
            )
            self._bootstrap_models[i] = model
    else:
        # Parallel training
        self._train_models_parallel(
            detector, x, all_bootstrap_indices, seed, n_jobs, logger
        )

    # Step 2: Compute out-of-bag calibration scores
    oob_scores = self._compute_oob_scores(x)

    # Call iteration callback if provided
    if iteration_callback is not None:
        iteration_callback(self._n_bootstraps, oob_scores)

    self._calibration_set = oob_scores
    self._calibration_ids = list(range(n_samples))

    # Step 3: Handle plus variant
    if self._plus:
        # Plus variant: Use ensemble of bootstrap models for prediction
        self._detector_list = self._bootstrap_models.copy()
        logger.info(
            f"JaB+ calibration completed with {len(self._calibration_set)} scores "
            f"using ensemble of {len(self._bootstrap_models)} models"
        )
    else:
        # Standard variant: Train final model on all data
        final_model = deepcopy(detector)
        final_model = _set_params(
            final_model,
            seed=seed,
            random_iteration=True,
            iteration=self._n_bootstraps,
        )
        final_model.fit(x)
        self._detector_list = [final_model]
        logger.info(
            f"JaB+ calibration completed with {len(self._calibration_set)} scores "
            f"using single model trained on all data"
        )

    return self._detector_list, self._calibration_set

Split

Split(n_calib: float | int = 0.1)

Bases: BaseStrategy

Split conformal strategy for fast anomaly detection with statistical guarantees.

Implements the classical split conformal approach by dividing training data into separate fitting and calibration sets. This provides the fastest conformal inference at the cost of using less data for calibration compared to other strategies.

Example
from nonconform.strategy import Split

# Use 20% of data for calibration
strategy = Split(n_calib=0.2)

# Use exactly 1000 samples for calibration
strategy = Split(n_calib=1000)

Attributes:

Name Type Description
_calib_size float | int

Size or proportion of data used for calibration.

_calibration_ids list[int] | None

Indices of calibration samples (for weighted conformal).

Parameters:

Name Type Description Default
n_calib float | int

The size or proportion of the dataset to use for the calibration set. If a float, it must be between 0.0 and 1.0 (exclusive of 0.0 and 1.0 in practice for train_test_split). If an int, it's the absolute number of samples. Defaults to 0.1 (10%).

0.1
Source code in nonconform/strategy/split.py
def __init__(self, n_calib: float | int = 0.1) -> None:
    """Initialize the Split strategy.

    Args:
        n_calib (float | int): The size or proportion
            of the dataset to use for the calibration set. If a float,
            it must be between 0.0 and 1.0 (exclusive of 0.0 and 1.0
            in practice for `train_test_split`). If an int, it's the
            absolute number of samples. Defaults to ``0.1`` (10%).
    """
    super().__init__()  # `plus` is not relevant for a single split
    self._calib_size: float | int = n_calib
    self._calibration_ids: list[int] | None = None
calibration_ids property
calibration_ids: list[int] | None

Returns a copy of indices from x used for the calibration set.

This property provides the list of indices corresponding to the samples that were allocated to the calibration set during the fit_calibrate method. It will be None if fit_calibrate was called with weighted=False or if fit_calibrate has not yet been called.

Returns:

Type Description
list[int] | None

list[int] | None: A copy of integer indices, or None.

Note

Returns a defensive copy to prevent external modification of internal state.

calib_size property
calib_size: float | int

Returns the calibration size or proportion.

Returns:

Type Description
float | int

float | int: The calibration size as specified during initialization. If float (0.0-1.0), represents proportion of data. If int, represents absolute number of samples.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    weighted: bool = False,
    seed: int | None = None,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]

Fits a detector and generates calibration scores using a data split.

The input data x is split into a training set and a calibration set according to _calib_size. The provided detector is trained on the training set. Non-conformity scores are then computed using the trained detector on the calibration set.

If weighted is True, the indices of the calibration samples are stored in _calibration_ids. Otherwise, _calibration_ids remains None.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data.

required
detector BaseDetector

The PyOD base detector instance to train. This instance is modified in place by fitting.

required
weighted bool

If True, the indices of the calibration samples are stored. Defaults to False.

False
seed int | None

Random seed for reproducibility of the train-test split. Defaults to None.

None
iteration_callback callable

Not used in Split strategy. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], np.ndarray]: A tuple containing: * A list containing the single trained PyOD detector instance. * An array of calibration scores from the calibration set.

Source code in nonconform/strategy/split.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    weighted: bool = False,
    seed: int | None = None,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fits a detector and generates calibration scores using a data split.

    The input data `x` is split into a training set and a calibration
    set according to `_calib_size`. The provided `detector` is trained
    on the training set. Non-conformity scores are then computed using
    the trained detector on the calibration set.

    If `weighted` is ``True``, the indices of the calibration samples
    are stored in `_calibration_ids`. Otherwise, `_calibration_ids`
    remains ``None``.

    Args:
        x (pd.DataFrame | np.ndarray): The input data.
        detector (BaseDetector): The PyOD base detector instance to train.
            This instance is modified in place by fitting.
        weighted (bool, optional): If ``True``, the indices of the
            calibration samples are stored. Defaults to ``False``.
        seed (int | None, optional): Random seed for reproducibility of the
            train-test split. Defaults to None.
        iteration_callback (callable, optional): Not used in Split strategy.
            Defaults to None.

    Returns:
        tuple[list[BaseDetector], np.ndarray]: A tuple containing:
            * A list containing the single trained PyOD detector instance.
            * An array of calibration scores from the calibration set.
    """
    x_id = np.arange(len(x))
    train_id, calib_id = train_test_split(
        x_id, test_size=self._calib_size, shuffle=True, random_state=seed
    )

    detector.fit(x[train_id])
    calibration_set = detector.decision_function(x[calib_id])

    if weighted:
        self._calibration_ids = calib_id.tolist()  # Ensure it's a list
    else:
        self._calibration_ids = None
    return [detector], calibration_set  # Return numpy array directly

base

BaseStrategy
BaseStrategy(plus: bool = True)

Bases: ABC

Abstract base class for anomaly detection calibration strategies.

This class provides a common interface for various calibration strategies applied to anomaly detectors. Subclasses must implement the core calibration logic and define how calibration data is identified and used.

Attributes:

Name Type Description
_plus bool

A flag, typically set during initialization, that may influence calibration behavior in subclasses (e.g., by applying an adjustment).

Parameters:

Name Type Description Default
plus bool

A flag that enables the "plus" variant which maintains statistical validity by retaining calibration models for inference. Strongly recommended for proper conformal guarantees. Defaults to True.

True
Source code in nonconform/strategy/base.py
def __init__(self, plus: bool = True):
    """Initialize the base calibration strategy.

    Args:
        plus (bool, optional): A flag that enables the "plus" variant which
            maintains statistical validity by retaining calibration models for
            inference. Strongly recommended for proper conformal guarantees.
            Defaults to ``True``.
    """
    self._plus: bool = plus
    self._calibration_ids: list[int]
calibration_ids abstractmethod property
calibration_ids: list[int]

Provides the indices of the data points used for calibration.

This abstract property must be implemented by subclasses. It should return a list of integer indices identifying which samples from the original input data (provided to fit_calibrate) were selected or designated as the calibration set.

Returns:

Type Description
list[int]

List[int]: A list of integer indices for the calibration data.

Raises:

Type Description
NotImplementedError

If the subclass does not implement this property.

fit_calibrate abstractmethod
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]

Fits the detector and performs calibration.

This abstract method must be implemented by subclasses to define the specific procedure for fitting the anomaly detector (if necessary) and then calibrating it using data derived from x. Calibration often involves determining thresholds or adjusting scores.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data, which may be used for both fitting the detector and deriving calibration data.

required
detector BaseDetector

The PyOD anomaly detection model to be fitted and/or calibrated.

required
weighted bool | None

A flag indicating whether a weighted approach should be used during calibration, if applicable to the subclass implementation.

False
seed int | None

A random seed for ensuring reproducibility in stochastic parts of the fitting or calibration process. Defaults to None.

None
iteration_callback callable | None

Optional callback function for strategies that support iteration tracking. Defaults to None.

None

Raises:

Type Description
NotImplementedError

If the subclass does not implement this method.

Source code in nonconform/strategy/base.py
@abc.abstractmethod
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fits the detector and performs calibration.

    This abstract method must be implemented by subclasses to define the
    specific procedure for fitting the anomaly detector (if necessary)
    and then calibrating it using data derived from `x`. Calibration often
    involves determining thresholds or adjusting scores.

    Args:
        x (pd.DataFrame | np.ndarray): The input data, which
            may be used for both fitting the detector and deriving
            calibration data.
        detector (BaseDetector): The PyOD anomaly detection model to be
            fitted and/or calibrated.
        weighted (bool | None): A flag indicating whether a weighted
            approach should be used during calibration, if applicable to
            the subclass implementation.
        seed (int | None): A random seed for ensuring reproducibility
            in stochastic parts of the fitting or calibration process.
            Defaults to None.
        iteration_callback (callable | None): Optional callback function
            for strategies that support iteration tracking. Defaults to None.

    Raises:
        NotImplementedError: If the subclass does not implement this method.
    """
    raise NotImplementedError(
        "The fit_calibrate() method must be implemented by subclasses."
    )

cross_val

CrossValidation
CrossValidation(k: int, plus: bool = True)

Bases: BaseStrategy

Implements k-fold cross-validation for conformal anomaly detection.

This strategy splits the data into k folds and uses each fold as a calibration set while training on the remaining folds. This approach provides more robust calibration scores by utilizing all available data. The strategy can operate in two modes: 1. Standard mode: Uses a single model trained on all data for prediction 2. Plus mode: Uses an ensemble of k models, each trained on k-1 folds

Attributes:

Name Type Description
_k int

Number of folds for cross-validation

_plus bool

Whether to use the plus variant (ensemble of models)

_detector_list list[BaseDetector]

List of trained detectors

_calibration_set list[float]

List of calibration scores

_calibration_ids list[int]

Indices of samples used for calibration

Parameters:

Name Type Description Default
k int

The number of folds for cross-validation. Must be at least 2. Higher values provide more robust calibration but increase computational cost.

required
plus bool

If True, appends each fold-trained model to _detector_list, creating an ensemble. If False, _detector_list will contain one model trained on all data after calibration scores are collected. The plus variant maintains statistical validity and is strongly recommended. Defaults to True.

True
Source code in nonconform/strategy/cross_val.py
def __init__(self, k: int, plus: bool = True):
    """Initialize the CrossValidation strategy.

    Args:
        k (int): The number of folds for cross-validation. Must be at
            least 2. Higher values provide more robust calibration but
            increase computational cost.
        plus (bool, optional): If ``True``, appends each fold-trained model
            to `_detector_list`, creating an ensemble. If ``False``,
            `_detector_list` will contain one model trained on all data
            after calibration scores are collected. The plus variant
            maintains statistical validity and is strongly recommended.
            Defaults to ``True``.
    """
    super().__init__(plus)
    self._k: int = k
    self._plus: bool = plus

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        from nonconform.utils.func.logger import get_logger

        logger = get_logger("strategy.cross_val")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended "
            "for statistical guarantees."
        )

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []
calibration_ids property
calibration_ids: list[int]

Returns a copy of the list of indices from x used for calibration.

In k-fold cross-validation, every sample in the input data x is used exactly once as part of a calibration set (when its fold is the hold-out set). This property returns a list of all these indices, typically covering all indices from 0 to len(x)-1, but ordered by fold processing.

Returns:

Type Description
list[int]

list[int]: A copy of integer indices.

Note

Returns a defensive copy to prevent external modification of internal state.

k property
k: int

Returns the number of folds for cross-validation.

Returns:

Name Type Description
int int

Number of folds specified during initialization.

plus property
plus: bool

Returns whether the plus variant is enabled.

Returns:

Name Type Description
bool bool

True if using ensemble mode, False if using single model.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]

Fit and calibrate the detector using k-fold cross-validation.

This method implements the cross-validation strategy by: 1. Splitting the data into k folds 2. For each fold: - Train the detector on k-1 folds - Use the remaining fold for calibration - Store calibration scores and optionally the trained model 3. If not in plus mode, train a final model on all data

The method ensures that each sample is used exactly once for calibration, providing a more robust estimate of the calibration scores.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix of shape (n_samples, n_features).

required
detector BaseDetector

The base anomaly detector to be used.

required
weighted bool

Whether to use weighted calibration. Currently not implemented for cross-validation. Defaults to False.

False
seed int | None

Random seed for reproducibility. Defaults to None.

None
iteration_callback callable

Not used in CrossValidation strategy. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], list[float]]: A tuple containing: * List of trained detectors (either k models in plus mode or a single model in standard mode) * Array of calibration scores from all folds

Raises:

Type Description
ValueError

If k is less than 2 or if the data size is too small for the specified number of folds.

Source code in nonconform/strategy/cross_val.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fit and calibrate the detector using k-fold cross-validation.

    This method implements the cross-validation strategy by:
    1. Splitting the data into k folds
    2. For each fold:
       - Train the detector on k-1 folds
       - Use the remaining fold for calibration
       - Store calibration scores and optionally the trained model
    3. If not in plus mode, train a final model on all data

    The method ensures that each sample is used exactly once for calibration,
    providing a more robust estimate of the calibration scores.

    Args:
        x (pd.DataFrame | np.ndarray): Input data matrix of shape
            (n_samples, n_features).
        detector (BaseDetector): The base anomaly detector to be used.
        weighted (bool, optional): Whether to use weighted calibration.
            Currently not implemented for cross-validation. Defaults to False.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.
        iteration_callback (callable, optional): Not used in CrossValidation
            strategy.
            Defaults to None.

    Returns:
        tuple[list[BaseDetector], list[float]]: A tuple containing:
            * List of trained detectors (either k models in plus mode or
              a single model in standard mode)
            * Array of calibration scores from all folds

    Raises:
        ValueError: If k is less than 2 or if the data size is too small
            for the specified number of folds.
    """
    _detector = detector
    n_samples = len(x)

    # Validate k before creating KFold
    if self._k < 2:
        exc = ValueError(
            f"k must be at least 2 for k-fold cross-validation, got {self._k}"
        )
        exc.add_note(f"Received k={self._k}, which is invalid.")
        exc.add_note(
            "Cross-validation requires at least one split"
            " for training and one for calibration."
        )
        exc.add_note(
            f"With {n_samples} samples, consider k=min(10,"
            f" {n_samples // 10}) for balanced folds."
        )
        raise exc

    if n_samples < self._k:
        exc = ValueError(
            f"Not enough samples ({n_samples}) for "
            f"k-fold cross-validation with k={self._k}"
        )
        exc.add_note(
            f"Each fold needs at least 1 sample, but {n_samples} < {self._k}."
        )
        exc.add_note(
            f"Either increase your dataset size or reduce k to at most {n_samples}."
        )
        raise exc

    # Pre-allocate calibration array for efficiency
    self._calibration_set = np.empty(n_samples, dtype=np.float64)
    calibration_offset = 0

    folds = KFold(
        n_splits=self._k,
        shuffle=True,
        random_state=seed,
    )

    last_iteration_index = 0
    logger = get_logger("strategy.cross_val")
    fold_iterator = (
        tqdm(
            folds.split(x),
            total=self._k,
            desc=f"CV fold training ({self._k} folds)",
        )
        if logger.isEnabledFor(logging.INFO)
        else folds.split(x)
    )
    for i, (train_idx, calib_idx) in enumerate(fold_iterator):
        last_iteration_index = i
        self._calibration_ids.extend(calib_idx.tolist())

        model = copy(_detector)
        model = _set_params(model, seed=seed, random_iteration=True, iteration=i)
        model.fit(x[train_idx])

        if self._plus:
            self._detector_list.append(deepcopy(model))

        # Store calibration scores efficiently using pre-allocated array
        fold_scores = model.decision_function(x[calib_idx])
        n_fold_samples = len(fold_scores)
        end_idx = calibration_offset + n_fold_samples
        self._calibration_set[calibration_offset:end_idx] = fold_scores
        calibration_offset += n_fold_samples

    if not self._plus:
        model = copy(_detector)
        model = _set_params(
            model,
            seed=seed,
            random_iteration=True,
            iteration=(last_iteration_index + 1),
        )
        model.fit(x)
        self._detector_list.append(deepcopy(model))

    return self._detector_list, self._calibration_set

experimental

bootstrap
Bootstrap
Bootstrap(
    resampling_ratio: float | None = None,
    n_bootstraps: int | None = None,
    n_calib: int | None = None,
    plus: bool = True,
)

Bases: BaseStrategy

Implements bootstrap-based conformal anomaly detection.

This strategy uses bootstrap resampling to create multiple training sets and calibration sets. For each bootstrap iteration: 1. A random subset of the data is sampled with replacement for training 2. The remaining samples are used for calibration 3. Optionally, a fixed number of calibration samples can be selected

The strategy can operate in two modes: 1. Standard mode: Uses a single model trained on all data for prediction 2. Plus mode: Uses an ensemble of models, each trained on a bootstrap sample

Attributes:

Name Type Description
_resampling_ratio float

Proportion of data to use for training in each bootstrap iteration

_n_bootstraps int

Number of bootstrap iterations

_n_calib int | None

Optional fixed number of calibration samples to use

_plus bool

Whether to use the plus variant (ensemble of models)

_detector_list list[BaseDetector]

List of trained detectors

_calibration_set list[float]

List of calibration scores

_calibration_ids list[int]

Indices of samples used for calibration

Exactly two of resampling_ratio, n_bootstraps, and n_calib should be provided. The third will be calculated by _configure.

Parameters:

Name Type Description Default
resampling_ratio float | None

The proportion of data to use for training in each bootstrap. Defaults to None.

None
n_bootstraps int | None

The number of bootstrap iterations. Defaults to None.

None
n_calib int | None

The desired size of the final calibration set. If set, collected scores/IDs might be subsampled. Defaults to None.

None
plus bool

If True, appends each bootstrapped model to _detector_list. If False, _detector_list will contain one model trained on all data after calibration scores are collected. Defaults to True.

True
Source code in nonconform/strategy/experimental/bootstrap.py
def __init__(
    self,
    resampling_ratio: float | None = None,
    n_bootstraps: int | None = None,
    n_calib: int | None = None,
    plus: bool = True,
):
    """Initialize the Bootstrap strategy.

    Exactly two of `resampling_ratio`, `n_bootstraps`, and `n_calib`
    should be provided. The third will be calculated by `_configure`.

    Args:
        resampling_ratio (float | None): The proportion of
            data to use for training in each bootstrap. Defaults to ``None``.
        n_bootstraps (int | None): The number of bootstrap
            iterations. Defaults to ``None``.
        n_calib (int | None): The desired size of the final
            calibration set. If set, collected scores/IDs might be
            subsampled. Defaults to ``None``.
        plus (bool, optional): If ``True``, appends each bootstrapped model
            to `_detector_list`. If ``False``, `_detector_list` will contain
            one model trained on all data after calibration scores are
            collected. Defaults to ``True``.
    """
    super().__init__(plus)
    self._resampling_ratio: float | None = resampling_ratio
    self._n_bootstraps: int | None = n_bootstraps
    self._n_calib: int | None = n_calib
    self._plus: bool = plus

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        logger = get_logger("strategy.bootstrap")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended for validity guarantees."
        )

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []
calibration_ids property
calibration_ids: list[int]

Returns a copy of the list of indices used for calibration.

These are indices relative to the original input data x provided to :meth:fit_calibrate. The list contains indices of all out-of-bag samples encountered during bootstrap iterations. If _n_calib was set and weighted was True in fit_calibrate, this list might be a subsample of all encountered IDs, corresponding to the subsampled _calibration_set.

Returns:

Type Description
list[int]

List[int]: A copy of integer indices.

Note

Returns a defensive copy to prevent external modification of internal state.

resampling_ratio property
resampling_ratio: float

Returns the resampling ratio.

Returns:

Name Type Description
float float

Proportion of data used for training in each bootstrap iteration.

n_bootstraps property
n_bootstraps: int

Returns the number of bootstrap iterations.

Returns:

Name Type Description
int int

Number of bootstrap iterations.

n_calib property
n_calib: int

Returns the target calibration set size.

Returns:

Name Type Description
int int

Target number of calibration samples.

plus property
plus: bool

Returns whether the plus variant is enabled.

Returns:

Name Type Description
bool bool

True if using ensemble mode, False if using single model.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, ndarray], None]
    | None = None,
) -> tuple[list[BaseDetector], np.ndarray]

Fit and calibrate the detector using bootstrap resampling.

This method implements the bootstrap strategy by: 1. Creating multiple bootstrap samples of the data 2. For each bootstrap iteration: - Train the detector on the bootstrap sample - Use the out-of-bootstrap samples for calibration - Store calibration scores and optionally the trained model 3. If not in plus mode, train a final model on all data 4. Optionally subsample the calibration set to a fixed size

The method provides robust calibration scores by using multiple bootstrap iterations, which helps account for the variability in the data and model training.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix of shape (n_samples, n_features).

required
detector BaseDetector

The base anomaly detector to be used.

required
weighted bool

Whether to use weighted calibration. If True, calibration scores are weighted by their sample indices. Defaults to False.

False
seed int | None

Random seed for reproducibility. Defaults to None.

None
iteration_callback Callable[[int, ndarray], None]

Optional callback function that gets called after each bootstrap iteration with the iteration number and calibration scores. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], list[float]]: A tuple containing: * List of trained detectors (either n_bootstraps models in plus mode or a single model in standard mode) * Array of calibration scores from all bootstrap iterations

Raises:

Type Description
ValueError

If resampling_ratio is not between 0 and 1, or if n_bootstraps is less than 1, or if n_calib is less than 1 when specified.

Source code in nonconform/strategy/experimental/bootstrap.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, np.ndarray], None] | None = None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fit and calibrate the detector using bootstrap resampling.

    This method implements the bootstrap strategy by:
    1. Creating multiple bootstrap samples of the data
    2. For each bootstrap iteration:
       - Train the detector on the bootstrap sample
       - Use the out-of-bootstrap samples for calibration
       - Store calibration scores and optionally the trained model
    3. If not in plus mode, train a final model on all data
    4. Optionally subsample the calibration set to a fixed size

    The method provides robust calibration scores by using multiple
    bootstrap iterations, which helps account for the variability in
    the data and model training.

    Args:
        x (pd.DataFrame | np.ndarray): Input data matrix of shape
            (n_samples, n_features).
        detector (BaseDetector): The base anomaly detector to be used.
        weighted (bool, optional): Whether to use weighted calibration.
            If True, calibration scores are weighted by their sample
            indices. Defaults to False.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.
        iteration_callback (Callable[[int, np.ndarray], None], optional):
            Optional callback function that gets called after each bootstrap
            iteration with the iteration number and calibration scores.
            Defaults to None.

    Returns:
        tuple[list[BaseDetector], list[float]]: A tuple containing:
            * List of trained detectors (either n_bootstraps models in plus
              mode or a single model in standard mode)
            * Array of calibration scores from all bootstrap iterations

    Raises:
        ValueError: If resampling_ratio is not between 0 and 1, or if
            n_bootstraps is less than 1, or if n_calib is less than 1
            when specified.
    """
    self._configure(len(x))

    _detector = detector
    _generator = np.random.default_rng(seed)

    folds = ShuffleSplit(
        n_splits=self._n_bootstraps,
        train_size=self._resampling_ratio,
        random_state=seed,
    )

    n_folds = folds.get_n_splits()
    last_iteration_index = (
        0  # To ensure unique iteration for final model if not _plus
    )
    logger = get_logger("strategy.bootstrap")
    fold_iterator = (
        tqdm(
            folds.split(x),
            total=n_folds,
            desc=f"Bootstrap training ({n_folds} folds)",
        )
        if logger.isEnabledFor(logging.INFO)
        else folds.split(x)
    )
    for i, (train_idx, calib_idx) in enumerate(fold_iterator):
        last_iteration_index = i
        self._calibration_ids.extend(calib_idx.tolist())

        model = copy(_detector)
        model = _set_params(model, seed=seed, random_iteration=True, iteration=i)
        model.fit(x[train_idx])

        current_scores = model.decision_function(x[calib_idx])

        # Call iteration callback if provided
        if iteration_callback is not None:
            iteration_callback(i, current_scores)

        if self._plus:
            self._detector_list.append(deepcopy(model))

        # Concatenate calibration scores
        if len(self._calibration_set) == 0:
            self._calibration_set = current_scores
        else:
            self._calibration_set = np.concatenate(
                [self._calibration_set, current_scores]
            )

    if not self._plus:
        model = copy(_detector)
        model = _set_params(
            model,
            seed=seed,
            random_iteration=True,
            iteration=(last_iteration_index + 1),
        )
        model.fit(x)
        self._detector_list.append(deepcopy(model))

    if self._n_calib is not None and self._n_calib < len(self._calibration_set):
        ids = _generator.choice(
            len(self._calibration_set), size=self._n_calib, replace=False
        )
        self._calibration_set = self._calibration_set[ids]
        if weighted:
            self._calibration_ids = [self._calibration_ids[i] for i in ids]

    return self._detector_list, self._calibration_set
randomized
Randomized
Randomized(
    n_iterations: int | None = None,
    n_calib: int | None = None,
    sampling_distr: Distribution = Distribution.UNIFORM,
    holdout_size_range: tuple[float, float] | None = None,
    beta_params: tuple[float, float] | None = None,
    grid_probs: tuple[list[int], list[float]] | None = None,
    plus: bool = True,
)

Bases: BaseStrategy

Implements randomized leave-p-out (rLpO) conformal anomaly detection.

This strategy uses randomized leave-p-out resampling where on each iteration a validation set size p is drawn at random, then a size-p validation set is sampled without replacement, the detector is trained on the rest, and calibration scores are computed. This approach smoothly interpolates between leave-one-out (p=1) and larger holdout strategies.

The strategy can operate in two modes: 1. Standard mode: Uses a single model trained on all data for prediction 2. Plus mode: Uses an ensemble of models, each trained on a different subset

Attributes:

Name Type Description
_sampling_distr Distribution

Distribution type for drawing holdout sizes

_n_iterations int | None

Number of rLpO iterations

_holdout_size_range tuple

Range of holdout sizes (relative or absolute)

_beta_params tuple

Alpha and beta parameters for beta distribution

_grid_probs tuple

Holdout sizes and probabilities for grid distribution

_n_calib int | None

Target number of calibration samples

_use_n_calib_mode bool

Whether to use n_calib mode vs n_iterations mode

_plus bool

Whether to use the plus variant (ensemble of models)

_detector_list list[BaseDetector]

List of trained detectors

_calibration_set list[float]

List of calibration scores

_calibration_ids list[int]

Indices of samples used for calibration

Parameters:

Name Type Description Default
n_iterations int | None

Number of rLpO iterations to perform. Cannot be used together with n_calib. Defaults to None.

None
n_calib int | None

Target number of calibration samples. Iterations will stop when this target is reached or exceeded, then subsample to exactly this size. Cannot be used with n_iterations. Defaults to None.

None
sampling_distr Distribution

Distribution for drawing holdout set sizes. Options: Distribution.BETA_BINOMIAL, Distribution.UNIFORM, Distribution.GRID. Defaults to Distribution.UNIFORM.

UNIFORM
holdout_size_range tuple[float, float]

Min and max holdout set sizes. Values in ]0, 1[ are interpreted as fractions of dataset size. Values >= 1 are interpreted as absolute sample counts. If None, defaults to (0.1, 0.5) for relative sizing. Defaults to None.

None
beta_params tuple[float, float]

Alpha and beta parameters for Beta distribution used to draw holdout size fractions. If None and sampling_distr is BETA_BINOMIAL, defaults to (2.0, 5.0). Common parameterizations: - (1.0, 1.0): Uniform sampling (equivalent to UNIFORM distribution) - (2.0, 5.0): Right-skewed, favors smaller holdout sizes [DEFAULT] - (5.0, 2.0): Left-skewed, favors larger holdout sizes - (2.0, 2.0): Bell-shaped, concentrated around middle sizes - (0.5, 0.5): U-shaped, concentrated at extremes Defaults to None.

None
grid_probs tuple[list[int], list[float]]

Holdout sizes and corresponding probabilities for grid distribution. Required if sampling_distr is Distribution.GRID. Defaults to None.

None
plus bool

If True, uses ensemble of models trained on different subsets. If False, uses single model trained on all data. Defaults to True.

True

Raises:

Type Description
ValueError

If required parameters for the chosen distribution are missing, if both n_iterations and n_calib are specified, or neither.

Source code in nonconform/strategy/experimental/randomized.py
def __init__(
    self,
    n_iterations: int | None = None,
    n_calib: int | None = None,
    sampling_distr: Distribution = Distribution.UNIFORM,
    holdout_size_range: tuple[float, float] | None = None,
    beta_params: tuple[float, float] | None = None,
    grid_probs: tuple[list[int], list[float]] | None = None,
    plus: bool = True,
):
    """Initialize the RandomizedLeaveOut strategy.

    Args:
        n_iterations (int | None, optional): Number of rLpO iterations to perform.
            Cannot be used together with n_calib. Defaults to None.
        n_calib (int | None, optional): Target number of calibration samples.
            Iterations will stop when this target is reached or exceeded, then
            subsample to exactly this size. Cannot be used with n_iterations.
            Defaults to None.
        sampling_distr (Distribution, optional): Distribution for drawing holdout
            set sizes. Options: Distribution.BETA_BINOMIAL, Distribution.UNIFORM,
            Distribution.GRID. Defaults to Distribution.UNIFORM.
        holdout_size_range (tuple[float, float], optional): Min and max holdout
            set sizes. Values in ]0, 1[ are interpreted as fractions of dataset
            size. Values >= 1 are interpreted as absolute sample counts.
            If None, defaults to (0.1, 0.5) for relative sizing. Defaults to None.
        beta_params (tuple[float, float], optional): Alpha and beta parameters
            for Beta distribution used to draw holdout size fractions. If None and
            sampling_distr is BETA_BINOMIAL, defaults to (2.0, 5.0).
            Common parameterizations:
            - (1.0, 1.0): Uniform sampling (equivalent to UNIFORM distribution)
            - (2.0, 5.0): Right-skewed, favors smaller holdout sizes [DEFAULT]
            - (5.0, 2.0): Left-skewed, favors larger holdout sizes
            - (2.0, 2.0): Bell-shaped, concentrated around middle sizes
            - (0.5, 0.5): U-shaped, concentrated at extremes
            Defaults to None.
        grid_probs (tuple[list[int], list[float]], optional): Holdout sizes and
            corresponding probabilities for grid distribution. Required if
            sampling_distr is Distribution.GRID. Defaults to None.
        plus (bool, optional): If True, uses ensemble of models trained on
            different subsets. If False, uses single model trained on all data.
            Defaults to True.

    Raises:
        ValueError: If required parameters for the chosen distribution are missing,
            if both n_iterations and n_calib are specified, or neither.
    """
    super().__init__(plus)

    # Validate that exactly one of n_iterations or n_calib is specified
    if n_iterations is not None and n_calib is not None:
        logger = get_logger("strategy.randomized")
        logger.warning(
            "Both n_iterations and n_calib specified. "
            "Using n_calib and ignoring n_iterations."
        )
        n_iterations = None
    elif n_iterations is None and n_calib is None:
        raise ValueError(
            "Must specify either n_iterations or n_calib. "
            "n_iterations controls the number of random leave-p-out iterations, "
            "while n_calib sets a target number of calibration samples to collect. "
            "Example: Randomized(n_iterations=1000) or Randomized(n_calib=5000)"
        )

    if n_iterations is not None and n_iterations < 1:
        raise ValueError(
            f"n_iterations must be at least 1, got {n_iterations}. "
            f"Typical values are 100-10000 depending on dataset size."
        )
    if n_calib is not None and n_calib < 1:
        raise ValueError(
            f"n_calib must be at least 1, got {n_calib}. "
            f"Typical values are 1000-100000 depending on desired precision."
        )

    self._n_iterations: int | None = n_iterations
    self._sampling_distr: Distribution = sampling_distr
    self._holdout_size_range: tuple[float, float] | None = holdout_size_range
    self._beta_params: tuple[float, float] | None = beta_params
    self._grid_probs: tuple[list[int], list[float]] | None = grid_probs
    self._n_calib: int | None = n_calib
    self._plus: bool = plus
    self._use_n_calib_mode: bool = n_calib is not None

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        logger = get_logger("strategy.randomized")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended for validity guarantees."
        )

    # Validate distribution-specific parameters
    self._validate_distribution_params()

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []
    self._n_data: int = 0
    self._holdout_sizes: list[int] = []
    self._iteration_scores: list[list[float]] = []
    # Will be set in _configure_holdout_size_range
    self._holdout_size_range_abs: tuple[int, int] = (1, 1)
calibration_ids property
calibration_ids: list[int]

Returns a copy of the list of indices used for calibration.

These are indices relative to the original input data x provided to :meth:fit_calibrate. The list contains indices of all holdout samples encountered during rLpO iterations.

Returns:

Type Description
list[int]

list[int]: A copy of integer indices for calibration samples.

Note

Returns a defensive copy to prevent external modification of internal state.

n_iterations property
n_iterations: int | None

Returns the number of iterations.

Returns:

Type Description
int | None

int | None: Number of iterations, or None if using n_calib mode.

n_calib property
n_calib: int | None

Returns the target calibration set size.

Returns:

Type Description
int | None

int | None: Target number of calibration samples,

int | None

or None if using n_iterations mode.

sampling_distr property
sampling_distr: Distribution

Returns the sampling distribution type.

Returns:

Name Type Description
Distribution Distribution

Distribution used for drawing holdout sizes.

holdout_size_range property
holdout_size_range: tuple[float, float]

Returns the holdout size range.

Returns:

Type Description
tuple[float, float]

tuple[float, float]: Min and max holdout set sizes.

beta_params property
beta_params: tuple[float, float] | None

Returns the beta distribution parameters.

Returns:

Type Description
tuple[float, float] | None

tuple[float, float] | None: Alpha and beta parameters,

tuple[float, float] | None

or None if not using beta distribution.

grid_probs property
grid_probs: tuple[list[int], list[float]] | None

Returns the grid probabilities.

Returns:

Type Description
tuple[list[int], list[float]] | None

tuple[list[int], list[float]] | None: Holdout sizes and probabilities, or None if not using grid distribution.

plus property
plus: bool

Returns whether the plus variant is enabled.

Returns:

Name Type Description
bool bool

True if using ensemble mode, False if using single model.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, ndarray], None]
    | None = None,
    track_p_values: bool = False,
) -> tuple[list[BaseDetector], np.ndarray]

Fit and calibrate the detector using randomized leave-p-out resampling.

This method implements the rLpO strategy by: 1. For each iteration, drawing a random holdout set size 2. Sampling a holdout set of that size without replacement 3. Training the detector on the remaining samples 4. Computing calibration scores on the holdout set 5. Optionally storing the trained model (in plus mode) 6. If using n_calib mode, stopping when target calibration size is reached

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix of shape (n_samples, n_features).

required
detector BaseDetector

The base anomaly detector to be used.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Whether to store calibration sample indices. Defaults to False.

False
iteration_callback Callable[[int, ndarray], None]

Optional callback function called after each iteration with the iteration number and calibration scores. Defaults to None.

None
track_p_values bool

If True, stores the holdout sizes and per-iteration scores for performance analysis. Can be accessed via get_iteration_info(). Defaults to False.

False

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], list[float]]: A tuple containing: * List of trained detectors (either multiple models in plus mode or a single model in standard mode) * Array of calibration scores from all iterations

Raises:

Type Description
ValueError

If holdout set size would leave insufficient training data.

Source code in nonconform/strategy/experimental/randomized.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, np.ndarray], None] | None = None,
    track_p_values: bool = False,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fit and calibrate the detector using randomized leave-p-out resampling.

    This method implements the rLpO strategy by:
    1. For each iteration, drawing a random holdout set size
    2. Sampling a holdout set of that size without replacement
    3. Training the detector on the remaining samples
    4. Computing calibration scores on the holdout set
    5. Optionally storing the trained model (in plus mode)
    6. If using n_calib mode, stopping when target calibration size is reached

    Args:
        x (pd.DataFrame | np.ndarray): Input data matrix of shape
            (n_samples, n_features).
        detector (BaseDetector): The base anomaly detector to be used.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.
        weighted (bool, optional): Whether to store calibration sample indices.
            Defaults to False.
        iteration_callback (Callable[[int, np.ndarray], None], optional):
            Optional callback function called after each iteration with the
            iteration number and calibration scores. Defaults to None.
        track_p_values (bool, optional): If True, stores the holdout sizes and
            per-iteration scores for performance analysis. Can be accessed
            via get_iteration_info(). Defaults to False.

    Returns:
        tuple[list[BaseDetector], list[float]]: A tuple containing:
            * List of trained detectors (either multiple models in plus
              mode or a single model in standard mode)
            * Array of calibration scores from all iterations

    Raises:
        ValueError: If holdout set size would leave insufficient training data.
    """
    self._configure_holdout_size_range(len(x))
    self._log_configuration()

    _detector = detector
    generator = np.random.default_rng(seed)

    logger = get_logger("strategy.randomized")

    # Determine iteration strategy and progress bar setup
    if self._use_n_calib_mode:
        # Use a high iteration limit but stop when n_calib is reached
        max_iterations = 10000  # Reasonable upper bound
        base_desc = f"Randomized (target: {self._n_calib})"
        total_for_progress = self._n_calib
    else:
        max_iterations = self._n_iterations
        base_desc = f"Randomized ({self._n_iterations} iterations)"
        total_for_progress = self._n_iterations

    actual_iterations = 0
    running_holdout_sum = 0
    progress_context = (
        tqdm(total=total_for_progress, desc=base_desc)
        if logger.isEnabledFor(logging.INFO)
        else contextlib.nullcontext()
    )
    with progress_context as pbar:
        while True:
            # Check stopping condition
            if self._use_n_calib_mode:
                if len(self._calibration_set) >= self._n_calib:
                    break
                if actual_iterations >= max_iterations:
                    logger.warning(
                        f"Reached maximum iterations ({max_iterations}) "
                        f"with only {len(self._calibration_set)} samples. "
                        f"Target was {self._n_calib}."
                    )
                    break
            else:
                if actual_iterations >= self._n_iterations:
                    break

            # Draw holdout set size for this iteration
            holdout_size = self._draw_holdout_size(generator)

            # Sample holdout set without replacement
            all_indices = np.arange(self._n_data)
            calib_idx = generator.choice(
                all_indices, size=holdout_size, replace=False
            )
            train_idx = np.setdiff1d(all_indices, calib_idx)

            if len(train_idx) < 1:
                raise ValueError(
                    f"No training samples left with holdout_size={holdout_size} "
                    f"for n={self._n_data}"
                )

            # Store calibration indices
            self._calibration_ids.extend(calib_idx.tolist())

            # Train model on training set
            model = copy(_detector)
            model = _set_params(
                model, seed=seed, random_iteration=True, iteration=actual_iterations
            )
            model.fit(x[train_idx])

            # Compute calibration scores on holdout set
            current_scores = model.decision_function(x[calib_idx])

            # Call iteration callback if provided
            if iteration_callback is not None:
                iteration_callback(actual_iterations, current_scores)

            # Store model if in plus mode
            if self._plus:
                self._detector_list.append(deepcopy(model))

            # Store calibration scores
            if len(self._calibration_set) == 0:
                self._calibration_set = current_scores
            else:
                self._calibration_set = np.concatenate(
                    [self._calibration_set, current_scores]
                )

            # Track holdout sizes and per-iteration scores if requested
            if track_p_values:
                self._holdout_sizes.append(holdout_size)
                self._iteration_scores.append(current_scores.tolist())

            actual_iterations += 1
            running_holdout_sum += holdout_size
            avg_holdout = running_holdout_sum / actual_iterations

            # Update progress bar based on mode
            if pbar is not None:
                if self._use_n_calib_mode:
                    # Update progress to show current calibration samples
                    pbar.n = min(len(self._calibration_set), self._n_calib)
                    pbar.desc = (
                        f"{base_desc} | iter: {actual_iterations}, "
                        f"avg_holdout: {avg_holdout:.1f}"
                    )
                    pbar.refresh()
                else:
                    pbar.update(1)
                    pbar.desc = f"{base_desc} | avg_holdout: {avg_holdout:.1f}"

    # If not in plus mode, train final model on all data
    if not self._plus:
        final_model = copy(_detector)
        final_model = _set_params(
            final_model,
            seed=seed,
            random_iteration=True,
            iteration=actual_iterations,
        )
        final_model.fit(x)
        self._detector_list.append(deepcopy(final_model))

    # Always subsample to exact n_calib in n_calib mode
    if self._use_n_calib_mode and len(self._calibration_set) != self._n_calib:
        generator = np.random.default_rng(seed)
        if len(self._calibration_set) > self._n_calib:
            # Subsample to exact target
            ids = generator.choice(
                len(self._calibration_set), size=self._n_calib, replace=False
            )
        else:
            # We have fewer than target - use all available
            ids = list(range(len(self._calibration_set)))
            logger.warning(
                f"Only collected {len(self._calibration_set)} calibration samples, "
                f"less than target {self._n_calib}"
            )

        self._calibration_set = self._calibration_set[ids]
        if weighted:
            self._calibration_ids = [self._calibration_ids[i] for i in ids]

        # Also subsample tracking data if enabled
        if track_p_values and self._holdout_sizes:
            # For simplicity, subsample proportionally across iterations
            selected_iterations = (
                set(ids % actual_iterations) if actual_iterations > 0 else set()
            )
            self._holdout_sizes = [
                size
                for i, size in enumerate(self._holdout_sizes)
                if i in selected_iterations
            ]
            self._iteration_scores = [
                scores
                for i, scores in enumerate(self._iteration_scores)
                if i in selected_iterations
            ]

    # Log final results - only for n_iterations mode
    if not self._use_n_calib_mode:
        final_calib_size = len(self._calibration_set)
        logger.info(f"Final calibration scores: {final_calib_size:,}")

    return self._detector_list, self._calibration_set
get_iteration_info
get_iteration_info() -> (
    tuple[list[int], list[list[float]]] | None
)

Get holdout sizes and per-iteration scores if tracking was enabled.

This method provides access to the holdout set sizes used in each iteration and the corresponding anomaly scores. This information can be used for performance analysis, plotting vs. holdout size, or understanding the distribution of holdout set sizes used.

Returns:

Type Description
tuple[list[int], list[list[float]]] | None

tuple[list[int], list[list[float]]] | None: A tuple containing: * List of holdout sizes for each iteration * List of score arrays, one per iteration Returns None if track_p_values was False during fit_calibrate.

Example

from nonconform.utils.func.enums import Distribution strategy = Randomized(n_calib=1000) strategy.fit_calibrate(X, detector, track_p_values=True) holdout_sizes, scores = strategy.get_iteration_info()

holdout_sizes[i] is the holdout set size for iteration i
scores[i] are the anomaly scores for iteration i
Source code in nonconform/strategy/experimental/randomized.py
def get_iteration_info(self) -> tuple[list[int], list[list[float]]] | None:
    """Get holdout sizes and per-iteration scores if tracking was enabled.

    This method provides access to the holdout set sizes used in each
    iteration and the corresponding anomaly scores. This information can be
    used for performance analysis, plotting vs. holdout size, or understanding
    the distribution of holdout set sizes used.

    Returns:
        tuple[list[int], list[list[float]]] | None: A tuple containing:
            * List of holdout sizes for each iteration
            * List of score arrays, one per iteration
            Returns None if track_p_values was False during fit_calibrate.

    Example:
        >>> from nonconform.utils.func.enums import Distribution
        >>> strategy = Randomized(n_calib=1000)
        >>> strategy.fit_calibrate(X, detector, track_p_values=True)
        >>> holdout_sizes, scores = strategy.get_iteration_info()
        >>> # holdout_sizes[i] is the holdout set size for iteration i
        >>> # scores[i] are the anomaly scores for iteration i
    """
    if not self._holdout_sizes:  # Empty list means tracking was not enabled
        return None
    return (
        self._holdout_sizes.copy(),
        [scores.copy() for scores in self._iteration_scores],
    )

jackknife

Jackknife
Jackknife(plus: bool = True)

Bases: BaseStrategy

Jackknife (leave-one-out) conformal anomaly detection strategy.

This strategy implements conformal prediction using the jackknife method, which is a special case of k-fold cross-validation where k equals the number of samples in the dataset (leave-one-out). For each sample, a model is trained on all other samples, and the left-out sample is used for calibration.

It internally uses a :class:~nonconform.strategy.cross_val.CrossValidation strategy, dynamically setting its _k parameter to the dataset size.

Attributes:

Name Type Description
_plus bool

If True, each model trained (one for each left-out sample) is retained. If False, a single model trained on the full dataset (after leave-one-out calibration) is retained. This behavior is delegated to the internal CrossValidation strategy.

_strategy CrossValidation

An instance of the :class:~nonconform.strategy.cross_val.CrossValidation strategy, configured for leave-one-out behavior.

_calibration_ids list[int] | None

Indices of the samples from the input data x used for calibration. Populated after :meth:fit_calibrate and accessible via :attr:calibration_ids. Initially None.

_detector_list List[BaseDetector]

A list of trained detector models, populated by :meth:fit_calibrate via the internal strategy.

_calibration_set ndarray

An array of calibration scores, one for each sample, populated by :meth:fit_calibrate via the internal strategy.

Parameters:

Name Type Description Default
plus bool

If True, instructs the internal cross-validation strategy to retain all models trained during the leave-one-out process. Strongly recommended for statistical validity. Defaults to True.

True
Source code in nonconform/strategy/jackknife.py
def __init__(self, plus: bool = True):
    """Initialize the Jackknife strategy.

    Args:
        plus (bool, optional): If ``True``, instructs the internal
            cross-validation strategy to retain all models trained during
            the leave-one-out process. Strongly recommended for statistical
            validity. Defaults to ``True``.
    """
    super().__init__(plus)
    self._plus: bool = plus

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        from nonconform.utils.func.logger import get_logger

        logger = get_logger("strategy.jackknife")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended "
            "for statistical guarantees."
        )

    self._strategy: CrossValidation = CrossValidation(k=1, plus=plus)
    self._calibration_ids: list[int] | None = None

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
calibration_ids property
calibration_ids: list[int] | None

Returns a copy of indices from x used for calibration via jackknife.

These are the indices of samples used to obtain calibration scores. In jackknife (leave-one-out), each sample is used once for calibration. The list is populated after fit_calibrate is called.

Returns:

Type Description
list[int] | None

list[int] | None: A copy of integer indices, or None if fit_calibrate has not been called.

Note

Returns a defensive copy to prevent external modification of internal state.

plus property
plus: bool

Returns whether the plus variant is enabled.

Returns:

Name Type Description
bool bool

True if using ensemble mode, False if using single model.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    weighted: bool = False,
    seed: int | None = None,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]

Fits detector(s) and gets calibration scores using jackknife.

This method configures the internal :class:~nonconform.strategy.cross_val.CrossValidation strategy to perform leave-one-out cross-validation by setting its number of folds (_k) to the total number of samples in x. It then delegates the fitting and calibration process to this internal strategy.

The results (trained models and calibration scores) and calibration sample IDs are retrieved from the internal strategy.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data.

required
detector BaseDetector

The PyOD base detector instance.

required
weighted bool

Passed to the internal CrossValidation strategy's fit_calibrate method. Its effect depends on the CrossValidation implementation. Defaults to False.

False
seed int | None

Random seed, passed to the internal CrossValidation strategy for reproducibility. Defaults to None.

None
iteration_callback callable

Not used in Jackknife strategy. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], np.ndarray]: A tuple containing: * A list of trained PyOD detector models. * An array of calibration scores (one per sample in x).

Source code in nonconform/strategy/jackknife.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    weighted: bool = False,  # Parameter passed to internal strategy
    seed: int | None = None,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fits detector(s) and gets calibration scores using jackknife.

    This method configures the internal
    :class:`~nonconform.strategy.cross_val.CrossValidation` strategy to
    perform leave-one-out cross-validation by setting its number of
    folds (`_k`) to the total number of samples in `x`. It then delegates
    the fitting and calibration process to this internal strategy.

    The results (trained models and calibration scores) and calibration
    sample IDs are retrieved from the internal strategy.

    Args:
        x (pd.DataFrame | np.ndarray): The input data.
        detector (BaseDetector): The PyOD base detector instance.
        weighted (bool, optional): Passed to the internal `CrossValidation`
            strategy's `fit_calibrate` method. Its effect depends on the
            `CrossValidation` implementation. Defaults to ``False``.
        seed (int | None, optional): Random seed, passed to the internal
            `CrossValidation` strategy for reproducibility. Defaults to None.
        iteration_callback (callable, optional): Not used in Jackknife strategy.
            Defaults to None.

    Returns:
        tuple[list[BaseDetector], np.ndarray]: A tuple containing:
            * A list of trained PyOD detector models.
            * An array of calibration scores (one per sample in `x`).
    """
    self._strategy._k = len(x)
    (
        self._detector_list,
        self._calibration_set,
    ) = self._strategy.fit_calibrate(
        x, detector, weighted, seed, iteration_callback
    )
    self._calibration_ids = self._strategy.calibration_ids
    return self._detector_list, self._calibration_set

jackknife_bootstrap

JackknifeBootstrap
JackknifeBootstrap(
    n_bootstraps: int = 100,
    aggregation_method: Aggregation = Aggregation.MEAN,
    plus: bool = True,
)

Bases: BaseStrategy

Implements Jackknife+-after-Bootstrap (JaB+) conformal anomaly detection.

This strategy implements the JaB+ method which provides predictive inference for ensemble models trained on bootstrap samples. The key insight is that JaB+ uses the out-of-bag (OOB) samples from bootstrap iterations to compute calibration scores without requiring additional model training.

The strategy can operate in two modes: 1. Plus mode (plus=True): Uses ensemble of models for prediction (recommended) 2. Standard mode (plus=False): Uses single model trained on all data

Attributes:

Name Type Description
_n_bootstraps int

Number of bootstrap iterations

_aggregation_method Aggregation

How to aggregate OOB predictions

_plus bool

Whether to use the plus variant (ensemble of models)

_detector_list list[BaseDetector]

List of trained detectors (ensemble/single)

_calibration_set list[float]

List of calibration scores from JaB+ procedure

_calibration_ids list[int]

Indices of samples used for calibration

_bootstrap_models list[BaseDetector]

Models trained on each bootstrap sample

_oob_mask ndarray

Boolean matrix of shape (n_bootstraps, n_samples) indicating out-of-bag status

Parameters:

Name Type Description Default
n_bootstraps int

Number of bootstrap iterations. Defaults to 100.

100
aggregation_method Aggregation

Method to aggregate out-of-bag predictions. Options are Aggregation.MEAN or Aggregation.MEDIAN. Defaults to Aggregation.MEAN.

MEAN
plus bool

If True, uses ensemble of bootstrap models for prediction (maintains statistical validity). If False, uses single model trained on all data. Strongly recommended to use True. Defaults to True.

True

Raises:

Type Description
ValueError

If aggregation_method is not a valid Aggregation enum value.

ValueError

If n_bootstraps is less than 1.

Source code in nonconform/strategy/jackknife_bootstrap.py
def __init__(
    self,
    n_bootstraps: int = 100,
    aggregation_method: Aggregation = Aggregation.MEAN,
    plus: bool = True,
):
    """Initialize the Bootstrap (JaB+) strategy.

    Args:
        n_bootstraps (int, optional): Number of bootstrap iterations.
            Defaults to 100.
        aggregation_method (Aggregation, optional): Method to aggregate out-of-bag
            predictions. Options are Aggregation.MEAN or Aggregation.MEDIAN.
            Defaults to Aggregation.MEAN.
        plus (bool, optional): If True, uses ensemble of bootstrap models for
            prediction (maintains statistical validity). If False, uses single
            model trained on all data. Strongly recommended to use True.
            Defaults to True.

    Raises:
        ValueError: If aggregation_method is not a valid Aggregation enum value.
        ValueError: If n_bootstraps is less than 1.
    """
    super().__init__(plus=plus)

    if n_bootstraps < 1:
        exc = ValueError(
            f"Number of bootstraps must be at least 1, got {n_bootstraps}. "
            f"Typical values are 50-200 for jackknife-after-bootstrap."
        )
        exc.add_note(f"Received n_bootstraps={n_bootstraps}, which is invalid.")
        exc.add_note(
            "Jackknife-after-Bootstrap requires at least one bootstrap iteration."
        )
        exc.add_note("Consider using n_bootstraps=100 as a balanced default.")
        raise exc
    if aggregation_method not in [Aggregation.MEAN, Aggregation.MEDIAN]:
        exc = ValueError(
            f"aggregation_method must be Aggregation.MEAN or Aggregation.MEDIAN, "
            f"got {aggregation_method}. These are the only statistically valid "
            f"methods for combining out-of-bag predictions in JackknifeBootstrap()."
        )
        exc.add_note(f"Received aggregation_method={aggregation_method}")
        exc.add_note("Valid options are: Aggregation.MEAN, Aggregation.MEDIAN")
        exc.add_note(
            "These methods ensure statistical validity of the JaB+ procedure."
        )
        raise exc

    # Warn if plus=False to alert about potential validity issues
    if not plus:
        logger = get_logger("strategy.jackknife_bootstrap")
        logger.warning(
            "Setting plus=False may compromise conformal validity. "
            "The plus variant (plus=True) is recommended "
            "for statistical guarantees."
        )

    self._n_bootstraps: int = n_bootstraps
    self._aggregation_method: Aggregation = aggregation_method

    self._detector_list: list[BaseDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []

    # Internal state for JaB+ computation
    self._bootstrap_models: list[BaseDetector] = []
    self._oob_mask: np.ndarray = np.array([])
calibration_ids property
calibration_ids: list[int]

Returns a copy of the list of indices used for calibration.

In JaB+, all original training samples contribute to calibration through the out-of-bag mechanism.

Returns:

Type Description
list[int]

list[int]: Copy of integer indices (0 to n_samples-1).

Note

Returns a defensive copy to prevent external modification of internal state.

n_bootstraps property
n_bootstraps: int

Returns the number of bootstrap iterations.

aggregation_method property
aggregation_method: Aggregation

Returns the aggregation method used for OOB predictions.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, ndarray], None]
    | None = None,
    n_jobs: int | None = None,
) -> tuple[list[BaseDetector], np.ndarray]

Fit and calibrate using Jackknife+-after-Bootstrap method.

This method implements the JaB+ algorithm: 1. Generate bootstrap samples and train models 2. For each sample, compute out-of-bag predictions 3. Aggregate OOB predictions to get calibration scores 4. Train final model on all data

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix of shape (n_samples, n_features).

required
detector BaseDetector

The base anomaly detector to be used.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Not used in JaB+ method. Defaults to False.

False
iteration_callback Callable[[int, ndarray], None]

Optional callback function that gets called after each bootstrap iteration with the iteration number and current calibration scores. Defaults to None.

None
n_jobs int

Number of parallel jobs for bootstrap training. If None, uses sequential processing. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], list[float]]: A tuple containing: * List of trained detector models (if plus=True, single if plus=False) * Array of calibration scores from JaB+ procedure

Source code in nonconform/strategy/jackknife_bootstrap.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    seed: int | None = None,
    weighted: bool = False,
    iteration_callback: Callable[[int, np.ndarray], None] | None = None,
    n_jobs: int | None = None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fit and calibrate using Jackknife+-after-Bootstrap method.

    This method implements the JaB+ algorithm:
    1. Generate bootstrap samples and train models
    2. For each sample, compute out-of-bag predictions
    3. Aggregate OOB predictions to get calibration scores
    4. Train final model on all data

    Args:
        x (pd.DataFrame | np.ndarray): Input data matrix of shape
            (n_samples, n_features).
        detector (BaseDetector): The base anomaly detector to be used.
        seed (int | None, optional): Random seed for reproducibility.
            Defaults to None.
        weighted (bool, optional): Not used in JaB+ method. Defaults to False.
        iteration_callback (Callable[[int, np.ndarray], None], optional):
            Optional callback function that gets called after each bootstrap
            iteration with the iteration number and current calibration scores.
            Defaults to None.
        n_jobs (int, optional): Number of parallel jobs for bootstrap
            training. If None, uses sequential processing. Defaults to None.

    Returns:
        tuple[list[BaseDetector], list[float]]: A tuple containing:
            * List of trained detector models (if plus=True, single if plus=False)
            * Array of calibration scores from JaB+ procedure
    """
    n_samples = len(x)
    logger = get_logger("strategy.bootstrap")
    generator = np.random.default_rng(seed)

    logger.info(
        f"Bootstrap (JaB+) Configuration:\n"
        f"  • Data: {n_samples:,} total samples\n"
        f"  • Bootstrap iterations: {self._n_bootstraps:,}\n"
        f"  • Aggregation method: {self._aggregation_method}"
    )

    # Step 1: Pre-allocate data structures and generate bootstrap samples
    self._bootstrap_models = [None] * self._n_bootstraps
    self._oob_mask = np.zeros((self._n_bootstraps, n_samples), dtype=bool)

    # Generate all bootstrap indices at once for better memory locality
    all_bootstrap_indices = generator.choice(
        n_samples, size=(self._n_bootstraps, n_samples), replace=True
    )

    # Pre-compute OOB mask efficiently
    for i in range(self._n_bootstraps):
        bootstrap_indices = all_bootstrap_indices[i]
        in_bag_mask = np.zeros(n_samples, dtype=bool)
        in_bag_mask[bootstrap_indices] = True
        self._oob_mask[i] = ~in_bag_mask

    # Train models (with optional parallelization)
    if n_jobs is None or n_jobs == 1:
        # Sequential training
        bootstrap_iterator = (
            tqdm(
                range(self._n_bootstraps),
                desc=f"Bootstrap training ({self._n_bootstraps} iterations)",
            )
            if logger.isEnabledFor(logging.INFO)
            else range(self._n_bootstraps)
        )
        for i in bootstrap_iterator:
            bootstrap_indices = all_bootstrap_indices[i]
            model = self._train_single_model(
                detector, x, bootstrap_indices, seed, i
            )
            self._bootstrap_models[i] = model
    else:
        # Parallel training
        self._train_models_parallel(
            detector, x, all_bootstrap_indices, seed, n_jobs, logger
        )

    # Step 2: Compute out-of-bag calibration scores
    oob_scores = self._compute_oob_scores(x)

    # Call iteration callback if provided
    if iteration_callback is not None:
        iteration_callback(self._n_bootstraps, oob_scores)

    self._calibration_set = oob_scores
    self._calibration_ids = list(range(n_samples))

    # Step 3: Handle plus variant
    if self._plus:
        # Plus variant: Use ensemble of bootstrap models for prediction
        self._detector_list = self._bootstrap_models.copy()
        logger.info(
            f"JaB+ calibration completed with {len(self._calibration_set)} scores "
            f"using ensemble of {len(self._bootstrap_models)} models"
        )
    else:
        # Standard variant: Train final model on all data
        final_model = deepcopy(detector)
        final_model = _set_params(
            final_model,
            seed=seed,
            random_iteration=True,
            iteration=self._n_bootstraps,
        )
        final_model.fit(x)
        self._detector_list = [final_model]
        logger.info(
            f"JaB+ calibration completed with {len(self._calibration_set)} scores "
            f"using single model trained on all data"
        )

    return self._detector_list, self._calibration_set

split

Split
Split(n_calib: float | int = 0.1)

Bases: BaseStrategy

Split conformal strategy for fast anomaly detection with statistical guarantees.

Implements the classical split conformal approach by dividing training data into separate fitting and calibration sets. This provides the fastest conformal inference at the cost of using less data for calibration compared to other strategies.

Example
from nonconform.strategy import Split

# Use 20% of data for calibration
strategy = Split(n_calib=0.2)

# Use exactly 1000 samples for calibration
strategy = Split(n_calib=1000)

Attributes:

Name Type Description
_calib_size float | int

Size or proportion of data used for calibration.

_calibration_ids list[int] | None

Indices of calibration samples (for weighted conformal).

Parameters:

Name Type Description Default
n_calib float | int

The size or proportion of the dataset to use for the calibration set. If a float, it must be between 0.0 and 1.0 (exclusive of 0.0 and 1.0 in practice for train_test_split). If an int, it's the absolute number of samples. Defaults to 0.1 (10%).

0.1
Source code in nonconform/strategy/split.py
def __init__(self, n_calib: float | int = 0.1) -> None:
    """Initialize the Split strategy.

    Args:
        n_calib (float | int): The size or proportion
            of the dataset to use for the calibration set. If a float,
            it must be between 0.0 and 1.0 (exclusive of 0.0 and 1.0
            in practice for `train_test_split`). If an int, it's the
            absolute number of samples. Defaults to ``0.1`` (10%).
    """
    super().__init__()  # `plus` is not relevant for a single split
    self._calib_size: float | int = n_calib
    self._calibration_ids: list[int] | None = None
calibration_ids property
calibration_ids: list[int] | None

Returns a copy of indices from x used for the calibration set.

This property provides the list of indices corresponding to the samples that were allocated to the calibration set during the fit_calibrate method. It will be None if fit_calibrate was called with weighted=False or if fit_calibrate has not yet been called.

Returns:

Type Description
list[int] | None

list[int] | None: A copy of integer indices, or None.

Note

Returns a defensive copy to prevent external modification of internal state.

calib_size property
calib_size: float | int

Returns the calibration size or proportion.

Returns:

Type Description
float | int

float | int: The calibration size as specified during initialization. If float (0.0-1.0), represents proportion of data. If int, represents absolute number of samples.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: BaseDetector,
    weighted: bool = False,
    seed: int | None = None,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]

Fits a detector and generates calibration scores using a data split.

The input data x is split into a training set and a calibration set according to _calib_size. The provided detector is trained on the training set. Non-conformity scores are then computed using the trained detector on the calibration set.

If weighted is True, the indices of the calibration samples are stored in _calibration_ids. Otherwise, _calibration_ids remains None.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data.

required
detector BaseDetector

The PyOD base detector instance to train. This instance is modified in place by fitting.

required
weighted bool

If True, the indices of the calibration samples are stored. Defaults to False.

False
seed int | None

Random seed for reproducibility of the train-test split. Defaults to None.

None
iteration_callback callable

Not used in Split strategy. Defaults to None.

None

Returns:

Type Description
tuple[list[BaseDetector], ndarray]

tuple[list[BaseDetector], np.ndarray]: A tuple containing: * A list containing the single trained PyOD detector instance. * An array of calibration scores from the calibration set.

Source code in nonconform/strategy/split.py
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: BaseDetector,
    weighted: bool = False,
    seed: int | None = None,
    iteration_callback=None,
) -> tuple[list[BaseDetector], np.ndarray]:
    """Fits a detector and generates calibration scores using a data split.

    The input data `x` is split into a training set and a calibration
    set according to `_calib_size`. The provided `detector` is trained
    on the training set. Non-conformity scores are then computed using
    the trained detector on the calibration set.

    If `weighted` is ``True``, the indices of the calibration samples
    are stored in `_calibration_ids`. Otherwise, `_calibration_ids`
    remains ``None``.

    Args:
        x (pd.DataFrame | np.ndarray): The input data.
        detector (BaseDetector): The PyOD base detector instance to train.
            This instance is modified in place by fitting.
        weighted (bool, optional): If ``True``, the indices of the
            calibration samples are stored. Defaults to ``False``.
        seed (int | None, optional): Random seed for reproducibility of the
            train-test split. Defaults to None.
        iteration_callback (callable, optional): Not used in Split strategy.
            Defaults to None.

    Returns:
        tuple[list[BaseDetector], np.ndarray]: A tuple containing:
            * A list containing the single trained PyOD detector instance.
            * An array of calibration scores from the calibration set.
    """
    x_id = np.arange(len(x))
    train_id, calib_id = train_test_split(
        x_id, test_size=self._calib_size, shuffle=True, random_state=seed
    )

    detector.fit(x[train_id])
    calibration_set = detector.decision_function(x[calib_id])

    if weighted:
        self._calibration_ids = calib_id.tolist()  # Ensure it's a list
    else:
        self._calibration_ids = None
    return [detector], calibration_set  # Return numpy array directly

Utils

nonconform.utils

Utility modules for nonconform.

This module provides data handling, functional programming utilities, and statistical functions used throughout the nonconform package.

data

Data utilities for nonconform.

Dataset

Bases: Enum

Available datasets for anomaly detection experiments.

This enumeration provides all built-in datasets that can be loaded using the load() function. Each dataset is preprocessed for anomaly detection tasks with normal and anomalous samples.

Usage

from nonconform.utils.data import load, Dataset df = load(Dataset.FRAUD, setup=True, seed=42)

DatasetInfo dataclass
DatasetInfo(
    name: str,
    description: str,
    filename: str,
    samples: int,
    features: int,
    anomaly_rate: float,
)

Metadata for a dataset.

clear_cache
clear_cache(
    dataset: str | None = None, all_versions: bool = False
) -> None

Clear dataset cache.

Parameters:

Name Type Description Default
dataset str | None

Specific dataset name to clear. If None, clears all.

None
all_versions bool

If True, clears cache for all dataset versions.

False

Examples:

>>> clear_cache("breast")  # Clear specific dataset
>>> clear_cache()  # Clear all datasets
>>> clear_cache(all_versions=True)  # Clear all versions
Source code in nonconform/utils/data/load.py
def clear_cache(dataset: str | None = None, all_versions: bool = False) -> None:
    """
    Clear dataset cache.

    Args:
        dataset: Specific dataset name to clear. If None, clears all.
        all_versions: If True, clears cache for all dataset versions.

    Examples:
        >>> clear_cache("breast")  # Clear specific dataset
        >>> clear_cache()  # Clear all datasets
        >>> clear_cache(all_versions=True)  # Clear all versions
    """
    _manager.clear_cache(dataset=dataset, all_versions=all_versions)
get_cache_location
get_cache_location() -> str

Get the cache directory path.

Returns:

Type Description
str

String path to the cache directory.

Examples:

>>> location = get_cache_location()
>>> print(f"Cache stored at: {location}")
Source code in nonconform/utils/data/load.py
def get_cache_location() -> str:
    """
    Get the cache directory path.

    Returns:
        String path to the cache directory.

    Examples:
        >>> location = get_cache_location()
        >>> print(f"Cache stored at: {location}")
    """
    return _manager.get_cache_location()
get_info
get_info(dataset: Dataset) -> DatasetInfo

Get detailed metadata for a specific dataset.

Parameters:

Name Type Description Default
dataset Dataset

The dataset to get info for (use Dataset enum values).

required

Returns:

Type Description
DatasetInfo

DatasetInfo object with dataset metadata.

Examples:

>>> from nonconform.utils.data import Dataset
>>> info = get_info(Dataset.BREAST)
>>> print(info.description)
Source code in nonconform/utils/data/load.py
def get_info(dataset: Dataset) -> DatasetInfo:
    """
    Get detailed metadata for a specific dataset.

    Args:
        dataset: The dataset to get info for (use Dataset enum values).

    Returns:
        DatasetInfo object with dataset metadata.

    Examples:
        >>> from nonconform.utils.data import Dataset
        >>> info = get_info(Dataset.BREAST)
        >>> print(info.description)
    """
    return _manager.get_info(dataset)
list_available
list_available() -> list[str]

Get a list of all available dataset names.

Returns:

Type Description
list[str]

Sorted list of dataset names.

Examples:

>>> datasets = list_available()
>>> print(datasets)
['breast', 'fraud', 'ionosphere', ...]
Source code in nonconform/utils/data/load.py
def list_available() -> list[str]:
    """
    Get a list of all available dataset names.

    Returns:
        Sorted list of dataset names.

    Examples:
        >>> datasets = list_available()
        >>> print(datasets)
        ['breast', 'fraud', 'ionosphere', ...]
    """
    return _manager.list_available()
generator

Data generators for conformal anomaly detection.

This module provides batch and online data generators for streaming and batch processing scenarios in conformal anomaly detection.

BaseDataGenerator
BaseDataGenerator(
    load_data_func: Callable[[], DataFrame],
    anomaly_proportion: float,
    anomaly_mode: Literal[
        "proportional", "probabilistic"
    ] = "proportional",
    n_batches: int | None = None,
    train_size: float = 0.5,
    seed: int | None = None,
)

Bases: ABC

Abstract base class for data generators with anomaly contamination.

This class defines the interface for generating data with controlled anomaly contamination. It supports both batch and online generation modes with different anomaly proportion control strategies.

load_data_func : Callable[[], pd.DataFrame] Function from nonconform.utils.data.load (e.g., load_shuttle, load_breast). anomaly_proportion : float Target proportion of anomalies (0.0 to 1.0). anomaly_mode : {"proportional", "probabilistic"}, default="proportional" How to control anomaly proportions: - "proportional": Fixed proportion per batch/instance - "probabilistic": Probabilistic with global target over all items n_batches : int, optional Number of batches/instances for "probabilistic" mode. Required when anomaly_mode="probabilistic". train_size : float, default=0.5 Proportion of normal instances to use for training. seed : int, optional Seed for random number generator.

x_train : pd.DataFrame Training data (normal instances only). x_normal : pd.DataFrame Normal instances for generation. x_anomaly : pd.DataFrame Anomalous instances for generation. n_normal : int Number of normal instances available. n_anomaly : int Number of anomalous instances available. rng : np.random.Generator Random number generator.

Source code in nonconform/utils/data/generator/base.py
def __init__(
    self,
    load_data_func: Callable[[], pd.DataFrame],
    anomaly_proportion: float,
    anomaly_mode: Literal["proportional", "probabilistic"] = "proportional",
    n_batches: int | None = None,
    train_size: float = 0.5,
    seed: int | None = None,
) -> None:
    """Initialize the base data generator."""
    self.load_data_func = load_data_func
    self.anomaly_proportion = anomaly_proportion
    self.anomaly_mode = anomaly_mode
    self.n_batches = n_batches
    self.train_size = train_size
    self.seed = seed

    # Initialize random number generator
    self.rng = np.random.default_rng(seed)

    # Validate configuration
    self._validate_config()

    # Load and prepare data
    self._prepare_data()

    # Initialize anomaly tracking for probabilistic mode
    if anomaly_mode == "probabilistic":
        self._init_probabilistic_tracking()
get_training_data
get_training_data() -> pd.DataFrame

Get training data (normal instances only).

pd.DataFrame Training data without anomalies.

Source code in nonconform/utils/data/generator/base.py
def get_training_data(self) -> pd.DataFrame:
    """Get training data (normal instances only).

    Returns:
    pd.DataFrame
        Training data without anomalies.
    """
    return self.x_train
reset
reset() -> None

Reset the generator to initial state.

Source code in nonconform/utils/data/generator/base.py
def reset(self) -> None:
    """Reset the generator to initial state."""
    self.rng = np.random.default_rng(self.seed)
    if self.anomaly_mode == "probabilistic":
        self._current_anomalies = 0
        self._items_generated = 0
generate abstractmethod
generate(**kwargs) -> Iterator[Any]

Generate data items.

This method must be implemented by subclasses to define the specific generation behavior (batch vs online).

Source code in nonconform/utils/data/generator/base.py
@abstractmethod
def generate(self, **kwargs) -> Iterator[Any]:
    """Generate data items.

    This method must be implemented by subclasses to define
    the specific generation behavior (batch vs online).
    """
    pass
BatchGenerator
BatchGenerator(
    load_data_func: Callable[[], DataFrame],
    batch_size: int,
    anomaly_proportion: float,
    anomaly_mode: Literal[
        "proportional", "probabilistic"
    ] = "proportional",
    n_batches: int | None = None,
    train_size: float = 0.5,
    seed: int | None = None,
)

Bases: BaseDataGenerator

Generate batches with configurable anomaly contamination.

load_data_func : Callable[[], pd.DataFrame] Function from nonconform.utils.data.load (e.g., load_shuttle). batch_size : int Number of instances per batch. anomaly_proportion : float Target proportion of anomalies (0.0 to 1.0). anomaly_mode : {"proportional", "probabilistic"}, default="proportional" How to control anomaly proportions. n_batches : int, optional Number of batches to generate. - Required for "probabilistic" mode - Optional for "proportional" mode (if None, generates indefinitely) train_size : float, default=0.5 Proportion of normal instances to use for training. seed : int, optional Seed for random number generator.

Examples:

from nonconform.utils.data.load import load_shuttle from nonconform.utils.data.generator import BatchGenerator

Proportional mode - 10% anomalies per batch

batch_gen = BatchGenerator( ... load_data_func=load_shuttle, batch_size=100, anomaly_proportion=0.1, seed=42 ... )

Proportional mode with limited batches - 10% anomalies for exactly 5 batches

batch_gen = BatchGenerator( ... load_data_func=load_shuttle, ... batch_size=100, ... anomaly_proportion=0.1, ... anomaly_mode="proportional", ... n_batches=5, ... seed=42, ... )

Probabilistic mode - 5% anomalies across 10 batches

batch_gen = BatchGenerator( ... load_data_func=load_shuttle, ... batch_size=100, ... anomaly_proportion=0.05, ... anomaly_mode="probabilistic", ... n_batches=10, ... seed=42, ... )

Get training data

x_train = batch_gen.get_training_data()

Generate batches (infinite for proportional mode)

for i, (x_batch, y_batch) in enumerate(batch_gen.generate()): ... print(f"Batch: {x_batch.shape}, Anomalies: {y_batch.sum()}") ... if i >= 4: # Stop after 5 batches ... break

Proportional mode with n_batches - automatic stopping after 5 batches

for x_batch, y_batch in batch_gen.generate(): ... print(f"Batch: {x_batch.shape}, Anomalies: {y_batch.sum()}")

Probabilistic mode - automatic stopping after n_batches

for x_batch, y_batch in batch_gen.generate(): ... print(f"Batch: {x_batch.shape}, Anomalies: {y_batch.sum()}")

Source code in nonconform/utils/data/generator/batch.py
def __init__(
    self,
    load_data_func: Callable[[], pd.DataFrame],
    batch_size: int,
    anomaly_proportion: float,
    anomaly_mode: Literal["proportional", "probabilistic"] = "proportional",
    n_batches: int | None = None,
    train_size: float = 0.5,
    seed: int | None = None,
) -> None:
    """Initialize the batch generator."""
    self.batch_size = batch_size

    # Validate batch size
    if batch_size <= 0:
        exc = ValueError(
            f"batch_size must be positive, got {batch_size}. "
            f"Typical values are 32-1000 depending on dataset size"
            f" and memory constraints."
        )
        exc.add_note(f"Received batch_size={batch_size}, which is invalid.")
        exc.add_note("Batch size must be a positive integer (≥ 1).")
        exc.add_note(
            "Common values: batch_size=32 (small),"
            " batch_size=100 (medium), batch_size=1000 (large)."
        )
        raise exc

    # Initialize base class
    super().__init__(
        load_data_func=load_data_func,
        anomaly_proportion=anomaly_proportion,
        anomaly_mode=anomaly_mode,
        n_batches=n_batches,
        train_size=train_size,
        seed=seed,
    )

    # Calculate anomaly count per batch for proportional mode
    if anomaly_mode == "proportional":
        self.n_anomaly_per_batch = int(batch_size * anomaly_proportion)
        self.n_normal_per_batch = batch_size - self.n_anomaly_per_batch

        # Warn if anomaly proportion truncates to zero
        if anomaly_proportion > 0 and self.n_anomaly_per_batch == 0:
            min_batch_size = int(1 / anomaly_proportion)
            logger.warning(
                f"Batch size {batch_size} with proportion {anomaly_proportion:.2%} "
                f"results in 0 anomalies per batch due to truncation. "
                f"Consider using batch_size >= {min_batch_size} "
                f"or use anomaly_mode='probabilistic' for exact global proportion."
            )

        self._validate_batch_config()
generate
generate() -> Iterator[tuple[pd.DataFrame, pd.Series]]

Generate batches with mixed normal and anomalous instances.

  • For proportional mode: generates batches indefinitely if n_batches=None, or exactly n_batches batches if specified in constructor
  • For probabilistic mode: generates exactly n_batches batches (required in constructor)

x_batch : pd.DataFrame Feature matrix for the batch. y_batch : pd.Series Labels for the batch (0=normal, 1=anomaly).

Source code in nonconform/utils/data/generator/batch.py
def generate(self) -> Iterator[tuple[pd.DataFrame, pd.Series]]:
    """Generate batches with mixed normal and anomalous instances.

    - For proportional mode: generates batches indefinitely if n_batches=None,
      or exactly n_batches batches if specified in constructor
    - For probabilistic mode: generates exactly n_batches batches
      (required in constructor)

    Yields:
    x_batch : pd.DataFrame
        Feature matrix for the batch.
    y_batch : pd.Series
        Labels for the batch (0=normal, 1=anomaly).
    """
    batch_count = 0

    # Determine stopping condition based on mode and n_batches
    def _should_continue() -> bool:
        if self.anomaly_mode == "proportional":
            # Proportional: stop when n_batches reached (if specified),
            # otherwise infinite
            return self.n_batches is None or batch_count < self.n_batches
        else:
            # Probabilistic: always stop at n_batches (required)
            return batch_count < self.n_batches

    while _should_continue():
        match self.anomaly_mode:
            case "proportional":
                # Proportional mode: exact number of anomalies per batch
                batch_data = []
                batch_labels = []

                # Generate exact number of normal instances
                for _ in range(self.n_normal_per_batch):
                    instance, label = self._sample_instance(False)
                    batch_data.append(instance)
                    batch_labels.append(label)

                # Generate exact number of anomaly instances
                for _ in range(self.n_anomaly_per_batch):
                    instance, label = self._sample_instance(True)
                    batch_data.append(instance)
                    batch_labels.append(label)

                # Combine and shuffle
                x_batch = pd.concat(batch_data, axis=0, ignore_index=True)
                y_batch = pd.Series(batch_labels, dtype=int)

                # Shuffle the batch to mix normal and anomalous instances
                shuffle_idx = self.rng.permutation(self.batch_size)
                x_batch = x_batch.iloc[shuffle_idx].reset_index(drop=True)
                y_batch = y_batch.iloc[shuffle_idx].reset_index(drop=True)

            case "probabilistic":
                # Probabilistic mode: use global tracking to ensure exact proportion
                batch_data = []
                batch_labels = []

                # Generate instances for this batch
                for _ in range(self.batch_size):
                    is_anomaly = self._should_generate_anomaly()
                    instance, label = self._sample_instance(is_anomaly)

                    batch_data.append(instance)
                    batch_labels.append(label)

                    # Update tracking
                    self._current_anomalies += label
                    self._items_generated += 1

                # Combine into batch
                x_batch = pd.concat(batch_data, axis=0, ignore_index=True)
                y_batch = pd.Series(batch_labels, dtype=int)

            case _:
                raise ValueError(f"Unknown anomaly_mode: {self.anomaly_mode}")

        yield x_batch, y_batch
        batch_count += 1
OnlineGenerator
OnlineGenerator(
    load_data_func: Callable[[], DataFrame],
    anomaly_proportion: float,
    n_instances: int,
    train_size: float = 0.5,
    seed: int | None = None,
)

Bases: BaseDataGenerator

Generate single instances with probabilistic anomaly contamination for streaming.

Online generators use probabilistic anomaly control to ensure exact global proportion over a specified number of instances.

load_data_func : Callable[[], pd.DataFrame] Function from nonconform.utils.data.load (e.g., load_shuttle). anomaly_proportion : float Target proportion of anomalies (0.0 to 1.0). n_instances : int Number of instances to ensure exact global proportion. train_size : float, default=0.5 Proportion of normal instances to use for training. seed : int, optional Seed for random number generator.

Examples:

from nonconform.utils.data.load import load_shuttle from nonconform.utils.data.generator import OnlineGenerator

Exactly 1% anomalies over 1000 instances

online_gen = OnlineGenerator( ... load_data_func=load_shuttle, ... anomaly_proportion=0.01, ... n_instances=1000, ... seed=42, ... )

Get training data

x_train = online_gen.get_training_data()

Generate instances - exactly 10 anomalies in 1000 instances

for x_instance, y_label in online_gen.generate(n_instances=1000): ... print(f"Instance: {x_instance.shape}, Label: {y_label}")

Source code in nonconform/utils/data/generator/online.py
def __init__(
    self,
    load_data_func: Callable[[], pd.DataFrame],
    anomaly_proportion: float,
    n_instances: int,
    train_size: float = 0.5,
    seed: int | None = None,
) -> None:
    """Initialize the online generator."""
    # Initialize base class with probabilistic mode
    super().__init__(
        load_data_func=load_data_func,
        anomaly_proportion=anomaly_proportion,
        anomaly_mode="probabilistic",
        n_batches=n_instances,
        train_size=train_size,
        seed=seed,
    )
generate
generate(
    n_instances: int | None = None,
) -> Iterator[tuple[pd.DataFrame, int]]

Generate stream of single instances with exact anomaly proportion.

n_instances : int, optional Number of instances to generate. If None, generates up to max_instances.

x_instance : pd.DataFrame Single instance feature vector. y_label : int Label for the instance (0=normal, 1=anomaly).

Source code in nonconform/utils/data/generator/online.py
def generate(
    self, n_instances: int | None = None
) -> Iterator[tuple[pd.DataFrame, int]]:
    """Generate stream of single instances with exact anomaly proportion.

    Parameters:
    n_instances : int, optional
        Number of instances to generate. If None, generates up to max_instances.

    Yields:
    x_instance : pd.DataFrame
        Single instance feature vector.
    y_label : int
        Label for the instance (0=normal, 1=anomaly).
    """
    # Default to n_instances if not specified
    if n_instances is None:
        n_instances = self.n_batches

    # Validate we don't exceed n_instances
    if n_instances > self.n_batches:
        raise ValueError(
            f"Requested {n_instances} instances exceeds n_instances "
            f"({self.n_batches}). Global proportion cannot be guaranteed."
        )

    instance_count = 0

    while instance_count < n_instances:
        # Determine if this instance should be anomaly using global tracking
        is_anomaly = self._should_generate_anomaly()

        # Sample instance
        instance, label = self._sample_instance(is_anomaly)

        # Update tracking
        self._current_anomalies += label
        self._items_generated += 1

        yield instance, label
        instance_count += 1
base

Abstract base class for data generators with anomaly contamination control.

BaseDataGenerator
BaseDataGenerator(
    load_data_func: Callable[[], DataFrame],
    anomaly_proportion: float,
    anomaly_mode: Literal[
        "proportional", "probabilistic"
    ] = "proportional",
    n_batches: int | None = None,
    train_size: float = 0.5,
    seed: int | None = None,
)

Bases: ABC

Abstract base class for data generators with anomaly contamination.

This class defines the interface for generating data with controlled anomaly contamination. It supports both batch and online generation modes with different anomaly proportion control strategies.

load_data_func : Callable[[], pd.DataFrame] Function from nonconform.utils.data.load (e.g., load_shuttle, load_breast). anomaly_proportion : float Target proportion of anomalies (0.0 to 1.0). anomaly_mode : {"proportional", "probabilistic"}, default="proportional" How to control anomaly proportions: - "proportional": Fixed proportion per batch/instance - "probabilistic": Probabilistic with global target over all items n_batches : int, optional Number of batches/instances for "probabilistic" mode. Required when anomaly_mode="probabilistic". train_size : float, default=0.5 Proportion of normal instances to use for training. seed : int, optional Seed for random number generator.

x_train : pd.DataFrame Training data (normal instances only). x_normal : pd.DataFrame Normal instances for generation. x_anomaly : pd.DataFrame Anomalous instances for generation. n_normal : int Number of normal instances available. n_anomaly : int Number of anomalous instances available. rng : np.random.Generator Random number generator.

Source code in nonconform/utils/data/generator/base.py
def __init__(
    self,
    load_data_func: Callable[[], pd.DataFrame],
    anomaly_proportion: float,
    anomaly_mode: Literal["proportional", "probabilistic"] = "proportional",
    n_batches: int | None = None,
    train_size: float = 0.5,
    seed: int | None = None,
) -> None:
    """Initialize the base data generator."""
    self.load_data_func = load_data_func
    self.anomaly_proportion = anomaly_proportion
    self.anomaly_mode = anomaly_mode
    self.n_batches = n_batches
    self.train_size = train_size
    self.seed = seed

    # Initialize random number generator
    self.rng = np.random.default_rng(seed)

    # Validate configuration
    self._validate_config()

    # Load and prepare data
    self._prepare_data()

    # Initialize anomaly tracking for probabilistic mode
    if anomaly_mode == "probabilistic":
        self._init_probabilistic_tracking()
get_training_data
get_training_data() -> pd.DataFrame

Get training data (normal instances only).

pd.DataFrame Training data without anomalies.

Source code in nonconform/utils/data/generator/base.py
def get_training_data(self) -> pd.DataFrame:
    """Get training data (normal instances only).

    Returns:
    pd.DataFrame
        Training data without anomalies.
    """
    return self.x_train
reset
reset() -> None

Reset the generator to initial state.

Source code in nonconform/utils/data/generator/base.py
def reset(self) -> None:
    """Reset the generator to initial state."""
    self.rng = np.random.default_rng(self.seed)
    if self.anomaly_mode == "probabilistic":
        self._current_anomalies = 0
        self._items_generated = 0
generate abstractmethod
generate(**kwargs) -> Iterator[Any]

Generate data items.

This method must be implemented by subclasses to define the specific generation behavior (batch vs online).

Source code in nonconform/utils/data/generator/base.py
@abstractmethod
def generate(self, **kwargs) -> Iterator[Any]:
    """Generate data items.

    This method must be implemented by subclasses to define
    the specific generation behavior (batch vs online).
    """
    pass
batch
BatchGenerator
BatchGenerator(
    load_data_func: Callable[[], DataFrame],
    batch_size: int,
    anomaly_proportion: float,
    anomaly_mode: Literal[
        "proportional", "probabilistic"
    ] = "proportional",
    n_batches: int | None = None,
    train_size: float = 0.5,
    seed: int | None = None,
)

Bases: BaseDataGenerator

Generate batches with configurable anomaly contamination.

load_data_func : Callable[[], pd.DataFrame] Function from nonconform.utils.data.load (e.g., load_shuttle). batch_size : int Number of instances per batch. anomaly_proportion : float Target proportion of anomalies (0.0 to 1.0). anomaly_mode : {"proportional", "probabilistic"}, default="proportional" How to control anomaly proportions. n_batches : int, optional Number of batches to generate. - Required for "probabilistic" mode - Optional for "proportional" mode (if None, generates indefinitely) train_size : float, default=0.5 Proportion of normal instances to use for training. seed : int, optional Seed for random number generator.

Examples:

from nonconform.utils.data.load import load_shuttle from nonconform.utils.data.generator import BatchGenerator

Proportional mode - 10% anomalies per batch

batch_gen = BatchGenerator( ... load_data_func=load_shuttle, batch_size=100, anomaly_proportion=0.1, seed=42 ... )

Proportional mode with limited batches - 10% anomalies for exactly 5 batches

batch_gen = BatchGenerator( ... load_data_func=load_shuttle, ... batch_size=100, ... anomaly_proportion=0.1, ... anomaly_mode="proportional", ... n_batches=5, ... seed=42, ... )

Probabilistic mode - 5% anomalies across 10 batches

batch_gen = BatchGenerator( ... load_data_func=load_shuttle, ... batch_size=100, ... anomaly_proportion=0.05, ... anomaly_mode="probabilistic", ... n_batches=10, ... seed=42, ... )

Get training data

x_train = batch_gen.get_training_data()

Generate batches (infinite for proportional mode)

for i, (x_batch, y_batch) in enumerate(batch_gen.generate()): ... print(f"Batch: {x_batch.shape}, Anomalies: {y_batch.sum()}") ... if i >= 4: # Stop after 5 batches ... break

Proportional mode with n_batches - automatic stopping after 5 batches

for x_batch, y_batch in batch_gen.generate(): ... print(f"Batch: {x_batch.shape}, Anomalies: {y_batch.sum()}")

Probabilistic mode - automatic stopping after n_batches

for x_batch, y_batch in batch_gen.generate(): ... print(f"Batch: {x_batch.shape}, Anomalies: {y_batch.sum()}")

Source code in nonconform/utils/data/generator/batch.py
def __init__(
    self,
    load_data_func: Callable[[], pd.DataFrame],
    batch_size: int,
    anomaly_proportion: float,
    anomaly_mode: Literal["proportional", "probabilistic"] = "proportional",
    n_batches: int | None = None,
    train_size: float = 0.5,
    seed: int | None = None,
) -> None:
    """Initialize the batch generator."""
    self.batch_size = batch_size

    # Validate batch size
    if batch_size <= 0:
        exc = ValueError(
            f"batch_size must be positive, got {batch_size}. "
            f"Typical values are 32-1000 depending on dataset size"
            f" and memory constraints."
        )
        exc.add_note(f"Received batch_size={batch_size}, which is invalid.")
        exc.add_note("Batch size must be a positive integer (≥ 1).")
        exc.add_note(
            "Common values: batch_size=32 (small),"
            " batch_size=100 (medium), batch_size=1000 (large)."
        )
        raise exc

    # Initialize base class
    super().__init__(
        load_data_func=load_data_func,
        anomaly_proportion=anomaly_proportion,
        anomaly_mode=anomaly_mode,
        n_batches=n_batches,
        train_size=train_size,
        seed=seed,
    )

    # Calculate anomaly count per batch for proportional mode
    if anomaly_mode == "proportional":
        self.n_anomaly_per_batch = int(batch_size * anomaly_proportion)
        self.n_normal_per_batch = batch_size - self.n_anomaly_per_batch

        # Warn if anomaly proportion truncates to zero
        if anomaly_proportion > 0 and self.n_anomaly_per_batch == 0:
            min_batch_size = int(1 / anomaly_proportion)
            logger.warning(
                f"Batch size {batch_size} with proportion {anomaly_proportion:.2%} "
                f"results in 0 anomalies per batch due to truncation. "
                f"Consider using batch_size >= {min_batch_size} "
                f"or use anomaly_mode='probabilistic' for exact global proportion."
            )

        self._validate_batch_config()
generate
generate() -> Iterator[tuple[pd.DataFrame, pd.Series]]

Generate batches with mixed normal and anomalous instances.

  • For proportional mode: generates batches indefinitely if n_batches=None, or exactly n_batches batches if specified in constructor
  • For probabilistic mode: generates exactly n_batches batches (required in constructor)

x_batch : pd.DataFrame Feature matrix for the batch. y_batch : pd.Series Labels for the batch (0=normal, 1=anomaly).

Source code in nonconform/utils/data/generator/batch.py
def generate(self) -> Iterator[tuple[pd.DataFrame, pd.Series]]:
    """Generate batches with mixed normal and anomalous instances.

    - For proportional mode: generates batches indefinitely if n_batches=None,
      or exactly n_batches batches if specified in constructor
    - For probabilistic mode: generates exactly n_batches batches
      (required in constructor)

    Yields:
    x_batch : pd.DataFrame
        Feature matrix for the batch.
    y_batch : pd.Series
        Labels for the batch (0=normal, 1=anomaly).
    """
    batch_count = 0

    # Determine stopping condition based on mode and n_batches
    def _should_continue() -> bool:
        if self.anomaly_mode == "proportional":
            # Proportional: stop when n_batches reached (if specified),
            # otherwise infinite
            return self.n_batches is None or batch_count < self.n_batches
        else:
            # Probabilistic: always stop at n_batches (required)
            return batch_count < self.n_batches

    while _should_continue():
        match self.anomaly_mode:
            case "proportional":
                # Proportional mode: exact number of anomalies per batch
                batch_data = []
                batch_labels = []

                # Generate exact number of normal instances
                for _ in range(self.n_normal_per_batch):
                    instance, label = self._sample_instance(False)
                    batch_data.append(instance)
                    batch_labels.append(label)

                # Generate exact number of anomaly instances
                for _ in range(self.n_anomaly_per_batch):
                    instance, label = self._sample_instance(True)
                    batch_data.append(instance)
                    batch_labels.append(label)

                # Combine and shuffle
                x_batch = pd.concat(batch_data, axis=0, ignore_index=True)
                y_batch = pd.Series(batch_labels, dtype=int)

                # Shuffle the batch to mix normal and anomalous instances
                shuffle_idx = self.rng.permutation(self.batch_size)
                x_batch = x_batch.iloc[shuffle_idx].reset_index(drop=True)
                y_batch = y_batch.iloc[shuffle_idx].reset_index(drop=True)

            case "probabilistic":
                # Probabilistic mode: use global tracking to ensure exact proportion
                batch_data = []
                batch_labels = []

                # Generate instances for this batch
                for _ in range(self.batch_size):
                    is_anomaly = self._should_generate_anomaly()
                    instance, label = self._sample_instance(is_anomaly)

                    batch_data.append(instance)
                    batch_labels.append(label)

                    # Update tracking
                    self._current_anomalies += label
                    self._items_generated += 1

                # Combine into batch
                x_batch = pd.concat(batch_data, axis=0, ignore_index=True)
                y_batch = pd.Series(batch_labels, dtype=int)

            case _:
                raise ValueError(f"Unknown anomaly_mode: {self.anomaly_mode}")

        yield x_batch, y_batch
        batch_count += 1
online
OnlineGenerator
OnlineGenerator(
    load_data_func: Callable[[], DataFrame],
    anomaly_proportion: float,
    n_instances: int,
    train_size: float = 0.5,
    seed: int | None = None,
)

Bases: BaseDataGenerator

Generate single instances with probabilistic anomaly contamination for streaming.

Online generators use probabilistic anomaly control to ensure exact global proportion over a specified number of instances.

load_data_func : Callable[[], pd.DataFrame] Function from nonconform.utils.data.load (e.g., load_shuttle). anomaly_proportion : float Target proportion of anomalies (0.0 to 1.0). n_instances : int Number of instances to ensure exact global proportion. train_size : float, default=0.5 Proportion of normal instances to use for training. seed : int, optional Seed for random number generator.

Examples:

from nonconform.utils.data.load import load_shuttle from nonconform.utils.data.generator import OnlineGenerator

Exactly 1% anomalies over 1000 instances

online_gen = OnlineGenerator( ... load_data_func=load_shuttle, ... anomaly_proportion=0.01, ... n_instances=1000, ... seed=42, ... )

Get training data

x_train = online_gen.get_training_data()

Generate instances - exactly 10 anomalies in 1000 instances

for x_instance, y_label in online_gen.generate(n_instances=1000): ... print(f"Instance: {x_instance.shape}, Label: {y_label}")

Source code in nonconform/utils/data/generator/online.py
def __init__(
    self,
    load_data_func: Callable[[], pd.DataFrame],
    anomaly_proportion: float,
    n_instances: int,
    train_size: float = 0.5,
    seed: int | None = None,
) -> None:
    """Initialize the online generator."""
    # Initialize base class with probabilistic mode
    super().__init__(
        load_data_func=load_data_func,
        anomaly_proportion=anomaly_proportion,
        anomaly_mode="probabilistic",
        n_batches=n_instances,
        train_size=train_size,
        seed=seed,
    )
generate
generate(
    n_instances: int | None = None,
) -> Iterator[tuple[pd.DataFrame, int]]

Generate stream of single instances with exact anomaly proportion.

n_instances : int, optional Number of instances to generate. If None, generates up to max_instances.

x_instance : pd.DataFrame Single instance feature vector. y_label : int Label for the instance (0=normal, 1=anomaly).

Source code in nonconform/utils/data/generator/online.py
def generate(
    self, n_instances: int | None = None
) -> Iterator[tuple[pd.DataFrame, int]]:
    """Generate stream of single instances with exact anomaly proportion.

    Parameters:
    n_instances : int, optional
        Number of instances to generate. If None, generates up to max_instances.

    Yields:
    x_instance : pd.DataFrame
        Single instance feature vector.
    y_label : int
        Label for the instance (0=normal, 1=anomaly).
    """
    # Default to n_instances if not specified
    if n_instances is None:
        n_instances = self.n_batches

    # Validate we don't exceed n_instances
    if n_instances > self.n_batches:
        raise ValueError(
            f"Requested {n_instances} instances exceeds n_instances "
            f"({self.n_batches}). Global proportion cannot be guaranteed."
        )

    instance_count = 0

    while instance_count < n_instances:
        # Determine if this instance should be anomaly using global tracking
        is_anomaly = self._should_generate_anomaly()

        # Sample instance
        instance, label = self._sample_instance(is_anomaly)

        # Update tracking
        self._current_anomalies += label
        self._items_generated += 1

        yield instance, label
        instance_count += 1
load

Modern dataset loading module with DatasetManager architecture.

DatasetManager
DatasetManager()

Manages dataset loading, caching, and metadata.

Source code in nonconform/utils/data/load.py
def __init__(self) -> None:
    """Initialize the DatasetManager with configuration."""
    self.version: str = os.environ.get("UNQUAD_DATASET_VERSION", "v0.9.17-datasets")
    base_repo_url = (
        "https://github.com/OliverHennhoefer/nonconform/releases/download/"
    )
    self.base_url: str = os.environ.get(
        "UNQUAD_DATASET_URL",
        urljoin(base_repo_url, quote(self.version, safe="") + "/"),
    )
    self.suffix: str = ".npz"
    self._memory_cache: OrderedDict[str, bytes] = OrderedDict()
    self.max_cache_size: int = 16  # Limit memory cache to 16 datasets
    self._cache_dir: Path | None = None
cache_dir property
cache_dir: Path

Get cache directory, creating it lazily.

memory_cache_size property
memory_cache_size: int

Returns the number of datasets cached in memory.

Returns:

Name Type Description
int int

Number of datasets currently in memory cache.

is_cache_enabled property
is_cache_enabled: bool

Returns whether disk caching is enabled.

Returns:

Name Type Description
bool bool

True if cache directory exists and is writable.

load
load(
    dataset: Dataset,
    setup: bool = False,
    seed: int | None = None,
) -> (
    pd.DataFrame
    | tuple[pd.DataFrame, pd.DataFrame, pd.Series]
)

Load a dataset by enum value.

Parameters:

Name Type Description Default
dataset Dataset

The dataset to load (use Dataset enum values).

required
setup bool

If True, splits the data into training and testing sets for anomaly detection tasks.

False
seed int | None

Random seed for data splitting if setup is True.

None

Returns:

Type Description
DataFrame | tuple[DataFrame, DataFrame, Series]

If setup is False, returns the complete dataset as a DataFrame.

DataFrame | tuple[DataFrame, DataFrame, Series]

If setup is True, returns a tuple: (x_train, x_test, y_test).

Raises:

Type Description
ValueError

If the dataset is not found in the registry.

URLError

If dataset download fails.

Source code in nonconform/utils/data/load.py
def load(
    self, dataset: Dataset, setup: bool = False, seed: int | None = None
) -> pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame, pd.Series]:
    """
    Load a dataset by enum value.

    Args:
        dataset: The dataset to load (use Dataset enum values).
        setup: If True, splits the data into training and testing sets
               for anomaly detection tasks.
        seed: Random seed for data splitting if setup is True.

    Returns:
        If setup is False, returns the complete dataset as a DataFrame.
        If setup is True, returns a tuple: (x_train, x_test, y_test).

    Raises:
        ValueError: If the dataset is not found in the registry.
        URLError: If dataset download fails.
    """
    name = dataset.value  # Extract string value from enum

    if name not in DATASET_REGISTRY:
        available = ", ".join(sorted(DATASET_REGISTRY.keys()))
        raise ValueError(
            f"Dataset '{name}' not found. Available datasets: {available}"
        )

    filename = DATASET_REGISTRY[name].filename

    # Download or retrieve from cache
    data_bytes = self._download(filename)

    # Load NPZ file from bytes
    buffer = io.BytesIO(data_bytes)
    npz_file = np.load(buffer)

    # Extract data and labels
    data = npz_file["X"]
    labels = npz_file["y"]

    # Convert integer types to float32 for PyOD compatibility
    if data.dtype in [
        np.int8,
        np.int16,
        np.int32,
        np.int64,
        np.uint8,
        np.uint16,
        np.uint32,
        np.uint64,
    ]:
        data = data.astype(np.float32)

    # Create DataFrame with programmatic column names
    column_names = [f"V{i + 1}" for i in range(data.shape[1])]
    df = pd.DataFrame(data, columns=column_names)
    df["Class"] = labels

    if setup:
        return self._create_setup(df, seed)

    return df
clear_cache
clear_cache(
    dataset: str | None = None, all_versions: bool = False
) -> None

Clear dataset cache.

Parameters:

Name Type Description Default
dataset str | None

Specific dataset name to clear. If None, clears all.

None
all_versions bool

If True, clears cache for all dataset versions.

False
Source code in nonconform/utils/data/load.py
def clear_cache(
    self, dataset: str | None = None, all_versions: bool = False
) -> None:
    """
    Clear dataset cache.

    Args:
        dataset: Specific dataset name to clear. If None, clears all.
        all_versions: If True, clears cache for all dataset versions.
    """
    if all_versions:
        # Clear entire cache directory (all versions)
        cache_root = self.cache_dir.parent
        if cache_root.exists():
            try:
                shutil.rmtree(cache_root)
                logger.info("Cleared all dataset cache (all versions)")
            except PermissionError:
                logger.warning("Could not clear all cache due to file permissions")
        self._memory_cache.clear()
        return

    if dataset is not None:
        # Clear specific dataset
        filename = f"{dataset}{self.suffix}"

        # Remove from memory cache
        self._memory_cache.pop(filename, None)

        # Remove from disk cache
        cache_file = self.cache_dir / filename
        if cache_file.exists():
            cache_file.unlink()
            logger.info(f"Cleared cache for dataset: {dataset}")
        else:
            logger.info(f"No cache found for dataset: {dataset}")
    else:
        # Clear all datasets for current version
        if self.cache_dir.exists():
            try:
                shutil.rmtree(self.cache_dir)
                logger.info(f"Cleared all dataset cache (v{self.version})")
            except PermissionError:
                logger.warning(
                    f"Could not clear cache directory (v{self.version}) "
                    f"due to file permissions"
                )
        self._memory_cache.clear()
list_available
list_available() -> list[str]

Get a list of all available dataset names.

Returns:

Type Description
list[str]

Sorted list of dataset names.

Source code in nonconform/utils/data/load.py
def list_available(self) -> list[str]:
    """
    Get a list of all available dataset names.

    Returns:
        Sorted list of dataset names.
    """
    return sorted(DATASET_REGISTRY.keys())
get_info
get_info(dataset: Dataset) -> DatasetInfo

Get metadata for a specific dataset.

Parameters:

Name Type Description Default
dataset Dataset

The dataset to get info for (use Dataset enum values).

required

Returns:

Type Description
DatasetInfo

DatasetInfo object with dataset metadata.

Raises:

Type Description
ValueError

If the dataset is not found.

Source code in nonconform/utils/data/load.py
def get_info(self, dataset: Dataset) -> DatasetInfo:
    """
    Get metadata for a specific dataset.

    Args:
        dataset: The dataset to get info for (use Dataset enum values).

    Returns:
        DatasetInfo object with dataset metadata.

    Raises:
        ValueError: If the dataset is not found.
    """
    name = dataset.value  # Extract string value from enum

    if name not in DATASET_REGISTRY:
        available = ", ".join(sorted(DATASET_REGISTRY.keys()))
        raise ValueError(
            f"Dataset '{name}' not found. Available datasets: {available}"
        )
    return DATASET_REGISTRY[name]
get_cache_location
get_cache_location() -> str

Get the cache directory path.

Returns:

Type Description
str

String path to the cache directory.

Source code in nonconform/utils/data/load.py
def get_cache_location(self) -> str:
    """
    Get the cache directory path.

    Returns:
        String path to the cache directory.
    """
    return str(self.cache_dir)
load
load(
    dataset: Dataset,
    setup: bool = False,
    seed: int | None = None,
) -> (
    pd.DataFrame
    | tuple[pd.DataFrame, pd.DataFrame, pd.Series]
)

Load a benchmark anomaly detection dataset.

Provides access to curated datasets commonly used for anomaly detection research. Datasets are automatically downloaded and cached locally for efficient reuse.

Parameters:

Name Type Description Default
dataset Dataset

Dataset to load using Dataset enum (e.g., Dataset.SHUTTLE, ...).

required
setup bool

If True, automatically splits data for anomaly detection workflow. Returns (x_train, x_test, y_test), x_train contains only normal samples.

False
seed int | None

Random seed for reproducible train/test splitting when setup=True.

None

Returns:

Type Description
DataFrame | tuple[DataFrame, DataFrame, Series]
  • If setup=False: Complete dataset as pd.DataFrame with 'label' column
DataFrame | tuple[DataFrame, DataFrame, Series]
  • If setup=True: Tuple of (x_train, x_test, y_test) where:
  • x_train: Normal samples for training (features only)
  • x_test: Mixed test samples (features only)
  • y_test: True labels for test samples (0=normal, 1=anomaly)

Examples:

Load complete dataset for exploration:

from nonconform.utils.data import load, Dataset

# Load full dataset with labels
df = load(Dataset.MAMMOGRAPHY)
print(f"Dataset shape: {df.shape}")
print(f"Anomaly rate: {df['label'].mean():.1%}")

Load split data ready for conformal detection:

# Get training/test split for anomaly detection
x_train, x_test, y_test = load(Dataset.SHUTTLE, setup=True, seed=42)

# x_train contains only normal samples for detector training
print(f"Training samples: {len(x_train)} (all normal)")
print(f"Test samples: {len(x_test)} ({np.sum(y_test)} anomalies)")

Available Datasets

Use list_available() to see all available datasets, or check enum values: Dataset.MAMMOGRAPHY, Dataset.SHUTTLE, Dataset.FRAUD, etc.

Source code in nonconform/utils/data/load.py
def load(
    dataset: Dataset, setup: bool = False, seed: int | None = None
) -> pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame, pd.Series]:
    """Load a benchmark anomaly detection dataset.

    Provides access to curated datasets commonly used for anomaly detection research.
    Datasets are automatically downloaded and cached locally for efficient reuse.

    Args:
        dataset: Dataset to load using Dataset enum (e.g., Dataset.SHUTTLE, ...).
        setup: If True, automatically splits data for anomaly detection workflow.
               Returns (x_train, x_test, y_test), x_train contains only normal samples.
        seed: Random seed for reproducible train/test splitting when setup=True.

    Returns:
        - If setup=False: Complete dataset as pd.DataFrame with 'label' column
        - If setup=True: Tuple of (x_train, x_test, y_test) where:
            - x_train: Normal samples for training (features only)
            - x_test: Mixed test samples (features only)
            - y_test: True labels for test samples (0=normal, 1=anomaly)

    Examples:
        Load complete dataset for exploration:
        ```python
        from nonconform.utils.data import load, Dataset

        # Load full dataset with labels
        df = load(Dataset.MAMMOGRAPHY)
        print(f"Dataset shape: {df.shape}")
        print(f"Anomaly rate: {df['label'].mean():.1%}")
        ```

        Load split data ready for conformal detection:
        ```python
        # Get training/test split for anomaly detection
        x_train, x_test, y_test = load(Dataset.SHUTTLE, setup=True, seed=42)

        # x_train contains only normal samples for detector training
        print(f"Training samples: {len(x_train)} (all normal)")
        print(f"Test samples: {len(x_test)} ({np.sum(y_test)} anomalies)")
        ```

    Available Datasets:
        Use `list_available()` to see all available datasets, or check enum values:
        Dataset.MAMMOGRAPHY, Dataset.SHUTTLE, Dataset.FRAUD, etc.
    """
    return _manager.load(dataset, setup=setup, seed=seed)
list_available
list_available() -> list[str]

Get a list of all available dataset names.

Returns:

Type Description
list[str]

Sorted list of dataset names.

Examples:

>>> datasets = list_available()
>>> print(datasets)
['breast', 'fraud', 'ionosphere', ...]
Source code in nonconform/utils/data/load.py
def list_available() -> list[str]:
    """
    Get a list of all available dataset names.

    Returns:
        Sorted list of dataset names.

    Examples:
        >>> datasets = list_available()
        >>> print(datasets)
        ['breast', 'fraud', 'ionosphere', ...]
    """
    return _manager.list_available()
get_info
get_info(dataset: Dataset) -> DatasetInfo

Get detailed metadata for a specific dataset.

Parameters:

Name Type Description Default
dataset Dataset

The dataset to get info for (use Dataset enum values).

required

Returns:

Type Description
DatasetInfo

DatasetInfo object with dataset metadata.

Examples:

>>> from nonconform.utils.data import Dataset
>>> info = get_info(Dataset.BREAST)
>>> print(info.description)
Source code in nonconform/utils/data/load.py
def get_info(dataset: Dataset) -> DatasetInfo:
    """
    Get detailed metadata for a specific dataset.

    Args:
        dataset: The dataset to get info for (use Dataset enum values).

    Returns:
        DatasetInfo object with dataset metadata.

    Examples:
        >>> from nonconform.utils.data import Dataset
        >>> info = get_info(Dataset.BREAST)
        >>> print(info.description)
    """
    return _manager.get_info(dataset)
clear_cache
clear_cache(
    dataset: str | None = None, all_versions: bool = False
) -> None

Clear dataset cache.

Parameters:

Name Type Description Default
dataset str | None

Specific dataset name to clear. If None, clears all.

None
all_versions bool

If True, clears cache for all dataset versions.

False

Examples:

>>> clear_cache("breast")  # Clear specific dataset
>>> clear_cache()  # Clear all datasets
>>> clear_cache(all_versions=True)  # Clear all versions
Source code in nonconform/utils/data/load.py
def clear_cache(dataset: str | None = None, all_versions: bool = False) -> None:
    """
    Clear dataset cache.

    Args:
        dataset: Specific dataset name to clear. If None, clears all.
        all_versions: If True, clears cache for all dataset versions.

    Examples:
        >>> clear_cache("breast")  # Clear specific dataset
        >>> clear_cache()  # Clear all datasets
        >>> clear_cache(all_versions=True)  # Clear all versions
    """
    _manager.clear_cache(dataset=dataset, all_versions=all_versions)
get_cache_location
get_cache_location() -> str

Get the cache directory path.

Returns:

Type Description
str

String path to the cache directory.

Examples:

>>> location = get_cache_location()
>>> print(f"Cache stored at: {location}")
Source code in nonconform/utils/data/load.py
def get_cache_location() -> str:
    """
    Get the cache directory path.

    Returns:
        String path to the cache directory.

    Examples:
        >>> location = get_cache_location()
        >>> print(f"Cache stored at: {location}")
    """
    return _manager.get_cache_location()
registry

Dataset registry with metadata for all available datasets.

DatasetInfo dataclass
DatasetInfo(
    name: str,
    description: str,
    filename: str,
    samples: int,
    features: int,
    anomaly_rate: float,
)

Metadata for a dataset.

func

Functional programming utilities for nonconform.

This module provides decorators, enumerations, and parameter utilities used throughout the nonconform package.

Aggregation

Bases: Enum

Aggregation functions for combining multiple model outputs or scores.

This enumeration lists strategies for aggregating data, commonly employed in ensemble methods to combine predictions or scores from several models.

Attributes:

Name Type Description
MEAN

Represents aggregation by calculating the arithmetic mean. The underlying value is typically "mean".

MEDIAN

Represents aggregation by calculating the median. The underlying value is typically "median".

MINIMUM

Represents aggregation by selecting the minimum value. The underlying value is typically "minimum".

MAXIMUM

Represents aggregation by selecting the maximum value. The underlying value is typically "maximum".

decorator
enums
Distribution

Bases: Enum

Probability distributions for validation set sizes in randomized strategies.

This enumeration defines the available distribution types for selecting validation set sizes in randomized leave-p-out conformal prediction strategies.

Attributes:

Name Type Description
BETA_BINOMIAL

Beta-binomial distribution for drawing validation fractions. Allows tunable mean and variance through alpha/beta parameters.

UNIFORM

Discrete uniform distribution over a specified range. Simple and controlled selection within [p_min, p_max].

GRID

Discrete distribution over a specified set of values. Targeted control with custom probabilities for each p value.

Aggregation

Bases: Enum

Aggregation functions for combining multiple model outputs or scores.

This enumeration lists strategies for aggregating data, commonly employed in ensemble methods to combine predictions or scores from several models.

Attributes:

Name Type Description
MEAN

Represents aggregation by calculating the arithmetic mean. The underlying value is typically "mean".

MEDIAN

Represents aggregation by calculating the median. The underlying value is typically "median".

MINIMUM

Represents aggregation by selecting the minimum value. The underlying value is typically "minimum".

MAXIMUM

Represents aggregation by selecting the maximum value. The underlying value is typically "maximum".

Dataset

Bases: Enum

Available datasets for anomaly detection experiments.

This enumeration provides all built-in datasets that can be loaded using the load() function. Each dataset is preprocessed for anomaly detection tasks with normal and anomalous samples.

Usage

from nonconform.utils.data import load, Dataset df = load(Dataset.FRAUD, setup=True, seed=42)

logger

Logging utilities for the nonconform package.

get_logger
get_logger(name: str) -> logging.Logger

Get a logger for the nonconform package.

name : str The name of the logger, typically the module name.

logging.Logger A logger instance for the nonconform package.

Notes: This function creates loggers with the naming convention "nonconform.{name}". By default, shows INFO level and above (INFO, WARNING, ERROR, CRITICAL). Users can control verbosity with standard logging: logging.getLogger("nonconform").setLevel(level).

Examples:

logger = get_logger("estimation.standard_conformal") logger.info("Calibration completed successfully")

To silence warnings:

logging.getLogger("nonconform").setLevel(logging.ERROR)

To enable debug:

logging.getLogger("nonconform").setLevel(logging.DEBUG)

Source code in nonconform/utils/func/logger.py
def get_logger(name: str) -> logging.Logger:
    """Get a logger for the nonconform package.

    Parameters:
    name : str
        The name of the logger, typically the module name.

    Returns:
    logging.Logger
        A logger instance for the nonconform package.

    Notes:
    This function creates loggers with the naming convention "nonconform.{name}".
    By default, shows INFO level and above (INFO, WARNING, ERROR, CRITICAL).
    Users can control verbosity with standard logging:
    logging.getLogger("nonconform").setLevel(level).

    Examples:
    >>> logger = get_logger("estimation.standard_conformal")
    >>> logger.info("Calibration completed successfully")

    >>> # To silence warnings:
    >>> logging.getLogger("nonconform").setLevel(logging.ERROR)

    >>> # To enable debug:
    >>> logging.getLogger("nonconform").setLevel(logging.DEBUG)
    """
    logger = logging.getLogger(f"nonconform.{name}")

    # Configure root nonconform logger if not already done
    root_logger = logging.getLogger("nonconform")
    if not root_logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter("%(levelname)s:%(name)s:%(message)s")
        handler.setFormatter(formatter)
        root_logger.addHandler(handler)
        root_logger.setLevel(logging.INFO)  # Show INFO and above by default
        root_logger.propagate = False

    return logger
params

Manages and configures anomaly detection models from the PyOD library.

This module provides utilities for setting up PyOD detector models, including handling a list of models that are restricted or unsupported for use with conformal anomaly detection.

Attributes:

Name Type Description
forbidden_model_list list[type[BaseDetector]]

A list of PyOD detector classes that are considered unsupported or restricted for use by the set_params function. These models are not suitable for conformal anomaly detection due to their specific design requirements.

stat

Statistical utilities for conformal anomaly detection.

This module provides statistical functions including aggregation methods, extreme value theory functions, evaluation metrics, and general statistical operations used in conformal prediction.

false_discovery_rate
false_discovery_rate(y: ndarray, y_hat: ndarray) -> float

Calculate the False Discovery Rate (FDR) for binary classification.

The False Discovery Rate is the proportion of false positives among all instances predicted as positive. It is calculated as: FDR = FP / (FP + TP), where FP is false positives and TP is true positives. If the total number of predicted positives (FP + TP) is zero, FDR is defined as 0.0.

Parameters:

Name Type Description Default
y ndarray

True binary labels, where 1 indicates an actual positive (e.g., anomaly) and 0 indicates an actual negative (e.g., normal).

required
y_hat ndarray

Predicted binary labels, where 1 indicates a predicted positive and 0 indicates a predicted negative.

required

Returns:

Name Type Description
float float

The calculated False Discovery Rate.

Source code in nonconform/utils/stat/metrics.py
def false_discovery_rate(y: np.ndarray, y_hat: np.ndarray) -> float:
    """Calculate the False Discovery Rate (FDR) for binary classification.

    The False Discovery Rate is the proportion of false positives among all
    instances predicted as positive. It is calculated as:
    FDR = FP / (FP + TP), where FP is false positives and TP is true positives.
    If the total number of predicted positives (FP + TP) is zero, FDR is
    defined as 0.0.

    Args:
        y (numpy.ndarray): True binary labels, where 1 indicates an actual
            positive (e.g., anomaly) and 0 indicates an actual negative
            (e.g., normal).
        y_hat (numpy.ndarray): Predicted binary labels, where 1 indicates a
            predicted positive and 0 indicates a predicted negative.

    Returns:
        float: The calculated False Discovery Rate.
    """
    y_true = y.astype(bool)
    y_pred = y_hat.astype(bool)

    true_positives = np.sum(y_pred & y_true)
    false_positives = np.sum(y_pred & ~y_true)

    total_predicted_positives = true_positives + false_positives

    if total_predicted_positives == 0:
        fdr = 0.0
    else:
        fdr = false_positives / total_predicted_positives

    return fdr
statistical_power
statistical_power(y: ndarray, y_hat: ndarray) -> float

Calculate statistical power (recall or true positive rate).

Statistical power, also known as recall or true positive rate (TPR), measures the proportion of actual positives that are correctly identified by the classifier. It is calculated as: Power (TPR) = TP / (TP + FN), where TP is true positives and FN is false negatives. If the total number of actual positives (TP + FN) is zero, power is defined as 0.0.

Parameters:

Name Type Description Default
y ndarray

True binary labels, where 1 indicates an actual positive (e.g., anomaly) and 0 indicates an actual negative (e.g., normal).

required
y_hat ndarray

Predicted binary labels, where 1 indicates a predicted positive and 0 indicates a predicted negative.

required

Returns:

Name Type Description
float float

The calculated statistical power.

Source code in nonconform/utils/stat/metrics.py
def statistical_power(y: np.ndarray, y_hat: np.ndarray) -> float:
    """Calculate statistical power (recall or true positive rate).

    Statistical power, also known as recall or true positive rate (TPR),
    measures the proportion of actual positives that are correctly identified
    by the classifier. It is calculated as:
    Power (TPR) = TP / (TP + FN), where TP is true positives and FN is
    false negatives.
    If the total number of actual positives (TP + FN) is zero, power is
    defined as 0.0.

    Args:
        y (numpy.ndarray): True binary labels, where 1 indicates an actual
            positive (e.g., anomaly) and 0 indicates an actual negative
            (e.g., normal).
        y_hat (numpy.ndarray): Predicted binary labels, where 1 indicates a
            predicted positive and 0 indicates a predicted negative.

    Returns:
        float: The calculated statistical power.
    """
    y_bool = y.astype(bool)  # Or y == 1
    y_hat_bool = y_hat.astype(bool)  # Or y_hat == 1

    true_positives = np.sum(y_bool & y_hat_bool)
    false_negatives = np.sum(y_bool & ~y_hat_bool)
    total_actual_positives = true_positives + false_negatives

    if total_actual_positives == 0:
        power = 0.0
    else:
        power = true_positives / total_actual_positives

    return power
aggregate
aggregate(
    method: Aggregation, scores: ndarray
) -> np.ndarray

Aggregate anomaly scores using a specified method.

This function applies a chosen aggregation technique to a 2D array of anomaly scores, where each row typically represents scores from a different model or source, and each column corresponds to a data sample.

Parameters:

Name Type Description Default
method Aggregation

The aggregation method to apply. Must be a member of the :class:~nonconform.utils.enums.Aggregation enum (e.g., Aggregation.MEAN, Aggregation.MEDIAN).

required
scores ndarray

A 2D NumPy array of anomaly scores. It is expected that scores are arranged such that rows correspond to different sets of scores (e.g., from different models) and columns correspond to individual data points/samples. Aggregation is performed along axis=0.

required

Returns:

Type Description
ndarray

numpy.ndarray: An array of aggregated anomaly scores. The length of the array will correspond to the number of columns in the input scores array.

Raises:

Type Description
ValueError

If the method is not a supported aggregation type defined in the internal mapping.

Source code in nonconform/utils/stat/aggregation.py
def aggregate(method: Aggregation, scores: np.ndarray) -> np.ndarray:
    """Aggregate anomaly scores using a specified method.

    This function applies a chosen aggregation technique to a 2D array of
    anomaly scores, where each row typically represents scores from a different
    model or source, and each column corresponds to a data sample.

    Args:
        method (Aggregation): The aggregation method to apply. Must be a
            member of the :class:`~nonconform.utils.enums.Aggregation` enum (e.g.,
            ``Aggregation.MEAN``, ``Aggregation.MEDIAN``).
        scores (numpy.ndarray): A 2D NumPy array of anomaly scores.
            It is expected that scores are arranged such that rows correspond
            to different sets of scores (e.g., from different models) and
            columns correspond to individual data points/samples.
            Aggregation is performed along ``axis=0``.

    Returns:
        numpy.ndarray: An array of aggregated anomaly scores. The length of the
            array will correspond to the number of columns in the input `scores` array.

    Raises:
        ValueError: If the `method` is not a supported aggregation type
            defined in the internal mapping.
    """
    match method:
        case Aggregation.MEAN:
            return np.mean(scores, axis=0)
        case Aggregation.MEDIAN:
            return np.median(scores, axis=0)
        case Aggregation.MINIMUM:
            return np.min(scores, axis=0)
        case Aggregation.MAXIMUM:
            return np.max(scores, axis=0)
        case _:
            valid_methods = ", ".join([f"Aggregation.{a.name}" for a in Aggregation])
            raise ValueError(
                f"Unsupported aggregation method: {method}. "
                f"Valid methods are: {valid_methods}. "
                f"Example: aggregate(Aggregation.MEAN, scores)"
            )
calculate_p_val
calculate_p_val(
    scores: ndarray, calibration_set: ndarray
) -> np.ndarray

Calculate p-values for scores based on a calibration set.

This function computes a p-value for each score in the scores array by comparing it against the distribution of scores in the calibration_set. The p-value represents the proportion of calibration scores that are greater than or equal to the given score, with a small adjustment.

Parameters:

Name Type Description Default
scores ndarray

A 1D array of test scores for which p-values are to be calculated.

required
calibration_set ndarray

A 1D array of calibration scores used as the reference distribution.

required

Returns:

Type Description
ndarray

numpy.ndarray: An array of p-values, each corresponding to an input score from scores.

Notes

The p-value for each score is computed using the formula: p_value = (1 + count(calibration_score >= score)) / (1 + N_calibration) where N_calibration is the total number of scores in calibration_set.

Source code in nonconform/utils/stat/statistical.py
def calculate_p_val(scores: np.ndarray, calibration_set: np.ndarray) -> np.ndarray:
    """Calculate p-values for scores based on a calibration set.

    This function computes a p-value for each score in the `scores` array by
    comparing it against the distribution of scores in the `calibration_set`.
    The p-value represents the proportion of calibration scores that are
    greater than or equal to the given score, with a small adjustment.

    Args:
        scores (numpy.ndarray): A 1D array of test scores for which p-values
            are to be calculated.
        calibration_set (numpy.ndarray): A 1D array of calibration scores
            used as the reference distribution.

    Returns:
        numpy.ndarray: An array of p-values, each corresponding to an input score
            from `scores`.

    Notes:
        The p-value for each score is computed using the formula:
        p_value = (1 + count(calibration_score >= score)) / (1 + N_calibration)
        where N_calibration is the total number of scores in `calibration_set`.
    """
    # sum_smaller counts how many calibration_set values are >= each score
    sum_smaller = np.sum(calibration_set >= scores[:, np.newaxis], axis=1)
    return (1.0 + sum_smaller) / (1.0 + len(calibration_set))
aggregation
aggregate
aggregate(
    method: Aggregation, scores: ndarray
) -> np.ndarray

Aggregate anomaly scores using a specified method.

This function applies a chosen aggregation technique to a 2D array of anomaly scores, where each row typically represents scores from a different model or source, and each column corresponds to a data sample.

Parameters:

Name Type Description Default
method Aggregation

The aggregation method to apply. Must be a member of the :class:~nonconform.utils.enums.Aggregation enum (e.g., Aggregation.MEAN, Aggregation.MEDIAN).

required
scores ndarray

A 2D NumPy array of anomaly scores. It is expected that scores are arranged such that rows correspond to different sets of scores (e.g., from different models) and columns correspond to individual data points/samples. Aggregation is performed along axis=0.

required

Returns:

Type Description
ndarray

numpy.ndarray: An array of aggregated anomaly scores. The length of the array will correspond to the number of columns in the input scores array.

Raises:

Type Description
ValueError

If the method is not a supported aggregation type defined in the internal mapping.

Source code in nonconform/utils/stat/aggregation.py
def aggregate(method: Aggregation, scores: np.ndarray) -> np.ndarray:
    """Aggregate anomaly scores using a specified method.

    This function applies a chosen aggregation technique to a 2D array of
    anomaly scores, where each row typically represents scores from a different
    model or source, and each column corresponds to a data sample.

    Args:
        method (Aggregation): The aggregation method to apply. Must be a
            member of the :class:`~nonconform.utils.enums.Aggregation` enum (e.g.,
            ``Aggregation.MEAN``, ``Aggregation.MEDIAN``).
        scores (numpy.ndarray): A 2D NumPy array of anomaly scores.
            It is expected that scores are arranged such that rows correspond
            to different sets of scores (e.g., from different models) and
            columns correspond to individual data points/samples.
            Aggregation is performed along ``axis=0``.

    Returns:
        numpy.ndarray: An array of aggregated anomaly scores. The length of the
            array will correspond to the number of columns in the input `scores` array.

    Raises:
        ValueError: If the `method` is not a supported aggregation type
            defined in the internal mapping.
    """
    match method:
        case Aggregation.MEAN:
            return np.mean(scores, axis=0)
        case Aggregation.MEDIAN:
            return np.median(scores, axis=0)
        case Aggregation.MINIMUM:
            return np.min(scores, axis=0)
        case Aggregation.MAXIMUM:
            return np.max(scores, axis=0)
        case _:
            valid_methods = ", ".join([f"Aggregation.{a.name}" for a in Aggregation])
            raise ValueError(
                f"Unsupported aggregation method: {method}. "
                f"Valid methods are: {valid_methods}. "
                f"Example: aggregate(Aggregation.MEAN, scores)"
            )
metrics
false_discovery_rate
false_discovery_rate(y: ndarray, y_hat: ndarray) -> float

Calculate the False Discovery Rate (FDR) for binary classification.

The False Discovery Rate is the proportion of false positives among all instances predicted as positive. It is calculated as: FDR = FP / (FP + TP), where FP is false positives and TP is true positives. If the total number of predicted positives (FP + TP) is zero, FDR is defined as 0.0.

Parameters:

Name Type Description Default
y ndarray

True binary labels, where 1 indicates an actual positive (e.g., anomaly) and 0 indicates an actual negative (e.g., normal).

required
y_hat ndarray

Predicted binary labels, where 1 indicates a predicted positive and 0 indicates a predicted negative.

required

Returns:

Name Type Description
float float

The calculated False Discovery Rate.

Source code in nonconform/utils/stat/metrics.py
def false_discovery_rate(y: np.ndarray, y_hat: np.ndarray) -> float:
    """Calculate the False Discovery Rate (FDR) for binary classification.

    The False Discovery Rate is the proportion of false positives among all
    instances predicted as positive. It is calculated as:
    FDR = FP / (FP + TP), where FP is false positives and TP is true positives.
    If the total number of predicted positives (FP + TP) is zero, FDR is
    defined as 0.0.

    Args:
        y (numpy.ndarray): True binary labels, where 1 indicates an actual
            positive (e.g., anomaly) and 0 indicates an actual negative
            (e.g., normal).
        y_hat (numpy.ndarray): Predicted binary labels, where 1 indicates a
            predicted positive and 0 indicates a predicted negative.

    Returns:
        float: The calculated False Discovery Rate.
    """
    y_true = y.astype(bool)
    y_pred = y_hat.astype(bool)

    true_positives = np.sum(y_pred & y_true)
    false_positives = np.sum(y_pred & ~y_true)

    total_predicted_positives = true_positives + false_positives

    if total_predicted_positives == 0:
        fdr = 0.0
    else:
        fdr = false_positives / total_predicted_positives

    return fdr
statistical_power
statistical_power(y: ndarray, y_hat: ndarray) -> float

Calculate statistical power (recall or true positive rate).

Statistical power, also known as recall or true positive rate (TPR), measures the proportion of actual positives that are correctly identified by the classifier. It is calculated as: Power (TPR) = TP / (TP + FN), where TP is true positives and FN is false negatives. If the total number of actual positives (TP + FN) is zero, power is defined as 0.0.

Parameters:

Name Type Description Default
y ndarray

True binary labels, where 1 indicates an actual positive (e.g., anomaly) and 0 indicates an actual negative (e.g., normal).

required
y_hat ndarray

Predicted binary labels, where 1 indicates a predicted positive and 0 indicates a predicted negative.

required

Returns:

Name Type Description
float float

The calculated statistical power.

Source code in nonconform/utils/stat/metrics.py
def statistical_power(y: np.ndarray, y_hat: np.ndarray) -> float:
    """Calculate statistical power (recall or true positive rate).

    Statistical power, also known as recall or true positive rate (TPR),
    measures the proportion of actual positives that are correctly identified
    by the classifier. It is calculated as:
    Power (TPR) = TP / (TP + FN), where TP is true positives and FN is
    false negatives.
    If the total number of actual positives (TP + FN) is zero, power is
    defined as 0.0.

    Args:
        y (numpy.ndarray): True binary labels, where 1 indicates an actual
            positive (e.g., anomaly) and 0 indicates an actual negative
            (e.g., normal).
        y_hat (numpy.ndarray): Predicted binary labels, where 1 indicates a
            predicted positive and 0 indicates a predicted negative.

    Returns:
        float: The calculated statistical power.
    """
    y_bool = y.astype(bool)  # Or y == 1
    y_hat_bool = y_hat.astype(bool)  # Or y_hat == 1

    true_positives = np.sum(y_bool & y_hat_bool)
    false_negatives = np.sum(y_bool & ~y_hat_bool)
    total_actual_positives = true_positives + false_negatives

    if total_actual_positives == 0:
        power = 0.0
    else:
        power = true_positives / total_actual_positives

    return power
statistical
calculate_p_val
calculate_p_val(
    scores: ndarray, calibration_set: ndarray
) -> np.ndarray

Calculate p-values for scores based on a calibration set.

This function computes a p-value for each score in the scores array by comparing it against the distribution of scores in the calibration_set. The p-value represents the proportion of calibration scores that are greater than or equal to the given score, with a small adjustment.

Parameters:

Name Type Description Default
scores ndarray

A 1D array of test scores for which p-values are to be calculated.

required
calibration_set ndarray

A 1D array of calibration scores used as the reference distribution.

required

Returns:

Type Description
ndarray

numpy.ndarray: An array of p-values, each corresponding to an input score from scores.

Notes

The p-value for each score is computed using the formula: p_value = (1 + count(calibration_score >= score)) / (1 + N_calibration) where N_calibration is the total number of scores in calibration_set.

Source code in nonconform/utils/stat/statistical.py
def calculate_p_val(scores: np.ndarray, calibration_set: np.ndarray) -> np.ndarray:
    """Calculate p-values for scores based on a calibration set.

    This function computes a p-value for each score in the `scores` array by
    comparing it against the distribution of scores in the `calibration_set`.
    The p-value represents the proportion of calibration scores that are
    greater than or equal to the given score, with a small adjustment.

    Args:
        scores (numpy.ndarray): A 1D array of test scores for which p-values
            are to be calculated.
        calibration_set (numpy.ndarray): A 1D array of calibration scores
            used as the reference distribution.

    Returns:
        numpy.ndarray: An array of p-values, each corresponding to an input score
            from `scores`.

    Notes:
        The p-value for each score is computed using the formula:
        p_value = (1 + count(calibration_score >= score)) / (1 + N_calibration)
        where N_calibration is the total number of scores in `calibration_set`.
    """
    # sum_smaller counts how many calibration_set values are >= each score
    sum_smaller = np.sum(calibration_set >= scores[:, np.newaxis], axis=1)
    return (1.0 + sum_smaller) / (1.0 + len(calibration_set))
calculate_weighted_p_val
calculate_weighted_p_val(
    scores: ndarray,
    calibration_set: ndarray,
    w_scores: ndarray,
    w_calib: ndarray,
) -> np.ndarray

Calculate weighted p-values for scores using a weighted calibration set.

This function computes p-values by comparing input scores (with corresponding w_scores weights) against a calibration_set (with w_calib weights). The calculation involves a weighted count of calibration scores exceeding each test score, incorporating the weights of both the test scores and calibration scores.

Parameters:

Name Type Description Default
scores ndarray

A 1D array of test scores.

required
calibration_set ndarray

A 1D array of calibration scores.

required
w_scores ndarray

A 1D array of weights corresponding to each score in scores.

required
w_calib ndarray

A 1D array of weights corresponding to each score in calibration_set.

required

Returns:

Type Description
ndarray

numpy.ndarray: An array of weighted p-values corresponding to the input scores.

Source code in nonconform/utils/stat/statistical.py
def calculate_weighted_p_val(
    scores: np.ndarray,
    calibration_set: np.ndarray,
    w_scores: np.ndarray,
    w_calib: np.ndarray,
) -> np.ndarray:
    """Calculate weighted p-values for scores using a weighted calibration set.

    This function computes p-values by comparing input `scores` (with
    corresponding `w_scores` weights) against a `calibration_set` (with
    `w_calib` weights). The calculation involves a weighted count of
    calibration scores exceeding each test score, incorporating the weights
    of both the test scores and calibration scores.

    Args:
        scores (numpy.ndarray): A 1D array of test scores.
        calibration_set (numpy.ndarray): A 1D array of calibration scores.
        w_scores (numpy.ndarray): A 1D array of weights corresponding to each
            score in `scores`.
        w_calib (numpy.ndarray): A 1D array of weights corresponding to each
            score in `calibration_set`.

    Returns:
        numpy.ndarray: An array of weighted p-values corresponding to the input
            `scores`.
    """
    # Create comparison matrix: True where calibration_set[j] >= scores[i]
    comparison_matrix = calibration_set >= scores[:, np.newaxis]

    # Weighted sum of calibration scores >= test score
    weighted_sum_calib_ge_score = np.sum(comparison_matrix * w_calib, axis=1)

    # Sum of weights of higher-scoring calibration items + self weight
    numerator = weighted_sum_calib_ge_score + w_scores

    # Total calibration weight + test instance weight
    denominator = np.sum(w_calib) + w_scores

    # Handle division by zero
    return np.divide(
        numerator, denominator, out=np.zeros_like(numerator), where=denominator != 0
    )