Pipeline Construction¶
Building effective anomaly detection pipelines requires combining multiple components in a systematic way. This guide covers pipeline design patterns, composition techniques, and best practices for creating robust detection systems.
Pipeline Architecture¶
Basic Pipeline Structure¶
A typical ONAD pipeline consists of several stages:
- Data Ingestion: Loading and streaming data
- Preprocessing: Feature scaling and transformation
- Feature Engineering: Dimensionality reduction and selection
- Anomaly Detection: Model scoring and threshold application
- Post-processing: Result interpretation and actions
# Basic pipeline structure
from onad.transform.preprocessing.scaler import StandardScaler
from onad.transform.projection.incremental_pca import IncrementalPCA
from onad.model.iforest import OnlineIsolationForest
# Create pipeline components
scaler = StandardScaler()
pca = IncrementalPCA(n_components=10)
detector = OnlineIsolationForest()
# Process data through pipeline
for data_point in stream:
# Stage 1: Preprocessing
scaler.learn_one(data_point)
scaled_data = scaler.transform_one(data_point)
# Stage 2: Feature engineering
pca.learn_one(scaled_data)
reduced_data = pca.transform_one(scaled_data)
# Stage 3: Anomaly detection
detector.learn_one(reduced_data)
anomaly_score = detector.score_one(reduced_data)
# Stage 4: Decision making
if anomaly_score > threshold:
handle_anomaly(data_point, anomaly_score)
Pipeline Design Patterns¶
Sequential Pipeline¶
The most common pattern chains components sequentially:
class SequentialPipeline:
def __init__(self, components):
self.components = components
def learn_one(self, x):
"""Learn from one data point through all components"""
current_data = x
for component in self.components:
if hasattr(component, 'learn_one'):
component.learn_one(current_data)
if hasattr(component, 'transform_one'):
current_data = component.transform_one(current_data)
def score_one(self, x):
"""Score one data point through the pipeline"""
current_data = x
# Transform through preprocessing stages
for component in self.components[:-1]: # All except last
if hasattr(component, 'transform_one'):
current_data = component.transform_one(current_data)
# Score with final model
final_model = self.components[-1]
return final_model.score_one(current_data)
# Usage
pipeline = SequentialPipeline([
StandardScaler(),
IncrementalPCA(n_components=5),
OnlineIsolationForest()
])
for data_point in stream:
pipeline.learn_one(data_point)
score = pipeline.score_one(data_point)
Parallel Pipeline¶
Process data through multiple parallel paths:
from typing import List, Dict
import numpy as np
class ParallelPipeline:
def __init__(self, pipelines: List[SequentialPipeline], weights: List[float] = None):
self.pipelines = pipelines
self.weights = weights or [1.0] * len(pipelines)
def learn_one(self, x):
"""Learn from data point in all parallel pipelines"""
for pipeline in self.pipelines:
pipeline.learn_one(x)
def score_one(self, x) -> float:
"""Get weighted average score from all pipelines"""
scores = [pipeline.score_one(x) for pipeline in self.pipelines]
weighted_scores = [s * w for s, w in zip(scores, self.weights)]
return sum(weighted_scores) / sum(self.weights)
# Create parallel pipelines with different preprocessing
pipeline1 = SequentialPipeline([
StandardScaler(),
OnlineIsolationForest(num_trees=50)
])
pipeline2 = SequentialPipeline([
MinMaxScaler(),
IncrementalKNN(k=10)
])
# Combine pipelines
ensemble = ParallelPipeline([pipeline1, pipeline2], weights=[0.7, 0.3])
for data_point in stream:
ensemble.learn_one(data_point)
combined_score = ensemble.score_one(data_point)
Conditional Pipeline¶
Apply different processing based on data characteristics:
class ConditionalPipeline:
def __init__(self, condition_func, pipeline_true, pipeline_false):
self.condition_func = condition_func
self.pipeline_true = pipeline_true
self.pipeline_false = pipeline_false
def _select_pipeline(self, x):
"""Select pipeline based on condition"""
return self.pipeline_true if self.condition_func(x) else self.pipeline_false
def learn_one(self, x):
pipeline = self._select_pipeline(x)
pipeline.learn_one(x)
def score_one(self, x):
pipeline = self._select_pipeline(x)
return pipeline.score_one(x)
# Example: Different processing for high/low dimensional data
def is_high_dimensional(x):
return len(x) > 50
high_dim_pipeline = SequentialPipeline([
StandardScaler(),
IncrementalPCA(n_components=20),
OnlineIsolationForest()
])
low_dim_pipeline = SequentialPipeline([
StandardScaler(),
IncrementalKNN(k=5)
])
conditional = ConditionalPipeline(
is_high_dimensional,
high_dim_pipeline,
low_dim_pipeline
)
Advanced Pipeline Components¶
Feature Selection Pipeline¶
from typing import Set
class FeatureSelector:
def __init__(self, selected_features: Set[str]):
self.selected_features = selected_features
def transform_one(self, x: Dict[str, float]) -> Dict[str, float]:
"""Select only specified features"""
return {k: v for k, v in x.items() if k in self.selected_features}
# Usage in pipeline
important_features = {'temperature', 'pressure', 'vibration'}
pipeline = SequentialPipeline([
FeatureSelector(important_features),
StandardScaler(),
OnlineIsolationForest()
])
Validation Pipeline¶
import numpy as np
import logging
class DataValidator:
def __init__(self, required_features: Set[str], value_ranges: Dict[str, tuple]):
self.required_features = required_features
self.value_ranges = value_ranges
self.logger = logging.getLogger(__name__)
def validate_one(self, x: Dict[str, float]) -> bool:
"""Validate a single data point"""
try:
# Check required features
missing_features = self.required_features - set(x.keys())
if missing_features:
self.logger.warning(f"Missing features: {missing_features}")
return False
# Check value ranges
for feature, value in x.items():
if feature in self.value_ranges:
min_val, max_val = self.value_ranges[feature]
if not (min_val <= value <= max_val):
self.logger.warning(f"Feature {feature} value {value} outside range [{min_val}, {max_val}]")
return False
# Check for invalid values
if np.isnan(value) or np.isinf(value):
self.logger.warning(f"Invalid value for feature {feature}: {value}")
return False
return True
except Exception as e:
self.logger.error(f"Validation error: {e}")
return False
# Usage
validator = DataValidator(
required_features={'temperature', 'pressure'},
value_ranges={'temperature': (-50, 200), 'pressure': (0, 1000)}
)
for data_point in stream:
if validator.validate_one(data_point):
pipeline.learn_one(data_point)
score = pipeline.score_one(data_point)
Anomaly Explanation Pipeline¶
class AnomalyExplainer:
def __init__(self, feature_names: List[str]):
self.feature_names = feature_names
self.baseline_stats = {}
def learn_one(self, x: Dict[str, float]):
"""Learn baseline statistics"""
for feature, value in x.items():
if feature not in self.baseline_stats:
self.baseline_stats[feature] = {'sum': 0, 'sum_sq': 0, 'count': 0}
stats = self.baseline_stats[feature]
stats['sum'] += value
stats['sum_sq'] += value ** 2
stats['count'] += 1
def explain_anomaly(self, x: Dict[str, float]) -> Dict[str, float]:
"""Provide feature-level anomaly scores"""
explanation = {}
for feature, value in x.items():
if feature in self.baseline_stats:
stats = self.baseline_stats[feature]
# Calculate z-score
mean = stats['sum'] / stats['count']
variance = (stats['sum_sq'] / stats['count']) - mean ** 2
std = np.sqrt(max(variance, 1e-8))
z_score = abs(value - mean) / std
explanation[feature] = z_score
return explanation
# Usage
explainer = AnomalyExplainer(['temperature', 'pressure', 'vibration'])
for data_point in stream:
# Normal processing
pipeline.learn_one(data_point)
score = pipeline.score_one(data_point)
# Explanation for anomalies
explainer.learn_one(data_point)
if score > threshold:
explanation = explainer.explain_anomaly(data_point)
print(f"Anomaly explanation: {explanation}")
Real-world Pipeline Examples¶
IoT Sensor Monitoring¶
class IoTMonitoringPipeline:
def __init__(self):
# Feature selection for sensor data
self.feature_selector = FeatureSelector({
'temperature', 'humidity', 'pressure', 'vibration'
})
# Data validation
self.validator = DataValidator(
required_features={'temperature', 'humidity'},
value_ranges={
'temperature': (-40, 85),
'humidity': (0, 100),
'pressure': (300, 1100)
}
)
# Preprocessing pipeline
self.scaler = StandardScaler()
self.pca = IncrementalPCA(n_components=3)
# Anomaly detection
self.detector = OnlineIsolationForest(num_trees=100, window_size=2000)
# Explanation
self.explainer = AnomalyExplainer(['temperature', 'humidity', 'pressure', 'vibration'])
# Logging
self.logger = logging.getLogger(__name__)
def process_sensor_reading(self, reading: Dict[str, float]) -> Dict[str, any]:
"""Process a single sensor reading"""
try:
# Stage 1: Feature selection
selected_features = self.feature_selector.transform_one(reading)
# Stage 2: Validation
if not self.validator.validate_one(selected_features):
return {'status': 'invalid', 'score': 0.0}
# Stage 3: Preprocessing
self.scaler.learn_one(selected_features)
scaled = self.scaler.transform_one(selected_features)
self.pca.learn_one(scaled)
reduced = self.pca.transform_one(scaled)
# Stage 4: Anomaly detection
self.detector.learn_one(reduced)
score = self.detector.score_one(reduced)
# Stage 5: Explanation
self.explainer.learn_one(selected_features)
explanation = None
if score > 0.7: # Anomaly threshold
explanation = self.explainer.explain_anomaly(selected_features)
self.logger.warning(f"Anomaly detected: score={score:.3f}, explanation={explanation}")
return {
'status': 'processed',
'score': score,
'is_anomaly': score > 0.7,
'explanation': explanation,
'processed_features': reduced
}
except Exception as e:
self.logger.error(f"Pipeline error: {e}")
return {'status': 'error', 'error': str(e)}
# Usage
pipeline = IoTMonitoringPipeline()
for sensor_reading in sensor_stream:
result = pipeline.process_sensor_reading(sensor_reading)
if result['is_anomaly']:
print(f"ALERT: Sensor anomaly detected!")
print(f"Score: {result['score']:.3f}")
print(f"Explanation: {result['explanation']}")
Network Security Pipeline¶
class NetworkSecurityPipeline:
def __init__(self):
# Multiple detection models
self.statistical_detector = MovingMahalanobisDistance(window_size=1000)
self.forest_detector = OnlineIsolationForest(num_trees=200)
self.knn_detector = IncrementalKNN(k=15)
# Preprocessing
self.scaler = StandardScaler()
# Feature weights for different attack types
self.feature_weights = {
'packet_size': 1.5,
'connection_duration': 2.0,
'bytes_transferred': 1.2,
'protocol_type': 0.8
}
self.attack_threshold = 0.8
self.logger = logging.getLogger(__name__)
def weight_features(self, features: Dict[str, float]) -> Dict[str, float]:
"""Apply feature weights"""
return {k: v * self.feature_weights.get(k, 1.0) for k, v in features.items()}
def detect_intrusion(self, network_data: Dict[str, float]) -> Dict[str, any]:
"""Detect network intrusions"""
try:
# Preprocessing
weighted_features = self.weight_features(network_data)
self.scaler.learn_one(weighted_features)
scaled_features = self.scaler.transform_one(weighted_features)
# Multiple detector scores
scores = {}
# Statistical detection
self.statistical_detector.learn_one(scaled_features)
scores['statistical'] = self.statistical_detector.score_one(scaled_features)
# Forest-based detection
self.forest_detector.learn_one(scaled_features)
scores['iforest'] = self.forest_detector.score_one(scaled_features)
# Distance-based detection
self.knn_detector.learn_one(scaled_features)
scores['knn'] = self.knn_detector.score_one(scaled_features)
# Combine scores (weighted average)
weights = [0.4, 0.4, 0.2] # statistical, iforest, knn
combined_score = sum(s * w for s, w in zip(scores.values(), weights))
# Detection decision
is_attack = combined_score > self.attack_threshold
if is_attack:
self.logger.critical(f"INTRUSION DETECTED: {combined_score:.3f}")
return {
'is_attack': is_attack,
'combined_score': combined_score,
'individual_scores': scores,
'confidence': min(combined_score / self.attack_threshold, 1.0)
}
except Exception as e:
self.logger.error(f"Detection error: {e}")
return {'is_attack': False, 'error': str(e)}
# Usage
security_pipeline = NetworkSecurityPipeline()
for connection_data in network_stream:
result = security_pipeline.detect_intrusion(connection_data)
if result['is_attack']:
# Trigger security response
trigger_security_alert(connection_data, result)
Pipeline Optimization¶
Memory Optimization¶
class MemoryOptimizedPipeline:
def __init__(self, max_memory_mb=1000):
self.max_memory_mb = max_memory_mb
self.components = []
self.processing_count = 0
def add_component(self, component, reset_interval=10000):
"""Add component with memory management"""
self.components.append({
'component': component,
'reset_interval': reset_interval,
'last_reset': 0
})
def _check_memory_and_reset(self):
"""Check memory usage and reset components if needed"""
import psutil
import os
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
if memory_usage > self.max_memory_mb:
for comp_info in self.components:
# Reset component to original state
component_class = type(comp_info['component'])
comp_info['component'] = component_class()
comp_info['last_reset'] = self.processing_count
print(f"Memory limit reached ({memory_usage:.1f} MB), components reset")
def process_one(self, x):
"""Process one data point through optimized pipeline"""
current_data = x
for comp_info in self.components:
component = comp_info['component']
# Regular processing
if hasattr(component, 'learn_one'):
component.learn_one(current_data)
if hasattr(component, 'transform_one'):
current_data = component.transform_one(current_data)
elif hasattr(component, 'score_one'):
return component.score_one(current_data)
self.processing_count += 1
# Periodic memory check
if self.processing_count % 1000 == 0:
self._check_memory_and_reset()
return current_data
Performance Monitoring¶
import time
from collections import deque
class MonitoredPipeline:
def __init__(self, pipeline, monitoring_window=1000):
self.pipeline = pipeline
self.processing_times = deque(maxlen=monitoring_window)
self.error_count = 0
self.total_processed = 0
def process_one(self, x):
"""Process with performance monitoring"""
start_time = time.time()
try:
result = self.pipeline.score_one(x)
self.pipeline.learn_one(x)
processing_time = time.time() - start_time
self.processing_times.append(processing_time)
self.total_processed += 1
return result
except Exception as e:
self.error_count += 1
self.total_processed += 1
raise
def get_performance_stats(self):
"""Get pipeline performance statistics"""
if not self.processing_times:
return {}
times = list(self.processing_times)
return {
'avg_processing_time': sum(times) / len(times),
'max_processing_time': max(times),
'min_processing_time': min(times),
'throughput_per_sec': len(times) / sum(times) if sum(times) > 0 else 0,
'error_rate': self.error_count / self.total_processed if self.total_processed > 0 else 0,
'total_processed': self.total_processed
}
Pipeline Design Best Practices
- Start Simple: Begin with basic sequential pipelines, add complexity as needed
- Validate Early: Place validation components early in the pipeline
- Monitor Performance: Track processing times and memory usage
- Handle Errors Gracefully: Implement error handling at each stage
- Document Components: Clear documentation helps with maintenance and debugging
Common Pipeline Issues
- Data Leakage: Don't use future information for current predictions
- Memory Leaks: Monitor memory usage, especially with long-running pipelines
- Component Mismatch: Ensure output of one stage matches input of next
- Error Propagation: One failing component shouldn't crash the entire pipeline