Skip to content
Edit this page View source of this page

API Reference

Complete API documentation for all nonconform modules and classes.

Start Here

If you are looking for task-oriented call sequences, start with Common Workflows.

For the v1 public compatibility contract, see API Stability.

Detector

nonconform.detector

Core conformal anomaly detector implementation.

This module provides the main ConformalDetector class that wraps any anomaly detector with conformal inference for valid p-values and FDR control.

Classes:

Name Description
BaseConformalDetector

Abstract base class for conformal detectors.

ConformalDetector

Main conformal anomaly detector with optional weighting.

BaseConformalDetector

Bases: ABC

Abstract base class for all conformal anomaly detectors.

Defines the core interface that all conformal anomaly detection implementations must provide. Conformal detectors support either an integrated or detached calibration workflow:

  1. Integrated calibration: fit() trains detector(s) and computes calibration scores
  2. Detached calibration: train detector externally, then call calibrate() on a separate calibration dataset
  3. Inference Phase: compute_p_values() converts new data scores to valid p-values, or select() for the combined p-value + FDR-control workflow

Subclasses must implement both abstract methods.

Note

This is an abstract class and cannot be instantiated directly. Use ConformalDetector for the main implementation.

fit abstractmethod
fit(
    x: DataFrame | ndarray,
    y: ndarray | None = None,
    *,
    n_jobs: int | None = None,
) -> Self

Fit the detector model(s) and compute calibration scores.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The dataset used for fitting the model(s) and determining calibration scores.

required
y ndarray | None

Ignored. Present for sklearn API compatibility.

None
n_jobs int | None

Optional strategy-specific parallelism hint. Currently used by strategies that expose an n_jobs parameter (for example, JackknifeBootstrap).

None

Returns:

Type Description
Self

The fitted detector instance.

Source code in nonconform/detector.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@ensure_numpy_array
@abstractmethod
def fit(
    self,
    x: pd.DataFrame | np.ndarray,
    y: np.ndarray | None = None,
    *,
    n_jobs: int | None = None,
) -> Self:
    """Fit the detector model(s) and compute calibration scores.

    Args:
        x: The dataset used for fitting the model(s) and determining
            calibration scores.
        y: Ignored. Present for sklearn API compatibility.
        n_jobs: Optional strategy-specific parallelism hint.
            Currently used by strategies that expose an ``n_jobs`` parameter
            (for example, ``JackknifeBootstrap``).

    Returns:
        The fitted detector instance.
    """
    raise NotImplementedError("Subclasses must implement fit()")
calibrate
calibrate(
    x: DataFrame | ndarray, y: ndarray | None = None
) -> Self

Calibrate a pre-fitted detector on separate calibration data.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Dataset used only to compute calibration scores.

required
y ndarray | None

Ignored. Present for sklearn API compatibility.

None

Returns:

Type Description
Self

The calibrated detector instance.

Source code in nonconform/detector.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@ensure_numpy_array
def calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    y: np.ndarray | None = None,
) -> Self:
    """Calibrate a pre-fitted detector on separate calibration data.

    Args:
        x: Dataset used only to compute calibration scores.
        y: Ignored. Present for sklearn API compatibility.

    Returns:
        The calibrated detector instance.
    """
    raise NotImplementedError("Subclasses must implement calibrate()")
compute_p_values abstractmethod
compute_p_values(
    x: DataFrame | Series | ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Return conformal p-values for new data.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Ignored in standard mode.

True

Returns:

Type Description
ndarray | Series

P-values as ndarray for numpy input, or pandas Series for pandas input.

Source code in nonconform/detector.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@abstractmethod
def compute_p_values(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Return conformal p-values for new data.

    Args:
        x: New data instances for anomaly estimation.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Ignored in standard mode.

    Returns:
        P-values as ndarray for numpy input, or pandas Series for pandas input.
    """
    raise NotImplementedError("Subclasses must implement compute_p_values()")
score_samples abstractmethod
score_samples(
    x: DataFrame | Series | ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Return aggregated raw anomaly scores for new data.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Ignored in standard mode.

True

Returns:

Type Description
ndarray | Series

Raw scores as ndarray for numpy input, or pandas Series for pandas input.

Source code in nonconform/detector.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@abstractmethod
def score_samples(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Return aggregated raw anomaly scores for new data.

    Args:
        x: New data instances for anomaly estimation.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Ignored in standard mode.

    Returns:
        Raw scores as ndarray for numpy input, or pandas Series for pandas input.
    """
    raise NotImplementedError("Subclasses must implement score_samples()")

ConformalDetector

ConformalDetector(
    detector: Any,
    strategy: BaseStrategy,
    estimation: BaseEstimation | None = None,
    weight_estimator: BaseWeightEstimator | None = None,
    aggregation: str = "median",
    score_polarity: ScorePolarity
    | Literal[
        "auto", "higher_is_anomalous", "higher_is_normal"
    ]
    | None = None,
    seed: int | None = None,
    verbose: bool = False,
    verify_prepared_batch_content: bool = True,
)

Bases: BaseConformalDetector

Unified conformal anomaly detector with optional covariate shift handling.

Provides distribution-free anomaly detection with valid p-values and False Discovery Rate (FDR) control by wrapping any anomaly detector with conformal inference. Supports PyOD detectors, sklearn-compatible detectors, and custom detectors implementing the AnomalyDetector protocol.

When no weight estimator is provided (standard conformal prediction): - Uses classical conformal inference for exchangeable data - Provides optimal performance and memory usage - Suitable when training and test data come from the same distribution

When a weight estimator is provided (weighted conformal prediction): - Handles distribution shift between calibration and test data - Estimates importance weights to maintain statistical validity - Slightly higher computational cost but robust to covariate shift

Parameters:

Name Type Description Default
detector Any

Anomaly detector (PyOD, sklearn-compatible, or custom).

required
strategy BaseStrategy

The conformal strategy for fitting and calibration.

required
estimation BaseEstimation | None

P-value estimation strategy. Defaults to Empirical().

None
weight_estimator BaseWeightEstimator | None

Weight estimator for covariate shift. Defaults to None.

None
aggregation str

Method for aggregating scores from multiple models. Defaults to "median".

'median'
score_polarity ScorePolarity | Literal['auto', 'higher_is_anomalous', 'higher_is_normal'] | None

Score direction convention. Use "higher_is_anomalous" when higher raw scores indicate more anomalous samples, and "higher_is_normal" when higher scores indicate more normal samples. If omitted (None), nonconform applies an implicit default policy: known sklearn normality detectors resolve to "higher_is_normal", while PyOD and unknown custom detectors resolve to "higher_is_anomalous". Explicit "auto" enables strict inference: known detector families are inferred, and unknown detectors raise. Defaults to None.

None
seed int | None

Random seed for reproducibility. Defaults to None.

None
verbose bool

If True, displays progress bars during prediction. Defaults to False.

False
verify_prepared_batch_content bool

If True (default), weighted reuse mode (refit_weights=False) verifies exact batch content identity via hashing. This adds O(n) overhead per checked batch. Set to False to skip content hashing and validate only batch size.

True

Attributes:

Name Type Description
detector

The underlying anomaly detection model.

strategy

The calibration strategy for computing p-values.

weight_estimator

Optional weight estimator for handling covariate shift.

aggregation

Method for combining scores from multiple models.

score_polarity ScorePolarity

Resolved score polarity used internally.

seed ScorePolarity

Random seed for reproducible results.

verbose ScorePolarity

Whether to display progress bars.

_detector_set ScorePolarity

List of trained detector models (populated after fit).

_calibration_set ScorePolarity

Calibration scores (populated after fit).

Examples:

Standard conformal prediction — FDR-controlled selection in one call:

from pyod.models.iforest import IForest
from nonconform import ConformalDetector, Split

detector = ConformalDetector(
    detector=IForest(), strategy=Split(n_calib=0.2), seed=42
)
detector.fit(X_train)
mask = detector.select(X_test, alpha=0.05)

Access raw p-values when needed:

detector.fit(X_train)
p_values = detector.compute_p_values(X_test)

Weighted conformal prediction:

from nonconform import logistic_weight_estimator

detector = ConformalDetector(
    detector=IForest(),
    strategy=Split(n_calib=0.2),
    weight_estimator=logistic_weight_estimator(),
    seed=42,
)
detector.fit(X_train)
mask = detector.select(X_test, alpha=0.05)

Detached calibration with a pre-trained model (Split strategy):

base_detector.fit(X_fit)
detector = ConformalDetector(
    detector=base_detector, strategy=Split(n_calib=0.2)
)
detector.calibrate(X_calib)
p_values = detector.compute_p_values(X_test)
Note

Strict inductive conformal/FDR workflows require a fixed training-only score map at inference time. PyOD detectors known to violate this are: CD, COF, COPOD, ECOD, LMDD, LOCI, RGraph, SOD, SOS.

Source code in nonconform/detector.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def __init__(
    self,
    detector: Any,
    strategy: BaseStrategy,
    estimation: BaseEstimation | None = None,
    weight_estimator: BaseWeightEstimator | None = None,
    aggregation: str = "median",
    score_polarity: ScorePolarity
    | Literal["auto", "higher_is_anomalous", "higher_is_normal"]
    | None = None,
    seed: int | None = None,
    verbose: bool = False,
    verify_prepared_batch_content: bool = True,
) -> None:
    self._configure(
        detector=detector,
        strategy=strategy,
        estimation=estimation,
        weight_estimator=weight_estimator,
        aggregation=aggregation,
        score_polarity=score_polarity,
        seed=seed,
        verbose=verbose,
        verify_prepared_batch_content=verify_prepared_batch_content,
    )
detector_set property
detector_set: list[AnomalyDetector]

Returns a copy of the list of trained detector models.

calibration_set property
calibration_set: ndarray

Returns a copy of the calibration scores.

calibration_samples property
calibration_samples: ndarray

Returns a copy of the calibration samples (weighted mode only).

last_result property
last_result: ConformalResult | None

Return the most recent conformal result snapshot.

score_polarity property
score_polarity: ScorePolarity

Returns the resolved score polarity convention.

is_fitted property
is_fitted: bool

Returns whether the detector has been fitted.

get_params
get_params(deep: bool = True) -> dict[str, Any]

Return estimator parameters following sklearn conventions.

Notes
  • deep=False returns constructor-facing parameters used for sklearn clone compatibility.
  • deep=True also includes nested component__param entries read from the current runtime components (effective/internal state), which may differ from originally passed constructor objects after adaptation/normalization.
Source code in nonconform/detector.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Return estimator parameters following sklearn conventions.

    Notes:
        - ``deep=False`` returns constructor-facing parameters used for
          sklearn clone compatibility.
        - ``deep=True`` also includes nested ``component__param`` entries
          read from the current runtime components (effective/internal state),
          which may differ from originally passed constructor objects after
          adaptation/normalization.
    """
    params: dict[str, Any] = {
        "detector": self._init_detector,
        "strategy": self._init_strategy,
        "estimation": self._init_estimation,
        "weight_estimator": self._init_weight_estimator,
        "aggregation": self._init_aggregation,
        "score_polarity": self._init_score_polarity,
        "seed": self._init_seed,
        "verbose": self._init_verbose,
        "verify_prepared_batch_content": self._init_verify_prepared_batch_content,
    }
    if not deep:
        return params

    for component_name in self._NESTED_COMPONENTS:
        component = getattr(self, component_name)
        if component is None or not hasattr(component, "get_params"):
            continue
        try:
            component_params = component.get_params(deep=True)
        except TypeError:
            component_params = component.get_params()
        for key, value in component_params.items():
            params[f"{component_name}__{key}"] = value
    return params
set_params
set_params(**params: Any) -> Self

Set estimator parameters following sklearn conventions.

Source code in nonconform/detector.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
def set_params(self, **params: Any) -> Self:
    """Set estimator parameters following sklearn conventions."""
    if not params:
        return self

    updated_params = self.get_params(deep=False)
    nested_updates: dict[str, dict[str, Any]] = {}

    for key, value in params.items():
        if "__" in key:
            component_name, nested_key = key.split("__", 1)
            if component_name not in self._NESTED_COMPONENTS:
                raise ValueError(f"Invalid parameter {component_name!r}.")
            nested_updates.setdefault(component_name, {})[nested_key] = value
            continue

        if key not in updated_params:
            raise ValueError(
                f"Invalid parameter {key!r} for estimator {type(self).__name__}."
            )
        updated_params[key] = value

    for component_name, component_params in nested_updates.items():
        component = updated_params[component_name]
        if component is None:
            raise ValueError(
                f"Cannot set nested parameters for {component_name!r}: "
                "component is None."
            )
        if not hasattr(component, "set_params"):
            raise ValueError(
                f"Cannot set nested parameters for {component_name!r}: "
                "component does not implement set_params()."
            )
        component.set_params(**component_params)

    self._configure(**updated_params)
    return self
fit
fit(
    x: DataFrame | ndarray,
    y: ndarray | None = None,
    *,
    n_jobs: int | None = None,
) -> Self

Fit detector model(s) and compute calibration scores.

Uses the specified strategy to train the base detector(s) and calculate non-conformity scores on the calibration set.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The dataset used for fitting and calibration.

required
y ndarray | None

Ignored. Present for sklearn API compatibility.

None
n_jobs int | None

Optional strategy-specific parallelism hint. Supported by strategies whose fit_calibrate signature includes n_jobs (for example, JackknifeBootstrap).

None

Returns:

Type Description
Self

The fitted detector instance (for method chaining).

Source code in nonconform/detector.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
@ensure_numpy_array
def fit(
    self,
    x: pd.DataFrame | np.ndarray,
    y: np.ndarray | None = None,
    *,
    n_jobs: int | None = None,
) -> Self:
    """Fit detector model(s) and compute calibration scores.

    Uses the specified strategy to train the base detector(s) and calculate
    non-conformity scores on the calibration set.

    Args:
        x: The dataset used for fitting and calibration.
        y: Ignored. Present for sklearn API compatibility.
        n_jobs: Optional strategy-specific parallelism hint. Supported by
            strategies whose ``fit_calibrate`` signature includes ``n_jobs``
            (for example, ``JackknifeBootstrap``).

    Returns:
        The fitted detector instance (for method chaining).
    """
    _ = y
    fit_kwargs: dict[str, Any] = {
        "x": x,
        "detector": self.detector,
        "weighted": self._is_weighted_mode,
        "seed": self.seed,
    }
    if n_jobs is not None:
        strategy_params = inspect.signature(self.strategy.fit_calibrate).parameters
        if "n_jobs" not in strategy_params:
            raise ValueError(
                f"Strategy {type(self.strategy).__name__} does not support n_jobs. "
                "Pass n_jobs only when using a strategy that exposes it, "
                "such as JackknifeBootstrap."
            )
        fit_kwargs["n_jobs"] = n_jobs

    self._detector_set, self._calibration_set = self.strategy.fit_calibrate(
        **fit_kwargs
    )

    if (
        self._is_weighted_mode
        and self.strategy.calibration_ids is not None
        and len(self.strategy.calibration_ids) > 0
    ):
        self._calibration_samples = x[self.strategy.calibration_ids]
    else:
        self._calibration_samples = np.array([])

    self._prepared_weight_batch_size = None
    self._prepared_weight_batch_signature = None
    self._last_result = None
    return self
calibrate
calibrate(
    x: DataFrame | ndarray, y: ndarray | None = None
) -> Self

Calibrate a pre-fitted detector on separate calibration data.

This detached workflow is currently supported only for Split strategy, where a single pre-fitted model is calibrated on a dedicated dataset.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Calibration dataset used to compute calibration scores.

required
y ndarray | None

Ignored. Present for sklearn API compatibility.

None

Returns:

Type Description
Self

The calibrated detector instance (for method chaining).

Raises:

Type Description
ValueError

If strategy is not Split.

NotFittedError

If the base detector appears unfitted.

Source code in nonconform/detector.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
@ensure_numpy_array
def calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    y: np.ndarray | None = None,
) -> Self:
    """Calibrate a pre-fitted detector on separate calibration data.

    This detached workflow is currently supported only for ``Split`` strategy,
    where a single pre-fitted model is calibrated on a dedicated dataset.

    Args:
        x: Calibration dataset used to compute calibration scores.
        y: Ignored. Present for sklearn API compatibility.

    Returns:
        The calibrated detector instance (for method chaining).

    Raises:
        ValueError: If strategy is not ``Split``.
        NotFittedError: If the base detector appears unfitted.
    """
    _ = y
    from nonconform.resampling import Split

    if not isinstance(self.strategy, Split):
        raise ValueError(
            "calibrate() is supported only with Split strategy. "
            f"Got {type(self.strategy).__name__}."
        )

    try:
        calibration_set = np.asarray(
            self.detector.decision_function(x),
            dtype=float,
        ).ravel()
    except Exception as exc:
        message = str(exc).lower()
        if (
            isinstance(exc, NotFittedError)
            or "not fitted" in message
            or (isinstance(exc, AttributeError) and "has no attribute" in message)
        ):
            raise NotFittedError(
                "Base detector is not fitted. Fit the base detector before "
                "calling calibrate()."
            ) from exc
        raise

    if calibration_set.shape[0] != len(x):
        raise ValueError(
            "calibration scores must have one value per calibration sample. "
            f"Got {calibration_set.shape[0]} scores for {len(x)} samples."
        )

    self._detector_set = [self.detector]
    self._calibration_set = calibration_set
    if self._is_weighted_mode:
        self._calibration_samples = x.copy()
    else:
        self._calibration_samples = np.array([])

    self._prepared_weight_batch_size = None
    self._prepared_weight_batch_signature = None
    self._last_result = None
    return self
select
select(
    x: DataFrame | Series | ndarray,
    *,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Compute p-values and apply FDR-controlled selection in one step.

This is the recommended single-call workflow for most use cases. It combines compute_p_values() and the appropriate selection procedure (BH-style FDR selection for standard mode, weighted conformalized selection for weighted mode) into one method, eliminating the need to access last_result manually.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
alpha float

Target FDR level in (0, 1). Defaults to 0.05.

0.05
pruning Pruning

Pruning strategy for weighted FDR control. Ignored in standard (unweighted) mode. Defaults to Pruning.DETERMINISTIC.

DETERMINISTIC
seed int | None

Optional random seed for weighted randomized pruning modes. When None, falls back to detector seed. Ignored in standard mode and deterministic pruning mode.

None
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Ignored in standard mode. Defaults to True.

True

Returns:

Type Description
ndarray | Series

Boolean selection mask of shape (n_test,). True entries are

ndarray | Series

the FDR-controlled anomaly discoveries. Returns a pandas Series when

ndarray | Series

the input is a DataFrame or Series.

Examples:

Standard workflow (no weight estimator):

detector.fit(X_train)
mask = detector.select(X_test, alpha=0.05)
print(f"Discoveries: {mask.sum()}")

Weighted workflow:

detector = ConformalDetector(
    detector=IForest(),
    strategy=Split(n_calib=0.2),
    weight_estimator=logistic_weight_estimator(),
)
detector.fit(X_train)
mask = detector.select(
    X_test,
    alpha=0.1,
    pruning=Pruning.HETEROGENEOUS,
    seed=42,
)
Source code in nonconform/detector.py
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
def select(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Compute p-values and apply FDR-controlled selection in one step.

    This is the recommended single-call workflow for most use cases. It
    combines ``compute_p_values()`` and the appropriate selection procedure
    (BH-style FDR selection for standard mode, weighted conformalized
    selection for weighted mode) into one method, eliminating the need to
    access ``last_result`` manually.

    Args:
        x: New data instances for anomaly estimation.
        alpha: Target FDR level in ``(0, 1)``. Defaults to ``0.05``.
        pruning: Pruning strategy for weighted FDR control. Ignored in
            standard (unweighted) mode. Defaults to
            ``Pruning.DETERMINISTIC``.
        seed: Optional random seed for weighted randomized pruning modes.
            When ``None``, falls back to detector ``seed``. Ignored in
            standard mode and deterministic pruning mode.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Ignored in standard mode. Defaults to True.

    Returns:
        Boolean selection mask of shape ``(n_test,)``. ``True`` entries are
        the FDR-controlled anomaly discoveries. Returns a pandas Series when
        the input is a DataFrame or Series.

    Examples:
        Standard workflow (no weight estimator):

        ```python
        detector.fit(X_train)
        mask = detector.select(X_test, alpha=0.05)
        print(f"Discoveries: {mask.sum()}")
        ```

        Weighted workflow:

        ```python
        detector = ConformalDetector(
            detector=IForest(),
            strategy=Split(n_calib=0.2),
            weight_estimator=logistic_weight_estimator(),
        )
        detector.fit(X_train)
        mask = detector.select(
            X_test,
            alpha=0.1,
            pruning=Pruning.HETEROGENEOUS,
            seed=42,
        )
        ```
    """
    if not (0.0 < alpha < 1.0):
        raise ValueError(f"alpha must be in (0, 1), got {alpha}")

    from nonconform.fdr import weighted_false_discovery_control

    x_array, index = _as_numpy_with_index(x)
    self.compute_p_values(x_array, refit_weights=refit_weights)
    result = self._last_result
    if result is None or result.p_values is None:
        raise RuntimeError(
            "Internal error: select() expected p-values after compute_p_values()."
        )

    if self._is_weighted_mode:
        selection_seed = self.seed if seed is None else seed
        mask = weighted_false_discovery_control(
            result=result,
            alpha=alpha,
            pruning=pruning,
            seed=selection_seed,
        )
    else:
        p_values = np.asarray(result.p_values, dtype=float)
        mask = false_discovery_control(p_values, method="bh") <= alpha

    if index is not None:
        return pd.Series(mask, index=index, name="selected")
    return mask
prepare_weights_for
prepare_weights_for(x: DataFrame | ndarray) -> Self

Prepare weighted conformal state for a specific test batch.

In weighted mode, this fits the weight estimator for the supplied batch without producing predictions. Use this for explicit state transitions in exploratory workflows.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Test batch for which weights should be prepared.

required

Returns:

Type Description
Self

The fitted detector instance (for method chaining).

Raises:

Type Description
NotFittedError

If fit() has not been called.

RuntimeError

If weighted mode is disabled.

Source code in nonconform/detector.py
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
@ensure_numpy_array
def prepare_weights_for(self, x: pd.DataFrame | np.ndarray) -> Self:
    """Prepare weighted conformal state for a specific test batch.

    In weighted mode, this fits the weight estimator for the supplied batch
    without producing predictions. Use this for explicit state transitions in
    exploratory workflows.

    Args:
        x: Test batch for which weights should be prepared.

    Returns:
        The fitted detector instance (for method chaining).

    Raises:
        NotFittedError: If fit() has not been called.
        RuntimeError: If weighted mode is disabled.
    """
    if not self.is_fitted:
        raise NotFittedError("This ConformalDetector instance is not fitted yet.")
    if not self._is_weighted_mode or self.weight_estimator is None:
        raise RuntimeError(
            "prepare_weights_for() requires weighted mode with a weight_estimator."
        )

    self.weight_estimator.fit(self._calibration_samples, x)
    self._prepared_weight_batch_size = len(x)
    if self.verify_prepared_batch_content:
        self._prepared_weight_batch_signature = _batch_signature(x)
    else:
        self._prepared_weight_batch_signature = None
    return self
score_samples
score_samples(
    x: DataFrame | Series | ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Return aggregated raw anomaly scores for new data.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Defaults to True.

True

Returns:

Type Description
ndarray | Series

Aggregated raw anomaly scores.

Source code in nonconform/detector.py
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
def score_samples(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Return aggregated raw anomaly scores for new data.

    Args:
        x: New data instances for anomaly estimation.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Defaults to True.

    Returns:
        Aggregated raw anomaly scores.
    """
    x_array, index = _as_numpy_with_index(x)
    estimates = self._aggregate_scores(x_array)
    weights = self._resolve_weights(x_array, refit_weights=refit_weights)
    calib_weights, test_weights = weights if weights else (None, None)

    self._last_result = ConformalResult(
        p_values=None,
        test_scores=estimates.copy(),
        calib_scores=self._calibration_set.copy(),
        test_weights=_safe_copy(test_weights),
        calib_weights=_safe_copy(calib_weights),
        metadata={},
    )
    if index is not None:
        return pd.Series(estimates, index=index, name="score")
    return estimates
compute_p_values
compute_p_values(
    x: DataFrame | Series | ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series

Return conformal p-values for new data.

Parameters:

Name Type Description Default
x DataFrame | Series | ndarray

New data instances for anomaly estimation.

required
refit_weights bool

Whether to refit the weight estimator for this batch in weighted mode. Defaults to True.

True

Returns:

Type Description
ndarray | Series

Conformal p-values.

Source code in nonconform/detector.py
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
def compute_p_values(
    self,
    x: pd.DataFrame | pd.Series | np.ndarray,
    *,
    refit_weights: bool = True,
) -> np.ndarray | pd.Series:
    """Return conformal p-values for new data.

    Args:
        x: New data instances for anomaly estimation.
        refit_weights: Whether to refit the weight estimator for this batch
            in weighted mode. Defaults to True.

    Returns:
        Conformal p-values.
    """
    x_array, index = _as_numpy_with_index(x)
    estimates = self._aggregate_scores(x_array)
    weights = self._resolve_weights(x_array, refit_weights=refit_weights)
    calib_weights, test_weights = weights if weights else (None, None)

    p_values = self.estimation.compute_p_values(
        estimates, self._calibration_set, weights
    )

    metadata: dict[str, Any] = {}
    if hasattr(self.estimation, "get_metadata"):
        meta = self.estimation.get_metadata()
        if meta:
            metadata = dict(meta)

    self._last_result = ConformalResult(
        p_values=p_values.copy(),
        test_scores=estimates.copy(),
        calib_scores=self._calibration_set.copy(),
        test_weights=_safe_copy(test_weights),
        calib_weights=_safe_copy(calib_weights),
        metadata=metadata,
    )
    if index is not None:
        return pd.Series(p_values, index=index, name="p_value")
    return p_values

Resampling Strategies

nonconform.resampling

Calibration strategies for conformal anomaly detection.

This module provides various calibration strategies that define how to split data for training and calibration in conformal prediction.

Classes:

Name Description
BaseStrategy

Abstract base class for calibration strategies.

Split

Simple train-test split strategy.

CrossValidation

K-fold cross-validation strategy (includes Jackknife factory).

JackknifeBootstrap

Jackknife+-after-Bootstrap (JaB+) strategy.

BaseStrategy

BaseStrategy(mode: ConformalModeInput = 'plus')

Bases: ABC

Abstract base class for anomaly detection calibration strategies.

This class provides a common interface for various calibration strategies applied to anomaly detectors. Subclasses must implement the core calibration logic and define how calibration data is identified and used.

Attributes:

Name Type Description
_mode ConformalMode

Model retention mode controlling calibration/inference behavior.

Parameters:

Name Type Description Default
mode ConformalModeInput

Model retention mode ("plus" or "single_model"). Equivalent ConformalMode enum values are also accepted.

'plus'
Source code in nonconform/resampling.py
70
71
72
73
74
75
76
77
78
def __init__(self, mode: ConformalModeInput = "plus") -> None:
    """Initialize the base calibration strategy.

    Args:
        mode: Model retention mode (`"plus"` or `"single_model"`).
            Equivalent ``ConformalMode`` enum values are also accepted.
    """
    self._mode: ConformalMode = _normalize_mode(mode)
    self._calibration_ids: list[int] = []
calibration_ids abstractmethod property
calibration_ids: list[int] | None

Indices of data points used for calibration.

fit_calibrate abstractmethod
fit_calibrate(
    x: DataFrame | ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
) -> tuple[list[AnomalyDetector], np.ndarray]

Fits the detector and performs calibration.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data for fitting and calibration.

required
detector AnomalyDetector

The anomaly detection model to be fitted and calibrated.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Whether to use weighted approach. Defaults to False.

False

Returns:

Type Description
tuple[list[AnomalyDetector], ndarray]

Tuple of (list of trained detectors, calibration scores array).

Source code in nonconform/resampling.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@abc.abstractmethod
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
) -> tuple[list[AnomalyDetector], np.ndarray]:
    """Fits the detector and performs calibration.

    Args:
        x: The input data for fitting and calibration.
        detector: The anomaly detection model to be fitted and calibrated.
        seed: Random seed for reproducibility. Defaults to None.
        weighted: Whether to use weighted approach. Defaults to False.

    Returns:
        Tuple of (list of trained detectors, calibration scores array).
    """
    raise NotImplementedError(
        "The fit_calibrate() method must be implemented by subclasses."
    )

Split

Split(n_calib: float | int = 0.1)

Bases: BaseStrategy

Split conformal strategy for fast anomaly detection.

Implements the classical split conformal approach by dividing training data into separate fitting and calibration sets.

Parameters:

Name Type Description Default
n_calib float | int

Size or proportion of data used for calibration. If float, must be between 0.0 and 1.0 (proportion). If int, the absolute number of samples. Defaults to 0.1.

0.1

Examples:

# Use 20% of data for calibration
strategy = Split(n_calib=0.2)

# Use exactly 1000 samples for calibration
strategy = Split(n_calib=1000)
Source code in nonconform/resampling.py
131
132
133
134
def __init__(self, n_calib: float | int = 0.1) -> None:
    super().__init__()
    self._calib_size: float | int = n_calib
    self._calibration_ids: list[int] | None = None
calibration_ids property
calibration_ids: list[int] | None

Indices of calibration samples (None if weighted=False).

calib_size property
calib_size: float | int

Returns the calibration size or proportion.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: AnomalyDetector,
    weighted: bool = False,
    seed: int | None = None,
) -> tuple[list[AnomalyDetector], np.ndarray]

Fits detector and generates calibration scores using a data split.

Parameters:

Name Type Description Default
x DataFrame | ndarray

The input data.

required
detector AnomalyDetector

The detector instance to train.

required
weighted bool

If True, stores calibration sample indices. Defaults to False.

False
seed int | None

Random seed for reproducibility. Defaults to None.

None

Returns:

Type Description
tuple[list[AnomalyDetector], ndarray]

Tuple of (list with trained detector, calibration scores array).

Source code in nonconform/resampling.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@ensure_numpy_array
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: AnomalyDetector,
    weighted: bool = False,
    seed: int | None = None,
) -> tuple[list[AnomalyDetector], np.ndarray]:
    """Fits detector and generates calibration scores using a data split.

    Args:
        x: The input data.
        detector: The detector instance to train.
        weighted: If True, stores calibration sample indices. Defaults to False.
        seed: Random seed for reproducibility. Defaults to None.

    Returns:
        Tuple of (list with trained detector, calibration scores array).
    """
    self._validate_n_calib(len(x))
    x_id = np.arange(len(x))
    train_id, calib_id = train_test_split(
        x_id, test_size=self._calib_size, shuffle=True, random_state=seed
    )

    if hasattr(detector, "set_params"):
        try:
            detector.set_params(random_state=seed)
        except (TypeError, ValueError):
            pass  # Detector may not support random_state parameter

    detector.fit(x[train_id])
    calibration_set = detector.decision_function(x[calib_id])

    if weighted:
        self._calibration_ids = calib_id.tolist()
    else:
        self._calibration_ids = None
    return [detector], calibration_set

CrossValidation

CrossValidation(
    k: int | None = 5,
    mode: ConformalModeInput = "plus",
    shuffle: bool = True,
)

Bases: BaseStrategy

K-fold cross-validation strategy for conformal anomaly detection.

Splits data into k folds and uses each fold as a calibration set while training on the remaining folds.

Parameters:

Name Type Description Default
k int | None

Number of folds. If None, uses leave-one-out (k=n at fit time).

5
mode ConformalModeInput

Model retention mode ("plus" or "single_model"). Equivalent ConformalMode values are accepted. Defaults to "plus".

'plus'
shuffle bool

Whether to shuffle data before splitting. Defaults to True. Set to False for deterministic leave-one-out (Jackknife).

True

Examples:

# 5-fold cross-validation
strategy = CrossValidation(k=5)

# Leave-one-out (Jackknife) via factory
strategy = CrossValidation.jackknife()
Source code in nonconform/resampling.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def __init__(
    self,
    k: int | None = 5,
    mode: ConformalModeInput = "plus",
    shuffle: bool = True,
) -> None:
    super().__init__(mode)
    if not isinstance(shuffle, bool):
        raise TypeError(
            f"shuffle must be a boolean value, got {type(shuffle).__name__}."
        )
    self._k: int | None = k
    self._shuffle: bool = shuffle
    self._is_jackknife = k is None

    # Warn if using single-model mode
    if self._mode is ConformalMode.SINGLE_MODEL:
        _crossval_logger.warning(
            "Setting mode=ConformalMode.SINGLE_MODEL may compromise conformal "
            "validity. mode=ConformalMode.PLUS is recommended."
        )

    self._detector_list: list[AnomalyDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []
calibration_ids property
calibration_ids: list[int]

Indices of samples used for calibration.

k property
k: int | None

Number of folds (None for jackknife mode).

mode property
mode: Literal['plus', 'single_model']

User-facing model retention mode.

jackknife classmethod
jackknife(
    mode: ConformalModeInput = "plus",
) -> CrossValidation

Create Leave-One-Out cross-validation (deterministic, no shuffle).

This factory method creates a Jackknife strategy, which is a special case of k-fold CV where k equals n (the dataset size). Each sample is left out exactly once for calibration.

Parameters:

Name Type Description Default
mode ConformalModeInput

Model retention mode ("plus" or "single_model").

'plus'

Returns:

Type Description
CrossValidation

CrossValidation configured for leave-one-out.

Examples:

strategy = CrossValidation.jackknife()
detector_list, calib_scores = strategy.fit_calibrate(X, detector)
Source code in nonconform/resampling.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@classmethod
def jackknife(cls, mode: ConformalModeInput = "plus") -> CrossValidation:
    """Create Leave-One-Out cross-validation (deterministic, no shuffle).

    This factory method creates a Jackknife strategy, which is a special
    case of k-fold CV where k equals n (the dataset size). Each sample is
    left out exactly once for calibration.

    Args:
        mode: Model retention mode (`"plus"` or `"single_model"`).

    Returns:
        CrossValidation configured for leave-one-out.

    Examples:
        ```python
        strategy = CrossValidation.jackknife()
        detector_list, calib_scores = strategy.fit_calibrate(X, detector)
        ```
    """
    return cls(k=None, mode=mode, shuffle=False)
fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
) -> tuple[list[AnomalyDetector], np.ndarray]

Fit and calibrate using k-fold cross-validation.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix.

required
detector AnomalyDetector

The base anomaly detector.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Whether to use weighted calibration. Defaults to False.

False

Returns:

Type Description
tuple[list[AnomalyDetector], ndarray]

Tuple of (list of trained detectors, calibration scores array).

Raises:

Type Description
ValueError

If k < 2 or not enough samples for specified k.

Source code in nonconform/resampling.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@ensure_numpy_array
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
) -> tuple[list[AnomalyDetector], np.ndarray]:
    """Fit and calibrate using k-fold cross-validation.

    Args:
        x: Input data matrix.
        detector: The base anomaly detector.
        seed: Random seed for reproducibility. Defaults to None.
        weighted: Whether to use weighted calibration. Defaults to False.

    Returns:
        Tuple of (list of trained detectors, calibration scores array).

    Raises:
        ValueError: If k < 2 or not enough samples for specified k.
    """
    self._detector_list.clear()
    self._calibration_ids = []

    detector_ = detector
    n_samples = len(x)

    # Determine k (for jackknife mode, k=n)
    k = n_samples if self._is_jackknife else self._k

    if k < 2:
        exc = ValueError(
            f"k must be at least 2 for k-fold cross-validation, got {k}"
        )
        exc.add_note(f"Received k={k}, which is invalid.")
        exc.add_note(
            "Cross-validation requires at least one split for training "
            "and one for calibration."
        )
        raise exc

    if n_samples < k:
        exc = ValueError(
            f"Not enough samples ({n_samples}) for "
            f"k-fold cross-validation with k={k}"
        )
        exc.add_note(f"Each fold needs at least 1 sample, but {n_samples} < {k}.")
        raise exc

    self._calibration_set = np.empty(n_samples, dtype=np.float64)
    calibration_offset = 0

    folds = KFold(
        n_splits=k,
        shuffle=self._shuffle,
        random_state=seed if self._shuffle else None,
    )

    fold_iterator = (
        tqdm(folds.split(x), total=k, desc="Calibration")
        if _crossval_logger.isEnabledFor(logging.INFO)
        else folds.split(x)
    )

    for i, (train_idx, calib_idx) in enumerate(fold_iterator):
        self._calibration_ids.extend(calib_idx.tolist())

        model = copy(detector_)
        if hasattr(model, "set_params"):
            try:
                model.set_params(random_state=seed)
            except (TypeError, ValueError):
                pass  # Detector may not support random_state parameter
        model.fit(x[train_idx])

        if self._mode is ConformalMode.PLUS:
            self._detector_list.append(deepcopy(model))

        fold_scores = model.decision_function(x[calib_idx])
        n_fold_samples = len(fold_scores)
        end_idx = calibration_offset + n_fold_samples
        self._calibration_set[calibration_offset:end_idx] = fold_scores
        calibration_offset += n_fold_samples

    if self._mode is ConformalMode.SINGLE_MODEL:
        model = copy(detector_)
        if hasattr(model, "set_params"):
            try:
                model.set_params(random_state=seed)
            except (TypeError, ValueError):
                pass  # Detector may not support random_state parameter
        model.fit(x)
        self._detector_list.append(deepcopy(model))

    return self._detector_list, self._calibration_set

JackknifeBootstrap

JackknifeBootstrap(
    n_bootstraps: int = 100,
    aggregation_method: BootstrapAggregationMethod = "mean",
    mode: ConformalModeInput = "plus",
)

Bases: BaseStrategy

Jackknife+-after-Bootstrap (JaB+) conformal anomaly detection.

Implements the JaB+ method which provides predictive inference for ensemble models trained on bootstrap samples. Uses out-of-bag samples for calibration.

Parameters:

Name Type Description Default
n_bootstraps int

Number of bootstrap iterations. Defaults to 100.

100
aggregation_method BootstrapAggregationMethod

How to aggregate OOB predictions ("mean" or "median"). Defaults to "mean".

'mean'
mode ConformalModeInput

Model retention mode ("plus" or "single_model"). Equivalent ConformalMode values are accepted. Defaults to "plus".

'plus'
References

Jin, Ying, and Emmanuel J. Candès. "Selection by Prediction with Conformal p-values." Journal of Machine Learning Research 24.244 (2023): 1-41.

Source code in nonconform/resampling.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def __init__(
    self,
    n_bootstraps: int = 100,
    aggregation_method: BootstrapAggregationMethod = "mean",
    mode: ConformalModeInput = "plus",
) -> None:
    super().__init__(mode=mode)

    if n_bootstraps < 2:
        exc = ValueError(
            f"Number of bootstraps must be at least 2, got {n_bootstraps}. "
            f"Typical values are 50-200 for jackknife-after-bootstrap."
        )
        exc.add_note(f"Received n_bootstraps={n_bootstraps}, which is invalid.")
        raise exc

    normalized_aggregation_method = normalize_bootstrap_aggregation_method(
        aggregation_method
    )

    if self._mode is ConformalMode.SINGLE_MODEL:
        _bootstrap_logger.warning(
            "Setting mode=ConformalMode.SINGLE_MODEL may compromise conformal "
            "validity. mode=ConformalMode.PLUS is recommended."
        )

    self._n_bootstraps: int = n_bootstraps
    self._aggregation_method: BootstrapAggregationMethod = (
        normalized_aggregation_method
    )

    self._detector_list: list[AnomalyDetector] = []
    self._calibration_set: np.ndarray = np.array([])
    self._calibration_ids: list[int] = []

    # Internal state
    self._bootstrap_models: list[AnomalyDetector | None] = []
    self._oob_mask: np.ndarray = np.array([])
calibration_ids property
calibration_ids: list[int]

Indices used for calibration (all samples in JaB+).

n_bootstraps property
n_bootstraps: int

Number of bootstrap iterations.

aggregation_method property
aggregation_method: BootstrapAggregationMethod

Aggregation method for OOB predictions.

fit_calibrate
fit_calibrate(
    x: DataFrame | ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
    n_jobs: int | None = None,
) -> tuple[list[AnomalyDetector], np.ndarray]

Fit and calibrate using JaB+ method.

Parameters:

Name Type Description Default
x DataFrame | ndarray

Input data matrix.

required
detector AnomalyDetector

The base anomaly detector.

required
seed int | None

Random seed for reproducibility. Defaults to None.

None
weighted bool

Not used in JaB+. Defaults to False.

False
n_jobs int | None

Number of parallel jobs. Use -1 for all available cores. Defaults to None (sequential).

None

Returns:

Type Description
tuple[list[AnomalyDetector], ndarray]

Tuple of (list of trained detectors, calibration scores array).

Source code in nonconform/resampling.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
@ensure_numpy_array
def fit_calibrate(
    self,
    x: pd.DataFrame | np.ndarray,
    detector: AnomalyDetector,
    seed: int | None = None,
    weighted: bool = False,
    n_jobs: int | None = None,
) -> tuple[list[AnomalyDetector], np.ndarray]:
    """Fit and calibrate using JaB+ method.

    Args:
        x: Input data matrix.
        detector: The base anomaly detector.
        seed: Random seed for reproducibility. Defaults to None.
        weighted: Not used in JaB+. Defaults to False.
        n_jobs: Number of parallel jobs. Use -1 for all available cores.
            Defaults to None (sequential).

    Returns:
        Tuple of (list of trained detectors, calibration scores array).
    """
    n_samples = len(x)
    generator = np.random.default_rng(seed)

    _bootstrap_logger.info(
        f"Bootstrap (JaB+): {n_samples:,} samples, "
        f"{self._n_bootstraps:,} iterations"
    )

    self._bootstrap_models = [None] * self._n_bootstraps
    all_bootstrap_indices, self._oob_mask = self._generate_bootstrap_indices(
        generator, n_samples
    )

    if n_jobs == -1:
        n_jobs = os.cpu_count() or 1
    elif n_jobs is not None and n_jobs < 1:
        raise ValueError(
            f"n_jobs must be None, -1, or a positive integer; got {n_jobs}."
        )

    if n_jobs is None or n_jobs == 1:
        bootstrap_iterator = (
            tqdm(range(self._n_bootstraps), desc="Calibration")
            if _bootstrap_logger.isEnabledFor(logging.INFO)
            else range(self._n_bootstraps)
        )
        for i in bootstrap_iterator:
            bootstrap_indices = all_bootstrap_indices[i]
            model = _train_bootstrap_model(detector, x, bootstrap_indices, seed)
            self._bootstrap_models[i] = model
    else:
        self._train_models_parallel(
            detector, x, all_bootstrap_indices, seed, n_jobs
        )

    oob_scores = self._compute_oob_scores(x)

    self._calibration_set = oob_scores
    self._calibration_ids = list(range(n_samples))

    if self._mode is ConformalMode.PLUS:
        self._detector_list = self._bootstrap_models.copy()
    else:
        final_model = deepcopy(detector)
        if hasattr(final_model, "set_params"):
            try:
                final_model.set_params(random_state=seed)
            except (TypeError, ValueError):
                pass  # Detector may not support random_state parameter
        final_model.fit(x)
        self._detector_list = [final_model]

    return self._detector_list, self._calibration_set

P-Value Estimation

nonconform.scoring

P-value estimation strategies for conformal prediction.

This module provides strategies for computing p-values from calibration scores.

Classes:

Name Description
BaseEstimation

Abstract base class for p-value estimation.

Empirical

Classical empirical p-value estimation using discrete CDF.

ConditionalEmpirical

Conditionally calibrated empirical p-values.

Probabilistic

KDE-based probabilistic p-value estimation.

Kernel

Bases: Enum

Kernel functions for KDE-based p-value computation.

Attributes:

Name Type Description
GAUSSIAN

Gaussian (normal) kernel.

EXPONENTIAL

Exponential kernel.

BOX

Box (uniform) kernel.

TRIANGULAR

Triangular kernel.

EPANECHNIKOV

Epanechnikov kernel.

BIWEIGHT

Biweight (quartic) kernel.

TRIWEIGHT

Triweight kernel.

TRICUBE

Tricube kernel.

COSINE

Cosine kernel.

BaseEstimation

Bases: ABC

Abstract base for p-value estimation strategies.

compute_p_values abstractmethod
compute_p_values(
    scores: ndarray,
    calibration_set: ndarray,
    weights: tuple[ndarray, ndarray] | None = None,
) -> np.ndarray

Compute p-values for test scores.

Parameters:

Name Type Description Default
scores ndarray

Test instance anomaly scores (1D array).

required
calibration_set ndarray

Calibration anomaly scores (1D array).

required
weights tuple[ndarray, ndarray] | None

Optional (w_calib, w_test) tuple for weighted conformal.

None

Returns:

Type Description
ndarray

Array of p-values for each test instance.

Source code in nonconform/scoring.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@abstractmethod
def compute_p_values(
    self,
    scores: np.ndarray,
    calibration_set: np.ndarray,
    weights: tuple[np.ndarray, np.ndarray] | None = None,
) -> np.ndarray:
    """Compute p-values for test scores.

    Args:
        scores: Test instance anomaly scores (1D array).
        calibration_set: Calibration anomaly scores (1D array).
        weights: Optional (w_calib, w_test) tuple for weighted conformal.

    Returns:
        Array of p-values for each test instance.
    """
    pass
get_metadata
get_metadata() -> dict[str, Any]

Optional auxiliary data exposed after compute_p_values.

Source code in nonconform/scoring.py
82
83
84
def get_metadata(self) -> dict[str, Any]:
    """Optional auxiliary data exposed after compute_p_values."""
    return {}
set_seed
set_seed(seed: int | None) -> None

Set random seed for reproducibility.

Parameters:

Name Type Description Default
seed int | None

Random seed value or None.

required
Source code in nonconform/scoring.py
86
87
88
89
90
91
92
93
def set_seed(self, seed: int | None) -> None:
    """Set random seed for reproducibility.

    Args:
        seed: Random seed value or None.
    """
    if hasattr(self, "_seed"):
        self._seed = seed

Empirical

Empirical(tie_break: TieBreakModeInput = 'classical')

Bases: BaseEstimation

Classical empirical p-value estimation using discrete CDF.

Computes p-values using deterministic tie handling by default. Optionally supports randomized smoothing to eliminate the resolution floor caused by discrete ties (Jin & Candes 2023).

Parameters:

Name Type Description Default
tie_break TieBreakModeInput

Tie-breaking strategy ("classical" or "randomized"). Equivalent TieBreakMode enum values are also accepted.

'classical'

Examples:

estimation = Empirical()  # tie_break="classical" by default
p_values = estimation.compute_p_values(test_scores, calib_scores)

# For randomized smoothing:
estimation = Empirical(tie_break="randomized")
Source code in nonconform/scoring.py
117
118
119
def __init__(self, tie_break: TieBreakModeInput = "classical") -> None:
    self._tie_break = _normalize_tie_break_mode(tie_break)
    self._seed: int | None = None
set_seed
set_seed(seed: int | None) -> None

Set random seed for reproducibility.

Source code in nonconform/scoring.py
121
122
123
def set_seed(self, seed: int | None) -> None:
    """Set random seed for reproducibility."""
    self._seed = seed
compute_p_values
compute_p_values(
    scores: ndarray,
    calibration_set: ndarray,
    weights: tuple[ndarray, ndarray] | None = None,
) -> np.ndarray

Compute empirical p-values from calibration set.

Source code in nonconform/scoring.py
125
126
127
128
129
130
131
132
133
134
135
136
def compute_p_values(
    self,
    scores: np.ndarray,
    calibration_set: np.ndarray,
    weights: tuple[np.ndarray, np.ndarray] | None = None,
) -> np.ndarray:
    """Compute empirical p-values from calibration set."""
    randomized = self._tie_break is TieBreakMode.RANDOMIZED
    rng = np.random.default_rng(self._seed) if randomized else None
    if weights is not None:
        return self._compute_weighted(scores, calibration_set, weights, rng)
    return self._compute_standard(scores, calibration_set, rng)

ConditionalEmpirical

ConditionalEmpirical(
    *,
    delta: float = 0.05,
    method: str | ConditionalCalibrationMethod = "mc",
    tie_break: TieBreakModeInput = "classical",
    simes_kden: int = 2,
    mc_num_simulations: int = 10000,
)

Bases: Empirical

Conditionally calibrated empirical conformal p-values (CCCPV).

This estimator first computes classical empirical conformal p-values and then applies a finite-sample calibration map:

.. math:: p_j = \frac{1 + \sum_{i=1}^{n_{\text{cal}}}\mathbf{1}[s_i \ge s_j]} {n_{\text{cal}} + 1}, \qquad \tilde p_j = C_{n_{\text{cal}},\delta}(p_j).

Supported calibration maps are "mc", "simes", "dkwm", and "asymptotic".

References

Bates et al. (2023), Testing for outliers with conformal p-values. Reference implementation: https://github.com/msesia/conditional-conformal-pvalues

Note

Weighted conformal p-values are intentionally not supported in this first release of ConditionalEmpirical.

Parameters:

Name Type Description Default
delta float

Confidence level used by the conditional calibration map. Must be in (0, 1). Defaults to 0.05.

0.05
method str | ConditionalCalibrationMethod

Conditional calibration method. One of {"mc", "simes", "dkwm", "asymptotic"}. Defaults to "mc".

'mc'
tie_break TieBreakModeInput

Tie-breaking strategy used for base empirical p-values ("classical" or "randomized").

'classical'
simes_kden int

Denominator used to derive k = floor(n_cal / simes_kden) for the Simes calibration map. Must be a positive integer. Defaults to 2.

2
mc_num_simulations int

Monte Carlo sample size used to estimate the finite-sample correction for method="mc". Defaults to 10,000.

10000
Source code in nonconform/scoring.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def __init__(
    self,
    *,
    delta: float = 0.05,
    method: str | ConditionalCalibrationMethod = "mc",
    tie_break: TieBreakModeInput = "classical",
    simes_kden: int = 2,
    mc_num_simulations: int = 10_000,
) -> None:
    super().__init__(tie_break=tie_break)
    try:
        delta_float = float(delta)
    except (TypeError, ValueError) as exc:
        raise ValueError("delta must be a float in (0, 1).") from exc
    if not np.isfinite(delta_float) or not (0.0 < delta_float < 1.0):
        raise ValueError(f"delta must be in (0, 1), got {delta!r}.")
    if (
        isinstance(simes_kden, bool)
        or not isinstance(simes_kden, int)
        or simes_kden < 1
    ):
        raise ValueError("simes_kden must be a positive integer.")
    if (
        isinstance(mc_num_simulations, bool)
        or not isinstance(mc_num_simulations, int)
        or mc_num_simulations < 100
    ):
        raise ValueError("mc_num_simulations must be an integer >= 100.")

    self._delta = delta_float
    self._method = normalize_conditional_calibration_method(method)
    self._simes_kden = simes_kden
    self._mc_num_simulations = mc_num_simulations
    self._mc_correction_cache: dict[tuple[int, float], float] = {}
set_seed
set_seed(seed: int | None) -> None

Set random seed for reproducibility.

Source code in nonconform/scoring.py
241
242
243
244
245
def set_seed(self, seed: int | None) -> None:
    """Set random seed for reproducibility."""
    super().set_seed(seed)
    # MC correction estimation depends on RNG; invalidate cached estimates.
    self._mc_correction_cache.clear()
compute_p_values
compute_p_values(
    scores: ndarray,
    calibration_set: ndarray,
    weights: tuple[ndarray, ndarray] | None = None,
) -> np.ndarray

Compute conditionally calibrated conformal p-values.

Source code in nonconform/scoring.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def compute_p_values(
    self,
    scores: np.ndarray,
    calibration_set: np.ndarray,
    weights: tuple[np.ndarray, np.ndarray] | None = None,
) -> np.ndarray:
    """Compute conditionally calibrated conformal p-values."""
    if weights is not None:
        raise ValueError(
            "ConditionalEmpirical does not support weighted p-values. "
            "Use Empirical or Probabilistic for weighted conformal mode."
        )

    base_p = super().compute_p_values(scores, calibration_set, weights=None)
    n_cal = len(np.asarray(calibration_set).ravel())

    cache_key = (n_cal, self._delta)
    cached_fs = (
        self._mc_correction_cache.get(cache_key) if self._method == "mc" else None
    )
    rng = np.random.default_rng(self._seed) if self._seed is not None else None
    calibrated, fs_correction = calibrate_conditional_p_values(
        base_p,
        n_calibration=n_cal,
        delta=self._delta,
        method=self._method,
        simes_kden=self._simes_kden,
        fs_correction=cached_fs,
        rng=rng,
        mc_num_simulations=self._mc_num_simulations,
    )
    if self._method == "mc" and fs_correction is not None:
        self._mc_correction_cache[cache_key] = fs_correction
    return calibrated

Probabilistic

Probabilistic(
    kernel: Kernel | Sequence[Kernel] = Kernel.GAUSSIAN,
    n_trials: int = 100,
    cv_folds: int = -1,
)

Bases: BaseEstimation

KDE-based probabilistic p-value estimation with continuous values.

Provides smooth p-values in [0,1] via kernel density estimation. Supports automatic hyperparameter tuning and weighted conformal prediction. In weighted mode, only calibration weights are applied to the KDE; test weights are intentionally not injected into the survival calculation so p-values can reach 0. This avoids the lower bound w_test / (sum_calib_weight + w_test) that the discrete weighted formula would impose.

Parameters:

Name Type Description Default
kernel Kernel | Sequence[Kernel]

Kernel function or list (list triggers kernel tuning). Bandwidth is always auto-tuned. Defaults to Kernel.GAUSSIAN.

GAUSSIAN
n_trials int

Number of Optuna trials for tuning. Defaults to 100.

100
cv_folds int

CV folds for tuning (-1 for leave-one-out). Defaults to -1.

-1

Examples:

# Basic usage
estimation = Probabilistic()
p_values = estimation.compute_p_values(test_scores, calib_scores)

# With custom kernel
estimation = Probabilistic(kernel=Kernel.EPANECHNIKOV)
Source code in nonconform/scoring.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def __init__(
    self,
    kernel: Kernel | Sequence[Kernel] = Kernel.GAUSSIAN,
    n_trials: int = 100,
    cv_folds: int = -1,
) -> None:
    self._kernel = kernel
    self._n_trials = n_trials
    self._cv_folds = cv_folds
    self._seed = None

    self._tuned_params: dict | None = None
    self._kde_model = None
    self._calibration_hash: int | None = None
    self._kde_eval_grid: np.ndarray | None = None
    self._kde_cdf_values: np.ndarray | None = None
    self._kde_total_weight: float | None = None
compute_p_values
compute_p_values(
    scores: ndarray,
    calibration_set: ndarray,
    weights: tuple[ndarray, ndarray] | None = None,
) -> np.ndarray

Compute continuous p-values using KDE.

Lazy fitting: tunes and fits KDE on first call or when calibration changes. Note: When weights are provided, this estimator uses only calibration weights to shape the KDE. Test weights are accepted for API parity but do not set a positive lower bound on p-values.

Source code in nonconform/scoring.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def compute_p_values(
    self,
    scores: np.ndarray,
    calibration_set: np.ndarray,
    weights: tuple[np.ndarray, np.ndarray] | None = None,
) -> np.ndarray:
    """Compute continuous p-values using KDE.

    Lazy fitting: tunes and fits KDE on first call or when calibration changes.
    Note: When weights are provided, this estimator uses only calibration
    weights to shape the KDE. Test weights are accepted for API parity but
    do not set a positive lower bound on p-values.
    """
    if weights is not None:
        w_calib, _w_test = weights
    else:
        w_calib, _w_test = None, None

    if weights is None:
        current_hash = hash(calibration_set.tobytes())
    else:
        current_hash = hash((calibration_set.tobytes(), w_calib.tobytes()))

    if self._kde_model is None or self._calibration_hash != current_hash:
        self._fit_kde(calibration_set, w_calib)
        self._calibration_hash = current_hash

    sum_calib_weight = (
        float(np.sum(w_calib))
        if w_calib is not None
        else float(len(calibration_set))
    )

    return self._compute_p_values_from_kde(scores, sum_calib_weight)
get_metadata
get_metadata() -> dict[str, Any]

Return KDE metadata after p-value computation.

Source code in nonconform/scoring.py
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
def get_metadata(self) -> dict[str, Any]:
    """Return KDE metadata after p-value computation."""
    if (
        self._kde_eval_grid is None
        or self._kde_cdf_values is None
        or self._kde_total_weight is None
    ):
        return {}
    return {
        "kde": {
            "eval_grid": self._kde_eval_grid.copy(),
            "cdf_values": self._kde_cdf_values.copy(),
            "total_weight": float(self._kde_total_weight),
        }
    }

calculate_p_val

calculate_p_val(
    scores: ndarray,
    calibration_set: ndarray,
    tie_break: TieBreakModeInput = "classical",
    rng: Generator | None = None,
) -> np.ndarray

Calculate empirical p-values (standalone function).

Uses classical deterministic tie handling by default. Optionally supports randomized smoothing to eliminate the resolution floor caused by discrete ties (Jin & Candes 2023).

Parameters:

Name Type Description Default
scores ndarray

Test instance anomaly scores (1D array).

required
calibration_set ndarray

Calibration anomaly scores (1D array).

required
tie_break TieBreakModeInput

Tie-breaking strategy for equal scores ("classical" or "randomized"). Equivalent TieBreakMode values are accepted.

'classical'
rng Generator | None

Optional random number generator for reproducibility.

None

Returns:

Type Description
ndarray

Array of p-values for each test instance.

Source code in nonconform/scoring.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def calculate_p_val(
    scores: np.ndarray,
    calibration_set: np.ndarray,
    tie_break: TieBreakModeInput = "classical",
    rng: np.random.Generator | None = None,
) -> np.ndarray:
    """Calculate empirical p-values (standalone function).

    Uses classical deterministic tie handling by default. Optionally supports
    randomized smoothing to eliminate the resolution floor caused by discrete
    ties (Jin & Candes 2023).

    Args:
        scores: Test instance anomaly scores (1D array).
        calibration_set: Calibration anomaly scores (1D array).
        tie_break: Tie-breaking strategy for equal scores (`"classical"` or
            `"randomized"`). Equivalent `TieBreakMode` values are accepted.
        rng: Optional random number generator for reproducibility.

    Returns:
        Array of p-values for each test instance.
    """
    mode = _normalize_tie_break_mode(tie_break)

    sorted_cal = np.sort(calibration_set)
    n_cal = len(calibration_set)

    if mode is TieBreakMode.CLASSICAL:
        # Old formula: count >= (at or above)
        ranks = n_cal - np.searchsorted(sorted_cal, scores, side="left")
        return (1.0 + ranks) / (1.0 + n_cal)

    # Randomized tie handling: separate strictly greater and ties
    pos_right = np.searchsorted(sorted_cal, scores, side="right")
    pos_left = np.searchsorted(sorted_cal, scores, side="left")
    n_greater = n_cal - pos_right  # strictly greater
    n_equal = pos_right - pos_left  # ties

    if rng is None:
        rng = np.random.default_rng()
    u = rng.uniform(size=len(scores))

    return (n_greater + (n_equal + 1) * u) / (1.0 + n_cal)

calculate_weighted_p_val

calculate_weighted_p_val(
    scores: ndarray,
    calibration_set: ndarray,
    test_weights: ndarray,
    calib_weights: ndarray,
    tie_break: TieBreakModeInput = "classical",
    rng: Generator | None = None,
) -> np.ndarray

Calculate weighted empirical p-values (standalone function).

Uses classical deterministic tie handling by default. Optionally supports randomized smoothing to eliminate the resolution floor caused by discrete ties (Jin & Candes 2023).

Parameters:

Name Type Description Default
scores ndarray

Test instance anomaly scores (1D array).

required
calibration_set ndarray

Calibration anomaly scores (1D array).

required
test_weights ndarray

Test instance weights (1D array).

required
calib_weights ndarray

Calibration weights (1D array).

required
tie_break TieBreakModeInput

Tie-breaking strategy for equal scores ("classical" or "randomized"). Equivalent TieBreakMode values are accepted.

'classical'
rng Generator | None

Optional random number generator for reproducibility.

None

Returns:

Type Description
ndarray

Array of weighted p-values for each test instance.

Note

Including test_weights in the numerator/denominator implies a positive lower bound of test_weights / (sum(calib_weights) + test_weights) when there is no calibration mass above the test score.

Source code in nonconform/scoring.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def calculate_weighted_p_val(
    scores: np.ndarray,
    calibration_set: np.ndarray,
    test_weights: np.ndarray,
    calib_weights: np.ndarray,
    tie_break: TieBreakModeInput = "classical",
    rng: np.random.Generator | None = None,
) -> np.ndarray:
    """Calculate weighted empirical p-values (standalone function).

    Uses classical deterministic tie handling by default. Optionally supports
    randomized smoothing to eliminate the resolution floor caused by discrete
    ties (Jin & Candes 2023).

    Args:
        scores: Test instance anomaly scores (1D array).
        calibration_set: Calibration anomaly scores (1D array).
        test_weights: Test instance weights (1D array).
        calib_weights: Calibration weights (1D array).
        tie_break: Tie-breaking strategy for equal scores (`"classical"` or
            `"randomized"`). Equivalent `TieBreakMode` values are accepted.
        rng: Optional random number generator for reproducibility.

    Returns:
        Array of weighted p-values for each test instance.

    Note:
        Including test_weights in the numerator/denominator implies a positive
        lower bound of test_weights / (sum(calib_weights) + test_weights) when
        there is no calibration mass above the test score.
    """
    mode = _normalize_tie_break_mode(tie_break)

    try:
        scores = _as_1d("scores", scores).astype(float, copy=False)
        calibration_set = _as_1d("calibration_set", calibration_set).astype(
            float, copy=False
        )
        w_scores = _as_1d("test_weights", test_weights).astype(float, copy=False)
        w_calib = _as_1d("calib_weights", calib_weights).astype(float, copy=False)
    except (TypeError, ValueError) as exc:
        raise ValueError(
            "scores, calibration_set, test_weights, and calib_weights must be numeric."
        ) from exc

    if len(scores) != len(w_scores):
        raise ValueError(
            "scores and test_weights must have the same length. "
            f"Got {len(scores)} and {len(w_scores)}."
        )
    if len(calibration_set) != len(w_calib):
        raise ValueError(
            "calibration_set and calib_weights must have the same length. "
            f"Got {len(calibration_set)} and {len(w_calib)}."
        )
    _validate_finite("scores", scores)
    _validate_finite("calibration_set", calibration_set)
    _validate_finite("test_weights", w_scores)
    _validate_finite("calib_weights", w_calib)
    if np.any(w_scores < 0):
        raise ValueError("test_weights must be non-negative.")
    if np.any(w_calib < 0):
        raise ValueError("calib_weights must be non-negative.")

    sort_idx = np.argsort(calibration_set)
    sorted_scores = calibration_set[sort_idx]
    sorted_weights = w_calib[sort_idx]

    cumulative_weights = np.concatenate(([0.0], np.cumsum(sorted_weights)))
    total_weight = float(cumulative_weights[-1])
    if total_weight <= 0:
        raise ValueError("calib_weights must sum to a positive value.")

    left_idx = np.searchsorted(sorted_scores, scores, side="left")
    right_idx = np.searchsorted(sorted_scores, scores, side="right")

    if mode is TieBreakMode.CLASSICAL:
        weighted_greater = total_weight - cumulative_weights[right_idx]
        numerator = weighted_greater + w_scores
    else:
        weighted_greater = total_weight - cumulative_weights[right_idx]
        weighted_equal = cumulative_weights[right_idx] - cumulative_weights[left_idx]

        if rng is None:
            rng = np.random.default_rng()
        u = rng.uniform(size=len(scores))

        numerator = weighted_greater + (weighted_equal + w_scores) * u

    denominator = total_weight + w_scores
    return numerator / denominator

Weight Estimation

nonconform.weighting

Weight estimation for covariate shift correction in weighted conformal prediction.

This module provides weight estimators that compute importance weights to correct for covariate shift between calibration and test distributions. They estimate density ratios w(x) = p_test(x) / p_calib(x) which are used to reweight conformal scores for better coverage guarantees under distribution shift.

Classes:

Name Description
BaseWeightEstimator

Abstract base class for weight estimators.

IdentityWeightEstimator

Returns uniform weights (no covariate shift).

SklearnWeightEstimator

Universal wrapper for sklearn probabilistic classifiers.

BootstrapBaggedWeightEstimator

Bootstrap-bagged wrapper for robust estimation.

Factory functions

logistic_weight_estimator: Create estimator using Logistic Regression. forest_weight_estimator: Create estimator using Random Forest.

ProbabilisticClassifier

Bases: Protocol

Protocol for classifiers that support probability estimation.

This protocol defines the interface for sklearn-compatible classifiers that can produce probability estimates for weight computation.

fit
fit(X: ndarray, y: ndarray) -> ProbabilisticClassifier

Fit the classifier on training data.

Parameters:

Name Type Description Default
X ndarray

Feature matrix of shape (n_samples, n_features).

required
y ndarray

Target labels of shape (n_samples,).

required

Returns:

Type Description
ProbabilisticClassifier

The fitted classifier instance.

Source code in nonconform/weighting.py
49
50
51
52
53
54
55
56
57
58
59
def fit(self, X: np.ndarray, y: np.ndarray) -> ProbabilisticClassifier:
    """Fit the classifier on training data.

    Args:
        X: Feature matrix of shape (n_samples, n_features).
        y: Target labels of shape (n_samples,).

    Returns:
        The fitted classifier instance.
    """
    ...
predict_proba
predict_proba(X: ndarray) -> np.ndarray

Return probability estimates for samples.

Parameters:

Name Type Description Default
X ndarray

Feature matrix of shape (n_samples, n_features).

required

Returns:

Type Description
ndarray

Probability estimates of shape (n_samples, n_classes).

Source code in nonconform/weighting.py
61
62
63
64
65
66
67
68
69
70
def predict_proba(self, X: np.ndarray) -> np.ndarray:
    """Return probability estimates for samples.

    Args:
        X: Feature matrix of shape (n_samples, n_features).

    Returns:
        Probability estimates of shape (n_samples, n_classes).
    """
    ...

BaseWeightEstimator

Bases: ABC

Abstract base class for weight estimators in weighted conformal prediction.

Weight estimators compute importance weights to correct for covariate shift between calibration and test distributions. They estimate density ratios w(x) = p_test(x) / p_calib(x) which are used to reweight conformal scores for better coverage guarantees under distribution shift.

Subclasses must implement fit(), _get_stored_weights(), and _score_new_data() to provide specific weight estimation strategies.

fit abstractmethod
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Estimate density ratio weights.

Source code in nonconform/weighting.py
87
88
89
90
@abstractmethod
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Estimate density ratio weights."""
    pass
get_weights
get_weights(
    calibration_samples: ndarray | None = None,
    test_samples: ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]

Return density ratio weights for calibration and test data.

Parameters:

Name Type Description Default
calibration_samples ndarray | None

Optional calibration data to score. If provided, computes weights for this data using the fitted model. If None, returns stored weights from fit(). Must provide both or neither.

None
test_samples ndarray | None

Optional test data to score. If provided, computes weights for this data using the fitted model. If None, returns stored weights from fit(). Must provide both or neither.

None

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of (calibration_weights, test_weights) as numpy arrays.

Raises:

Type Description
NotFittedError

If fit() has not been called.

ValueError

If only one of calibration_samples/test_samples is provided.

Source code in nonconform/weighting.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_weights(
    self,
    calibration_samples: np.ndarray | None = None,
    test_samples: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]:
    """Return density ratio weights for calibration and test data.

    Args:
        calibration_samples: Optional calibration data to score. If provided,
            computes weights for this data using the fitted model. If None,
            returns stored weights from fit(). Must provide both or neither.
        test_samples: Optional test data to score. If provided, computes
            weights for this data using the fitted model. If None, returns
            stored weights from fit(). Must provide both or neither.

    Returns:
        Tuple of (calibration_weights, test_weights) as numpy arrays.

    Raises:
        NotFittedError: If fit() has not been called.
        ValueError: If only one of calibration_samples/test_samples is provided.
    """
    if not hasattr(self, "_is_fitted") or not self._is_fitted:
        raise NotFittedError("This weight estimator instance is not fitted yet.")

    if (calibration_samples is None) != (test_samples is None):
        raise ValueError(
            "Must provide both calibration_samples and test_samples, or neither. "
            "Cannot score only one set."
        )

    if calibration_samples is None:
        return self._get_stored_weights()
    else:
        return self._score_new_data(calibration_samples, test_samples)
set_seed
set_seed(seed: int | None) -> None

Set random seed for reproducibility.

Parameters:

Name Type Description Default
seed int | None

Random seed value or None.

required
Source code in nonconform/weighting.py
140
141
142
143
144
145
146
def set_seed(self, seed: int | None) -> None:
    """Set random seed for reproducibility.

    Args:
        seed: Random seed value or None.
    """
    self._seed = seed

IdentityWeightEstimator

IdentityWeightEstimator()

Bases: BaseWeightEstimator

Identity weight estimator that returns uniform weights.

This estimator assumes no covariate shift and returns weights of 1.0 for all samples. Useful as a baseline or when covariate shift is known to be minimal.

This effectively makes weighted conformal prediction equivalent to standard conformal prediction.

Source code in nonconform/weighting.py
241
242
243
244
def __init__(self) -> None:
    self._n_calib = 0
    self._n_test = 0
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the identity weight estimator.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required
Source code in nonconform/weighting.py
246
247
248
249
250
251
252
253
254
255
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the identity weight estimator.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.
    """
    self._n_calib = calibration_samples.shape[0]
    self._n_test = test_samples.shape[0]
    self._is_fitted = True

SklearnWeightEstimator

SklearnWeightEstimator(
    base_estimator: ProbabilisticClassifier
    | BaseEstimator
    | None = None,
    clip_quantile: float | None = 0.05,
)

Bases: BaseWeightEstimator

Universal wrapper for any sklearn-compatible probabilistic classifier.

Adheres to the standard sklearn 'Meta-Estimator' pattern. Accepts a configured estimator instance and clones it for cross-validation safety.

Parameters:

Name Type Description Default
base_estimator ProbabilisticClassifier | BaseEstimator | None

Configured sklearn classifier instance with predict_proba support. Defaults to LogisticRegression.

None
clip_quantile float | None

Quantile for weight clipping (e.g., 0.05 clips to 5th-95th percentile). Use None to disable clipping. Defaults to 0.05.

0.05

Raises:

Type Description
ValueError

If base_estimator does not implement predict_proba.

Examples:

from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

# Default (LogisticRegression)
estimator = SklearnWeightEstimator()

# Custom with pipeline
estimator = SklearnWeightEstimator(
    base_estimator=make_pipeline(
        StandardScaler(), LogisticRegression(C=1.0, class_weight="balanced")
    )
)

# Random Forest
estimator = SklearnWeightEstimator(
    base_estimator=RandomForestClassifier(n_estimators=100, max_depth=5)
)
Source code in nonconform/weighting.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def __init__(
    self,
    base_estimator: ProbabilisticClassifier | BaseEstimator | None = None,
    clip_quantile: float | None = 0.05,
) -> None:
    # Default to a sane baseline if nothing is provided
    # Use explicit None check to avoid truthiness evaluation of sklearn estimators
    # (unfitted ensemble estimators raise AttributeError on __len__)
    self.base_estimator = (
        base_estimator
        if base_estimator is not None
        else LogisticRegression(solver="liblinear")
    )
    if clip_quantile is not None and not (0 < clip_quantile < 0.5):
        raise ValueError(
            f"clip_quantile must be in (0, 0.5) or None, got {clip_quantile}."
        )
    self.clip_quantile = clip_quantile

    if not hasattr(self.base_estimator, "predict_proba"):
        raise ValueError(
            f"The provided base_estimator {type(self.base_estimator).__name__} "
            "does not implement 'predict_proba'. Density estimation requires "
            "probability scores. Use SVC(probability=True) or similar."
        )

    # Seed inheritance attribute (may be set by ConformalDetector)
    self._seed: int | None = None

    self.estimator_: ProbabilisticClassifier | None = None
    self._test_class_idx: int | None = None  # Column index for P(Test)
    self._w_calib: np.ndarray | None = None
    self._w_test: np.ndarray | None = None
    self._clip_bounds: tuple[float, float] | None = None
    self._is_fitted = False
fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the weight estimator on calibration and test samples.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required

Raises:

Type Description
ValueError

If calibration_samples is empty.

Source code in nonconform/weighting.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the weight estimator on calibration and test samples.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.

    Raises:
        ValueError: If calibration_samples is empty.
    """
    if calibration_samples.shape[0] == 0:
        raise ValueError("Calibration samples are empty. Cannot compute weights.")

    # Prepare data (Calib=0, Test=1 labels)
    x_joint, y_joint = self._prepare_training_data(
        calibration_samples, test_samples, self._seed
    )

    self.estimator_ = clone(self.base_estimator)
    if self._seed is not None:
        self._apply_seed_to_estimator(self.estimator_, self._seed)
    self.estimator_.fit(x_joint, y_joint)

    # sklearn sorts classes_ - get correct column index for P(Test)
    self._test_class_idx = int(
        np.where(self.estimator_.classes_ == self.TEST_LABEL)[0][0]
    )

    w_calib, w_test = self._compute_weights(calibration_samples, test_samples)
    self._clip_bounds = self._compute_clip_bounds(
        w_calib, w_test, self.clip_quantile
    )
    self._w_calib, self._w_test = self._clip_weights(
        w_calib, w_test, self._clip_bounds
    )
    self._is_fitted = True

BootstrapBaggedWeightEstimator

BootstrapBaggedWeightEstimator(
    base_estimator: BaseWeightEstimator,
    n_bootstraps: int = 100,
    clip_quantile: float | None = 0.05,
    scoring_mode: Literal["frozen"] = "frozen",
)

Bases: BaseWeightEstimator

Bootstrap-bagged wrapper for weight estimators with instance-wise aggregation.

This estimator wraps any base weight estimator and applies bootstrap bagging to create more stable, robust weight estimates. It's most relevant when the calibration set is much larger than the test batch (or vice versa), where standalone weights can become spiky and unstable.

The algorithm: 1. For each bootstrap iteration: - Resample BOTH sets to balanced sample size (min of calibration and test sizes) - Fit the base estimator on the balanced bootstrap sample - Score ALL original instances using the fitted model (perfect coverage) - Store log(weights) for each instance 2. After all iterations: - Aggregate instance-wise weights using geometric mean (average in log-space) - Apply clipping to maintain boundedness for theoretical guarantees

Seed inheritance

This class uses the _seed attribute pattern for automatic seed inheritance from ConformalDetector.

Parameters:

Name Type Description Default
base_estimator BaseWeightEstimator

Any BaseWeightEstimator instance.

required
n_bootstraps int

Number of bootstrap iterations. Defaults to 100.

100
clip_quantile float | None

Quantile for adaptive clipping. Use None to disable clipping. Defaults to 0.05.

0.05
scoring_mode Literal['frozen']

Weight scoring behavior after fit. Currently only "frozen" is supported, meaning the estimator can only serve the exact calibration/test batches used during fit(). Defaults to "frozen".

'frozen'
References

Jin, Ying, and Emmanuel J. Candès. "Selection by Prediction with Conformal p-values." Journal of Machine Learning Research 24.244 (2023): 1-41.

Source code in nonconform/weighting.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def __init__(
    self,
    base_estimator: BaseWeightEstimator,
    n_bootstraps: int = 100,
    clip_quantile: float | None = 0.05,
    scoring_mode: Literal["frozen"] = "frozen",
) -> None:
    if n_bootstraps < 1:
        raise ValueError(
            f"n_bootstraps must be at least 1, got {n_bootstraps}. "
            f"Typical values are 50-200 for stable weight estimation."
        )
    if clip_quantile is not None and not (0 < clip_quantile < 0.5):
        raise ValueError(
            f"clip_quantile must be in (0, 0.5), got {clip_quantile}. "
            f"Common values are 0.05 (5th-95th percentiles) or 0.01."
        )
    if scoring_mode != "frozen":
        raise ValueError(
            f"Unsupported scoring_mode {scoring_mode!r}. "
            "BootstrapBaggedWeightEstimator currently supports only "
            "scoring_mode='frozen'."
        )

    self.base_estimator = base_estimator
    self.n_bootstraps = n_bootstraps
    self.clip_quantile = clip_quantile
    self.scoring_mode: Literal["frozen"] = scoring_mode

    # Seed inheritance attribute (set by ConformalDetector)
    self._seed: int | None = None

    self._w_calib: np.ndarray | None = None
    self._w_test: np.ndarray | None = None
    self._calibration_signature: tuple[tuple[int, ...], str, str] | None = None
    self._test_signature: tuple[tuple[int, ...], str, str] | None = None
    self._is_fitted = False
supports_rescoring property
supports_rescoring: bool

Whether this estimator can score arbitrary new batches after fit().

weight_counts property
weight_counts: str

Return diagnostic info about instance-wise weight coverage.

fit
fit(
    calibration_samples: ndarray, test_samples: ndarray
) -> None

Fit the bagged weight estimator with perfect instance coverage.

Parameters:

Name Type Description Default
calibration_samples ndarray

Array of calibration data samples.

required
test_samples ndarray

Array of test data samples.

required

Raises:

Type Description
ValueError

If calibration_samples is empty.

Source code in nonconform/weighting.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
def fit(self, calibration_samples: np.ndarray, test_samples: np.ndarray) -> None:
    """Fit the bagged weight estimator with perfect instance coverage.

    Args:
        calibration_samples: Array of calibration data samples.
        test_samples: Array of test data samples.

    Raises:
        ValueError: If calibration_samples is empty.
    """
    if calibration_samples.shape[0] == 0:
        raise ValueError("Calibration samples are empty. Cannot compute weights.")

    n_calib, n_test = len(calibration_samples), len(test_samples)
    sample_size = min(n_calib, n_test)
    rng = np.random.default_rng(self._seed)

    if _bagged_logger.isEnabledFor(logging.INFO):
        _bagged_logger.info(
            f"Bootstrap: n_calib={n_calib}, n_test={n_test}, "
            f"sample_size={sample_size}, n_bootstraps={self.n_bootstraps}. "
            f"Perfect coverage: all instances weighted in all iterations."
        )

    # Online accumulation: sum log-weights (memory efficient)
    sum_log_weights_calib = np.zeros(n_calib)
    sum_log_weights_test = np.zeros(n_test)

    bootstrap_iterator = (
        tqdm(range(self.n_bootstraps), desc="Weighting")
        if _bagged_logger.isEnabledFor(logging.INFO)
        else range(self.n_bootstraps)
    )

    for i in bootstrap_iterator:
        # Resample both sets for balanced comparison
        calib_indices = rng.choice(n_calib, size=sample_size, replace=True)
        test_indices = rng.choice(n_test, size=sample_size, replace=True)
        x_calib_boot = calibration_samples[calib_indices]
        x_test_boot = test_samples[test_indices]

        # Create base estimator with iteration-specific seed
        base_est = deepcopy(self.base_estimator)
        if self._seed is not None:
            derived_seed = derive_seed(i, self._seed)
            if hasattr(base_est, "seed"):
                base_est.seed = derived_seed
            if hasattr(base_est, "_seed"):
                base_est._seed = derived_seed

        # Fit on bootstrap sample, then score ALL original instances
        base_est.fit(x_calib_boot, x_test_boot)
        w_c_all, w_t_all = base_est.get_weights(calibration_samples, test_samples)

        # Accumulate log-weights for geometric mean aggregation
        sum_log_weights_calib += np.log(w_c_all)
        sum_log_weights_test += np.log(w_t_all)

    # Geometric mean aggregation: exp(mean(log-weights))
    w_calib_final = np.exp(sum_log_weights_calib / self.n_bootstraps)
    w_test_final = np.exp(sum_log_weights_test / self.n_bootstraps)

    # Apply clipping after aggregation (use base class static method)
    clip_bounds = BaseWeightEstimator._compute_clip_bounds(
        w_calib_final, w_test_final, self.clip_quantile
    )
    if clip_bounds is None:
        self._w_calib = w_calib_final
        self._w_test = w_test_final
    else:
        clip_min, clip_max = clip_bounds
        self._w_calib = np.clip(w_calib_final, clip_min, clip_max)
        self._w_test = np.clip(w_test_final, clip_min, clip_max)

    self._calibration_signature = self._sample_signature(calibration_samples)
    self._test_signature = self._sample_signature(test_samples)
    self._is_fitted = True

logistic_weight_estimator

logistic_weight_estimator(
    regularization: str | float = "auto",
    clip_quantile: float = 0.05,
    class_weight: str | dict = "balanced",
    max_iter: int = 1000,
) -> SklearnWeightEstimator

Create weight estimator using Logistic Regression.

This factory function provides behavioral equivalence with the old LogisticWeightEstimator class.

Note

When used with ConformalDetector, the detector's seed is automatically propagated to the weight estimator for reproducibility.

Parameters:

Name Type Description Default
regularization str | float

Regularization parameter. If 'auto', uses C=1.0. If float, uses as C parameter.

'auto'
clip_quantile float

Quantile for weight clipping. Defaults to 0.05.

0.05
class_weight str | dict

Class weights for LogisticRegression. Defaults to 'balanced'.

'balanced'
max_iter int

Maximum iterations for solver convergence. Defaults to 1000.

1000

Returns:

Type Description
SklearnWeightEstimator

Configured SklearnWeightEstimator instance.

Examples:

estimator = logistic_weight_estimator(regularization=0.5)
estimator.fit(calib_samples, test_samples)
w_calib, w_test = estimator.get_weights()
Source code in nonconform/weighting.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
def logistic_weight_estimator(
    regularization: str | float = "auto",
    clip_quantile: float = 0.05,
    class_weight: str | dict = "balanced",
    max_iter: int = 1000,
) -> SklearnWeightEstimator:
    """Create weight estimator using Logistic Regression.

    This factory function provides behavioral equivalence with the old
    LogisticWeightEstimator class.

    Note:
        When used with ConformalDetector, the detector's seed is automatically
        propagated to the weight estimator for reproducibility.

    Args:
        regularization: Regularization parameter. If 'auto', uses C=1.0.
            If float, uses as C parameter.
        clip_quantile: Quantile for weight clipping. Defaults to 0.05.
        class_weight: Class weights for LogisticRegression. Defaults to 'balanced'.
        max_iter: Maximum iterations for solver convergence. Defaults to 1000.

    Returns:
        Configured SklearnWeightEstimator instance.

    Examples:
        ```python
        estimator = logistic_weight_estimator(regularization=0.5)
        estimator.fit(calib_samples, test_samples)
        w_calib, w_test = estimator.get_weights()
        ```
    """
    from sklearn.pipeline import make_pipeline
    from sklearn.preprocessing import StandardScaler

    c_param = 1.0 if regularization == "auto" else float(regularization)
    base_estimator = make_pipeline(
        StandardScaler(),
        LogisticRegression(
            C=c_param,
            max_iter=max_iter,
            class_weight=class_weight,
        ),
    )
    return SklearnWeightEstimator(
        base_estimator=base_estimator, clip_quantile=clip_quantile
    )

forest_weight_estimator

forest_weight_estimator(
    n_estimators: int = 100,
    max_depth: int | None = 5,
    min_samples_leaf: int = 10,
    clip_quantile: float = 0.05,
) -> SklearnWeightEstimator

Create weight estimator using Random Forest.

This factory function provides behavioral equivalence with the old ForestWeightEstimator class.

Note

When used with ConformalDetector, the detector's seed is automatically propagated to the weight estimator for reproducibility.

Parameters:

Name Type Description Default
n_estimators int

Number of trees in the forest. Defaults to 100.

100
max_depth int | None

Maximum depth of trees. Defaults to 5.

5
min_samples_leaf int

Minimum samples at leaf node. Defaults to 10.

10
clip_quantile float

Quantile for weight clipping. Defaults to 0.05.

0.05

Returns:

Type Description
SklearnWeightEstimator

Configured SklearnWeightEstimator instance.

Examples:

estimator = forest_weight_estimator(n_estimators=200)
estimator.fit(calib_samples, test_samples)
w_calib, w_test = estimator.get_weights()
Source code in nonconform/weighting.py
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
def forest_weight_estimator(
    n_estimators: int = 100,
    max_depth: int | None = 5,
    min_samples_leaf: int = 10,
    clip_quantile: float = 0.05,
) -> SklearnWeightEstimator:
    """Create weight estimator using Random Forest.

    This factory function provides behavioral equivalence with the old
    ForestWeightEstimator class.

    Note:
        When used with ConformalDetector, the detector's seed is automatically
        propagated to the weight estimator for reproducibility.

    Args:
        n_estimators: Number of trees in the forest. Defaults to 100.
        max_depth: Maximum depth of trees. Defaults to 5.
        min_samples_leaf: Minimum samples at leaf node. Defaults to 10.
        clip_quantile: Quantile for weight clipping. Defaults to 0.05.

    Returns:
        Configured SklearnWeightEstimator instance.

    Examples:
        ```python
        estimator = forest_weight_estimator(n_estimators=200)
        estimator.fit(calib_samples, test_samples)
        w_calib, w_test = estimator.get_weights()
        ```
    """
    from sklearn.ensemble import RandomForestClassifier

    base_estimator = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        min_samples_leaf=min_samples_leaf,
        class_weight="balanced",
        n_jobs=-1,
    )
    return SklearnWeightEstimator(
        base_estimator=base_estimator, clip_quantile=clip_quantile
    )

FDR Control

Includes weighted low-level expert APIs (weighted_false_discovery_control). For standard workflows, prefer ConformalDetector.select(...).

nonconform.fdr

False Discovery Rate control utilities for conformal prediction.

This module provides explicit entry points for:

  • Weighted Conformalized Selection (WCS) under covariate shift.

Pruning

Bases: Enum

Pruning strategies for weighted FDR control.

Attributes:

Name Type Description
HETEROGENEOUS

Remove elements based on independent random checks per item.

HOMOGENEOUS

Apply one shared random decision to all items.

DETERMINISTIC

Remove items using a fixed rule with no randomness.

weighted_false_discovery_control

weighted_false_discovery_control(
    result: ConformalResult | None,
    *,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
) -> np.ndarray

Perform WCS from a strict ConformalResult bundle.

Source code in nonconform/fdr.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def weighted_false_discovery_control(
    result: ConformalResult | None,
    *,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
) -> np.ndarray:
    """Perform WCS from a strict ConformalResult bundle."""
    p_values, test_scores, calib_scores, test_weights, calib_weights = (
        _extract_required_wcs_fields(result)
    )
    kde_support, use_self_weight = _extract_kde_support(result)
    return _run_wcs(
        p_values=p_values,
        test_scores=test_scores,
        calib_scores=calib_scores,
        test_weights=test_weights,
        calib_weights=calib_weights,
        alpha=alpha,
        pruning=pruning,
        seed=seed,
        kde_support=kde_support,
        include_self_weight=use_self_weight,
    )

weighted_false_discovery_control_from_arrays

weighted_false_discovery_control_from_arrays(
    *,
    p_values: ndarray,
    test_scores: ndarray,
    calib_scores: ndarray,
    test_weights: ndarray,
    calib_weights: ndarray,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
) -> np.ndarray

Perform WCS from explicit weighted arrays and precomputed p-values.

Source code in nonconform/fdr.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def weighted_false_discovery_control_from_arrays(
    *,
    p_values: np.ndarray,
    test_scores: np.ndarray,
    calib_scores: np.ndarray,
    test_weights: np.ndarray,
    calib_weights: np.ndarray,
    alpha: float = 0.05,
    pruning: Pruning = Pruning.DETERMINISTIC,
    seed: int | None = None,
) -> np.ndarray:
    """Perform WCS from explicit weighted arrays and precomputed p-values."""
    return _run_wcs(
        p_values=p_values,
        test_scores=test_scores,
        calib_scores=calib_scores,
        test_weights=test_weights,
        calib_weights=calib_weights,
        alpha=alpha,
        pruning=pruning,
        seed=seed,
    )

Martingales

nonconform.martingales

Exchangeability martingales for sequential conformal evidence.

This module implements p-value-based martingales and alarm statistics for streaming or temporal monitoring workflows. In practice, you feed one conformal p-value at a time and read a running evidence state after each update.

Implemented martingales
  • PowerMartingale
  • SimpleMixtureMartingale
  • SimpleJumperMartingale

All classes consume conformal p-values in [0, 1]. Alarm statistics are computed from martingale ratio increments and exposed together with the current martingale value in :class:MartingaleState.

AlarmConfig dataclass

AlarmConfig(
    ville_threshold: float | None = None,
    restarted_ville_threshold: float | None = None,
    cusum_threshold: float | None = None,
    shiryaev_roberts_threshold: float | None = None,
)

Optional alarm thresholds for martingale evidence statistics.

Thresholds are disabled when set to None. Each threshold compares against a running statistic in :class:MartingaleState.

ville_threshold and restarted_ville_threshold are Ville thresholds for e-processes. cusum_threshold and shiryaev_roberts_threshold are change-evidence thresholds and should not be interpreted as probability-of-ever-crossing Ville thresholds without a separate theorem for the exact statistic.

MartingaleState dataclass

MartingaleState(
    step: int,
    p_value: float,
    log_martingale: float,
    martingale: float,
    log_restarted_martingale: float,
    restarted_martingale: float,
    log_cusum: float,
    cusum: float,
    log_shiryaev_roberts: float,
    shiryaev_roberts: float,
    triggered_alarms: tuple[str, ...],
)

Snapshot of martingale and alarm statistics after one update.

BaseMartingale

BaseMartingale(alarm_config: AlarmConfig | None = None)

Bases: ABC

Abstract base class for p-value-driven exchangeability martingales.

Source code in nonconform/martingales.py
170
171
172
def __init__(self, alarm_config: AlarmConfig | None = None) -> None:
    self._alarm_config = alarm_config if alarm_config is not None else AlarmConfig()
    self.reset()
state property
state: MartingaleState

Return current state snapshot.

reset
reset() -> None

Reset martingale and alarm statistics to initial values.

Source code in nonconform/martingales.py
179
180
181
182
183
184
185
186
187
188
189
def reset(self) -> None:
    """Reset martingale and alarm statistics to initial values."""
    self._step = 0
    self._last_p_value = float("nan")
    self._log_martingale = 0.0
    self._log_active_restarted_mass = float("-inf")
    self._log_restarted_martingale = 0.0
    # CUSUM/SR start at 0 on linear scale -> -inf in log space.
    self._log_cusum = float("-inf")
    self._log_shiryaev_roberts = float("-inf")
    self._reset_method_state()
update_many
update_many(
    p_values: Sequence[float] | ndarray,
) -> list[MartingaleState]

Update state for each p-value in order and return all snapshots.

Source code in nonconform/martingales.py
191
192
193
194
195
def update_many(
    self, p_values: Sequence[float] | np.ndarray
) -> list[MartingaleState]:
    """Update state for each p-value in order and return all snapshots."""
    return [self.update(float(p_value)) for p_value in p_values]
update
update(p_value: float) -> MartingaleState

Ingest one p-value in [0, 1] and return the updated state.

Source code in nonconform/martingales.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def update(self, p_value: float) -> MartingaleState:
    """Ingest one p-value in ``[0, 1]`` and return the updated state."""
    p_value_validated = _validate_probability(p_value)
    log_increment = self._compute_log_increment(p_value_validated)
    if np.isnan(log_increment):
        raise ValueError("Martingale increment is NaN.")

    self._step += 1
    self._last_p_value = p_value_validated
    self._log_martingale += log_increment
    self._log_active_restarted_mass = float(
        log_increment
        + np.logaddexp(
            self._log_active_restarted_mass,
            _log_harmonic_restart_weight(self._step),
        )
    )
    self._log_restarted_martingale = float(
        np.logaddexp(
            self._log_active_restarted_mass,
            _log_harmonic_restart_tail(self._step),
        )
    )
    self._log_cusum = float(log_increment + max(self._log_cusum, 0.0))
    self._log_shiryaev_roberts = float(
        log_increment + np.logaddexp(0.0, self._log_shiryaev_roberts)
    )
    return self._current_state()

PowerMartingale

PowerMartingale(
    epsilon: float = 0.5,
    alarm_config: AlarmConfig | None = None,
)

Bases: BaseMartingale

Power martingale with fixed epsilon in (0, 1].

Source code in nonconform/martingales.py
280
281
282
283
284
285
286
287
288
def __init__(
    self,
    epsilon: float = 0.5,
    alarm_config: AlarmConfig | None = None,
) -> None:
    self.epsilon = float(epsilon)
    if not (0.0 < self.epsilon <= 1.0):
        raise ValueError(f"epsilon must be in (0, 1], got {self.epsilon}.")
    super().__init__(alarm_config=alarm_config)

SimpleMixtureMartingale

SimpleMixtureMartingale(
    epsilons: Sequence[float] | ndarray | None = None,
    *,
    n_grid: int = 100,
    min_epsilon: float = 0.01,
    alarm_config: AlarmConfig | None = None,
)

Bases: BaseMartingale

Simple mixture martingale over a fixed epsilon grid.

Source code in nonconform/martingales.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def __init__(
    self,
    epsilons: Sequence[float] | np.ndarray | None = None,
    *,
    n_grid: int = 100,
    min_epsilon: float = 0.01,
    alarm_config: AlarmConfig | None = None,
) -> None:
    if epsilons is None:
        if n_grid < 2:
            raise ValueError(f"n_grid must be at least 2, got {n_grid}.")
        if not (0.0 < min_epsilon <= 1.0):
            raise ValueError(f"min_epsilon must be in (0, 1], got {min_epsilon}.")
        self.epsilons = np.linspace(float(min_epsilon), 1.0, int(n_grid))
    else:
        self.epsilons = np.asarray(epsilons, dtype=float)
        if self.epsilons.ndim != 1 or self.epsilons.size == 0:
            raise ValueError("epsilons must be a non-empty 1D sequence.")

    if not np.all(np.isfinite(self.epsilons)):
        raise ValueError("epsilons must be finite.")
    if np.any((self.epsilons <= 0.0) | (self.epsilons > 1.0)):
        raise ValueError("All epsilons must be in (0, 1].")
    self._n_eps = int(self.epsilons.size)
    super().__init__(alarm_config=alarm_config)

SimpleJumperMartingale

SimpleJumperMartingale(
    jump: float = 0.01,
    alarm_config: AlarmConfig | None = None,
)

Bases: BaseMartingale

Simple Jumper martingale (Algorithm 1 in Vovk et al.).

This method mixes three betting components and redistributes mass each step through jump.

Source code in nonconform/martingales.py
358
359
360
361
362
363
364
365
366
367
def __init__(
    self,
    jump: float = 0.01,
    alarm_config: AlarmConfig | None = None,
) -> None:
    self.jump = float(jump)
    if not (0.0 < self.jump <= 1.0):
        raise ValueError(f"jump must be in (0, 1], got {self.jump}.")
    self._epsilons = np.array([-1.0, 0.0, 1.0], dtype=float)
    super().__init__(alarm_config=alarm_config)

Data Structures

nonconform.structures

Core data structures and protocols for nonconform.

This module provides the fundamental types used throughout the package:

Classes:

Name Description
AnomalyDetector

Protocol defining the detector interface.

ConformalResult

Container for conformal prediction outputs.

AnomalyDetector

Bases: Protocol

Protocol defining the interface for anomaly detectors.

Any detector (PyOD, sklearn-compatible, or custom) can be used with nonconform by implementing this protocol.

Required methods

fit: Train the detector on data decision_function: Compute anomaly scores get_params: Retrieve detector parameters set_params: Configure detector parameters

The detector must be copyable (support copy.copy and copy.deepcopy).

Examples:

# Most PyOD detectors work automatically (blocked strict-inductive
# exceptions are documented in the detector compatibility guide)
from pyod.models.iforest import IForest

detector: AnomalyDetector = IForest()


# Custom detector implementing the protocol
class MyDetector:
    def fit(self, X, y=None): ...
    def decision_function(self, X): ...
    def get_params(self, deep=True): ...
    def set_params(self, **params): ...


detector: AnomalyDetector = MyDetector()
fit
fit(X: ndarray, y: ndarray | None = None) -> Self

Train the anomaly detector.

Parameters:

Name Type Description Default
X ndarray

Training data of shape (n_samples, n_features).

required
y ndarray | None

Ignored. Present for API consistency.

None

Returns:

Type Description
Self

The fitted detector instance.

Source code in nonconform/structures.py
62
63
64
65
66
67
68
69
70
71
72
def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> Self:
    """Train the anomaly detector.

    Args:
        X: Training data of shape (n_samples, n_features).
        y: Ignored. Present for API consistency.

    Returns:
        The fitted detector instance.
    """
    ...
decision_function
decision_function(X: ndarray) -> np.ndarray

Compute anomaly scores for samples.

Higher scores indicate more anomalous samples.

Parameters:

Name Type Description Default
X ndarray

Data of shape (n_samples, n_features).

required

Returns:

Type Description
ndarray

Anomaly scores of shape (n_samples,).

Source code in nonconform/structures.py
74
75
76
77
78
79
80
81
82
83
84
85
def decision_function(self, X: np.ndarray) -> np.ndarray:
    """Compute anomaly scores for samples.

    Higher scores indicate more anomalous samples.

    Args:
        X: Data of shape (n_samples, n_features).

    Returns:
        Anomaly scores of shape (n_samples,).
    """
    ...
get_params
get_params(deep: bool = True) -> dict[str, Any]

Get parameters for this detector.

Parameters:

Name Type Description Default
deep bool

If True, return parameters for sub-objects.

True

Returns:

Type Description
dict[str, Any]

Parameter names mapped to their values.

Source code in nonconform/structures.py
87
88
89
90
91
92
93
94
95
96
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Get parameters for this detector.

    Args:
        deep: If True, return parameters for sub-objects.

    Returns:
        Parameter names mapped to their values.
    """
    ...
set_params
set_params(**params: Any) -> Self

Set parameters for this detector.

Parameters:

Name Type Description Default
**params Any

Detector parameters.

{}

Returns:

Type Description
Self

The detector instance.

Source code in nonconform/structures.py
 98
 99
100
101
102
103
104
105
106
107
def set_params(self, **params: Any) -> Self:
    """Set parameters for this detector.

    Args:
        **params: Detector parameters.

    Returns:
        The detector instance.
    """
    ...

ConformalResult dataclass

ConformalResult(
    p_values: ndarray | None = None,
    test_scores: ndarray | None = None,
    calib_scores: ndarray | None = None,
    test_weights: ndarray | None = None,
    calib_weights: ndarray | None = None,
    metadata: dict[str, Any] = dict(),
)

Snapshot of detector outputs for downstream procedures.

This dataclass holds all outputs from a conformal prediction, including p-values, raw scores, and optional weights for weighted conformal.

Attributes:

Name Type Description
p_values ndarray | None

Conformal p-values for test instances (None when unavailable).

test_scores ndarray | None

Non-conformity scores for the test instances (raw predictions).

calib_scores ndarray | None

Non-conformity scores for the calibration set.

test_weights ndarray | None

Importance weights for test instances (weighted mode only).

calib_weights ndarray | None

Importance weights for calibration instances.

metadata dict[str, Any]

Optional dictionary with extra data (debug info, timings, etc.).

Examples:

p_values = detector.compute_p_values(X_test)
result = detector.last_result
print(result.p_values)  # Access p-values
print(result.metadata)  # Access optional metadata
copy
copy() -> ConformalResult

Return a copy with arrays and metadata fully duplicated.

Returns:

Type Description
ConformalResult

A new ConformalResult with copied arrays and deep-copied metadata.

Source code in nonconform/structures.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def copy(self) -> ConformalResult:
    """Return a copy with arrays and metadata fully duplicated.

    Returns:
        A new ConformalResult with copied arrays and deep-copied metadata.
    """

    def _copy_arr(arr: np.ndarray | None) -> np.ndarray | None:
        return arr.copy() if arr is not None else None

    return ConformalResult(
        p_values=_copy_arr(self.p_values),
        test_scores=_copy_arr(self.test_scores),
        calib_scores=_copy_arr(self.calib_scores),
        test_weights=_copy_arr(self.test_weights),
        calib_weights=_copy_arr(self.calib_weights),
        metadata=deepcopy(self.metadata),
    )

Adapters

nonconform.adapters

External detector adapters for nonconform.

ScorePolarityAdapter

ScorePolarityAdapter(
    detector: AnomalyDetector, score_polarity: ScorePolarity
)

Adapter that normalizes detector score direction conventions.

Source code in nonconform/adapters.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def __init__(
    self,
    detector: AnomalyDetector,
    score_polarity: ScorePolarity,
) -> None:
    if score_polarity not in {
        ScorePolarity.HIGHER_IS_ANOMALOUS,
        ScorePolarity.HIGHER_IS_NORMAL,
    }:
        raise ValueError(
            "ScorePolarityAdapter requires explicit non-auto score polarity."
        )
    self._detector = detector
    self._score_polarity = score_polarity
    self._multiplier = (
        1.0 if score_polarity is ScorePolarity.HIGHER_IS_ANOMALOUS else -1.0
    )
fit
fit(X: ndarray, y: ndarray | None = None) -> Self

Fit wrapped detector.

Source code in nonconform/adapters.py
233
234
235
236
def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> Self:
    """Fit wrapped detector."""
    self._detector.fit(X, y)
    return self
decision_function
decision_function(X: ndarray) -> np.ndarray

Return scores transformed to anomalous-higher convention.

Source code in nonconform/adapters.py
238
239
240
241
def decision_function(self, X: np.ndarray) -> np.ndarray:
    """Return scores transformed to anomalous-higher convention."""
    scores = np.asarray(self._detector.decision_function(X), dtype=float)
    return self._multiplier * scores
get_params
get_params(deep: bool = True) -> dict[str, Any]

Delegate parameter retrieval to wrapped detector.

Source code in nonconform/adapters.py
243
244
245
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Delegate parameter retrieval to wrapped detector."""
    return self._detector.get_params(deep=deep)
set_params
set_params(**params: Any) -> Self

Delegate parameter updates to wrapped detector.

Source code in nonconform/adapters.py
247
248
249
250
def set_params(self, **params: Any) -> Self:
    """Delegate parameter updates to wrapped detector."""
    self._detector.set_params(**params)
    return self

PyODAdapter

PyODAdapter(detector: Any)

Adapter wrapping PyOD detectors to ensure protocol compliance.

Source code in nonconform/adapters.py
284
285
286
287
288
def __init__(self, detector: Any) -> None:
    """Initialize adapter for a PyOD detector."""
    if not PYOD_AVAILABLE:
        raise ImportError("PyOD is not installed. Install with: pip install pyod")
    self._detector = detector
fit
fit(X: ndarray, y: ndarray | None = None) -> Self

Fit wrapped detector.

Source code in nonconform/adapters.py
290
291
292
293
def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> Self:
    """Fit wrapped detector."""
    self._detector.fit(X, y)
    return self
decision_function
decision_function(X: ndarray) -> np.ndarray

Return anomaly scores from wrapped detector.

Source code in nonconform/adapters.py
295
296
297
def decision_function(self, X: np.ndarray) -> np.ndarray:
    """Return anomaly scores from wrapped detector."""
    return self._detector.decision_function(X)
get_params
get_params(deep: bool = True) -> dict[str, Any]

Delegate parameter retrieval to wrapped detector.

Source code in nonconform/adapters.py
299
300
301
def get_params(self, deep: bool = True) -> dict[str, Any]:
    """Delegate parameter retrieval to wrapped detector."""
    return self._detector.get_params(deep=deep)
set_params
set_params(**params: Any) -> Self

Delegate parameter updates to wrapped detector.

Source code in nonconform/adapters.py
303
304
305
306
def set_params(self, **params: Any) -> Self:
    """Delegate parameter updates to wrapped detector."""
    self._detector.set_params(**params)
    return self

adapt

adapt(detector: Any) -> AnomalyDetector

Adapt a detector to the AnomalyDetector protocol.

Source code in nonconform/adapters.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def adapt(detector: Any) -> AnomalyDetector:
    """Adapt a detector to the AnomalyDetector protocol."""
    _guard_blocked_pyod_detector(detector)

    if isinstance(detector, AnomalyDetector):
        return detector

    if PYOD_AVAILABLE and isinstance(detector, PyODBaseDetector):
        return PyODAdapter(detector)

    if not PYOD_AVAILABLE and _looks_like_pyod(detector):
        raise ImportError(
            "Detector appears to be a PyOD detector, but PyOD is not installed. "
            'Install with: pip install "nonconform[pyod]" or pip install pyod.'
        )

    required_methods = ["fit", "decision_function", "get_params", "set_params"]
    missing_methods = [m for m in required_methods if not hasattr(detector, m)]
    if missing_methods:
        raise TypeError(
            "Detector must implement AnomalyDetector protocol. "
            f"Missing methods: {', '.join(missing_methods)}"
        )

    return detector

parse_score_polarity

parse_score_polarity(
    score_polarity: ScorePolarityInput,
) -> ScorePolarity

Parse score polarity input to canonical enum representation.

Source code in nonconform/adapters.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def parse_score_polarity(score_polarity: ScorePolarityInput) -> ScorePolarity:
    """Parse score polarity input to canonical enum representation."""
    if isinstance(score_polarity, ScorePolarity):
        return score_polarity

    if isinstance(score_polarity, str):
        normalized = score_polarity.strip().lower()
        mapping = {
            "auto": ScorePolarity.AUTO,
            "higher_is_anomalous": ScorePolarity.HIGHER_IS_ANOMALOUS,
            "higher_is_normal": ScorePolarity.HIGHER_IS_NORMAL,
        }
        if normalized in mapping:
            return mapping[normalized]
        raise ValueError(
            "Invalid score_polarity value. "
            "Use one of: 'auto', 'higher_is_anomalous', 'higher_is_normal'."
        )

    raise TypeError(
        "score_polarity must be a ScorePolarity enum or string literal "
        "('auto', 'higher_is_anomalous', 'higher_is_normal')."
    )

resolve_implicit_score_polarity

resolve_implicit_score_polarity(
    detector: Any,
) -> ScorePolarity

Resolve score polarity when users omit score_polarity.

The default favors low-friction custom detector onboarding while preserving safe behavior for known detector families: - Known sklearn normality detectors -> HIGHER_IS_NORMAL - PyOD detectors -> HIGHER_IS_ANOMALOUS - Unknown custom detectors -> HIGHER_IS_ANOMALOUS

Source code in nonconform/adapters.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def resolve_implicit_score_polarity(detector: Any) -> ScorePolarity:
    """Resolve score polarity when users omit score_polarity.

    The default favors low-friction custom detector onboarding while
    preserving safe behavior for known detector families:
    - Known sklearn normality detectors -> HIGHER_IS_NORMAL
    - PyOD detectors -> HIGHER_IS_ANOMALOUS
    - Unknown custom detectors -> HIGHER_IS_ANOMALOUS
    """
    if _is_known_sklearn_normality_detector(detector):
        return ScorePolarity.HIGHER_IS_NORMAL
    if isinstance(detector, PyODAdapter) or _looks_like_pyod(detector):
        return ScorePolarity.HIGHER_IS_ANOMALOUS
    return ScorePolarity.HIGHER_IS_ANOMALOUS

resolve_score_polarity

resolve_score_polarity(
    detector: Any, score_polarity: ScorePolarityInput
) -> ScorePolarity

Resolve requested score polarity in strict AUTO mode.

Unlike resolve_implicit_score_polarity, this function is intentionally strict for explicit score_polarity="auto" and raises for unknown detectors.

Source code in nonconform/adapters.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def resolve_score_polarity(
    detector: Any,
    score_polarity: ScorePolarityInput,
) -> ScorePolarity:
    """Resolve requested score polarity in strict AUTO mode.

    Unlike ``resolve_implicit_score_polarity``, this function is intentionally
    strict for explicit ``score_polarity="auto"`` and raises for unknown
    detectors.
    """
    parsed = parse_score_polarity(score_polarity)
    if parsed is not ScorePolarity.AUTO:
        return parsed

    if isinstance(detector, PyODAdapter) or _looks_like_pyod(detector):
        return ScorePolarity.HIGHER_IS_ANOMALOUS
    if _is_known_sklearn_normality_detector(detector):
        return ScorePolarity.HIGHER_IS_NORMAL

    detector_cls = type(detector)
    detector_name = f"{detector_cls.__module__}.{detector_cls.__qualname__}"
    raise ValueError(
        "Unable to infer score polarity automatically in strict auto mode for "
        f"detector '{detector_name}'. Auto inference currently supports PyOD "
        "detectors and known sklearn normality estimators. For custom detectors, "
        "pass score_polarity='higher_is_anomalous' (recommended when larger "
        "scores mean more anomalous) or score_polarity='higher_is_normal'."
    )

apply_score_polarity

apply_score_polarity(
    detector: AnomalyDetector,
    score_polarity: ScorePolarityInput,
) -> AnomalyDetector

Return detector that follows requested score polarity convention.

Source code in nonconform/adapters.py
191
192
193
194
195
196
197
198
199
200
201
202
203
def apply_score_polarity(
    detector: AnomalyDetector,
    score_polarity: ScorePolarityInput,
) -> AnomalyDetector:
    """Return detector that follows requested score polarity convention."""
    parsed = parse_score_polarity(score_polarity)
    if parsed is ScorePolarity.AUTO:
        raise ValueError(
            "score_polarity='auto' must be resolved first with resolve_score_polarity."
        )
    if parsed is ScorePolarity.HIGHER_IS_ANOMALOUS:
        return detector
    return ScorePolarityAdapter(detector=detector, score_polarity=parsed)