Best Practices¶
This guide provides proven strategies for deploying ONAD in production environments, optimizing performance, and ensuring reliable anomaly detection systems.
Production Deployment¶
System Architecture¶
Microservices Design
import logging
from typing import Dict, Any, Optional
from dataclasses import dataclass
import json
@dataclass
class AnomalyDetectionConfig:
model_type: str
model_params: Dict[str, Any]
preprocessing_steps: list
thresholds: Dict[str, float]
memory_limit_mb: int = 1000
checkpoint_interval: int = 10000
class AnomalyDetectionService:
def __init__(self, config: AnomalyDetectionConfig):
self.config = config
self.logger = logging.getLogger(__name__)
# Initialize components
self.model = self._create_model()
self.preprocessors = self._create_preprocessors()
# Monitoring
self.processed_count = 0
self.anomaly_count = 0
self.last_checkpoint = 0
def _create_model(self):
"""Factory method for model creation"""
model_map = {
'isolation_forest': OnlineIsolationForest,
'adaptive_svm': IncrementalOneClassSVMAdaptiveKernel,
'knn': IncrementalKNN
}
model_class = model_map.get(self.config.model_type)
if not model_class:
raise ValueError(f"Unknown model type: {self.config.model_type}")
return model_class(**self.config.model_params)
def _create_preprocessors(self):
"""Create preprocessing pipeline"""
preprocessors = []
for step in self.config.preprocessing_steps:
if step['type'] == 'scaler':
from onad.transform.preprocessing.scaler import StandardScaler
preprocessors.append(StandardScaler(**step.get('params', {})))
elif step['type'] == 'pca':
from onad.transform.projection.incremental_pca import IncrementalPCA
preprocessors.append(IncrementalPCA(**step.get('params', {})))
return preprocessors
def process_data_point(self, data: Dict[str, float]) -> Dict[str, Any]:
"""Process single data point"""
try:
# Preprocessing
processed_data = data.copy()
for preprocessor in self.preprocessors:
preprocessor.learn_one(processed_data)
processed_data = preprocessor.transform_one(processed_data)
# Anomaly detection
self.model.learn_one(processed_data)
score = self.model.score_one(processed_data)
# Thresholding
is_anomaly = score > self.config.thresholds.get('anomaly', 0.7)
severity = self._calculate_severity(score)
# Update counters
self.processed_count += 1
if is_anomaly:
self.anomaly_count += 1
# Periodic operations
if self.processed_count % self.config.checkpoint_interval == 0:
self._checkpoint_model()
result = {
'anomaly_score': score,
'is_anomaly': is_anomaly,
'severity': severity,
'timestamp': time.time(),
'model_state': self._get_model_info()
}
if is_anomaly:
self.logger.warning(f"Anomaly detected: score={score:.3f}")
return result
except Exception as e:
self.logger.error(f"Processing error: {e}")
return {'error': str(e), 'timestamp': time.time()}
def _calculate_severity(self, score: float) -> str:
"""Calculate anomaly severity level"""
thresholds = self.config.thresholds
if score > thresholds.get('critical', 0.9):
return 'critical'
elif score > thresholds.get('high', 0.8):
return 'high'
elif score > thresholds.get('medium', 0.7):
return 'medium'
else:
return 'low'
def _checkpoint_model(self):
"""Save model checkpoint"""
import pickle
import os
checkpoint_dir = 'checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)
checkpoint_path = f"{checkpoint_dir}/model_{self.processed_count}.pkl"
with open(checkpoint_path, 'wb') as f:
pickle.dump({
'model': self.model,
'preprocessors': self.preprocessors,
'processed_count': self.processed_count,
'config': self.config
}, f)
self.logger.info(f"Model checkpoint saved: {checkpoint_path}")
def _get_model_info(self) -> Dict[str, Any]:
"""Get model state information"""
return {
'processed_count': self.processed_count,
'anomaly_count': self.anomaly_count,
'anomaly_rate': self.anomaly_count / max(self.processed_count, 1),
'memory_usage': self._get_memory_usage()
}
def _get_memory_usage(self) -> float:
"""Get current memory usage in MB"""
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
# Configuration example
config = AnomalyDetectionConfig(
model_type='isolation_forest',
model_params={'num_trees': 100, 'window_size': 2000},
preprocessing_steps=[
{'type': 'scaler', 'params': {}},
{'type': 'pca', 'params': {'n_components': 10}}
],
thresholds={'medium': 0.6, 'high': 0.8, 'critical': 0.9}
)
# Service usage
service = AnomalyDetectionService(config)
result = service.process_data_point({'temperature': 25.5, 'pressure': 1013.25})
Error Handling and Resilience¶
Graceful Degradation
import time
from enum import Enum
from typing import Optional
class ServiceState(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
FAILED = "failed"
class ResilientAnomalyDetector:
def __init__(self, primary_model, fallback_model=None):
self.primary_model = primary_model
self.fallback_model = fallback_model
self.state = ServiceState.HEALTHY
# Error tracking
self.error_count = 0
self.error_window = 100
self.error_threshold = 0.1 # 10% error rate
self.last_errors = deque(maxlen=self.error_window)
# Circuit breaker
self.circuit_breaker_open = False
self.circuit_breaker_timeout = 300 # 5 minutes
self.circuit_breaker_opened_at = None
self.logger = logging.getLogger(__name__)
def process_with_resilience(self, data_point: Dict[str, float]) -> Dict[str, Any]:
"""Process data point with error handling and fallback"""
try:
# Check circuit breaker
if self._should_use_circuit_breaker():
return self._fallback_processing(data_point, "circuit_breaker")
# Try primary model
result = self._process_with_primary(data_point)
self._record_success()
return result
except Exception as e:
self._record_error(e)
# Try fallback processing
return self._fallback_processing(data_point, str(e))
def _process_with_primary(self, data_point: Dict[str, float]) -> Dict[str, Any]:
"""Process with primary model"""
self.primary_model.learn_one(data_point)
score = self.primary_model.score_one(data_point)
return {
'score': score,
'model': 'primary',
'timestamp': time.time(),
'state': self.state.value
}
def _fallback_processing(self, data_point: Dict[str, float], reason: str) -> Dict[str, Any]:
"""Fallback processing when primary fails"""
self.logger.warning(f"Using fallback processing: {reason}")
try:
if self.fallback_model:
self.fallback_model.learn_one(data_point)
score = self.fallback_model.score_one(data_point)
model_used = 'fallback'
else:
# Simple statistical fallback
score = self._simple_statistical_anomaly_score(data_point)
model_used = 'statistical'
return {
'score': score,
'model': model_used,
'timestamp': time.time(),
'fallback_reason': reason,
'state': ServiceState.DEGRADED.value
}
except Exception as e:
self.logger.error(f"Fallback processing failed: {e}")
return {
'score': 0.0,
'model': 'none',
'error': str(e),
'timestamp': time.time(),
'state': ServiceState.FAILED.value
}
def _simple_statistical_anomaly_score(self, data_point: Dict[str, float]) -> float:
"""Simple statistical anomaly detection as last resort"""
# Use z-score based on running statistics
values = list(data_point.values())
if not hasattr(self, '_running_mean'):
self._running_mean = np.mean(values)
self._running_std = 1.0
self._count = 1
return 0.0
# Update running statistics
current_mean = np.mean(values)
self._count += 1
alpha = 1.0 / self._count
self._running_mean = (1 - alpha) * self._running_mean + alpha * current_mean
# Calculate anomaly score based on deviation
deviation = abs(current_mean - self._running_mean)
return min(deviation / (self._running_std + 1e-8), 1.0)
def _record_error(self, error: Exception):
"""Record error for circuit breaker logic"""
self.error_count += 1
self.last_errors.append(time.time())
# Check if we should open circuit breaker
recent_error_rate = len(self.last_errors) / self.error_window
if recent_error_rate > self.error_threshold:
self._open_circuit_breaker()
self.logger.error(f"Model error: {error}")
def _record_success(self):
"""Record successful processing"""
if self.circuit_breaker_open:
# Try to close circuit breaker
self._close_circuit_breaker()
def _should_use_circuit_breaker(self) -> bool:
"""Check if circuit breaker should prevent primary model use"""
if not self.circuit_breaker_open:
return False
# Check if timeout has passed
if (time.time() - self.circuit_breaker_opened_at) > self.circuit_breaker_timeout:
self._close_circuit_breaker()
return False
return True
def _open_circuit_breaker(self):
"""Open circuit breaker"""
self.circuit_breaker_open = True
self.circuit_breaker_opened_at = time.time()
self.state = ServiceState.DEGRADED
self.logger.warning("Circuit breaker opened - using fallback processing")
def _close_circuit_breaker(self):
"""Close circuit breaker"""
self.circuit_breaker_open = False
self.circuit_breaker_opened_at = None
self.state = ServiceState.HEALTHY
self.logger.info("Circuit breaker closed - resuming normal processing")
Monitoring and Observability¶
Comprehensive Monitoring
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import Dict, List, Any
@dataclass
class PerformanceMetrics:
processing_times: deque = field(default_factory=lambda: deque(maxlen=1000))
error_rates: deque = field(default_factory=lambda: deque(maxlen=1000))
anomaly_rates: deque = field(default_factory=lambda: deque(maxlen=1000))
memory_usage: deque = field(default_factory=lambda: deque(maxlen=1000))
throughput: deque = field(default_factory=lambda: deque(maxlen=1000))
class MonitoringService:
def __init__(self, alert_thresholds: Dict[str, float]):
self.metrics = PerformanceMetrics()
self.alert_thresholds = alert_thresholds
self.logger = logging.getLogger(__name__)
# Alert state tracking
self.active_alerts = set()
self.alert_cooldown = 300 # 5 minutes
self.last_alert_times = defaultdict(float)
def record_processing_time(self, processing_time: float):
"""Record processing time for monitoring"""
self.metrics.processing_times.append(processing_time)
# Check for performance degradation
if len(self.metrics.processing_times) >= 100:
avg_time = sum(list(self.metrics.processing_times)[-100:]) / 100
if avg_time > self.alert_thresholds.get('slow_processing', 0.1):
self._trigger_alert('slow_processing', f"Average processing time: {avg_time:.3f}s")
def record_error_rate(self, error_count: int, total_count: int):
"""Record error rate"""
error_rate = error_count / max(total_count, 1)
self.metrics.error_rates.append(error_rate)
if error_rate > self.alert_thresholds.get('high_error_rate', 0.05):
self._trigger_alert('high_error_rate', f"Error rate: {error_rate:.1%}")
def record_anomaly_rate(self, anomaly_count: int, total_count: int):
"""Record anomaly detection rate"""
anomaly_rate = anomaly_count / max(total_count, 1)
self.metrics.anomaly_rates.append(anomaly_rate)
# Alert for unusual anomaly rates
if len(self.metrics.anomaly_rates) >= 100:
recent_rate = sum(list(self.metrics.anomaly_rates)[-100:]) / 100
if recent_rate > self.alert_thresholds.get('high_anomaly_rate', 0.1):
self._trigger_alert('high_anomaly_rate', f"Anomaly rate: {recent_rate:.1%}")
elif recent_rate < self.alert_thresholds.get('low_anomaly_rate', 0.001):
self._trigger_alert('low_anomaly_rate', f"Anomaly rate: {recent_rate:.1%}")
def record_memory_usage(self, memory_mb: float):
"""Record memory usage"""
self.metrics.memory_usage.append(memory_mb)
if memory_mb > self.alert_thresholds.get('high_memory', 1000):
self._trigger_alert('high_memory', f"Memory usage: {memory_mb:.1f} MB")
def record_throughput(self, points_per_second: float):
"""Record processing throughput"""
self.metrics.throughput.append(points_per_second)
if points_per_second < self.alert_thresholds.get('low_throughput', 10):
self._trigger_alert('low_throughput', f"Throughput: {points_per_second:.1f} points/sec")
def _trigger_alert(self, alert_type: str, message: str):
"""Trigger alert with cooldown logic"""
current_time = time.time()
# Check cooldown
if (current_time - self.last_alert_times[alert_type]) < self.alert_cooldown:
return
self.last_alert_times[alert_type] = current_time
self.active_alerts.add(alert_type)
# Log alert
self.logger.critical(f"ALERT [{alert_type}]: {message}")
# Send to external monitoring (implement as needed)
self._send_external_alert(alert_type, message)
def _send_external_alert(self, alert_type: str, message: str):
"""Send alert to external monitoring system"""
# Implement integration with your monitoring system
# Examples: Slack, PagerDuty, email, etc.
pass
def get_health_status(self) -> Dict[str, Any]:
"""Get current system health status"""
if not any(self.metrics.processing_times):
return {'status': 'unknown', 'reason': 'no_data'}
# Calculate current metrics
recent_processing_time = sum(list(self.metrics.processing_times)[-10:]) / min(10, len(self.metrics.processing_times))
recent_memory = list(self.metrics.memory_usage)[-1] if self.metrics.memory_usage else 0
recent_throughput = list(self.metrics.throughput)[-1] if self.metrics.throughput else 0
# Determine health status
if self.active_alerts:
status = 'unhealthy'
elif (recent_processing_time > self.alert_thresholds.get('slow_processing', 0.1) * 0.8 or
recent_memory > self.alert_thresholds.get('high_memory', 1000) * 0.8):
status = 'degraded'
else:
status = 'healthy'
return {
'status': status,
'active_alerts': list(self.active_alerts),
'metrics': {
'avg_processing_time': recent_processing_time,
'memory_usage_mb': recent_memory,
'throughput_per_sec': recent_throughput
},
'timestamp': time.time()
}
# Usage example
alert_thresholds = {
'slow_processing': 0.1, # seconds
'high_error_rate': 0.05, # 5%
'high_anomaly_rate': 0.15, # 15%
'low_anomaly_rate': 0.001, # 0.1%
'high_memory': 1000, # MB
'low_throughput': 10 # points/sec
}
monitoring = MonitoringService(alert_thresholds)
Memory Management¶
Memory-Efficient Processing¶
import gc
import psutil
import os
class MemoryManager:
def __init__(self, max_memory_mb=2000, cleanup_threshold=0.8):
self.max_memory_mb = max_memory_mb
self.cleanup_threshold = cleanup_threshold
self.process = psutil.Process(os.getpid())
self.logger = logging.getLogger(__name__)
def get_memory_usage(self) -> float:
"""Get current memory usage in MB"""
return self.process.memory_info().rss / 1024 / 1024
def should_cleanup(self) -> bool:
"""Check if memory cleanup is needed"""
current_memory = self.get_memory_usage()
return current_memory > (self.max_memory_mb * self.cleanup_threshold)
def force_cleanup(self):
"""Force garbage collection and memory cleanup"""
before_memory = self.get_memory_usage()
# Run garbage collection
collected = gc.collect()
after_memory = self.get_memory_usage()
freed_memory = before_memory - after_memory
self.logger.info(
f"Memory cleanup: freed {freed_memory:.1f} MB, "
f"collected {collected} objects"
)
return freed_memory
def monitor_component_memory(self, component, component_name: str):
"""Monitor memory usage of specific component"""
if hasattr(component, '__sizeof__'):
component_size = component.__sizeof__() / 1024 / 1024 # MB
if component_size > 100: # Log if component uses >100MB
self.logger.warning(
f"Component {component_name} using {component_size:.1f} MB"
)
class MemoryAwareModel:
def __init__(self, base_model, memory_manager: MemoryManager, reset_threshold_mb=500):
self.base_model = base_model
self.memory_manager = memory_manager
self.reset_threshold_mb = reset_threshold_mb
self.processed_count = 0
def learn_one(self, x):
"""Learn with memory monitoring"""
self.base_model.learn_one(x)
self.processed_count += 1
# Periodic memory check
if self.processed_count % 1000 == 0:
current_memory = self.memory_manager.get_memory_usage()
if current_memory > self.reset_threshold_mb:
self._reset_model()
def score_one(self, x):
"""Score with memory monitoring"""
return self.base_model.score_one(x)
def _reset_model(self):
"""Reset model to free memory"""
model_class = type(self.base_model)
self.base_model = model_class()
self.memory_manager.force_cleanup()
self.logger.info(f"Model reset after {self.processed_count} points")
# Usage
memory_manager = MemoryManager(max_memory_mb=2000)
memory_aware_model = MemoryAwareModel(OnlineIsolationForest(), memory_manager)
Efficient Data Structures¶
import numpy as np
from collections import deque
from typing import Dict, Any
class EfficientFeatureStore:
"""Memory-efficient storage for streaming features"""
def __init__(self, max_size=10000, feature_types=None):
self.max_size = max_size
self.feature_types = feature_types or {}
self.data = {}
self.count = 0
def add_point(self, features: Dict[str, Any]):
"""Add data point with automatic type optimization"""
for key, value in features.items():
if key not in self.data:
# Initialize efficient storage based on data type
dtype = self.feature_types.get(key, self._infer_dtype(value))
if dtype == 'category':
self.data[key] = deque(maxlen=self.max_size)
else:
self.data[key] = np.zeros(self.max_size, dtype=dtype)
self.data[key + '_idx'] = 0
# Store value efficiently
if key + '_idx' in self.data: # Numeric array
idx = self.data[key + '_idx']
if idx < self.max_size:
self.data[key][idx] = value
self.data[key + '_idx'] += 1
else:
# Shift array
self.data[key][:-1] = self.data[key][1:]
self.data[key][-1] = value
else: # Deque for categorical
self.data[key].append(value)
self.count += 1
def _infer_dtype(self, value):
"""Infer efficient data type"""
if isinstance(value, bool):
return np.bool_
elif isinstance(value, int) and -128 <= value <= 127:
return np.int8
elif isinstance(value, int) and -32768 <= value <= 32767:
return np.int16
elif isinstance(value, int):
return np.int32
elif isinstance(value, float):
return np.float32
else:
return 'category'
def get_memory_usage(self) -> Dict[str, float]:
"""Get memory usage by feature"""
usage = {}
for key, data in self.data.items():
if isinstance(data, np.ndarray):
usage[key] = data.nbytes / 1024 / 1024 # MB
elif isinstance(data, deque):
usage[key] = len(data) * 8 / 1024 / 1024 # Rough estimate
return usage
Performance Optimization¶
Batch Processing¶
from typing import List, Iterator
import time
class BatchProcessor:
def __init__(self, batch_size=100, timeout_seconds=1.0):
self.batch_size = batch_size
self.timeout_seconds = timeout_seconds
self.batch = []
self.last_batch_time = time.time()
def add_point(self, data_point: Dict[str, float]) -> Iterator[List[Dict[str, float]]]:
"""Add point to batch and yield when ready"""
self.batch.append(data_point)
current_time = time.time()
# Yield batch if size or timeout reached
if (len(self.batch) >= self.batch_size or
(current_time - self.last_batch_time) > self.timeout_seconds):
batch_to_process = self.batch.copy()
self.batch = []
self.last_batch_time = current_time
yield batch_to_process
def flush(self) -> Iterator[List[Dict[str, float]]]:
"""Flush remaining batch"""
if self.batch:
yield self.batch
self.batch = []
class BatchedAnomalyDetector:
def __init__(self, model, batch_processor: BatchProcessor):
self.model = model
self.batch_processor = batch_processor
def process_stream(self, data_stream):
"""Process stream in batches for better performance"""
for data_point in data_stream:
# Add to batch
for batch in self.batch_processor.add_point(data_point):
yield from self._process_batch(batch)
# Process final batch
for batch in self.batch_processor.flush():
yield from self._process_batch(batch)
def _process_batch(self, batch: List[Dict[str, float]]):
"""Process a batch of data points"""
start_time = time.time()
results = []
for data_point in batch:
# Learn and score
self.model.learn_one(data_point)
score = self.model.score_one(data_point)
results.append({
'data': data_point,
'score': score,
'timestamp': time.time()
})
processing_time = time.time() - start_time
throughput = len(batch) / processing_time
# Log batch performance
if throughput < 100: # Alert if <100 points/sec
logging.warning(f"Low batch throughput: {throughput:.1f} points/sec")
yield from results
# Usage
batch_processor = BatchProcessor(batch_size=50, timeout_seconds=0.5)
batched_detector = BatchedAnomalyDetector(model, batch_processor)
for result in batched_detector.process_stream(data_stream):
if result['score'] > 0.8:
handle_anomaly(result)
Caching Strategies¶
from functools import lru_cache
import hashlib
import json
class SmartCacheAnomalyDetector:
def __init__(self, model, cache_size=1000):
self.model = model
self.cache_size = cache_size
# Feature preprocessing cache
self._preprocess_cache = {}
# Score cache for identical inputs
self._score_cache = {}
@lru_cache(maxsize=1000)
def _cached_transform(self, data_hash: str, data_json: str) -> str:
"""Cache expensive transformations"""
data = json.loads(data_json)
# Expensive preprocessing here
return json.dumps(self._expensive_preprocessing(data))
def _expensive_preprocessing(self, data: Dict[str, float]) -> Dict[str, float]:
"""Placeholder for expensive preprocessing"""
# Example: complex feature engineering
result = data.copy()
result['feature_sum'] = sum(data.values())
result['feature_mean'] = sum(data.values()) / len(data)
return result
def _get_data_hash(self, data: Dict[str, float]) -> str:
"""Create hash for data point"""
data_str = json.dumps(data, sort_keys=True)
return hashlib.md5(data_str.encode()).hexdigest()
def score_one_cached(self, data: Dict[str, float]) -> float:
"""Score with caching"""
data_hash = self._get_data_hash(data)
# Check score cache
if data_hash in self._score_cache:
return self._score_cache[data_hash]
# Check preprocessing cache
data_json = json.dumps(data, sort_keys=True)
if data_hash in self._preprocess_cache:
processed_data = self._preprocess_cache[data_hash]
else:
processed_json = self._cached_transform(data_hash, data_json)
processed_data = json.loads(processed_json)
self._preprocess_cache[data_hash] = processed_data
# Score with model
score = self.model.score_one(processed_data)
# Cache score
if len(self._score_cache) < self.cache_size:
self._score_cache[data_hash] = score
return score
def clear_caches(self):
"""Clear all caches"""
self._preprocess_cache.clear()
self._score_cache.clear()
self._cached_transform.cache_clear()
Security Considerations¶
Input Validation and Sanitization¶
import re
from typing import Set, Dict, Any, List
class SecurityValidator:
def __init__(self):
self.max_feature_count = 1000
self.max_string_length = 10000
self.allowed_feature_patterns = [
r'^[a-zA-Z][a-zA-Z0-9_]*$', # Valid identifier
r'^\d+\.\d+\.\d+\.\d+$', # IP address
r'^[0-9a-fA-F-]{36}$' # UUID
]
self.blocked_patterns = [
r'<script.*?>', # Script tags
r'javascript:', # JavaScript URLs
r'eval\(', # Eval calls
r'__.*__' # Python special methods
]
def validate_input(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and sanitize input data"""
if not isinstance(data, dict):
raise ValueError("Input must be a dictionary")
if len(data) > self.max_feature_count:
raise ValueError(f"Too many features: {len(data)} > {self.max_feature_count}")
sanitized = {}
for key, value in data.items():
# Validate key
self._validate_key(key)
# Validate and sanitize value
sanitized_value = self._validate_value(value)
sanitized[key] = sanitized_value
return sanitized
def _validate_key(self, key: str):
"""Validate feature key"""
if not isinstance(key, str):
raise ValueError("Feature keys must be strings")
if len(key) > 100:
raise ValueError("Feature key too long")
# Check against blocked patterns
for pattern in self.blocked_patterns:
if re.search(pattern, key, re.IGNORECASE):
raise ValueError(f"Blocked pattern in key: {key}")
# Check against allowed patterns
key_valid = any(re.match(pattern, key) for pattern in self.allowed_feature_patterns)
if not key_valid:
raise ValueError(f"Invalid feature key format: {key}")
def _validate_value(self, value: Any) -> float:
"""Validate and convert value to safe numeric type"""
if isinstance(value, (int, float)):
if not (-1e10 < value < 1e10): # Reasonable range
raise ValueError("Numeric value out of range")
return float(value)
elif isinstance(value, str):
if len(value) > self.max_string_length:
raise ValueError("String value too long")
# Check for blocked patterns
for pattern in self.blocked_patterns:
if re.search(pattern, value, re.IGNORECASE):
raise ValueError("Blocked content in string value")
# Try to convert to float
try:
return float(value)
except ValueError:
# Hash string to numeric value
import hashlib
hash_val = int(hashlib.md5(value.encode()).hexdigest()[:8], 16)
return float(hash_val % 10000) # Normalize to reasonable range
else:
raise ValueError(f"Unsupported value type: {type(value)}")
class SecureAnomalyDetectionService:
def __init__(self, model, rate_limit_per_minute=1000):
self.model = model
self.validator = SecurityValidator()
self.rate_limiter = self._create_rate_limiter(rate_limit_per_minute)
self.logger = logging.getLogger(__name__)
# Security monitoring
self.security_events = deque(maxlen=1000)
def _create_rate_limiter(self, requests_per_minute):
"""Create rate limiter"""
from collections import defaultdict
import time
class RateLimiter:
def __init__(self, max_requests):
self.max_requests = max_requests
self.requests = defaultdict(list)
def is_allowed(self, client_id: str) -> bool:
now = time.time()
minute_ago = now - 60
# Clean old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > minute_ago
]
# Check limit
if len(self.requests[client_id]) >= self.max_requests:
return False
# Record request
self.requests[client_id].append(now)
return True
return RateLimiter(requests_per_minute)
def process_secure(self, data: Dict[str, Any], client_id: str = "default") -> Dict[str, Any]:
"""Process data with security validation"""
try:
# Rate limiting
if not self.rate_limiter.is_allowed(client_id):
self._log_security_event("rate_limit_exceeded", client_id)
raise ValueError("Rate limit exceeded")
# Input validation
validated_data = self.validator.validate_input(data)
# Normal processing
self.model.learn_one(validated_data)
score = self.model.score_one(validated_data)
return {
'score': score,
'timestamp': time.time(),
'status': 'success'
}
except ValueError as e:
# Security violation
self._log_security_event("validation_error", client_id, str(e))
return {
'error': 'Invalid input',
'timestamp': time.time(),
'status': 'rejected'
}
except Exception as e:
# System error
self.logger.error(f"Processing error: {e}")
return {
'error': 'Processing failed',
'timestamp': time.time(),
'status': 'error'
}
def _log_security_event(self, event_type: str, client_id: str, details: str = ""):
"""Log security event"""
event = {
'type': event_type,
'client_id': client_id,
'timestamp': time.time(),
'details': details
}
self.security_events.append(event)
self.logger.warning(f"Security event: {event}")
def get_security_summary(self) -> Dict[str, Any]:
"""Get security event summary"""
if not self.security_events:
return {'total_events': 0}
event_types = {}
for event in self.security_events:
event_types[event['type']] = event_types.get(event['type'], 0) + 1
return {
'total_events': len(self.security_events),
'event_types': event_types,
'recent_events': list(self.security_events)[-10:] # Last 10 events
}
# Usage
secure_service = SecureAnomalyDetectionService(model, rate_limit_per_minute=500)
result = secure_service.process_secure(data_point, client_id="user_123")
Production Checklist
- [ ] Implement comprehensive error handling and circuit breakers
- [ ] Set up monitoring and alerting for key metrics
- [ ] Configure appropriate memory limits and cleanup
- [ ] Validate and sanitize all inputs
- [ ] Implement rate limiting and security measures
- [ ] Set up checkpointing for long-running processes
- [ ] Test failover and recovery procedures
- [ ] Document configuration and operational procedures
Common Production Issues
- Memory leaks: Monitor memory usage and implement cleanup
- Performance degradation: Use batch processing and caching
- Security vulnerabilities: Always validate inputs
- Single points of failure: Implement redundancy and failover