Best Practices Guide¶
This guide provides recommendations for using nonconform effectively in different scenarios with the new API.
Data Preparation¶
1. Data Quality¶
- Ensure your data is clean and preprocessed
- Handle missing values appropriately
- Normalize or standardize features when necessary
- Remove or handle outliers in the training data
- Check for data leakage between training and test sets
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
# Data cleaning pipeline
def prepare_data(X):
# Handle missing values
imputer = SimpleImputer(strategy='median')
X_clean = imputer.fit_transform(X)
# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_clean)
return X_scaled, imputer, scaler
2. Feature Engineering¶
- Use domain knowledge to create relevant features
- Consider feature selection to reduce dimensionality
- Handle categorical variables appropriately
- Create features that capture temporal patterns if applicable
- Consider feature interactions
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.preprocessing import LabelEncoder
def engineer_features(X, categorical_cols=None, k_best=None):
"""Feature engineering pipeline."""
X_engineered = X.copy()
# Handle categorical variables
if categorical_cols:
encoder = LabelEncoder()
for col in categorical_cols:
X_engineered[:, col] = encoder.fit_transform(X_engineered[:, col])
# Feature selection
if k_best:
selector = SelectKBest(f_classif, k=k_best)
X_engineered = selector.fit_transform(X_engineered, y_normal_indicator)
return X_engineered
Model Selection¶
1. Choosing a Detector¶
Consider the following when selecting a detector:
Data Size Considerations¶
- Small datasets (< 1,000 samples): Use simpler models (IsolationForest, LOF)
- Medium datasets (1,000-100,000): Most detectors work well
- Large datasets (> 100,000): Consider scalable models, use parallel processing
- High-dimensional data: Use PCA-based preprocessing or specialized methods
Data Characteristics¶
- Linear patterns: Use PCA, OCSVM
- Non-linear patterns: Use IsolationForest, LOF, KNN
- Complex patterns: Use deep learning models when available
- Temporal data: Consider features that capture time dependencies
Computational Resources¶
from pyod.models.iforest import IForest
from pyod.models.lof import LOF
from pyod.models.knn import KNN
# Fast detectors for large datasets
fast_detectors = {
'IsolationForest': IForest(contamination=0.1, n_jobs=-1),
'LOF': LOF(contamination=0.1, n_jobs=-1),
}
# Accurate but slower detectors
accurate_detectors = {
'KNN': KNN(contamination=0.1),
'OCSVM': OCSVM(contamination=0.1)
}
2. Ensemble Methods¶
Consider using multiple detectors for improved robustness:
from nonconform.estimation.standard import ConformalDetector
from nonconform.strategy.split import Split
from nonconform.utils.func.enums import Aggregation
from scipy.stats import false_discovery_control
# Create multiple detectors
detectors = {
'LOF': LOF(contamination=0.1),
'IForest': IForest(contamination=0.1, n_jobs=-1),
'OCSVM': OCSVM(contamination=0.1)
}
# Get p-values from each detector
all_p_values = {}
strategy = SplitStrategy(calibration_size=0.2)
for name, detector in detectors.items():
conf_detector = ConformalDetector(
detector=detector,
strategy=strategy,
aggregation=Aggregation.MEDIAN,
seed=42
)
conf_detector.fit(X_train)
p_values = conf_detector.predict(X_test, raw=False)
all_p_values[name] = p_values
# Combine results (simple approach: use minimum p-value)
ensemble_p_values = np.minimum.reduce(list(all_p_values.values()))
ensemble_discoveries = false_discovery_control(ensemble_p_values, method='bh') < 0.05
Conformal Strategy Selection¶
1. Split Strategy¶
Best for: - Large datasets (> 10,000 samples) - When computational efficiency is important - When you have enough data for reliable calibration
from nonconform.strategy.split import Split
# For large datasets
strategy = Split(n_calib=0.2) # Use 20% for calibration
# For very large datasets, use absolute number
strategy = Split(n_calib=1000) # Use 1000 samples
2. Jackknife (Leave-One-Out)¶
Best for: - Small datasets (< 1,000 samples) - When you need maximum statistical power - When computational cost is not a primary concern
from nonconform.strategy.jackknife import Jackknife
# For small datasets where every sample matters
strategy = Jackknife()
3. Bootstrap¶
Best for: - Medium-sized datasets (1,000-10,000 samples) - When you need robust estimates - When you want to balance efficiency and power
from nonconform.strategy.bootstrap import Bootstrap
# Balanced approach for medium datasets
strategy = Bootstrap(
n_bootstraps=50,
resampling_ratio=0.8
)
4. Cross-Validation¶
Best for: - When you want to use all data efficiently - Medium to large datasets - When you need stable performance estimates
from nonconform.strategy.cross_val import CrossValidation
# Good balance of efficiency and stability
strategy = CrossValidation(k=5)
Calibration Best Practices¶
1. Calibration Set Size¶
def choose_calibration_strategy(n_samples):
"""Choose appropriate strategy based on dataset size."""
if n_samples < 500:
return Jackknife()
elif n_samples < 2000:
return Bootstrap(n_bootstraps=50, resampling_ratio=0.8)
elif n_samples < 10000:
return CrossValidation(k=5)
else:
# Use absolute number for very large datasets
calib_size = min(2000, int(0.2 * n_samples))
return Split(calib_size=calib_size)
2. Calibration Data Quality¶
- Ensure calibration data is representative of normal class
- Avoid using contaminated data for calibration
- Consider stratified sampling for balanced calibration
def validate_calibration_data(X_train, contamination_rate=0.05):
"""Validate that calibration data is clean."""
# Use a simple detector to identify potential anomalies in training data
temp_detector = IForest(contamination=contamination_rate)
temp_detector.fit(X_train)
anomaly_scores = temp_detector.decision_function(X_train)
# Keep only the most normal samples for calibration
normal_threshold = np.percentile(anomaly_scores, (1 - contamination_rate) * 100)
clean_indices = anomaly_scores >= normal_threshold
return X_train[clean_indices]
FDR Control Best Practices¶
1. Alpha Selection¶
def choose_alpha_level(application_type):
"""Choose appropriate alpha level based on application."""
alpha_levels = {
'critical_safety': 0.001, # Medical devices, safety systems
'financial': 0.01, # Fraud detection, trading
'security': 0.01, # Intrusion detection
'quality_control': 0.05, # Manufacturing, general QC
'exploratory': 0.1, # Research, data exploration
'monitoring': 0.05 # System monitoring
}
return alpha_levels.get(application_type, 0.05)
2. Multiple Testing Scenarios¶
from scipy.stats import false_discovery_control
def apply_fdr_control(p_values, alpha=0.05, method='bh'):
"""Apply FDR control with proper validation."""
# Validate p-values
if np.any(p_values < 0) or np.any(p_values > 1):
raise ValueError("P-values must be between 0 and 1")
# Apply FDR control
adjusted_p_values = false_discovery_control(p_values, method=method, alpha=alpha)
discoveries = adjusted_p_values < alpha
print(f"Original detections: {(p_values < alpha).sum()}")
print(f"FDR-controlled discoveries: {discoveries.sum()}")
print(f"Reduction: {(p_values < alpha).sum() - discoveries.sum()}")
return discoveries, adjusted_p_values
Performance Monitoring¶
1. Key Metrics to Track¶
def calculate_performance_metrics(y_true, discoveries):
"""Calculate comprehensive performance metrics."""
if len(y_true) != len(discoveries):
raise ValueError("y_true and discoveries must have same length")
true_positives = np.sum(discoveries & (y_true == 1))
false_positives = np.sum(discoveries & (y_true == 0))
true_negatives = np.sum(~discoveries & (y_true == 0))
false_negatives = np.sum(~discoveries & (y_true == 1))
# Calculate metrics
precision = true_positives / max(1, true_positives + false_positives)
recall = true_positives / max(1, true_positives + false_negatives)
f1_score = 2 * precision * recall / max(1e-10, precision + recall)
# FDR calculation
fdr = false_positives / max(1, true_positives + false_positives)
return {
'precision': precision,
'recall': recall,
'f1_score': f1_score,
'fdr': fdr,
'true_positives': true_positives,
'false_positives': false_positives,
'discoveries': discoveries.sum()
}
2. Performance Monitoring Pipeline¶
import time
import psutil
import os
class PerformanceMonitor:
"""Monitor detector performance over time."""
def __init__(self):
self.metrics_history = []
def monitor_prediction(self, detector, X_test, y_true=None):
"""Monitor a single prediction run."""
# Time the prediction
start_time = time.time()
start_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
p_values = detector.predict(X_test, raw=False)
end_time = time.time()
end_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
# Apply FDR control
discoveries, _ = apply_fdr_control(p_values)
metrics = {
'timestamp': time.time(),
'prediction_time': end_time - start_time,
'memory_usage': end_memory - start_memory,
'n_samples': len(X_test),
'discoveries': discoveries.sum(),
'p_value_stats': {
'min': p_values.min(),
'max': p_values.max(),
'mean': p_values.mean(),
'std': p_values.std()
}
}
# Add performance metrics if ground truth available
if y_true is not None:
perf_metrics = calculate_performance_metrics(y_true, discoveries)
metrics.update(perf_metrics)
self.metrics_history.append(metrics)
return metrics
Production Deployment¶
1. Model Updates and Drift Detection¶
from sklearn.metrics import accuracy_score
from scipy.stats import ks_2samp
class ModelDriftDetector:
"""Detect when model needs updating due to drift."""
def __init__(self, baseline_data, drift_threshold=0.05):
self.baseline_data = baseline_data
self.drift_threshold = drift_threshold
def detect_drift(self, new_data):
"""Detect distribution drift using KS test."""
drift_detected = False
p_values = []
for i in range(new_data.shape[1]):
_, p_value = ks_2samp(
self.baseline_data[:, i],
new_data[:, i]
)
p_values.append(p_value)
if p_value < self.drift_threshold:
drift_detected = True
return drift_detected, p_values
2. Scalable Batch Processing¶
class ScalableAnomalyDetector:
"""Scalable anomaly detection for production."""
def __init__(self, detector_config, batch_size=1000):
self.detector_config = detector_config
self.batch_size = batch_size
self.detector = None
def fit(self, X_train):
"""Fit detector on training data."""
# Use appropriate strategy based on data size
strategy = choose_calibration_strategy(len(X_train))
self.detector = ConformalDetector(
detector=self.detector_config['detector'],
strategy=strategy,
aggregation=self.detector_config['aggregation'],
seed=self.detector_config['seed'],
silent=self.detector_config.get('silent', True)
)
self.detector.fit(X_train)
def predict_batch(self, X_test):
"""Predict on large datasets using batching."""
import itertools
all_p_values = []
for batch in itertools.batched(X_test, self.batch_size):
batch_p_values = self.detector.predict(batch, raw=False)
all_p_values.extend(batch_p_values)
return np.array(all_p_values)
Code Organization¶
1. Configuration Management¶
from dataclasses import dataclass
from typing import Optional
from nonconform.utils.func.enums import Aggregation
@dataclass
class AnomalyDetectionConfig:
"""Configuration for anomaly detection pipeline."""
alpha: float = 0.05
calibration_size: float = 0.2 # Can be float (ratio) or int (absolute)
detector_type: str = "iforest"
aggregation: Aggregation = Aggregation.MEDIAN
seed: int = 42
silent: bool = True
batch_size: int = 1000
fdr_method: str = 'bh'
def __post_init__(self):
"""Validate configuration."""
if not 0 < self.alpha < 1:
raise ValueError("Alpha must be between 0 and 1")
if isinstance(self.calibration_size, float) and not 0 < self.calibration_size < 1:
raise ValueError("Calibration size ratio must be between 0 and 1")
2. Complete Pipeline Implementation¶
from pyod.models.iforest import IForest
from pyod.models.lof import LOF
from pyod.models.ocsvm import OCSVM
class AnomalyDetectionPipeline:
"""Complete anomaly detection pipeline."""
DETECTOR_MAP = {
'iforest': IForest,
'lof': LOF,
'ocsvm': OCSVM
}
def __init__(self, config: AnomalyDetectionConfig):
self.config = config
self.detector = None
self.performance_monitor = PerformanceMonitor()
self.drift_detector = None
def _create_detector(self):
"""Create base detector from configuration."""
detector_class = self.DETECTOR_MAP[self.config.detector_type]
return detector_class(contamination=0.1)
def _create_strategy(self, n_samples):
"""Create strategy based on dataset size."""
return choose_calibration_strategy(n_samples)
def fit(self, X_train):
"""Fit the complete pipeline."""
# Validate and prepare data
X_clean = validate_calibration_data(X_train)
# Create components
base_detector = self._create_detector()
strategy = self._create_strategy(len(X_clean))
# Create conformal detector
self.detector = ConformalDetector(
detector=base_detector,
strategy=strategy,
aggregation=self.config.aggregation,
seed=self.config.seed,
silent=self.config.silent
)
# Fit detector
self.detector.fit(X_clean)
# Initialize drift detector
self.drift_detector = ModelDriftDetector(X_clean)
print(f"Pipeline fitted with {len(X_clean)} samples")
print(f"Strategy: {type(strategy).__name__}")
print(f"Calibration set size: {len(self.detector.calibration_set)}")
def predict(self, X_test, y_true=None, check_drift=True):
"""Make predictions with full monitoring."""
if self.detector is None:
raise ValueError("Pipeline must be fitted before prediction")
# Check for drift
if check_drift and self.drift_detector:
drift_detected, _ = self.drift_detector.detect_drift(X_test)
if drift_detected:
print("WARNING: Distribution drift detected!")
# Make predictions
if len(X_test) > self.config.batch_size:
# Use batch processing for large datasets
p_values = self._predict_batch(X_test)
else:
p_values = self.detector.predict(X_test, raw=False)
# Apply FDR control
discoveries, adjusted_p_values = apply_fdr_control(
p_values,
alpha=self.config.alpha,
method=self.config.fdr_method
)
# Monitor performance
metrics = self.performance_monitor.monitor_prediction(
self.detector, X_test, y_true
)
return {
'discoveries': discoveries,
'p_values': p_values,
'adjusted_p_values': adjusted_p_values,
'metrics': metrics
}
def _predict_batch(self, X_test):
"""Batch prediction for large datasets."""
scalable_detector = ScalableAnomalyDetector(
{
'detector': self._create_detector(),
'aggregation': self.config.aggregation,
'seed': self.config.seed,
'silent': self.config.silent
},
batch_size=self.config.batch_size
)
# Note: In practice, you'd want to reuse the fitted detector
# This is simplified for demonstration
return scalable_detector.predict_batch(X_test)
3. Usage Example¶
# Configuration
config = AnomalyDetectionConfig(
alpha=0.05,
calibration_size=0.2,
detector_type="iforest",
aggregation=Aggregation.MEDIAN,
fdr_method='bh'
)
# Create and use pipeline
pipeline = AnomalyDetectionPipeline(config)
pipeline.fit(X_train)
# Make predictions
results = pipeline.predict(X_test, y_true=y_test)
print(f"Discoveries: {results['discoveries'].sum()}")
print(f"Performance metrics: {results['metrics']}")
This comprehensive approach ensures robust, scalable, and maintainable anomaly detection systems using the new nonconform API.