Skip to content

Bootstrap-based Conformal Detection

This example demonstrates how to use bootstrap resampling for conformal anomaly detection.

Setup

import numpy as np
from pyod.models.lof import LOF
from sklearn.datasets import load_breast_cancer
from scipy.stats import false_discovery_control
from nonconform.estimation import ConformalDetector
from nonconform.strategy import Bootstrap
from nonconform.utils.func import Aggregation

# Load example data
data = load_breast_cancer()
X = data.data
y = data.target

Basic Usage

# Initialize base detector
base_detector = LOF()

# Create bootstrap strategy
bootstrap_strategy = Bootstrap(
    n_bootstraps=100,
    resampling_ratio=0.8
)

# Initialize detector with bootstrap strategy
detector = ConformalDetector(
    detector=base_detector,
    strategy=bootstrap_strategy,
    aggregation=Aggregation.MEDIAN,
    seed=42
)

# Fit and predict
detector.fit(X)
p_values = detector.predict(X, raw=False)

# Detect anomalies
anomalies = p_values < 0.05
print(f"Number of anomalies detected: {anomalies.sum()}")

Bootstrap Plus Mode

# Use bootstrap plus mode for better calibration
bootstrap_plus_strategy = Bootstrap(
    n_bootstraps=100,
    resampling_ratio=0.8,
    plus=True
)

detector_plus = ConformalDetector(
    detector=base_detector,
    strategy=bootstrap_plus_strategy,
    aggregation=Aggregation.MEDIAN,
    seed=42
)

# Fit and predict with ensemble
detector_plus.fit(X)
p_values_plus = detector_plus.predict(X, raw=False)

print(f"Bootstrap detections: {(p_values < 0.05).sum()}")
print(f"Bootstrap+ detections: {(p_values_plus < 0.05).sum()}")

Comparing Different Bootstrap Configurations

# Try different bootstrap configurations
configurations = [
    {"n_bootstraps": 50, "resampling_ratio": 0.7},
    {"n_bootstraps": 100, "resampling_ratio": 0.8},
    {"n_bootstraps": 200, "resampling_ratio": 0.9}
]

results = {}
for config in configurations:
    strategy = Bootstrap(**config)
    detector = ConformalDetector(
        detector=base_detector,
        strategy=strategy,
        aggregation=Aggregation.MEDIAN,
        seed=42,
    )
    detector.fit(X)
    p_vals = detector.predict(X, raw=False)

    key = f"B={config['n_bootstraps']}, r={config['resampling_ratio']}"
    results[key] = (p_vals < 0.05).sum()
    print(f"{key}: {results[key]} detections")

FDR Control with Bootstrap

# Apply FDR control to bootstrap p-values
adjusted_p_values = false_discovery_control(p_values, method='bh')
discoveries = adjusted_p_values < 0.05

print(f"\nFDR Control Results:")
print(f"Discoveries: {discoveries.sum()}")
print(f"Original detections: {(p_values < 0.05).sum()}")
print(f"Reduction: {(p_values < 0.05).sum() - discoveries.sum()}")

Accessing Bootstrap Iterations

# Track calibration scores during bootstrap iterations
iteration_scores = []
iteration_stats = []

def track_bootstrap_iterations(iteration: int, scores: np.ndarray):
    """Callback to track calibration scores per iteration."""
    iteration_scores.append(scores.copy())
    iteration_stats.append({
        'iteration': iteration,
        'count': len(scores),
        'mean': scores.mean(),
        'std': scores.std(),
        'min': scores.min(),
        'max': scores.max()
    })
    print(f"Iteration {iteration}: {len(scores)} calibration scores, "
          f"mean={scores.mean():.3f}")

# Initialize bootstrap strategy with callback
bootstrap_strategy = Bootstrap(
    n_bootstraps=10,
    resampling_ratio=0.8
)

detector = ConformalDetector(
    detector=base_detector,
    strategy=bootstrap_strategy,
    aggregation=Aggregation.MEDIAN,
    seed=42
)

# Fit with iteration callback
detector.detector_set, detector.calibration_set = bootstrap_strategy.fit_calibrate(
    X, detector.detector, iteration_callback=track_bootstrap_iterations
)

# Analyze iteration progression
print(f"\nBootstrap completed with {len(iteration_scores)} iterations")
print(f"Total calibration scores: {len(detector.calibration_set)}")

Bootstrap Iteration Analysis

# Analyze how calibration distribution evolves
import matplotlib.pyplot as plt

plt.figure(figsize=(15, 5))

# Plot calibration score evolution
plt.subplot(1, 3, 1)
for i, scores in enumerate(iteration_scores[:5]):  # Show first 5 iterations
    plt.hist(scores, bins=20, alpha=0.5, label=f'Iter {i}')
plt.xlabel('Calibration Score')
plt.ylabel('Frequency')
plt.title('Calibration Score Distribution per Iteration')
plt.legend()

# Plot iteration statistics
plt.subplot(1, 3, 2)
means = [stat['mean'] for stat in iteration_stats]
stds = [stat['std'] for stat in iteration_stats]
iterations = list(range(len(iteration_stats)))

plt.errorbar(iterations, means, yerr=stds, marker='o', capsize=5)
plt.xlabel('Bootstrap Iteration')
plt.ylabel('Mean Calibration Score')
plt.title('Score Statistics Evolution')

# Cumulative distribution analysis
plt.subplot(1, 3, 3)
cumulative_scores = []
for i in range(len(iteration_scores)):
    # Combine all scores up to iteration i
    combined = np.concatenate(iteration_scores[:i+1])
    cumulative_scores.append(combined.mean())

plt.plot(iterations, cumulative_scores, marker='s')
plt.xlabel('Bootstrap Iteration')
plt.ylabel('Cumulative Mean Score')
plt.title('Cumulative Calibration Mean')

plt.tight_layout()
plt.show()

# Print iteration summary
print("\nIteration Summary:")
for stat in iteration_stats:
    print(f"Iteration {stat['iteration']:2d}: "
          f"{stat['count']:3d} scores, "
          f"mean={stat['mean']:6.3f}, "
          f"std={stat['std']:6.3f}")

Uncertainty Quantification

# Get raw scores for uncertainty analysis
raw_scores = detector.predict(X, raw=True)

# Analyze score distribution
plt.figure(figsize=(12, 4))

# Score distribution
plt.subplot(1, 3, 1)
plt.hist(raw_scores, bins=50, alpha=0.7, color='blue', edgecolor='black')
plt.xlabel('Anomaly Score')
plt.ylabel('Frequency')
plt.title('Bootstrap Anomaly Score Distribution')

# P-value calculation using final calibration set
p_values = detector.predict(X, raw=False)

# P-value vs Score relationship
plt.subplot(1, 3, 2)
plt.scatter(raw_scores, p_values, alpha=0.5)
plt.xlabel('Anomaly Score')
plt.ylabel('p-value')
plt.title('Score vs P-value Relationship')

# Bootstrap stability analysis
plt.subplot(1, 3, 3)
# Run multiple bootstrap iterations
stability_results = []
for _ in range(10):
    det = ConformalDetector(
        detector=base_detector,
        strategy=Bootstrap(n_bootstraps=50, resampling_ratio=0.8),
        aggregation=Aggregation.MEDIAN,
        seed=np.random.randint(1000)
    )
    det.fit(X)
    p_vals = det.predict(X, raw=False)
    stability_results.append((p_vals < 0.05).sum())

plt.boxplot(stability_results)
plt.ylabel('Number of Detections')
plt.title('Bootstrap Detection Stability')

plt.tight_layout()
plt.show()

Comparison with Other Strategies

from nonconform.strategy import Split, Jackknife

# Compare bootstrap with other strategies
strategies = {
    'Bootstrap': Bootstrap(n_bootstraps=100, resampling_ratio=0.8),
    'Split': Split(n_calib=0.2),
    'Jackknife': Jackknife()
}

comparison_results = {}
for name, strategy in strategies.items():
    detector = ConformalDetector(
        detector=base_detector,
        strategy=strategy,
        aggregation=Aggregation.MEDIAN,
        seed=42,
    )
    detector.fit(X)
    p_vals = detector.predict(X, raw=False)
    comparison_results[name] = {
        'detections': (p_vals < 0.05).sum(),
        'min_p': p_vals.min(),
        'mean_p': p_vals.mean()
    }

print("\nStrategy Comparison:")
for name, results in comparison_results.items():
    print(f"{name}:")
    print(f"  Detections: {results['detections']}")
    print(f"  Min p-value: {results['min_p']:.4f}")
    print(f"  Mean p-value: {results['mean_p']:.4f}")

Next Steps