Skip to content

API Reference

Complete API documentation for all nonconform modules and classes.

Start Here

If you are looking for task-oriented call sequences, start with Common Workflows.

Detector

nonconform.detector

Core conformal anomaly detector implementation.

This module provides the main ConformalDetector class that wraps any anomaly detector with conformal inference for valid p-values and FDR control.

Classes:

Name Description
BaseConformalDetector

Abstract base class for conformal detectors.

ConformalDetector

Main conformal anomaly detector with optional weighting.

BaseConformalDetector

Bases: ABC

Abstract base class for all conformal anomaly detectors.

Defines the core interface that all conformal anomaly detection implementations must provide. Conformal detectors support either an integrated or detached calibration workflow:

  1. Integrated calibration: fit() trains detector(s) and computes calibration scores
  2. Detached calibration: train detector externally, then call calibrate() on a separate calibration dataset
  3. Inference Phase: compute_p_values() converts new data scores to valid p-values, or select() for the combined p-value + FDR-control workflow

Subclasses must implement both abstract methods.

Note

This is an abstract class and cannot be instantiated directly. Use ConformalDetector for the main implementation.

fit abstractmethod
fit(
    x: DataFrame | ndarray,
    y: ndarray | None = None,
    *,
    n_jobs: int | None = None,
) -> Self

Fit the detector model(s) and compute calibration scores.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The dataset used for fitting the model(s) and determining calibration scores.

required
y ndarray | None

Ignored. Present for sklearn API compatibility.

None
n_jobs int | None

Optional strategy-specific parallelism hint. Currently used by strategies that expose an n_jobs parameter (for example, JackknifeBootstrap).

None

Returns:

Type Description
Self

The fitted detector instance.

Source code in nonconform/detector.py
@ensure_numpy_array
@abstractmethod
def fit(
    self,
    x: pd.DataFrame | np.ndarray,
    y: np.ndarray | None = None,
    *,
    n_jobs: int | None = None,
) -> Self:
    """Fit the detector model(s) and compute calibration scores.

    Args:
        x: The dataset used for fitting the model(s) and determining
            calibration scores.
        y: Ignored. Present for sklearn API compatibility.
        n_jobs: Optional strategy-specific parallelism hint.
            Currently used by strategies that expose an ``n_jobs`` parameter
            (for example, ``JackknifeBootstrap``).

    Returns:
        The fitted detector instance.
    """
    raise NotImplementedError("Subclasses must implement fit()")
calibrate
calibrate(
    x: DataFrame | ndarray, y: ndarray | None = None
) -> Self

Calibrate a pre-fitted detector on separate calibration data.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Dataset used only to compute calibration scores.

required
y ndarray | None

Ignored. Present for sklearn API compatibility.

None

Returns:

Type Description
Self

The calibrated detector instance.

Source code in nonconform/detector.py
@ensure_numpy_array
def calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    y: np.ndarray | None = None,
) -> Self:
    """Calibrate a pre-fitted detector on separate calibration data.

    Args:
        x: Dataset used only to compute calibration scores.
        y: Ignored. Present for sklearn API compatibility.

    Returns:
        The calibrated detector instance.
    """
    raise NotImplementedError("Subclasses must implement calibrate()")
compute_p_values abstractmethod
compute_p_values(
    x: DataFrame | Series | ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Return conformal p-values for new data.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Ignored in standard mode.

True

Returns:

Type Description
ndarray | Series

P-values as ndarray for numpy input, or pandas Series for pandas input.

Source code in nonconform/detector.py
@abstractmethod
def compute_p_values(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Return conformal p-values for new data.

    Args:
        x: New data instances for anomaly estimation.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Ignored in standard mode.

    Returns:
        P-values as ndarray for numpy input, or pandas Series for pandas input.
    """
    raise NotImplementedError("Subclasses must implement compute_p_values()")
score_samples abstractmethod
score_samples(
    x: DataFrame | Series | ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Return aggregated raw anomaly scores for new data.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Ignored in standard mode.

True

Returns:

Type Description
ndarray | Series

Raw scores as ndarray for numpy input, or pandas Series for pandas input.

Source code in nonconform/detector.py
@abstractmethod
def score_samples(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Return aggregated raw anomaly scores for new data.

    Args:
        x: New data instances for anomaly estimation.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Ignored in standard mode.

    Returns:
        Raw scores as ndarray for numpy input, or pandas Series for pandas input.
    """
    raise NotImplementedError("Subclasses must implement score_samples()")

ConformalDetector

ConformalDetector(
    detector: Any,
    strategy: BaseStrategy,
    estimation: BaseEstimation | None = None,
    weight_estimator: BaseWeightEstimator | None = None,
    aggregation: str = "median",
    score_polarity: ScorePolarity
    | Literal[
        "auto", "higher_is_anomalous", "higher_is_normal"
    ]
    | None = None,
    seed: int | None = None,
    verbose: bool = False,
    verify_prepared_batch_content: bool = True,
)

Bases: BaseConformalDetector

Unified conformal anomaly detector with optional covariate shift handling.

Provides distribution-free anomaly detection with valid p-values and False Discovery Rate (FDR) control by wrapping any anomaly detector with conformal inference. Supports PyOD detectors, sklearn-compatible detectors, and custom detectors implementing the AnomalyDetector protocol.

When no weight estimator is provided (standard conformal prediction): - Uses classical conformal inference for exchangeable data - Provides optimal performance and memory usage - Suitable when training and test data come from the same distribution

When a weight estimator is provided (weighted conformal prediction): - Handles distribution shift between calibration and test data - Estimates importance weights to maintain statistical validity - Slightly higher computational cost but robust to covariate shift

Parameters:

Name Type Description Default
detector Any

Anomaly detector (PyOD, sklearn-compatible, or custom).

required
strategy BaseStrategy

The conformal strategy for fitting and calibration.

required
estimation BaseEstimation | None

P-value estimation strategy. Defaults to Empirical().

None
weight_estimator BaseWeightEstimator | None

Weight estimator for covariate shift. Defaults to None.

None
aggregation str

Method for aggregating scores from multiple models. Defaults to "median".

'median'
score_polarity ScorePolarity | Literal['auto', 'higher_is_anomalous', 'higher_is_normal'] | None

Score direction convention. Use "higher_is_anomalous" when higher raw scores indicate more anomalous samples, and "higher_is_normal" when higher scores indicate more normal samples. If omitted (None), nonconform applies an implicit default policy: known sklearn normality detectors resolve to "higher_is_normal", while PyOD and unknown custom detectors resolve to "higher_is_anomalous". Explicit "auto" enables strict inference: known detector families are inferred, and unknown detectors raise. Defaults to None.

None
seed int | None

Random seed for reproducibility. Defaults to None.

None
verbose bool

If True, displays progress bars during prediction. Defaults to False.

False
verify_prepared_batch_content bool

If True (default), weighted reuse mode (refit_weights=False) verifies exact batch content identity via hashing. This adds O(n) overhead per checked batch. Set to False to skip content hashing and validate only batch size.

True

Attributes:

Name Type Description
detector

The underlying anomaly detection model.

strategy

The calibration strategy for computing p-values.

weight_estimator

Optional weight estimator for handling covariate shift.

aggregation

Method for combining scores from multiple models.

score_polarity ScorePolarity

Resolved score polarity used internally.

seed ScorePolarity

Random seed for reproducible results.

verbose ScorePolarity

Whether to display progress bars.

_detector_set ScorePolarity

List of trained detector models (populated after fit).

_calibration_set ScorePolarity

Calibration scores (populated after fit).

Examples:

Standard conformal prediction — FDR-controlled selection in one call:

from pyod.models.iforest import IForest
from nonconform import ConformalDetector, Split

detector = ConformalDetector(
    detector=IForest(), strategy=Split(n_calib=0.2), seed=42
)
detector.fit(X_train)
mask = detector.select(X_test, alpha=0.05)

Access raw p-values when needed:

detector.fit(X_train)
p_values = detector.compute_p_values(X_test)

Weighted conformal prediction:

from nonconform import logistic_weight_estimator

detector = ConformalDetector(
    detector=IForest(),
    strategy=Split(n_calib=0.2),
    weight_estimator=logistic_weight_estimator(),
    seed=42,
)
detector.fit(X_train)
mask = detector.select(X_test, alpha=0.05)

Detached calibration with a pre-trained model (Split strategy):

base_detector.fit(X_fit)
detector = ConformalDetector(
    detector=base_detector, strategy=Split(n_calib=0.2)
)
detector.calibrate(X_calib)
p_values = detector.compute_p_values(X_test)
Note

Strict inductive conformal/FDR workflows require a fixed training-only score map at inference time. PyOD detectors known to violate this are: CD, COF, COPOD, ECOD, LMDD, LOCI, RGraph, SOD, SOS.

Source code in nonconform/detector.py
def __init__(
    self,
    detector: Any,
    strategy: BaseStrategy,
    estimation: BaseEstimation | None = None,
    weight_estimator: BaseWeightEstimator | None = None,
    aggregation: str = "median",
    score_polarity: ScorePolarity
    | Literal["auto", "higher_is_anomalous", "higher_is_normal"]
    | None = None,
    seed: int | None = None,
    verbose: bool = False,
    verify_prepared_batch_content: bool = True,
) -> None:
    self._configure(
        detector=detector,
        strategy=strategy,
        estimation=estimation,
        weight_estimator=weight_estimator,
        aggregation=aggregation,
        score_polarity=score_polarity,
        seed=seed,
        verbose=verbose,
        verify_prepared_batch_content=verify_prepared_batch_content,
    )
detector_set property
detector_set: list[AnomalyDetector]

Returns a copy of the list of trained detector models.

calibration_set property
calibration_set: ndarray

Returns a copy of the calibration scores.

calibration_samples property
calibration_samples: ndarray

Returns a copy of the calibration samples (weighted mode only).

last_result property
last_result: ConformalResult | None

Return the most recent conformal result snapshot.

score_polarity property
score_polarity: ScorePolarity

Returns the resolved score polarity convention.

is_fitted property
is_fitted: bool

Returns whether the detector has been fitted.

get_params
get_params(deep: bool = True) -> dict[str, Any]

Return estimator parameters following sklearn conventions.

Notes
  • deep=False returns constructor-facing parameters used for sklearn clone compatibility.
  • deep=True also includes nested component__param entries read from the current runtime components (effective/internal state), which may differ from originally passed constructor objects after adaptation/normalization.
Source code in nonconform/detector.py
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Return estimator parameters following sklearn conventions.

    Notes:
        - ``deep=False`` returns constructor-facing parameters used for
          sklearn clone compatibility.
        - ``deep=True`` also includes nested ``component__param`` entries
          read from the current runtime components (effective/internal state),
          which may differ from originally passed constructor objects after
          adaptation/normalization.
    """
    params: dict[str, Any] = {
        "detector": self._init_detector,
        "strategy": self._init_strategy,
        "estimation": self._init_estimation,
        "weight_estimator": self._init_weight_estimator,
        "aggregation": self._init_aggregation,
        "score_polarity": self._init_score_polarity,
        "seed": self._init_seed,
        "verbose": self._init_verbose,
        "verify_prepared_batch_content": self._init_verify_prepared_batch_content,
    }
    if not deep:
        return params

    for component_name in self._NESTED_COMPONENTS:
        component = getattr(self, component_name)
        if component is None or not hasattr(component, "get_params"):
            continue
        try:
            component_params = component.get_params(deep=True)
        except TypeError:
            component_params = component.get_params()
        for key, value in component_params.items():
            params[f"{component_name}__{key}"] = value
    return params
set_params
set_params(**params: Any) -> Self

Set estimator parameters following sklearn conventions.

Source code in nonconform/detector.py
def set_params(self, **params: Any) -> Self:
    """Set estimator parameters following sklearn conventions."""
    if not params:
        return self

    updated_params = self.get_params(deep=False)
    nested_updates: dict[str, dict[str, Any]] = {}

    for key, value in params.items():
        if "__" in key:
            component_name, nested_key = key.split("__", 1)
            if component_name not in self._NESTED_COMPONENTS:
                raise ValueError(f"Invalid parameter {component_name!r}.")
            nested_updates.setdefault(component_name, {})[nested_key] = value
            continue

        if key not in updated_params:
            raise ValueError(
                f"Invalid parameter {key!r} for estimator {type(self).__name__}."
            )
        updated_params[key] = value

    for component_name, component_params in nested_updates.items():
        component = updated_params[component_name]
        if component is None:
            raise ValueError(
                f"Cannot set nested parameters for {component_name!r}: "
                "component is None."
            )
        if not hasattr(component, "set_params"):
            raise ValueError(
                f"Cannot set nested parameters for {component_name!r}: "
                "component does not implement set_params()."
            )
        component.set_params(**component_params)

    self._configure(**updated_params)
    return self
fit
fit(
    x: DataFrame | ndarray,
    y: ndarray | None = None,
    *,
    n_jobs: int | None = None,
) -> Self

Fit detector model(s) and compute calibration scores.

Uses the specified strategy to train the base detector(s) and calculate non-conformity scores on the calibration set.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The dataset used for fitting and calibration.

required
y ndarray | None

Ignored. Present for sklearn API compatibility.

None
n_jobs int | None

Optional strategy-specific parallelism hint. Supported by strategies whose fit_calibrate signature includes n_jobs (for example, JackknifeBootstrap).

None

Returns:

Type Description
Self

The fitted detector instance (for method chaining).

Source code in nonconform/detector.py
@ensure_numpy_array
def fit(
    self,
    x: pd.DataFrame | np.ndarray,
    y: np.ndarray | None = None,
    *,
    n_jobs: int | None = None,
) -> Self:
    """Fit detector model(s) and compute calibration scores.

    Uses the specified strategy to train the base detector(s) and calculate
    non-conformity scores on the calibration set.

    Args:
        x: The dataset used for fitting and calibration.
        y: Ignored. Present for sklearn API compatibility.
        n_jobs: Optional strategy-specific parallelism hint. Supported by
            strategies whose ``fit_calibrate`` signature includes ``n_jobs``
            (for example, ``JackknifeBootstrap``).

    Returns:
        The fitted detector instance (for method chaining).
    """
    _ = y
    fit_kwargs: dict[str, Any] = {
        "x": x,
        "detector": self.detector,
        "weighted": self._is_weighted_mode,
        "seed": self.seed,
    }
    if n_jobs is not None:
        strategy_params = inspect.signature(self.strategy.fit_calibrate).parameters
        if "n_jobs" not in strategy_params:
            raise ValueError(
                f"Strategy {type(self.strategy).__name__} does not support n_jobs. "
                "Pass n_jobs only when using a strategy that exposes it, "
                "such as JackknifeBootstrap."
            )
        fit_kwargs["n_jobs"] = n_jobs

    self._detector_set, self._calibration_set = self.strategy.fit_calibrate(
        **fit_kwargs
    )

    if (
        self._is_weighted_mode
        and self.strategy.calibration_ids is not None
        and len(self.strategy.calibration_ids) > 0
    ):
        self._calibration_samples = x[self.strategy.calibration_ids]
    else:
        self._calibration_samples = np.array([])

    self._prepared_weight_batch_size = None
    self._prepared_weight_batch_signature = None
    self._last_result = None
    return self
calibrate
calibrate(
    x: DataFrame | ndarray, y: ndarray | None = None
) -> Self

Calibrate a pre-fitted detector on separate calibration data.

This detached workflow is currently supported only for Split strategy, where a single pre-fitted model is calibrated on a dedicated dataset.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Calibration dataset used to compute calibration scores.

required
y ndarray | None

Ignored. Present for sklearn API compatibility.

None

Returns:

Type Description
Self

The calibrated detector instance (for method chaining).

Raises:

Type Description
ValueError

If strategy is not Split.

NotFittedError

If the base detector appears unfitted.

Source code in nonconform/detector.py
@ensure_numpy_array
def calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    y: np.ndarray | None = None,
) -> Self:
    """Calibrate a pre-fitted detector on separate calibration data.

    This detached workflow is currently supported only for ``Split`` strategy,
    where a single pre-fitted model is calibrated on a dedicated dataset.

    Args:
        x: Calibration dataset used to compute calibration scores.
        y: Ignored. Present for sklearn API compatibility.

    Returns:
        The calibrated detector instance (for method chaining).

    Raises:
        ValueError: If strategy is not ``Split``.
        NotFittedError: If the base detector appears unfitted.
    """
    _ = y
    from nonconform.resampling import Split

    if not isinstance(self.strategy, Split):
        raise ValueError(
            "calibrate() is supported only with Split strategy. "
            f"Got {type(self.strategy).__name__}."
        )

    try:
        calibration_set = np.asarray(
            self.detector.decision_function(x),
            dtype=float,
        ).ravel()
    except Exception as exc:
        message = str(exc).lower()
        if (
            isinstance(exc, NotFittedError)
            or "not fitted" in message
            or (isinstance(exc, AttributeError) and "has no attribute" in message)
        ):
            raise NotFittedError(
                "Base detector is not fitted. Fit the base detector before "
                "calling calibrate()."
            ) from exc
        raise

    if calibration_set.shape[0] != len(x):
        raise ValueError(
            "calibration scores must have one value per calibration sample. "
            f"Got {calibration_set.shape[0]} scores for {len(x)} samples."
        )

    self._detector_set = [self.detector]
    self._calibration_set = calibration_set
    if self._is_weighted_mode:
        self._calibration_samples = x.copy()
    else:
        self._calibration_samples = np.array([])

    self._prepared_weight_batch_size = None
    self._prepared_weight_batch_signature = None
    self._last_result = None
    return self
select
select(
    x: DataFrame | Series | ndarray,
    *,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Compute p-values and apply FDR-controlled selection in one step.

This is the recommended single-call workflow for most use cases. It combines compute_p_values() and the appropriate selection procedure (BH-style FDR selection for standard mode, weighted conformalized selection for weighted mode) into one method, eliminating the need to access last_result manually.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
alpha float

Target FDR level in (0, 1). Defaults to 0.05.

0.05
pruning Pruning

Pruning strategy for weighted FDR control. Ignored in standard (unweighted) mode. Defaults to Pruning.DETERMINISTIC.

DETERMINISTIC
seed int | None

Optional random seed for weighted randomized pruning modes. When None, falls back to detector seed. Ignored in standard mode and deterministic pruning mode.

None
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Ignored in standard mode. Defaults to True.

True

Returns:

Type Description
ndarray | Series

Boolean selection mask of shape (n_test,). True entries are

ndarray | Series

the FDR-controlled anomaly discoveries. Returns a pandas Series when

ndarray | Series

the input is a DataFrame or Series.

Examples:

Standard workflow (no weight estimator):

detector.fit(X_train)
mask = detector.select(X_test, alpha=0.05)
print(f"Discoveries: {mask.sum()}")

Weighted workflow:

detector = ConformalDetector(
    detector=IForest(),
    strategy=Split(n_calib=0.2),
    weight_estimator=logistic_weight_estimator(),
)
detector.fit(X_train)
mask = detector.select(
    X_test,
    alpha=0.1,
    pruning=Pruning.HETEROGENEOUS,
    seed=42,
)
Source code in nonconform/detector.py
def select(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Compute p-values and apply FDR-controlled selection in one step.

    This is the recommended single-call workflow for most use cases. It
    combines ``compute_p_values()`` and the appropriate selection procedure
    (BH-style FDR selection for standard mode, weighted conformalized
    selection for weighted mode) into one method, eliminating the need to
    access ``last_result`` manually.

    Args:
        x: New data instances for anomaly estimation.
        alpha: Target FDR level in ``(0, 1)``. Defaults to ``0.05``.
        pruning: Pruning strategy for weighted FDR control. Ignored in
            standard (unweighted) mode. Defaults to
            ``Pruning.DETERMINISTIC``.
        seed: Optional random seed for weighted randomized pruning modes.
            When ``None``, falls back to detector ``seed``. Ignored in
            standard mode and deterministic pruning mode.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Ignored in standard mode. Defaults to True.

    Returns:
        Boolean selection mask of shape ``(n_test,)``. ``True`` entries are
        the FDR-controlled anomaly discoveries. Returns a pandas Series when
        the input is a DataFrame or Series.

    Examples:
        Standard workflow (no weight estimator):

        ```python
        detector.fit(X_train)
        mask = detector.select(X_test, alpha=0.05)
        print(f"Discoveries: {mask.sum()}")
        ```

        Weighted workflow:

        ```python
        detector = ConformalDetector(
            detector=IForest(),
            strategy=Split(n_calib=0.2),
            weight_estimator=logistic_weight_estimator(),
        )
        detector.fit(X_train)
        mask = detector.select(
            X_test,
            alpha=0.1,
            pruning=Pruning.HETEROGENEOUS,
            seed=42,
        )
        ```
    """
    if not (0.0 < alpha < 1.0):
        raise ValueError(f"alpha must be in (0, 1), got {alpha}")

    from nonconform.fdr import weighted_false_discovery_control

    x_array, index = _as_numpy_with_index(x)
    self.compute_p_values(x_array, refit_weights=refit_weights)
    result = self._last_result
    if result is None or result.p_values is None:
        raise RuntimeError(
            "Internal error: select() expected p-values after compute_p_values()."
        )

    if self._is_weighted_mode:
        selection_seed = self.seed if seed is None else seed
        mask = weighted_false_discovery_control(
            result=result,
            alpha=alpha,
            pruning=pruning,
            seed=selection_seed,
        )
    else:
        p_values = np.asarray(result.p_values, dtype=float)
        mask = false_discovery_control(p_values, method="bh") <= alpha

    if index is not None:
        return pd.Series(mask, index=index, name="selected")
    return mask
prepare_weights_for
prepare_weights_for(x: DataFrame | ndarray) -> Self

Prepare weighted conformal state for a specific test batch.

In weighted mode, this fits the weight estimator for the supplied batch without producing predictions. Use this for explicit state transitions in exploratory workflows.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Test batch for which weights should be prepared.

required

Returns:

Type Description
Self

The fitted detector instance (for method chaining).

Raises:

Type Description
NotFittedError

If fit() has not been called.

RuntimeError

If weighted mode is disabled.

Source code in nonconform/detector.py
@ensure_numpy_array
def prepare_weights_for(self, x: pd.DataFrame | np.ndarray) -> Self:
    """Prepare weighted conformal state for a specific test batch.

    In weighted mode, this fits the weight estimator for the supplied batch
    without producing predictions. Use this for explicit state transitions in
    exploratory workflows.

    Args:
        x: Test batch for which weights should be prepared.

    Returns:
        The fitted detector instance (for method chaining).

    Raises:
        NotFittedError: If fit() has not been called.
        RuntimeError: If weighted mode is disabled.
    """
    if not self.is_fitted:
        raise NotFittedError("This ConformalDetector instance is not fitted yet.")
    if not self._is_weighted_mode or self.weight_estimator is None:
        raise RuntimeError(
            "prepare_weights_for() requires weighted mode with a weight_estimator."
        )

    self.weight_estimator.fit(self._calibration_samples, x)
    self._prepared_weight_batch_size = len(x)
    if self.verify_prepared_batch_content:
        self._prepared_weight_batch_signature = _batch_signature(x)
    else:
        self._prepared_weight_batch_signature = None
    return self
score_samples
score_samples(
    x: DataFrame | Series | ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Return aggregated raw anomaly scores for new data.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Defaults to True.

True

Returns:

Type Description
ndarray | Series

Aggregated raw anomaly scores.

Source code in nonconform/detector.py
def score_samples(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Return aggregated raw anomaly scores for new data.

    Args:
        x: New data instances for anomaly estimation.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Defaults to True.

    Returns:
        Aggregated raw anomaly scores.
    """
    x_array, index = _as_numpy_with_index(x)
    estimates = self._aggregate_scores(x_array)
    weights = self._resolve_weights(x_array, refit_weights=refit_weights)
    calib_weights, test_weights = weights if weights else (None, None)

    self._last_result = ConformalResult(
        p_values=None,
        test_scores=estimates.copy(),
        calib_scores=self._calibration_set.copy(),
        test_weights=_safe_copy(test_weights),
        calib_weights=_safe_copy(calib_weights),
        metadata={},
    )
    if index is not None:
        return pd.Series(estimates, index=index, name="score")
    return estimates
compute_p_values
compute_p_values(
    x: DataFrame | Series | ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Return conformal p-values for new data.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Defaults to True.

True

Returns:

Type Description
ndarray | Series

Conformal p-values.

Source code in nonconform/detector.py
def compute_p_values(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Return conformal p-values for new data.

    Args:
        x: New data instances for anomaly estimation.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Defaults to True.

    Returns:
        Conformal p-values.
    """
    x_array, index = _as_numpy_with_index(x)
    estimates = self._aggregate_scores(x_array)
    weights = self._resolve_weights(x_array, refit_weights=refit_weights)
    calib_weights, test_weights = weights if weights else (None, None)

    p_values = self.estimation.compute_p_values(
        estimates, self._calibration_set, weights
    )

    metadata: dict[str, Any] = {}
    if hasattr(self.estimation, "get_metadata"):
        meta = self.estimation.get_metadata()
        if meta:
            metadata = dict(meta)

    self._last_result = ConformalResult(
        p_values=p_values.copy(),
        test_scores=estimates.copy(),
        calib_scores=self._calibration_set.copy(),
        test_weights=_safe_copy(test_weights),
        calib_weights=_safe_copy(calib_weights),
        metadata=metadata,
    )
    if index is not None:
        return pd.Series(p_values, index=index, name="p_value")
    return p_values

Resampling Strategies

nonconform.resampling

Calibration strategies for conformal anomaly detection.

This module provides various calibration strategies that define how to split data for training and calibration in conformal prediction.

Classes:

Name Description
BaseStrategy

Abstract base class for calibration strategies.

Split

Simple train-test split strategy.

CrossValidation

K-fold cross-validation strategy (includes Jackknife factory).

JackknifeBootstrap

Jackknife+-after-Bootstrap (JaB+) strategy.

BaseStrategy

BaseStrategy(mode: ConformalModeInput = 'plus')

Bases: ABC

Abstract base class for anomaly detection calibration strategies.

This class provides a common interface for various calibration strategies applied to anomaly detectors. Subclasses must implement the core calibration logic and define how calibration data is identified and used.

Attributes:

Name Type Description
_mode ConformalMode

Model retention mode controlling calibration/inference behavior.

Parameters:

Name Type Description Default
mode ConformalModeInput

Model retention mode ("plus" or "single_model"). Equivalent ConformalMode enum values are also accepted.

'plus'
Source code in nonconform/resampling.py
def __init__(self, mode: ConformalModeInput = "plus") -> None:
    """Initialize the base calibration strategy.

    Args:
        mode: Model retention mode (`"plus"` or `"single_model"`).
            Equivalent ``ConformalMode`` enum values are also accepted.
    """
    self._mode: ConformalMode = _normalize_mode(mode)
    self._calibration_ids: list[int] = []
calibration_ids abstractmethod property
calibration_ids: list[int] | None

Indices of data points used for calibration.

fit_calibrate abstractmethod
fit_calibrate(
    x: DataFrame | ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
) -> tuple[list[AnomalyDetector], np.ndarray]

Fits the detector and performs calibration.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data for fitting and calibration.

required
detector AnomalyDetector

The anomaly detection model to be fitted and calibrated.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Whether to use weighted approach. Defaults to False.

False

Returns:

Type Description
tuple[list[AnomalyDetector], ndarray]

Tuple of (list of trained detectors, calibration scores array).

Source code in nonconform/resampling.py
@abc.abstractmethod
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
) -> tuple[list[AnomalyDetector], np.ndarray]:
    """Fits the detector and performs calibration.

    Args:
        x: The input data for fitting and calibration.
        detector: The anomaly detection model to be fitted and calibrated.
        seed: Random seed for reproducibility. Defaults to None.
        weighted: Whether to use weighted approach. Defaults to False.

    Returns:
        Tuple of (list of trained detectors, calibration scores array).
    """
    raise NotImplementedError(
        "The fit_calibrate() method must be implemented by subclasses."
    )

Split

Split(n_calib: float | int = 0.1)

Bases: BaseStrategy

Split conformal strategy for fast anomaly detection.

Implements the classical split conformal approach by dividing training data into separate fitting and calibration sets.

Parameters:

Name Type Description Default
n_calib float | int

Size or proportion of data used for calibration. If float, must be between 0.0 and 1.0 (proportion). If int, the absolute number of samples. Defaults to 0.1.

0.1

Examples:

# Use 20% of data for calibration
strategy = Split(n_calib=0.2)

# Use exactly 1000 samples for calibration
strategy = Split(n_calib=1000)
Source code in nonconform/resampling.py
def __init__(self, n_calib: float | int = 0.1) -> None:
    super().__init__()
    self._calib_size: float | int = n_calib
    self._calibration_ids: list[int] | None = None
calibration_ids property
calibration_ids: list[int] | None

Indices of calibration samples (None if weighted=False).

calib_size property
calib_size: float | int

Returns the calibration size or proportion.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: AnomalyDetector,
    weighted: bool = False,
    seed: int | None = None,
) -> tuple[list[AnomalyDetector], np.ndarray]

Fits detector and generates calibration scores using a data split.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data.

required
detector AnomalyDetector

The detector instance to train.

required
weighted bool

If True, stores calibration sample indices. Defaults to False.

False
seed int | None

Random seed for reproducibility. Defaults to None.

None

Returns:

Type Description
tuple[list[AnomalyDetector], ndarray]

Tuple of (list with trained detector, calibration scores array).

Source code in nonconform/resampling.py
@ensure_numpy_array
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: AnomalyDetector,
    weighted: bool = False,
    seed: int | None = None,
) -> tuple[list[AnomalyDetector], np.ndarray]:
    """Fits detector and generates calibration scores using a data split.

    Args:
        x: The input data.
        detector: The detector instance to train.
        weighted: If True, stores calibration sample indices. Defaults to False.
        seed: Random seed for reproducibility. Defaults to None.

    Returns:
        Tuple of (list with trained detector, calibration scores array).
    """
    self._validate_n_calib(len(x))
    x_id = np.arange(len(x))
    train_id, calib_id = train_test_split(
        x_id, test_size=self._calib_size, shuffle=True, random_state=seed
    )

    if hasattr(detector, "set_params"):
        try:
            detector.set_params(random_state=seed)
        except (TypeError, ValueError):
            pass  # Detector may not support random_state parameter

    detector.fit(x[train_id])
    calibration_set = detector.decision_function(x[calib_id])

    if weighted:
        self._calibration_ids = calib_id.tolist()
    else:
        self._calibration_ids = None
    return [detector], calibration_set

CrossValidation

CrossValidation(
    k: int | None = 5,
    mode: ConformalModeInput = "plus",
    shuffle: bool = True,
)

Bases: BaseStrategy

K-fold cross-validation strategy for conformal anomaly detection.

Splits data into k folds and uses each fold as a calibration set while training on the remaining folds.

Parameters:

Name Type Description Default
k int | None

Number of folds. If None, uses leave-one-out (k=n at fit time).

5
mode ConformalModeInput

Model retention mode ("plus" or "single_model"). Equivalent ConformalMode values are accepted. Defaults to "plus".

'plus'
shuffle bool

Whether to shuffle data before splitting. Defaults to True. Set to False for deterministic leave-one-out (Jackknife).

True

Examples:

# 5-fold cross-validation
strategy = CrossValidation(k=5)

# Leave-one-out (Jackknife) via factory
strategy = CrossValidation.jackknife()
Source code in nonconform/resampling.py
def __init__(
    self,
    k: int | None = 5,
    mode: ConformalModeInput = "plus",
    shuffle: bool = True,
) -> None:
    super().__init__(mode)
    if not isinstance(shuffle, bool):
        raise TypeError(
            f"shuffle must be a boolean value, got {type(shuffle).__name__}."
        )
    self._k: int | None = k
    self._shuffle: bool = shuffle
    self._is_jackknife = k is None

    # Warn if using single-model mode
    if self._mode is ConformalMode.SINGLE_MODEL:
        _crossval_logger.warning(
            "Setting mode=ConformalMode.SINGLE_MODEL may compromise conformal "
            "validity. mode=ConformalMode.PLUS is recommended."
        )

    self._detector_list: list[AnomalyDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []
calibration_ids property
calibration_ids: list[int]

Indices of samples used for calibration.

k property
k: int | None

Number of folds (None for jackknife mode).

mode property
mode: Literal['plus', 'single_model']

User-facing model retention mode.

jackknife classmethod
jackknife(
    mode: ConformalModeInput = "plus",
) -> CrossValidation

Create Leave-One-Out cross-validation (deterministic, no shuffle).

This factory method creates a Jackknife strategy, which is a special case of k-fold CV where k equals n (the dataset size). Each sample is left out exactly once for calibration.

Parameters:

Name Type Description Default
mode ConformalModeInput

Model retention mode ("plus" or "single_model").

'plus'

Returns:

Type Description
CrossValidation

CrossValidation configured for leave-one-out.

Examples:

strategy = CrossValidation.jackknife()
detector_list, calib_scores = strategy.fit_calibrate(X, detector)
Source code in nonconform/resampling.py
@classmethod
def jackknife(cls, mode: ConformalModeInput = "plus") -> CrossValidation:
    """Create Leave-One-Out cross-validation (deterministic, no shuffle).

    This factory method creates a Jackknife strategy, which is a special
    case of k-fold CV where k equals n (the dataset size). Each sample is
    left out exactly once for calibration.

    Args:
        mode: Model retention mode (`"plus"` or `"single_model"`).

    Returns:
        CrossValidation configured for leave-one-out.

    Examples:
        ```python
        strategy = CrossValidation.jackknife()
        detector_list, calib_scores = strategy.fit_calibrate(X, detector)
        ```
    """
    return cls(k=None, mode=mode, shuffle=False)
fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
) -> tuple[list[AnomalyDetector], np.ndarray]

Fit and calibrate using k-fold cross-validation.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix.

required
detector AnomalyDetector

The base anomaly detector.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Whether to use weighted calibration. Defaults to False.

False

Returns:

Type Description
tuple[list[AnomalyDetector], ndarray]

Tuple of (list of trained detectors, calibration scores array).

Raises:

Type Description
ValueError

If k < 2 or not enough samples for specified k.

Source code in nonconform/resampling.py
@ensure_numpy_array
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
) -> tuple[list[AnomalyDetector], np.ndarray]:
    """Fit and calibrate using k-fold cross-validation.

    Args:
        x: Input data matrix.
        detector: The base anomaly detector.
        seed: Random seed for reproducibility. Defaults to None.
        weighted: Whether to use weighted calibration. Defaults to False.

    Returns:
        Tuple of (list of trained detectors, calibration scores array).

    Raises:
        ValueError: If k < 2 or not enough samples for specified k.
    """
    self._detector_list.clear()
    self._calibration_ids = []

    detector_ = detector
    n_samples = len(x)

    # Determine k (for jackknife mode, k=n)
    k = n_samples if self._is_jackknife else self._k

    if k < 2:
        exc = ValueError(
            f"k must be at least 2 for k-fold cross-validation, got {k}"
        )
        exc.add_note(f"Received k={k}, which is invalid.")
        exc.add_note(
            "Cross-validation requires at least one split for training "
            "and one for calibration."
        )
        raise exc

    if n_samples < k:
        exc = ValueError(
            f"Not enough samples ({n_samples}) for "
            f"k-fold cross-validation with k={k}"
        )
        exc.add_note(f"Each fold needs at least 1 sample, but {n_samples} < {k}.")
        raise exc

    self._calibration_set = np.empty(n_samples, dtype=np.float64)
    calibration_offset = 0

    folds = KFold(
        n_splits=k,
        shuffle=self._shuffle,
        random_state=seed if self._shuffle else None,
    )

    fold_iterator = (
        tqdm(folds.split(x), total=k, desc="Calibration")
        if _crossval_logger.isEnabledFor(logging.INFO)
        else folds.split(x)
    )

    for i, (train_idx, calib_idx) in enumerate(fold_iterator):
        self._calibration_ids.extend(calib_idx.tolist())

        model = copy(detector_)
        if hasattr(model, "set_params"):
            try:
                model.set_params(random_state=seed)
            except (TypeError, ValueError):
                pass  # Detector may not support random_state parameter
        model.fit(x[train_idx])

        if self._mode is ConformalMode.PLUS:
            self._detector_list.append(deepcopy(model))

        fold_scores = model.decision_function(x[calib_idx])
        n_fold_samples = len(fold_scores)
        end_idx = calibration_offset + n_fold_samples
        self._calibration_set[calibration_offset:end_idx] = fold_scores
        calibration_offset += n_fold_samples

    if self._mode is ConformalMode.SINGLE_MODEL:
        model = copy(detector_)
        if hasattr(model, "set_params"):
            try:
                model.set_params(random_state=seed)
            except (TypeError, ValueError):
                pass  # Detector may not support random_state parameter
        model.fit(x)
        self._detector_list.append(deepcopy(model))

    return self._detector_list, self._calibration_set

JackknifeBootstrap

JackknifeBootstrap(
    n_bootstraps: int = 100,
    aggregation_method: BootstrapAggregationMethod = "mean",
    mode: ConformalModeInput = "plus",
)

Bases: BaseStrategy

Jackknife+-after-Bootstrap (JaB+) conformal anomaly detection.

Implements the JaB+ method which provides predictive inference for ensemble models trained on bootstrap samples. Uses out-of-bag samples for calibration.

Parameters:

Name Type Description Default
n_bootstraps int

Number of bootstrap iterations. Defaults to 100.

100
aggregation_method BootstrapAggregationMethod

How to aggregate OOB predictions ("mean" or "median"). Defaults to "mean".

'mean'
mode ConformalModeInput

Model retention mode ("plus" or "single_model"). Equivalent ConformalMode values are accepted. Defaults to "plus".

'plus'
References

Jin, Ying, and Emmanuel J. Candès. "Selection by Prediction with Conformal p-values." Journal of Machine Learning Research 24.244 (2023): 1-41.

Source code in nonconform/resampling.py
def __init__(
    self,
    n_bootstraps: int = 100,
    aggregation_method: BootstrapAggregationMethod = "mean",
    mode: ConformalModeInput = "plus",
) -> None:
    super().__init__(mode=mode)

    if n_bootstraps < 2:
        exc = ValueError(
            f"Number of bootstraps must be at least 2, got {n_bootstraps}. "
            f"Typical values are 50-200 for jackknife-after-bootstrap."
        )
        exc.add_note(f"Received n_bootstraps={n_bootstraps}, which is invalid.")
        raise exc

    normalized_aggregation_method = normalize_bootstrap_aggregation_method(
        aggregation_method
    )

    if self._mode is ConformalMode.SINGLE_MODEL:
        _bootstrap_logger.warning(
            "Setting mode=ConformalMode.SINGLE_MODEL may compromise conformal "
            "validity. mode=ConformalMode.PLUS is recommended."
        )

    self._n_bootstraps: int = n_bootstraps
    self._aggregation_method: BootstrapAggregationMethod = (
        normalized_aggregation_method
    )

    self._detector_list: list[AnomalyDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []

    # Internal state
    self._bootstrap_models: list[AnomalyDetector | None] = []
    self._oob_mask: np.ndarray = np.array([])
calibration_ids property
calibration_ids: list[int]

Indices used for calibration (all samples in JaB+).

n_bootstraps property
n_bootstraps: int

Number of bootstrap iterations.

aggregation_method property
aggregation_method: BootstrapAggregationMethod

Aggregation method for OOB predictions.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
    n_jobs: int | None = None,
) -> tuple[list[AnomalyDetector], np.ndarray]

Fit and calibrate using JaB+ method.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix.

required
detector AnomalyDetector

The base anomaly detector.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Not used in JaB+. Defaults to False.

False
n_jobs int | None

Number of parallel jobs. Use -1 for all available cores. Defaults to None (sequential).

None

Returns:

Type Description
tuple[list[AnomalyDetector], ndarray]

Tuple of (list of trained detectors, calibration scores array).

Source code in nonconform/resampling.py
@ensure_numpy_array
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
    n_jobs: int | None = None,
) -> tuple[list[AnomalyDetector], np.ndarray]:
    """Fit and calibrate using JaB+ method.

    Args:
        x: Input data matrix.
        detector: The base anomaly detector.
        seed: Random seed for reproducibility. Defaults to None.
        weighted: Not used in JaB+. Defaults to False.
        n_jobs: Number of parallel jobs. Use -1 for all available cores.
            Defaults to None (sequential).

    Returns:
        Tuple of (list of trained detectors, calibration scores array).
    """
    n_samples = len(x)
    generator = np.random.default_rng(seed)

    _bootstrap_logger.info(
        f"Bootstrap (JaB+): {n_samples:,} samples, "
        f"{self._n_bootstraps:,} iterations"
    )

    self._bootstrap_models = [None] * self._n_bootstraps
    all_bootstrap_indices, self._oob_mask = self._generate_bootstrap_indices(
        generator, n_samples
    )

    if n_jobs == -1:
        n_jobs = os.cpu_count() or 1
    elif n_jobs is not None and n_jobs < 1:
        raise ValueError(
            f"n_jobs must be None, -1, or a positive integer; got {n_jobs}."
        )

    if n_jobs is None or n_jobs == 1:
        bootstrap_iterator = (
            tqdm(range(self._n_bootstraps), desc="Calibration")
            if _bootstrap_logger.isEnabledFor(logging.INFO)
            else range(self._n_bootstraps)
        )
        for i in bootstrap_iterator:
            bootstrap_indices = all_bootstrap_indices[i]
            model = _train_bootstrap_model(detector, x, bootstrap_indices, seed)
            self._bootstrap_models[i] = model
    else:
        self._train_models_parallel(
            detector, x, all_bootstrap_indices, seed, n_jobs
        )

    oob_scores = self._compute_oob_scores(x)

    self._calibration_set = oob_scores
    self._calibration_ids = list(range(n_samples))

    if self._mode is ConformalMode.PLUS:
        self._detector_list = self._bootstrap_models.copy()
    else:
        final_model = deepcopy(detector)
        if hasattr(final_model, "set_params"):
            try:
                final_model.set_params(random_state=seed)
            except (TypeError, ValueError):
                pass  # Detector may not support random_state parameter
        final_model.fit(x)
        self._detector_list = [final_model]

    return self._detector_list, self._calibration_set

P-Value Estimation

nonconform.scoring

P-value estimation strategies for conformal prediction.

This module provides strategies for computing p-values from calibration scores.

Classes:

Name Description
BaseEstimation

Abstract base class for p-value estimation.

Empirical

Classical empirical p-value estimation using discrete CDF.

ConditionalEmpirical

Conditionally calibrated empirical p-values.

Probabilistic

KDE-based probabilistic p-value estimation.

Kernel

Bases: Enum

Kernel functions for KDE-based p-value computation.

Attributes:

Name Type Description
GAUSSIAN

Gaussian (normal) kernel.

EXPONENTIAL

Exponential kernel.

BOX

Box (uniform) kernel.

TRIANGULAR

Triangular kernel.

EPANECHNIKOV

Epanechnikov kernel.

BIWEIGHT

Biweight (quartic) kernel.

TRIWEIGHT

Triweight kernel.

TRICUBE

Tricube kernel.

COSINE

Cosine kernel.

BaseEstimation

Bases: ABC

Abstract base for p-value estimation strategies.

compute_p_values abstractmethod
compute_p_values(
    scores: ndarray,
    calibration_set: ndarray,
    weights: tuple[ndarray, ndarray] | None = None,
) -> np.ndarray

Compute p-values for test scores.

Parameters:

Name Type Description Default
scores ndarray

Test instance anomaly scores (1D array).

required
calibration_set ndarray

Calibration anomaly scores (1D array).

required
weights tuple[ndarray, ndarray] | None

Optional (w_calib, w_test) tuple for weighted conformal.

None

Returns:

Type Description
ndarray

Array of p-values for each test instance.

Source code in nonconform/scoring.py
@abstractmethod
def compute_p_values(
    self,
    scores: np.ndarray,
    calibration_set: np.ndarray,
    weights: tuple[np.ndarray, np.ndarray] | None = None,
) -> np.ndarray:
    """Compute p-values for test scores.

    Args:
        scores: Test instance anomaly scores (1D array).
        calibration_set: Calibration anomaly scores (1D array).
        weights: Optional (w_calib, w_test) tuple for weighted conformal.

    Returns:
        Array of p-values for each test instance.
    """
    pass
get_metadata
get_metadata() -> dict[str, Any]

Optional auxiliary data exposed after compute_p_values.

Source code in nonconform/scoring.py
def get_metadata(self) -> dict[str, Any]:
    """Optional auxiliary data exposed after compute_p_values."""
    return {}
set_seed
set_seed(seed: int | None) -> None

Set random seed for reproducibility.

Parameters:

Name Type Description Default
seed int | None

Random seed value or None.

required
Source code in nonconform/scoring.py
def set_seed(self, seed: int | None) -> None:
    """Set random seed for reproducibility.

    Args:
        seed: Random seed value or None.
    """
    if hasattr(self, "_seed"):
        self._seed = seed

Empirical

Empirical(tie_break: TieBreakModeInput = 'classical')

Bases: BaseEstimation

Classical empirical p-value estimation using discrete CDF.

Computes p-values using deterministic tie handling by default. Optionally supports randomized smoothing to eliminate the resolution floor caused by discrete ties (Jin & Candes 2023).

Parameters:

Name Type Description Default
tie_break TieBreakModeInput

Tie-breaking strategy ("classical" or "randomized"). Equivalent TieBreakMode enum values are also accepted.

'classical'

Examples:

estimation = Empirical()  # tie_break="classical" by default
p_values = estimation.compute_p_values(test_scores, calib_scores)

# For randomized smoothing:
estimation = Empirical(tie_break="randomized")
Source code in nonconform/scoring.py
def __init__(self, tie_break: TieBreakModeInput = "classical") -> None:
    self._tie_break = _normalize_tie_break_mode(tie_break)
    self._seed: int | None = None
set_seed
set_seed(seed: int | None) -> None

Set random seed for reproducibility.

Source code in nonconform/scoring.py
def set_seed(self, seed: int | None) -> None:
    """Set random seed for reproducibility."""
    self._seed = seed
compute_p_values
compute_p_values(
    scores: ndarray,
    calibration_set: ndarray,
    weights: tuple[ndarray, ndarray] | None = None,
) -> np.ndarray

Compute empirical p-values from calibration set.

Source code in nonconform/scoring.py
def compute_p_values(
    self,
    scores: np.ndarray,
    calibration_set: np.ndarray,
    weights: tuple[np.ndarray, np.ndarray] | None = None,
) -> np.ndarray:
    """Compute empirical p-values from calibration set."""
    randomized = self._tie_break is TieBreakMode.RANDOMIZED
    rng = np.random.default_rng(self._seed) if randomized else None
    if weights is not None:
        return self._compute_weighted(scores, calibration_set, weights, rng)
    return self._compute_standard(scores, calibration_set, rng)

ConditionalEmpirical

ConditionalEmpirical(
    *,
    delta: float = 0.05,
    method: str | ConditionalCalibrationMethod = "mc",
    tie_break: TieBreakModeInput = "classical",
    simes_kden: int = 2,
    mc_num_simulations: int = 10000,
)

Bases: Empirical

Conditionally calibrated empirical conformal p-values (CCCPV).

This estimator first computes classical empirical conformal p-values and then applies a finite-sample calibration map:

.. math:: p_j = \frac{1 + \sum_{i=1}^{n_{\text{cal}}}\mathbf{1}[s_i \ge s_j]} {n_{\text{cal}} + 1}, \qquad \tilde p_j = C_{n_{\text{cal}},\delta}(p_j).

Supported calibration maps are "mc", "simes", "dkwm", and "asymptotic".

References

Bates et al. (2023), Testing for outliers with conformal p-values. Reference implementation: https://github.com/msesia/conditional-conformal-pvalues

Note

Weighted conformal p-values are intentionally not supported in this first release of ConditionalEmpirical.

Parameters:

Name Type Description Default
delta float

Confidence level used by the conditional calibration map. Must be in (0, 1). Defaults to 0.05.

0.05
method str | ConditionalCalibrationMethod

Conditional calibration method. One of {"mc", "simes", "dkwm", "asymptotic"}. Defaults to "mc".

'mc'
tie_break TieBreakModeInput

Tie-breaking strategy used for base empirical p-values ("classical" or "randomized").

'classical'
simes_kden int

Denominator used to derive k = floor(n_cal / simes_kden) for the Simes calibration map. Must be a positive integer. Defaults to 2.

2
mc_num_simulations int

Monte Carlo sample size used to estimate the finite-sample correction for method="mc". Defaults to 10,000.

10000
Source code in nonconform/scoring.py
def __init__(
    self,
    *,
    delta: float = 0.05,
    method: str | ConditionalCalibrationMethod = "mc",
    tie_break: TieBreakModeInput = "classical",
    simes_kden: int = 2,
    mc_num_simulations: int = 10_000,
) -> None:
    super().__init__(tie_break=tie_break)
    try:
        delta_float = float(delta)
    except (TypeError, ValueError) as exc:
        raise ValueError("delta must be a float in (0, 1).") from exc
    if not np.isfinite(delta_float) or not (0.0 < delta_float < 1.0):
        raise ValueError(f"delta must be in (0, 1), got {delta!r}.")
    if (
        isinstance(simes_kden, bool)
        or not isinstance(simes_kden, int)
        or simes_kden < 1
    ):
        raise ValueError("simes_kden must be a positive integer.")
    if (
        isinstance(mc_num_simulations, bool)
        or not isinstance(mc_num_simulations, int)
        or mc_num_simulations < 100
    ):
        raise ValueError("mc_num_simulations must be an integer >= 100.")

    self._delta = delta_float
    self._method = normalize_conditional_calibration_method(method)
    self._simes_kden = simes_kden
    self._mc_num_simulations = mc_num_simulations
    self._mc_correction_cache: dict[tuple[int, float], float] = {}
set_seed
set_seed(seed: int | None) -> None

Set random seed for reproducibility.

Source code in nonconform/scoring.py
def set_seed(self, seed: int | None) -> None:
    """Set random seed for reproducibility."""
    super().set_seed(seed)
    # MC correction estimation depends on RNG; invalidate cached estimates.
    self._mc_correction_cache.clear()
compute_p_values
compute_p_values(
    scores: ndarray,
    calibration_set: ndarray,
    weights: tuple[ndarray, ndarray] | None = None,
) -> np.ndarray

Compute conditionally calibrated conformal p-values.

Source code in nonconform/scoring.py
def compute_p_values(
    self,
    scores: np.ndarray,
    calibration_set: np.ndarray,
    weights: tuple[np.ndarray, np.ndarray] | None = None,
) -> np.ndarray:
    """Compute conditionally calibrated conformal p-values."""
    if weights is not None:
        raise ValueError(
            "ConditionalEmpirical does not support weighted p-values. "
            "Use Empirical or Probabilistic for weighted conformal mode."
        )

    base_p = super().compute_p_values(scores, calibration_set, weights=None)
    n_cal = len(np.asarray(calibration_set).ravel())

    cache_key = (n_cal, self._delta)
    cached_fs = (
        self._mc_correction_cache.get(cache_key) if self._method == "mc" else None
    )
    rng = np.random.default_rng(self._seed) if self._seed is not None else None
    calibrated, fs_correction = calibrate_conditional_p_values(
        base_p,
        n_calibration=n_cal,
        delta=self._delta,
        method=self._method,
        simes_kden=self._simes_kden,
        fs_correction=cached_fs,
        rng=rng,
        mc_num_simulations=self._mc_num_simulations,
    )
    if self._method == "mc" and fs_correction is not None:
        self._mc_correction_cache[cache_key] = fs_correction
    return calibrated

Probabilistic

Probabilistic(
    kernel: Kernel | Sequence[Kernel] = Kernel.GAUSSIAN,
    n_trials: int = 100,
    cv_folds: int = -1,
)

Bases: BaseEstimation

KDE-based probabilistic p-value estimation with continuous values.

Provides smooth p-values in [0,1] via kernel density estimation. Supports automatic hyperparameter tuning and weighted conformal prediction. In weighted mode, only calibration weights are applied to the KDE; test weights are intentionally not injected into the survival calculation so p-values can reach 0. This avoids the lower bound w_test / (sum_calib_weight + w_test) that the discrete weighted formula would impose.

Parameters:

Name Type Description Default
kernel Kernel | Sequence[Kernel]

Kernel function or list (list triggers kernel tuning). Bandwidth is always auto-tuned. Defaults to Kernel.GAUSSIAN.

GAUSSIAN
n_trials int

Number of Optuna trials for tuning. Defaults to 100.

100
cv_folds int

CV folds for tuning (-1 for leave-one-out). Defaults to -1.

-1

Examples:

# Basic usage
estimation = Probabilistic()
p_values = estimation.compute_p_values(test_scores, calib_scores)

# With custom kernel
estimation = Probabilistic(kernel=Kernel.EPANECHNIKOV)
Source code in nonconform/scoring.py
def __init__(
    self,
    kernel: Kernel | Sequence[Kernel] = Kernel.GAUSSIAN,
    n_trials: int = 100,
    cv_folds: int = -1,
) -> None:
    self._kernel = kernel
    self._n_trials = n_trials
    self._cv_folds = cv_folds
    self._seed = None

    self._tuned_params: dict | None = None
    self._kde_model = None
    self._calibration_hash: int | None = None
    self._kde_eval_grid: np.ndarray | None = None
    self._kde_cdf_values: np.ndarray | None = None
    self._kde_total_weight: float | None = None
compute_p_values
compute_p_values(
    scores: ndarray,
    calibration_set: ndarray,
    weights: tuple[ndarray, ndarray] | None = None,
) -> np.ndarray

Compute continuous p-values using KDE.

Lazy fitting: tunes and fits KDE on first call or when calibration changes. Note: When weights are provided, this estimator uses only calibration weights to shape the KDE. Test weights are accepted for API parity but do not set a positive lower bound on p-values.

Source code in nonconform/scoring.py
def compute_p_values(
    self,
    scores: np.ndarray,
    calibration_set: np.ndarray,
    weights: tuple[np.ndarray, np.ndarray] | None = None,
) -> np.ndarray:
    """Compute continuous p-values using KDE.

    Lazy fitting: tunes and fits KDE on first call or when calibration changes.
    Note: When weights are provided, this estimator uses only calibration
    weights to shape the KDE. Test weights are accepted for API parity but
    do not set a positive lower bound on p-values.
    """
    if weights is not None:
        w_calib, _w_test = weights
    else:
        w_calib, _w_test = None, None

    if weights is None:
        current_hash = hash(calibration_set.tobytes())
    else:
        current_hash = hash((calibration_set.tobytes(), w_calib.tobytes()))

    if self._kde_model is None or self._calibration_hash != current_hash:
        self._fit_kde(calibration_set, w_calib)
        self._calibration_hash = current_hash

    sum_calib_weight = (
        float(np.sum(w_calib))
        if w_calib is not None
        else float(len(calibration_set))
    )

    return self._compute_p_values_from_kde(scores, sum_calib_weight)
get_metadata
get_metadata() -> dict[str, Any]

Return KDE metadata after p-value computation.

Source code in nonconform/scoring.py
def get_metadata(self) -> dict[str, Any]:
    """Return KDE metadata after p-value computation."""
    if (
        self._kde_eval_grid is None
        or self._kde_cdf_values is None
        or self._kde_total_weight is None
    ):
        return {}
    return {
        "kde": {
            "eval_grid": self._kde_eval_grid.copy(),
            "cdf_values": self._kde_cdf_values.copy(),
            "total_weight": float(self._kde_total_weight),
        }
    }

calculate_p_val

calculate_p_val(
    scores: ndarray,
    calibration_set: ndarray,
    tie_break: TieBreakModeInput = "classical",
    rng: Generator | None = None,
) -> np.ndarray

Calculate empirical p-values (standalone function).

Uses classical deterministic tie handling by default. Optionally supports randomized smoothing to eliminate the resolution floor caused by discrete ties (Jin & Candes 2023).

Parameters:

Name Type Description Default
scores ndarray

Test instance anomaly scores (1D array).

required
calibration_set ndarray

Calibration anomaly scores (1D array).

required
tie_break TieBreakModeInput

Tie-breaking strategy for equal scores ("classical" or "randomized"). Equivalent TieBreakMode values are accepted.

'classical'
rng Generator | None

Optional random number generator for reproducibility.

None

Returns:

Type Description
ndarray

Array of p-values for each test instance.

Source code in nonconform/scoring.py
def calculate_p_val(
    scores: np.ndarray,
    calibration_set: np.ndarray,
    tie_break: TieBreakModeInput = "classical",
    rng: np.random.Generator | None = None,
) -> np.ndarray:
    """Calculate empirical p-values (standalone function).

    Uses classical deterministic tie handling by default. Optionally supports
    randomized smoothing to eliminate the resolution floor caused by discrete
    ties (Jin & Candes 2023).

    Args:
        scores: Test instance anomaly scores (1D array).
        calibration_set: Calibration anomaly scores (1D array).
        tie_break: Tie-breaking strategy for equal scores (`"classical"` or
            `"randomized"`). Equivalent `TieBreakMode` values are accepted.
        rng: Optional random number generator for reproducibility.

    Returns:
        Array of p-values for each test instance.
    """
    mode = _normalize_tie_break_mode(tie_break)

    sorted_cal = np.sort(calibration_set)
    n_cal = len(calibration_set)

    if mode is TieBreakMode.CLASSICAL:
        # Old formula: count >= (at or above)
        ranks = n_cal - np.searchsorted(sorted_cal, scores, side="left")
        return (1.0 + ranks) / (1.0 + n_cal)

    # Randomized tie handling: separate strictly greater and ties
    pos_right = np.searchsorted(sorted_cal, scores, side="right")
    pos_left = np.searchsorted(sorted_cal, scores, side="left")
    n_greater = n_cal - pos_right  # strictly greater
    n_equal = pos_right - pos_left  # ties

    if rng is None:
        rng = np.random.default_rng()
    u = rng.uniform(size=len(scores))

    return (n_greater + (n_equal + 1) * u) / (1.0 + n_cal)

calculate_weighted_p_val

calculate_weighted_p_val(
    scores: ndarray,
    calibration_set: ndarray,
    test_weights: ndarray,
    calib_weights: ndarray,
    tie_break: TieBreakModeInput = "classical",
    rng: Generator | None = None,
) -> np.ndarray

Calculate weighted empirical p-values (standalone function).

Uses classical deterministic tie handling by default. Optionally supports randomized smoothing to eliminate the resolution floor caused by discrete ties (Jin & Candes 2023).

Parameters:

Name Type Description Default
scores ndarray

Test instance anomaly scores (1D array).

required
calibration_set ndarray

Calibration anomaly scores (1D array).

required
test_weights ndarray

Test instance weights (1D array).

required
calib_weights ndarray

Calibration weights (1D array).

required
tie_break TieBreakModeInput

Tie-breaking strategy for equal scores ("classical" or "randomized"). Equivalent TieBreakMode values are accepted.

'classical'
rng Generator | None

Optional random number generator for reproducibility.

None

Returns:

Type Description
ndarray

Array of weighted p-values for each test instance.

Note

Including test_weights in the numerator/denominator implies a positive lower bound of test_weights / (sum(calib_weights) + test_weights) when there is no calibration mass above the test score.

Source code in nonconform/scoring.py
def calculate_weighted_p_val(
    scores: np.ndarray,
    calibration_set: np.ndarray,
    test_weights: np.ndarray,
    calib_weights: np.ndarray,
    tie_break: TieBreakModeInput = "classical",
    rng: np.random.Generator | None = None,
) -> np.ndarray:
    """Calculate weighted empirical p-values (standalone function).

    Uses classical deterministic tie handling by default. Optionally supports
    randomized smoothing to eliminate the resolution floor caused by discrete
    ties (Jin & Candes 2023).

    Args:
        scores: Test instance anomaly scores (1D array).
        calibration_set: Calibration anomaly scores (1D array).
        test_weights: Test instance weights (1D array).
        calib_weights: Calibration weights (1D array).
        tie_break: Tie-breaking strategy for equal scores (`"classical"` or
            `"randomized"`). Equivalent `TieBreakMode` values are accepted.
        rng: Optional random number generator for reproducibility.

    Returns:
        Array of weighted p-values for each test instance.

    Note:
        Including test_weights in the numerator/denominator implies a positive
        lower bound of test_weights / (sum(calib_weights) + test_weights) when
        there is no calibration mass above the test score.
    """
    mode = _normalize_tie_break_mode(tie_break)

    try:
        scores = _as_1d("scores", scores).astype(float, copy=False)
        calibration_set = _as_1d("calibration_set", calibration_set).astype(
            float, copy=False
        )
        w_scores = _as_1d("test_weights", test_weights).astype(float, copy=False)
        w_calib = _as_1d("calib_weights", calib_weights).astype(float, copy=False)
    except (TypeError, ValueError) as exc:
        raise ValueError(
            "scores, calibration_set, test_weights, and calib_weights must be numeric."
        ) from exc

    if len(scores) != len(w_scores):
        raise ValueError(
            "scores and test_weights must have the same length. "
            f"Got {len(scores)} and {len(w_scores)}."
        )
    if len(calibration_set) != len(w_calib):
        raise ValueError(
            "calibration_set and calib_weights must have the same length. "
            f"Got {len(calibration_set)} and {len(w_calib)}."
        )
    _validate_finite("scores", scores)
    _validate_finite("calibration_set", calibration_set)
    _validate_finite("test_weights", w_scores)
    _validate_finite("calib_weights", w_calib)
    if np.any(w_scores < 0):
        raise ValueError("test_weights must be non-negative.")
    if np.any(w_calib < 0):
        raise ValueError("calib_weights must be non-negative.")

    sort_idx = np.argsort(calibration_set)
    sorted_scores = calibration_set[sort_idx]
    sorted_weights = w_calib[sort_idx]

    cumulative_weights = np.concatenate(([0.0], np.cumsum(sorted_weights)))
    total_weight = float(cumulative_weights[-1])
    if total_weight <= 0:
        raise ValueError("calib_weights must sum to a positive value.")

    left_idx = np.searchsorted(sorted_scores, scores, side="left")
    right_idx = np.searchsorted(sorted_scores, scores, side="right")

    if mode is TieBreakMode.CLASSICAL:
        weighted_greater = total_weight - cumulative_weights[right_idx]
        numerator = weighted_greater + w_scores
    else:
        weighted_greater = total_weight - cumulative_weights[right_idx]
        weighted_equal = cumulative_weights[right_idx] - cumulative_weights[left_idx]

        if rng is None:
            rng = np.random.default_rng()
        u = rng.uniform(size=len(scores))

        numerator = weighted_greater + (weighted_equal + w_scores) * u

    denominator = total_weight + w_scores
    return numerator / denominator

Weight Estimation

nonconform.weighting

Weight estimation for covariate shift correction in weighted conformal prediction.

This module provides weight estimators that compute importance weights to correct for covariate shift between calibration and test distributions. They estimate density ratios w(x) = p_test(x) / p_calib(x) which are used to reweight conformal scores for better coverage guarantees under distribution shift.

Classes:

Name Description
BaseWeightEstimator

Abstract base class for weight estimators.

IdentityWeightEstimator

Returns uniform weights (no covariate shift).

SklearnWeightEstimator

Universal wrapper for sklearn probabilistic classifiers.

BootstrapBaggedWeightEstimator

Bootstrap-bagged wrapper for robust estimation.

Factory functions

logistic_weight_estimator: Create estimator using Logistic Regression. forest_weight_estimator: Create estimator using Random Forest.

ProbabilisticClassifier

Bases: Protocol

Protocol for classifiers that support probability estimation.

This protocol defines the interface for sklearn-compatible classifiers that can produce probability estimates for weight computation.

fit
fit(X: ndarray, y: ndarray) -> ProbabilisticClassifier

Fit the classifier on training data.

Parameters:

Name Type Description Default
X ndarray

Feature matrix of shape (n_samples, n_features).

required
y ndarray

Target labels of shape (n_samples,).

required

Returns:

Type Description
ProbabilisticClassifier

The fitted classifier instance.

Source code in nonconform/weighting.py
def fit(self, X: np.ndarray, y: np.ndarray) -> ProbabilisticClassifier:
    """Fit the classifier on training data.

    Args:
        X: Feature matrix of shape (n_samples, n_features).
        y: Target labels of shape (n_samples,).

    Returns:
        The fitted classifier instance.
    """
    ...
predict_proba
predict_proba(X: ndarray) -> np.ndarray

Return probability estimates for samples.

Parameters:

Name Type Description Default
X ndarray

Feature matrix of shape (n_samples, n_features).

required

Returns:

Type Description
ndarray

Probability estimates of shape (n_samples, n_classes).

Source code in nonconform/weighting.py
def predict_proba(self, X: np.ndarray) -> np.ndarray:
    """Return probability estimates for samples.

    Args:
        X: Feature matrix of shape (n_samples, n_features).

    Returns:
        Probability estimates of shape (n_samples, n_classes).
    """
    ...

BaseWeightEstimator

Bases: ABC

Abstract base class for weight estimators in weighted conformal prediction.

Weight estimators compute importance weights to correct for covariate shift between calibration and test distributions. They estimate density ratios w(x) = p_test(x) / p_calib(x) which are used to reweight conformal scores for better coverage guarantees under distribution shift.

Subclasses must implement fit(), _get_stored_weights(), and _score_new_data() to provide specific weight estimation strategies.

fit abstractmethod
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Estimate density ratio weights.

Source code in nonconform/weighting.py
@abstractmethod
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Estimate density ratio weights."""
    pass
get_weights
get_weights(
    calibration_samples: ndarray | None = None,
    test_samples: ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]

Return density ratio weights for calibration and test data.

Parameters:

Name Type Description Default
calibration_samples ndarray | None

Optional calibration data to score. If provided, computes weights for this data using the fitted model. If None, returns stored weights from fit(). Must provide both or neither.

None
test_samples ndarray | None

Optional test data to score. If provided, computes weights for this data using the fitted model. If None, returns stored weights from fit(). Must provide both or neither.

None

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of (calibration_weights, test_weights) as numpy arrays.

Raises:

Type Description
NotFittedError

If fit() has not been called.

ValueError

If only one of calibration_samples/test_samples is provided.

Source code in nonconform/weighting.py
def get_weights(
    self,
    calibration_samples: np.ndarray | None = None,
    test_samples: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]:
    """Return density ratio weights for calibration and test data.

    Args:
        calibration_samples: Optional calibration data to score. If provided,
            computes weights for this data using the fitted model. If None,
            returns stored weights from fit(). Must provide both or neither.
        test_samples: Optional test data to score. If provided, computes
            weights for this data using the fitted model. If None, returns
            stored weights from fit(). Must provide both or neither.

    Returns:
        Tuple of (calibration_weights, test_weights) as numpy arrays.

    Raises:
        NotFittedError: If fit() has not been called.
        ValueError: If only one of calibration_samples/test_samples is provided.
    """
    if not hasattr(self, "_is_fitted") or not self._is_fitted:
        raise NotFittedError("This weight estimator instance is not fitted yet.")

    if (calibration_samples is None) != (test_samples is None):
        raise ValueError(
            "Must provide both calibration_samples and test_samples, or neither. "
            "Cannot score only one set."
        )

    if calibration_samples is None:
        return self._get_stored_weights()
    else:
        return self._score_new_data(calibration_samples, test_samples)
set_seed
set_seed(seed: int | None) -> None

Set random seed for reproducibility.

Parameters:

Name Type Description Default
seed int | None

Random seed value or None.

required
Source code in nonconform/weighting.py
def set_seed(self, seed: int | None) -> None:
    """Set random seed for reproducibility.

    Args:
        seed: Random seed value or None.
    """
    self._seed = seed

IdentityWeightEstimator

IdentityWeightEstimator()

Bases: BaseWeightEstimator

Identity weight estimator that returns uniform weights.

This estimator assumes no covariate shift and returns weights of 1.0 for all samples. Useful as a baseline or when covariate shift is known to be minimal.

This effectively makes weighted conformal prediction equivalent to standard conformal prediction.

Source code in nonconform/weighting.py
def __init__(self) -> None:
    self._n_calib = 0
    self._n_test = 0
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the identity weight estimator.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required
Source code in nonconform/weighting.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the identity weight estimator.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.
    """
    self._n_calib = calibration_samples.shape[0]
    self._n_test = test_samples.shape[0]
    self._is_fitted = True

SklearnWeightEstimator

SklearnWeightEstimator(
    base_estimator: ProbabilisticClassifier
    | BaseEstimator
    | None = None,
    clip_quantile: float | None = 0.05,
)

Bases: BaseWeightEstimator

Universal wrapper for any sklearn-compatible probabilistic classifier.

Adheres to the standard sklearn 'Meta-Estimator' pattern. Accepts a configured estimator instance and clones it for cross-validation safety.

Parameters:

Name Type Description Default
base_estimator ProbabilisticClassifier | BaseEstimator | None

Configured sklearn classifier instance with predict_proba support. Defaults to LogisticRegression.

None
clip_quantile float | None

Quantile for weight clipping (e.g., 0.05 clips to 5th-95th percentile). Use None to disable clipping. Defaults to 0.05.

0.05

Raises:

Type Description
ValueError

If base_estimator does not implement predict_proba.

Examples:

from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

# Default (LogisticRegression)
estimator = SklearnWeightEstimator()

# Custom with pipeline
estimator = SklearnWeightEstimator(
    base_estimator=make_pipeline(
        StandardScaler(), LogisticRegression(C=1.0, class_weight="balanced")
    )
)

# Random Forest
estimator = SklearnWeightEstimator(
    base_estimator=RandomForestClassifier(n_estimators=100, max_depth=5)
)
Source code in nonconform/weighting.py
def __init__(
    self,
    base_estimator: ProbabilisticClassifier | BaseEstimator | None = None,
    clip_quantile: float | None = 0.05,
) -> None:
    # Default to a sane baseline if nothing is provided
    # Use explicit None check to avoid truthiness evaluation of sklearn estimators
    # (unfitted ensemble estimators raise AttributeError on __len__)
    self.base_estimator = (
        base_estimator
        if base_estimator is not None
        else LogisticRegression(solver="liblinear")
    )
    if clip_quantile is not None and not (0 < clip_quantile < 0.5):
        raise ValueError(
            f"clip_quantile must be in (0, 0.5) or None, got {clip_quantile}."
        )
    self.clip_quantile = clip_quantile

    if not hasattr(self.base_estimator, "predict_proba"):
        raise ValueError(
            f"The provided base_estimator {type(self.base_estimator).__name__} "
            "does not implement 'predict_proba'. Density estimation requires "
            "probability scores. Use SVC(probability=True) or similar."
        )

    # Seed inheritance attribute (may be set by ConformalDetector)
    self._seed: int | None = None

    self.estimator_: ProbabilisticClassifier | None = None
    self._test_class_idx: int | None = None  # Column index for P(Test)
    self._w_calib: np.ndarray | None = None
    self._w_test: np.ndarray | None = None
    self._clip_bounds: tuple[float, float] | None = None
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the weight estimator on calibration and test samples.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required

Raises:

Type Description
ValueError

If calibration_samples is empty.

Source code in nonconform/weighting.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the weight estimator on calibration and test samples.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.

    Raises:
        ValueError: If calibration_samples is empty.
    """
    if calibration_samples.shape[0] == 0:
        raise ValueError("Calibration samples are empty. Cannot compute weights.")

    # Prepare data (Calib=0, Test=1 labels)
    x_joint, y_joint = self._prepare_training_data(
        calibration_samples, test_samples, self._seed
    )

    self.estimator_ = clone(self.base_estimator)
    if self._seed is not None:
        self._apply_seed_to_estimator(self.estimator_, self._seed)
    self.estimator_.fit(x_joint, y_joint)

    # sklearn sorts classes_ - get correct column index for P(Test)
    self._test_class_idx = int(
        np.where(self.estimator_.classes_ == self.TEST_LABEL)[0][0]
    )

    w_calib, w_test = self._compute_weights(calibration_samples, test_samples)
    self._clip_bounds = self._compute_clip_bounds(
        w_calib, w_test, self.clip_quantile
    )
    self._w_calib, self._w_test = self._clip_weights(
        w_calib, w_test, self._clip_bounds
    )
    self._is_fitted = True

BootstrapBaggedWeightEstimator

BootstrapBaggedWeightEstimator(
    base_estimator: BaseWeightEstimator,
    n_bootstraps: int = 100,
    clip_quantile: float | None = 0.05,
    scoring_mode: Literal["frozen"] = "frozen",
)

Bases: BaseWeightEstimator

Bootstrap-bagged wrapper for weight estimators with instance-wise aggregation.

This estimator wraps any base weight estimator and applies bootstrap bagging to create more stable, robust weight estimates. It's most relevant when the calibration set is much larger than the test batch (or vice versa), where standalone weights can become spiky and unstable.

The algorithm: 1. For each bootstrap iteration: - Resample BOTH sets to balanced sample size (min of calibration and test sizes) - Fit the base estimator on the balanced bootstrap sample - Score ALL original instances using the fitted model (perfect coverage) - Store log(weights) for each instance 2. After all iterations: - Aggregate instance-wise weights using geometric mean (average in log-space) - Apply clipping to maintain boundedness for theoretical guarantees

Seed inheritance

This class uses the _seed attribute pattern for automatic seed inheritance from ConformalDetector.

Parameters:

Name Type Description Default
base_estimator BaseWeightEstimator

Any BaseWeightEstimator instance.

required
n_bootstraps int

Number of bootstrap iterations. Defaults to 100.

100
clip_quantile float | None

Quantile for adaptive clipping. Use None to disable clipping. Defaults to 0.05.

0.05
scoring_mode Literal['frozen']

Weight scoring behavior after fit. Currently only "frozen" is supported, meaning the estimator can only serve the exact calibration/test batches used during fit(). Defaults to "frozen".

'frozen'
References

Jin, Ying, and Emmanuel J. Candès. "Selection by Prediction with Conformal p-values." Journal of Machine Learning Research 24.244 (2023): 1-41.

Source code in nonconform/weighting.py
def __init__(
    self,
    base_estimator: BaseWeightEstimator,
    n_bootstraps: int = 100,
    clip_quantile: float | None = 0.05,
    scoring_mode: Literal["frozen"] = "frozen",
) -> None:
    if n_bootstraps < 1:
        raise ValueError(
            f"n_bootstraps must be at least 1, got {n_bootstraps}. "
            f"Typical values are 50-200 for stable weight estimation."
        )
    if clip_quantile is not None and not (0 < clip_quantile < 0.5):
        raise ValueError(
            f"clip_quantile must be in (0, 0.5), got {clip_quantile}. "
            f"Common values are 0.05 (5th-95th percentiles) or 0.01."
        )
    if scoring_mode != "frozen":
        raise ValueError(
            f"Unsupported scoring_mode {scoring_mode!r}. "
            "BootstrapBaggedWeightEstimator currently supports only "
            "scoring_mode='frozen'."
        )

    self.base_estimator = base_estimator
    self.n_bootstraps = n_bootstraps
    self.clip_quantile = clip_quantile
    self.scoring_mode: Literal["frozen"] = scoring_mode

    # Seed inheritance attribute (set by ConformalDetector)
    self._seed: int | None = None

    self._w_calib: np.ndarray | None = None
    self._w_test: np.ndarray | None = None
    self._calibration_signature: tuple[tuple[int, ...], str, str] | None = None
    self._test_signature: tuple[tuple[int, ...], str, str] | None = None
    self._is_fitted = False
supports_rescoring property
supports_rescoring: bool

Whether this estimator can score arbitrary new batches after fit().

weight_counts property
weight_counts: str

Return diagnostic info about instance-wise weight coverage.

fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the bagged weight estimator with perfect instance coverage.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required

Raises:

Type Description
ValueError

If calibration_samples is empty.

Source code in nonconform/weighting.py
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the bagged weight estimator with perfect instance coverage.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.

    Raises:
        ValueError: If calibration_samples is empty.
    """
    if calibration_samples.shape[0] == 0:
        raise ValueError("Calibration samples are empty. Cannot compute weights.")

    n_calib, n_test = len(calibration_samples), len(test_samples)
    sample_size = min(n_calib, n_test)
    rng = np.random.default_rng(self._seed)

    if _bagged_logger.isEnabledFor(logging.INFO):
        _bagged_logger.info(
            f"Bootstrap: n_calib={n_calib}, n_test={n_test}, "
            f"sample_size={sample_size}, n_bootstraps={self.n_bootstraps}. "
            f"Perfect coverage: all instances weighted in all iterations."
        )

    # Online accumulation: sum log-weights (memory efficient)
    sum_log_weights_calib = np.zeros(n_calib)
    sum_log_weights_test = np.zeros(n_test)

    bootstrap_iterator = (
        tqdm(range(self.n_bootstraps), desc="Weighting")
        if _bagged_logger.isEnabledFor(logging.INFO)
        else range(self.n_bootstraps)
    )

    for i in bootstrap_iterator:
        # Resample both sets for balanced comparison
        calib_indices = rng.choice(n_calib, size=sample_size, replace=True)
        test_indices = rng.choice(n_test, size=sample_size, replace=True)
        x_calib_boot = calibration_samples[calib_indices]
        x_test_boot = test_samples[test_indices]

        # Create base estimator with iteration-specific seed
        base_est = deepcopy(self.base_estimator)
        if self._seed is not None:
            derived_seed = derive_seed(i, self._seed)
            if hasattr(base_est, "seed"):
                base_est.seed = derived_seed
            if hasattr(base_est, "_seed"):
                base_est._seed = derived_seed

        # Fit on bootstrap sample, then score ALL original instances
        base_est.fit(x_calib_boot, x_test_boot)
        w_c_all, w_t_all = base_est.get_weights(calibration_samples, test_samples)

        # Accumulate log-weights for geometric mean aggregation
        sum_log_weights_calib += np.log(w_c_all)
        sum_log_weights_test += np.log(w_t_all)

    # Geometric mean aggregation: exp(mean(log-weights))
    w_calib_final = np.exp(sum_log_weights_calib / self.n_bootstraps)
    w_test_final = np.exp(sum_log_weights_test / self.n_bootstraps)

    # Apply clipping after aggregation (use base class static method)
    clip_bounds = BaseWeightEstimator._compute_clip_bounds(
        w_calib_final, w_test_final, self.clip_quantile
    )
    if clip_bounds is None:
        self._w_calib = w_calib_final
        self._w_test = w_test_final
    else:
        clip_min, clip_max = clip_bounds
        self._w_calib = np.clip(w_calib_final, clip_min, clip_max)
        self._w_test = np.clip(w_test_final, clip_min, clip_max)

    self._calibration_signature = self._sample_signature(calibration_samples)
    self._test_signature = self._sample_signature(test_samples)
    self._is_fitted = True

logistic_weight_estimator

logistic_weight_estimator(
    regularization: str | float = "auto",
    clip_quantile: float = 0.05,
    class_weight: str | dict = "balanced",
    max_iter: int = 1000,
) -> SklearnWeightEstimator

Create weight estimator using Logistic Regression.

This factory function provides behavioral equivalence with the old LogisticWeightEstimator class.

Note

When used with ConformalDetector, the detector's seed is automatically propagated to the weight estimator for reproducibility.

Parameters:

Name Type Description Default
regularization str | float

Regularization parameter. If 'auto', uses C=1.0. If float, uses as C parameter.

'auto'
clip_quantile float

Quantile for weight clipping. Defaults to 0.05.

0.05
class_weight str | dict

Class weights for LogisticRegression. Defaults to 'balanced'.

'balanced'
max_iter int

Maximum iterations for solver convergence. Defaults to 1000.

1000

Returns:

Type Description
SklearnWeightEstimator

Configured SklearnWeightEstimator instance.

Examples:

estimator = logistic_weight_estimator(regularization=0.5)
estimator.fit(calib_samples, test_samples)
w_calib, w_test = estimator.get_weights()
Source code in nonconform/weighting.py
def logistic_weight_estimator(
    regularization: str | float = "auto",
    clip_quantile: float = 0.05,
    class_weight: str | dict = "balanced",
    max_iter: int = 1000,
) -> SklearnWeightEstimator:
    """Create weight estimator using Logistic Regression.

    This factory function provides behavioral equivalence with the old
    LogisticWeightEstimator class.

    Note:
        When used with ConformalDetector, the detector's seed is automatically
        propagated to the weight estimator for reproducibility.

    Args:
        regularization: Regularization parameter. If 'auto', uses C=1.0.
            If float, uses as C parameter.
        clip_quantile: Quantile for weight clipping. Defaults to 0.05.
        class_weight: Class weights for LogisticRegression. Defaults to 'balanced'.
        max_iter: Maximum iterations for solver convergence. Defaults to 1000.

    Returns:
        Configured SklearnWeightEstimator instance.

    Examples:
        ```python
        estimator = logistic_weight_estimator(regularization=0.5)
        estimator.fit(calib_samples, test_samples)
        w_calib, w_test = estimator.get_weights()
        ```
    """
    from sklearn.pipeline import make_pipeline
    from sklearn.preprocessing import StandardScaler

    c_param = 1.0 if regularization == "auto" else float(regularization)
    base_estimator = make_pipeline(
        StandardScaler(),
        LogisticRegression(
            C=c_param,
            max_iter=max_iter,
            class_weight=class_weight,
        ),
    )
    return SklearnWeightEstimator(
        base_estimator=base_estimator, clip_quantile=clip_quantile
    )

forest_weight_estimator

forest_weight_estimator(
    n_estimators: int = 100,
    max_depth: int | None = 5,
    min_samples_leaf: int = 10,
    clip_quantile: float = 0.05,
) -> SklearnWeightEstimator

Create weight estimator using Random Forest.

This factory function provides behavioral equivalence with the old ForestWeightEstimator class.

Note

When used with ConformalDetector, the detector's seed is automatically propagated to the weight estimator for reproducibility.

Parameters:

Name Type Description Default
n_estimators int

Number of trees in the forest. Defaults to 100.

100
max_depth int | None

Maximum depth of trees. Defaults to 5.

5
min_samples_leaf int

Minimum samples at leaf node. Defaults to 10.

10
clip_quantile float

Quantile for weight clipping. Defaults to 0.05.

0.05

Returns:

Type Description
SklearnWeightEstimator

Configured SklearnWeightEstimator instance.

Examples:

estimator = forest_weight_estimator(n_estimators=200)
estimator.fit(calib_samples, test_samples)
w_calib, w_test = estimator.get_weights()
Source code in nonconform/weighting.py
def forest_weight_estimator(
    n_estimators: int = 100,
    max_depth: int | None = 5,
    min_samples_leaf: int = 10,
    clip_quantile: float = 0.05,
) -> SklearnWeightEstimator:
    """Create weight estimator using Random Forest.

    This factory function provides behavioral equivalence with the old
    ForestWeightEstimator class.

    Note:
        When used with ConformalDetector, the detector's seed is automatically
        propagated to the weight estimator for reproducibility.

    Args:
        n_estimators: Number of trees in the forest. Defaults to 100.
        max_depth: Maximum depth of trees. Defaults to 5.
        min_samples_leaf: Minimum samples at leaf node. Defaults to 10.
        clip_quantile: Quantile for weight clipping. Defaults to 0.05.

    Returns:
        Configured SklearnWeightEstimator instance.

    Examples:
        ```python
        estimator = forest_weight_estimator(n_estimators=200)
        estimator.fit(calib_samples, test_samples)
        w_calib, w_test = estimator.get_weights()
        ```
    """
    from sklearn.ensemble import RandomForestClassifier

    base_estimator = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        min_samples_leaf=min_samples_leaf,
        class_weight="balanced",
        n_jobs=-1,
    )
    return SklearnWeightEstimator(
        base_estimator=base_estimator, clip_quantile=clip_quantile
    )

FDR Control

Includes weighted low-level expert APIs (weighted_false_discovery_control). For standard workflows, prefer ConformalDetector.select(...).

nonconform.fdr

False Discovery Rate control utilities for conformal prediction.

This module provides explicit entry points for:

  • Weighted Conformalized Selection (WCS) under covariate shift.

Pruning

Bases: Enum

Pruning strategies for weighted FDR control.

Attributes:

Name Type Description
HETEROGENEOUS

Remove elements based on independent random checks per item.

HOMOGENEOUS

Apply one shared random decision to all items.

DETERMINISTIC

Remove items using a fixed rule with no randomness.

weighted_false_discovery_control

weighted_false_discovery_control(
    result: ConformalResult | None,
    *,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
) -> np.ndarray

Perform WCS from a strict ConformalResult bundle.

Source code in nonconform/fdr.py
def weighted_false_discovery_control(
    result: ConformalResult | None,
    *,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
) -> np.ndarray:
    """Perform WCS from a strict ConformalResult bundle."""
    p_values, test_scores, calib_scores, test_weights, calib_weights = (
        _extract_required_wcs_fields(result)
    )
    kde_support, use_self_weight = _extract_kde_support(result)
    return _run_wcs(
        p_values=p_values,
        test_scores=test_scores,
        calib_scores=calib_scores,
        test_weights=test_weights,
        calib_weights=calib_weights,
        alpha=alpha,
        pruning=pruning,
        seed=seed,
        kde_support=kde_support,
        include_self_weight=use_self_weight,
    )

weighted_false_discovery_control_from_arrays

weighted_false_discovery_control_from_arrays(
    *,
    p_values: ndarray,
    test_scores: ndarray,
    calib_scores: ndarray,
    test_weights: ndarray,
    calib_weights: ndarray,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
) -> np.ndarray

Perform WCS from explicit weighted arrays and precomputed p-values.

Source code in nonconform/fdr.py
def weighted_false_discovery_control_from_arrays(
    *,
    p_values: np.ndarray,
    test_scores: np.ndarray,
    calib_scores: np.ndarray,
    test_weights: np.ndarray,
    calib_weights: np.ndarray,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
) -> np.ndarray:
    """Perform WCS from explicit weighted arrays and precomputed p-values."""
    return _run_wcs(
        p_values=p_values,
        test_scores=test_scores,
        calib_scores=calib_scores,
        test_weights=test_weights,
        calib_weights=calib_weights,
        alpha=alpha,
        pruning=pruning,
        seed=seed,
    )

Martingales

nonconform.martingales

Exchangeability martingales for sequential conformal evidence.

This module implements p-value-based martingales and alarm statistics for streaming or temporal monitoring workflows. In practice, you feed one conformal p-value at a time and read a running evidence state after each update.

Implemented martingales
  • PowerMartingale
  • SimpleMixtureMartingale
  • SimpleJumperMartingale

All classes consume conformal p-values in [0, 1]. Alarm statistics are computed from martingale ratio increments and exposed together with the current martingale value in :class:MartingaleState.

AlarmConfig dataclass

AlarmConfig(
    ville_threshold: float | None = None,
    cusum_threshold: float | None = None,
    shiryaev_roberts_threshold: float | None = None,
)

Optional alarm thresholds for martingale evidence statistics.

Thresholds are disabled when set to None. Each threshold compares against a running statistic in :class:MartingaleState.

MartingaleState dataclass

MartingaleState(
    step: int,
    p_value: float,
    log_martingale: float,
    martingale: float,
    log_cusum: float,
    cusum: float,
    log_shiryaev_roberts: float,
    shiryaev_roberts: float,
    triggered_alarms: tuple[str, ...],
)

Snapshot of martingale and alarm statistics after one update.

BaseMartingale

BaseMartingale(alarm_config: AlarmConfig | None = None)

Bases: ABC

Abstract base class for p-value-driven exchangeability martingales.

Source code in nonconform/martingales.py
def __init__(self, alarm_config: AlarmConfig | None = None) -> None:
    self._alarm_config = alarm_config if alarm_config is not None else AlarmConfig()
    self.reset()
state property
state: MartingaleState

Return current state snapshot.

reset
reset() -> None

Reset martingale and alarm statistics to initial values.

Source code in nonconform/martingales.py
def reset(self) -> None:
    """Reset martingale and alarm statistics to initial values."""
    self._step = 0
    self._last_p_value = float("nan")
    self._log_martingale = 0.0
    # CUSUM/SR start at 0 on linear scale -> -inf in log space.
    self._log_cusum = float("-inf")
    self._log_shiryaev_roberts = float("-inf")
    self._reset_method_state()
update_many
update_many(
    p_values: Sequence[float] | ndarray,
) -> list[MartingaleState]

Update state for each p-value in order and return all snapshots.

Source code in nonconform/martingales.py
def update_many(
    self, p_values: Sequence[float] | np.ndarray
) -> list[MartingaleState]:
    """Update state for each p-value in order and return all snapshots."""
    return [self.update(float(p_value)) for p_value in p_values]
update
update(p_value: float) -> MartingaleState

Ingest one p-value in [0, 1] and return the updated state.

Source code in nonconform/martingales.py
def update(self, p_value: float) -> MartingaleState:
    """Ingest one p-value in ``[0, 1]`` and return the updated state."""
    p_value_validated = _validate_probability(p_value)
    log_increment = self._compute_log_increment(p_value_validated)
    if np.isnan(log_increment):
        raise ValueError("Martingale increment is NaN.")

    self._step += 1
    self._last_p_value = p_value_validated
    self._log_martingale += log_increment
    self._log_cusum = float(log_increment + max(self._log_cusum, 0.0))
    self._log_shiryaev_roberts = float(
        log_increment + np.logaddexp(0.0, self._log_shiryaev_roberts)
    )
    return self._current_state()

PowerMartingale

PowerMartingale(
    epsilon: float = 0.5,
    alarm_config: AlarmConfig | None = None,
)

Bases: BaseMartingale

Power martingale with fixed epsilon in (0, 1].

Source code in nonconform/martingales.py
def __init__(
    self,
    epsilon: float = 0.5,
    alarm_config: AlarmConfig | None = None,
) -> None:
    self.epsilon = float(epsilon)
    if not (0.0 < self.epsilon <= 1.0):
        raise ValueError(f"epsilon must be in (0, 1], got {self.epsilon}.")
    super().__init__(alarm_config=alarm_config)

SimpleMixtureMartingale

SimpleMixtureMartingale(
    epsilons: Sequence[float] | ndarray | None = None,
    *,
    n_grid: int = 100,
    min_epsilon: float = 0.01,
    alarm_config: AlarmConfig | None = None,
)

Bases: BaseMartingale

Simple mixture martingale over a fixed epsilon grid.

Source code in nonconform/martingales.py
def __init__(
    self,
    epsilons: Sequence[float] | np.ndarray | None = None,
    *,
    n_grid: int = 100,
    min_epsilon: float = 0.01,
    alarm_config: AlarmConfig | None = None,
) -> None:
    if epsilons is None:
        if n_grid < 2:
            raise ValueError(f"n_grid must be at least 2, got {n_grid}.")
        if not (0.0 < min_epsilon <= 1.0):
            raise ValueError(f"min_epsilon must be in (0, 1], got {min_epsilon}.")
        self.epsilons = np.linspace(float(min_epsilon), 1.0, int(n_grid))
    else:
        self.epsilons = np.asarray(epsilons, dtype=float)
        if self.epsilons.ndim != 1 or self.epsilons.size == 0:
            raise ValueError("epsilons must be a non-empty 1D sequence.")

    if not np.all(np.isfinite(self.epsilons)):
        raise ValueError("epsilons must be finite.")
    if np.any((self.epsilons <= 0.0) | (self.epsilons > 1.0)):
        raise ValueError("All epsilons must be in (0, 1].")
    self._n_eps = int(self.epsilons.size)
    super().__init__(alarm_config=alarm_config)

SimpleJumperMartingale

SimpleJumperMartingale(
    jump: float = 0.01,
    alarm_config: AlarmConfig | None = None,
)

Bases: BaseMartingale

Simple Jumper martingale (Algorithm 1 in Vovk et al.).

This method mixes three betting components and redistributes mass each step through jump.

Source code in nonconform/martingales.py
def __init__(
    self,
    jump: float = 0.01,
    alarm_config: AlarmConfig | None = None,
) -> None:
    self.jump = float(jump)
    if not (0.0 < self.jump <= 1.0):
        raise ValueError(f"jump must be in (0, 1], got {self.jump}.")
    self._epsilons = np.array([-1.0, 0.0, 1.0], dtype=float)
    super().__init__(alarm_config=alarm_config)

Data Structures

nonconform.structures

Core data structures and protocols for nonconform.

This module provides the fundamental types used throughout the package:

Classes:

Name Description
AnomalyDetector

Protocol defining the detector interface.

ConformalResult

Container for conformal prediction outputs.

AnomalyDetector

Bases: Protocol

Protocol defining the interface for anomaly detectors.

Any detector (PyOD, sklearn-compatible, or custom) can be used with nonconform by implementing this protocol.

Required methods

fit: Train the detector on data decision_function: Compute anomaly scores get_params: Retrieve detector parameters set_params: Configure detector parameters

The detector must be copyable (support copy.copy and copy.deepcopy).

Examples:

# Most PyOD detectors work automatically (blocked strict-inductive
# exceptions are documented in the detector compatibility guide)
from pyod.models.iforest import IForest

detector: AnomalyDetector = IForest()


# Custom detector implementing the protocol
class MyDetector:
    def fit(self, X, y=None): ...
    def decision_function(self, X): ...
    def get_params(self, deep=True): ...
    def set_params(self, **params): ...


detector: AnomalyDetector = MyDetector()
fit
fit(X: ndarray, y: ndarray | None = None) -> Self

Train the anomaly detector.

Parameters:

Name Type Description Default
X ndarray

Training data of shape (n_samples, n_features).

required
y ndarray | None

Ignored. Present for API consistency.

None

Returns:

Type Description
Self

The fitted detector instance.

Source code in nonconform/structures.py
def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> Self:
    """Train the anomaly detector.

    Args:
        X: Training data of shape (n_samples, n_features).
        y: Ignored. Present for API consistency.

    Returns:
        The fitted detector instance.
    """
    ...
decision_function
decision_function(X: ndarray) -> np.ndarray

Compute anomaly scores for samples.

Higher scores indicate more anomalous samples.

Parameters:

Name Type Description Default
X ndarray

Data of shape (n_samples, n_features).

required

Returns:

Type Description
ndarray

Anomaly scores of shape (n_samples,).

Source code in nonconform/structures.py
def decision_function(self, X: np.ndarray) -> np.ndarray:
    """Compute anomaly scores for samples.

    Higher scores indicate more anomalous samples.

    Args:
        X: Data of shape (n_samples, n_features).

    Returns:
        Anomaly scores of shape (n_samples,).
    """
    ...
get_params
get_params(deep: bool = True) -> dict[str, Any]

Get parameters for this detector.

Parameters:

Name Type Description Default
deep bool

If True, return parameters for sub-objects.

True

Returns:

Type Description
dict[str, Any]

Parameter names mapped to their values.

Source code in nonconform/structures.py
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Get parameters for this detector.

    Args:
        deep: If True, return parameters for sub-objects.

    Returns:
        Parameter names mapped to their values.
    """
    ...
set_params
set_params(**params: Any) -> Self

Set parameters for this detector.

Parameters:

Name Type Description Default
**params Any

Detector parameters.

{}

Returns:

Type Description
Self

The detector instance.

Source code in nonconform/structures.py
def set_params(self, **params: Any) -> Self:
    """Set parameters for this detector.

    Args:
        **params: Detector parameters.

    Returns:
        The detector instance.
    """
    ...

ConformalResult dataclass

ConformalResult(
    p_values: ndarray | None = None,
    test_scores: ndarray | None = None,
    calib_scores: ndarray | None = None,
    test_weights: ndarray | None = None,
    calib_weights: ndarray | None = None,
    metadata: dict[str, Any] = dict(),
)

Snapshot of detector outputs for downstream procedures.

This dataclass holds all outputs from a conformal prediction, including p-values, raw scores, and optional weights for weighted conformal.

Attributes:

Name Type Description
p_values ndarray | None

Conformal p-values for test instances (None when unavailable).

test_scores ndarray | None

Non-conformity scores for the test instances (raw predictions).

calib_scores ndarray | None

Non-conformity scores for the calibration set.

test_weights ndarray | None

Importance weights for test instances (weighted mode only).

calib_weights ndarray | None

Importance weights for calibration instances.

metadata dict[str, Any]

Optional dictionary with extra data (debug info, timings, etc.).

Examples:

p_values = detector.compute_p_values(X_test)
result = detector.last_result
print(result.p_values)  # Access p-values
print(result.metadata)  # Access optional metadata
copy
copy() -> ConformalResult

Return a copy with arrays and metadata fully duplicated.

Returns:

Type Description
ConformalResult

A new ConformalResult with copied arrays and deep-copied metadata.

Source code in nonconform/structures.py
def copy(self) -> ConformalResult:
    """Return a copy with arrays and metadata fully duplicated.

    Returns:
        A new ConformalResult with copied arrays and deep-copied metadata.
    """

    def _copy_arr(arr: np.ndarray | None) -> np.ndarray | None:
        return arr.copy() if arr is not None else None

    return ConformalResult(
        p_values=_copy_arr(self.p_values),
        test_scores=_copy_arr(self.test_scores),
        calib_scores=_copy_arr(self.calib_scores),
        test_weights=_copy_arr(self.test_weights),
        calib_weights=_copy_arr(self.calib_weights),
        metadata=deepcopy(self.metadata),
    )

Adapters

nonconform.adapters

External detector adapters for nonconform.

ScorePolarityAdapter

ScorePolarityAdapter(
    detector: AnomalyDetector, score_polarity: ScorePolarity
)

Adapter that normalizes detector score direction conventions.

Source code in nonconform/adapters.py
def __init__(
    self,
    detector: AnomalyDetector,
    score_polarity: ScorePolarity,
) -> None:
    if score_polarity not in {
        ScorePolarity.HIGHER_IS_ANOMALOUS,
        ScorePolarity.HIGHER_IS_NORMAL,
    }:
        raise ValueError(
            "ScorePolarityAdapter requires explicit non-auto score polarity."
        )
    self._detector = detector
    self._score_polarity = score_polarity
    self._multiplier = (
        1.0 if score_polarity is ScorePolarity.HIGHER_IS_ANOMALOUS else -1.0
    )
fit
fit(X: ndarray, y: ndarray | None = None) -> Self

Fit wrapped detector.

Source code in nonconform/adapters.py
def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> Self:
    """Fit wrapped detector."""
    self._detector.fit(X, y)
    return self
decision_function
decision_function(X: ndarray) -> np.ndarray

Return scores transformed to anomalous-higher convention.

Source code in nonconform/adapters.py
def decision_function(self, X: np.ndarray) -> np.ndarray:
    """Return scores transformed to anomalous-higher convention."""
    scores = np.asarray(self._detector.decision_function(X), dtype=float)
    return self._multiplier * scores
get_params
get_params(deep: bool = True) -> dict[str, Any]

Delegate parameter retrieval to wrapped detector.

Source code in nonconform/adapters.py
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Delegate parameter retrieval to wrapped detector."""
    return self._detector.get_params(deep=deep)
set_params
set_params(**params: Any) -> Self

Delegate parameter updates to wrapped detector.

Source code in nonconform/adapters.py
def set_params(self, **params: Any) -> Self:
    """Delegate parameter updates to wrapped detector."""
    self._detector.set_params(**params)
    return self

PyODAdapter

PyODAdapter(detector: Any)

Adapter wrapping PyOD detectors to ensure protocol compliance.

Source code in nonconform/adapters.py
def __init__(self, detector: Any) -> None:
    """Initialize adapter for a PyOD detector."""
    if not PYOD_AVAILABLE:
        raise ImportError("PyOD is not installed. Install with: pip install pyod")
    self._detector = detector
fit
fit(X: ndarray, y: ndarray | None = None) -> Self

Fit wrapped detector.

Source code in nonconform/adapters.py
def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> Self:
    """Fit wrapped detector."""
    self._detector.fit(X, y)
    return self
decision_function
decision_function(X: ndarray) -> np.ndarray

Return anomaly scores from wrapped detector.

Source code in nonconform/adapters.py
def decision_function(self, X: np.ndarray) -> np.ndarray:
    """Return anomaly scores from wrapped detector."""
    return self._detector.decision_function(X)
get_params
get_params(deep: bool = True) -> dict[str, Any]

Delegate parameter retrieval to wrapped detector.

Source code in nonconform/adapters.py
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Delegate parameter retrieval to wrapped detector."""
    return self._detector.get_params(deep=deep)
set_params
set_params(**params: Any) -> Self

Delegate parameter updates to wrapped detector.

Source code in nonconform/adapters.py
def set_params(self, **params: Any) -> Self:
    """Delegate parameter updates to wrapped detector."""
    self._detector.set_params(**params)
    return self

adapt

adapt(detector: Any) -> AnomalyDetector

Adapt a detector to the AnomalyDetector protocol.

Source code in nonconform/adapters.py
def adapt(detector: Any) -> AnomalyDetector:
    """Adapt a detector to the AnomalyDetector protocol."""
    _guard_blocked_pyod_detector(detector)

    if isinstance(detector, AnomalyDetector):
        return detector

    if PYOD_AVAILABLE and isinstance(detector, PyODBaseDetector):
        return PyODAdapter(detector)

    if not PYOD_AVAILABLE and _looks_like_pyod(detector):
        raise ImportError(
            "Detector appears to be a PyOD detector, but PyOD is not installed. "
            'Install with: pip install "nonconform[pyod]" or pip install pyod.'
        )

    required_methods = ["fit", "decision_function", "get_params", "set_params"]
    missing_methods = [m for m in required_methods if not hasattr(detector, m)]
    if missing_methods:
        raise TypeError(
            "Detector must implement AnomalyDetector protocol. "
            f"Missing methods: {', '.join(missing_methods)}"
        )

    return detector

parse_score_polarity

parse_score_polarity(
    score_polarity: ScorePolarityInput,
) -> ScorePolarity

Parse score polarity input to canonical enum representation.

Source code in nonconform/adapters.py
def parse_score_polarity(score_polarity: ScorePolarityInput) -> ScorePolarity:
    """Parse score polarity input to canonical enum representation."""
    if isinstance(score_polarity, ScorePolarity):
        return score_polarity

    if isinstance(score_polarity, str):
        normalized = score_polarity.strip().lower()
        mapping = {
            "auto": ScorePolarity.AUTO,
            "higher_is_anomalous": ScorePolarity.HIGHER_IS_ANOMALOUS,
            "higher_is_normal": ScorePolarity.HIGHER_IS_NORMAL,
        }
        if normalized in mapping:
            return mapping[normalized]
        raise ValueError(
            "Invalid score_polarity value. "
            "Use one of: 'auto', 'higher_is_anomalous', 'higher_is_normal'."
        )

    raise TypeError(
        "score_polarity must be a ScorePolarity enum or string literal "
        "('auto', 'higher_is_anomalous', 'higher_is_normal')."
    )

resolve_implicit_score_polarity

resolve_implicit_score_polarity(
    detector: Any,
) -> ScorePolarity

Resolve score polarity when users omit score_polarity.

This pre-release default favors low-friction custom detector onboarding while preserving safe behavior for known detector families: - Known sklearn normality detectors -> HIGHER_IS_NORMAL - PyOD detectors -> HIGHER_IS_ANOMALOUS - Unknown custom detectors -> HIGHER_IS_ANOMALOUS

Source code in nonconform/adapters.py
def resolve_implicit_score_polarity(detector: Any) -> ScorePolarity:
    """Resolve score polarity when users omit score_polarity.

    This pre-release default favors low-friction custom detector onboarding while
    preserving safe behavior for known detector families:
    - Known sklearn normality detectors -> HIGHER_IS_NORMAL
    - PyOD detectors -> HIGHER_IS_ANOMALOUS
    - Unknown custom detectors -> HIGHER_IS_ANOMALOUS
    """
    if _is_known_sklearn_normality_detector(detector):
        return ScorePolarity.HIGHER_IS_NORMAL
    if isinstance(detector, PyODAdapter) or _looks_like_pyod(detector):
        return ScorePolarity.HIGHER_IS_ANOMALOUS
    return ScorePolarity.HIGHER_IS_ANOMALOUS

resolve_score_polarity

resolve_score_polarity(
    detector: Any, score_polarity: ScorePolarityInput
) -> ScorePolarity

Resolve requested score polarity in strict AUTO mode.

Unlike resolve_implicit_score_polarity, this function is intentionally strict for explicit score_polarity="auto" and raises for unknown detectors.

Source code in nonconform/adapters.py
def resolve_score_polarity(
    detector: Any,
    score_polarity: ScorePolarityInput,
) -> ScorePolarity:
    """Resolve requested score polarity in strict AUTO mode.

    Unlike ``resolve_implicit_score_polarity``, this function is intentionally
    strict for explicit ``score_polarity="auto"`` and raises for unknown
    detectors.
    """
    parsed = parse_score_polarity(score_polarity)
    if parsed is not ScorePolarity.AUTO:
        return parsed

    if isinstance(detector, PyODAdapter) or _looks_like_pyod(detector):
        return ScorePolarity.HIGHER_IS_ANOMALOUS
    if _is_known_sklearn_normality_detector(detector):
        return ScorePolarity.HIGHER_IS_NORMAL

    detector_cls = type(detector)
    detector_name = f"{detector_cls.__module__}.{detector_cls.__qualname__}"
    raise ValueError(
        "Unable to infer score polarity automatically in strict auto mode for "
        f"detector '{detector_name}'. Auto inference currently supports PyOD "
        "detectors and known sklearn normality estimators. For custom detectors, "
        "pass score_polarity='higher_is_anomalous' (recommended when larger "
        "scores mean more anomalous) or score_polarity='higher_is_normal'."
    )

apply_score_polarity

apply_score_polarity(
    detector: AnomalyDetector,
    score_polarity: ScorePolarityInput,
) -> AnomalyDetector

Return detector that follows requested score polarity convention.

Source code in nonconform/adapters.py
def apply_score_polarity(
    detector: AnomalyDetector,
    score_polarity: ScorePolarityInput,
) -> AnomalyDetector:
    """Return detector that follows requested score polarity convention."""
    parsed = parse_score_polarity(score_polarity)
    if parsed is ScorePolarity.AUTO:
        raise ValueError(
            "score_polarity='auto' must be resolved first with resolve_score_polarity."
        )
    if parsed is ScorePolarity.HIGHER_IS_ANOMALOUS:
        return detector
    return ScorePolarityAdapter(detector=detector, score_polarity=parsed)