Skip to content

Stream Processing

ONAD is designed for processing continuous data streams efficiently. This guide covers data loading, streaming utilities, and best practices for handling real-time and batch data.

Core Streaming Concepts

Streaming vs Batch Processing

Traditional Batch Processing: - Load entire dataset into memory - Process all data at once - Suitable for offline analysis

Streaming Processing (ONAD approach): - Process one data point at a time - Constant memory usage - Real-time results - Handles infinite streams

Stream Interface

All ONAD components follow a consistent streaming interface:

# Universal pattern for streaming processing
for data_point in stream:
    # Learn from the data point
    model.learn_one(data_point)

    # Get anomaly score
    score = model.score_one(data_point)

    # Process result
    if score > threshold:
        handle_anomaly(data_point, score)

Built-in Data Sources

ParquetStreamer

Efficiently stream data from Parquet files with memory optimization.

Basic usage:

from onad.stream import ParquetStreamer

# Stream from file
with ParquetStreamer("data.parquet") as streamer:
    for features, label in streamer:
        model.learn_one(features)
        score = model.score_one(features)

Advanced configuration:

# Stream with specific label column and data sanitization
with ParquetStreamer(
    "dataset.parquet",
    label_column="is_anomaly",    # Specify label column
    sanitize_floats=True          # Clean float values
) as streamer:
    for features, label in streamer:
        # features: dict of feature name -> value
        # label: value from label column (if specified)
        model.learn_one(features)

Memory efficiency features: - Uses PyArrow for efficient Parquet reading - Processes data in batches internally - Converts to pandas iteratively to minimize memory usage - Automatic cleanup of file handles

Built-in Datasets

ONAD provides sample datasets for testing and experimentation:

from onad.stream import Dataset, ParquetStreamer

# Available datasets
datasets = [
    Dataset.FRAUD,      # Financial fraud detection
    Dataset.NETWORK,    # Network intrusion detection
    Dataset.IOT,        # IoT sensor anomalies
    Dataset.SYSTEM      # System performance anomalies
]

# Use built-in dataset
with ParquetStreamer(Dataset.FRAUD) as streamer:
    for features, label in streamer:
        # Process fraud detection data
        model.learn_one(features)
        score = model.score_one(features)

Custom Data Sources

File-based Streams

CSV files:

import csv
from typing import Iterator, Dict

def csv_stream(filename: str) -> Iterator[Dict[str, float]]:
    """Stream data from CSV file"""
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)
        for row in reader:
            # Convert string values to float
            yield {k: float(v) for k, v in row.items() if v}

# Usage
for data_point in csv_stream("sensor_data.csv"):
    model.learn_one(data_point)

JSON Lines files:

import json

def jsonl_stream(filename: str) -> Iterator[Dict[str, float]]:
    """Stream data from JSON Lines file"""
    with open(filename, 'r') as file:
        for line in file:
            if line.strip():
                yield json.loads(line)

# Usage
for data_point in jsonl_stream("events.jsonl"):
    model.learn_one(data_point)

Database Streams

SQL databases:

import sqlite3
from typing import Iterator

def sql_stream(db_path: str, query: str) -> Iterator[Dict[str, float]]:
    """Stream data from SQL database"""
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row  # Enable column name access

    cursor = conn.execute(query)
    try:
        while True:
            rows = cursor.fetchmany(1000)  # Process in batches
            if not rows:
                break

            for row in rows:
                yield dict(row)
    finally:
        conn.close()

# Usage
query = "SELECT temperature, pressure, vibration FROM sensors ORDER BY timestamp"
for data_point in sql_stream("sensors.db", query):
    model.learn_one(data_point)

Real-time Streams

Message queues (Redis):

import redis
import json

def redis_stream(host: str, port: int, channel: str) -> Iterator[Dict[str, float]]:
    """Stream data from Redis pub/sub"""
    r = redis.Redis(host=host, port=port)
    pubsub = r.pubsub()
    pubsub.subscribe(channel)

    try:
        for message in pubsub.listen():
            if message['type'] == 'message':
                data = json.loads(message['data'])
                yield data
    finally:
        pubsub.close()

# Usage
for data_point in redis_stream("localhost", 6379, "sensor_data"):
    model.learn_one(data_point)

Apache Kafka:

from kafka import KafkaConsumer
import json

def kafka_stream(topic: str, bootstrap_servers: list) -> Iterator[Dict[str, float]]:
    """Stream data from Kafka topic"""
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

    try:
        for message in consumer:
            yield message.value
    finally:
        consumer.close()

# Usage
servers = ['localhost:9092']
for data_point in kafka_stream("anomaly_data", servers):
    model.learn_one(data_point)

Stream Processing Patterns

Basic Processing Loop

def process_stream(stream, model, threshold=0.7):
    """Basic anomaly detection on stream"""
    anomaly_count = 0
    total_count = 0

    for data_point in stream:
        # Update model
        model.learn_one(data_point)

        # Get anomaly score
        score = model.score_one(data_point)

        # Check for anomaly
        if score > threshold:
            anomaly_count += 1
            print(f"Anomaly detected: {data_point}, Score: {score:.3f}")

        total_count += 1

        # Report progress
        if total_count % 1000 == 0:
            rate = anomaly_count / total_count
            print(f"Processed {total_count} points, {rate:.1%} anomalies")

# Usage
process_stream(csv_stream("data.csv"), model)

Windowed Processing

from collections import deque
from typing import Deque

def windowed_processing(stream, model, window_size=100):
    """Process stream with sliding window statistics"""
    window: Deque[float] = deque(maxlen=window_size)

    for data_point in stream:
        # Update model
        model.learn_one(data_point)
        score = model.score_one(data_point)

        # Maintain sliding window of scores
        window.append(score)

        if len(window) >= window_size:
            # Compute window statistics
            window_mean = sum(window) / len(window)
            window_std = np.std(window)

            # Adaptive threshold based on recent scores
            threshold = window_mean + 2 * window_std

            if score > threshold:
                print(f"Anomaly detected with adaptive threshold: {score:.3f} > {threshold:.3f}")

# Usage
windowed_processing(csv_stream("sensor_data.csv"), model)

Batch Processing Interface

Process streams in batches for improved performance:

def batch_stream_processing(stream, model, batch_size=50):
    """Process stream in batches"""
    batch = []

    for data_point in stream:
        batch.append(data_point)

        if len(batch) >= batch_size:
            # Process batch
            scores = []
            for point in batch:
                model.learn_one(point)
                scores.append(model.score_one(point))

            # Analyze batch results
            max_score = max(scores)
            if max_score > 0.8:
                print(f"High anomaly score in batch: {max_score:.3f}")

            batch = []  # Reset batch

# Usage
batch_stream_processing(csv_stream("large_dataset.csv"), model, batch_size=100)

Memory Management

Monitoring Memory Usage

import psutil
import os

def monitor_memory_usage(stream, model):
    """Monitor memory usage during stream processing"""
    process = psutil.Process(os.getpid())
    initial_memory = process.memory_info().rss / 1024 / 1024  # MB

    for i, data_point in enumerate(stream):
        model.learn_one(data_point)
        score = model.score_one(data_point)

        # Check memory every 1000 points
        if i % 1000 == 0:
            current_memory = process.memory_info().rss / 1024 / 1024
            memory_growth = current_memory - initial_memory

            print(f"Point {i}: Memory usage: {current_memory:.1f} MB (+{memory_growth:.1f} MB)")

            # Alert if memory growth is excessive
            if memory_growth > 500:  # 500 MB growth
                print("WARNING: Excessive memory growth detected!")

# Usage
monitor_memory_usage(csv_stream("large_file.csv"), model)

Memory-Efficient Streaming

def memory_efficient_processing(stream, model, max_memory_mb=1000):
    """Process stream with memory limits"""
    process = psutil.Process(os.getpid())

    for data_point in stream:
        # Check memory before processing
        current_memory = process.memory_info().rss / 1024 / 1024

        if current_memory > max_memory_mb:
            # Reset model to free memory
            print(f"Memory limit reached ({current_memory:.1f} MB), resetting model")
            model = type(model)()  # Create fresh instance

        model.learn_one(data_point)
        score = model.score_one(data_point)

Performance Optimization

Parallel Processing

import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import queue

def parallel_stream_processing(stream, model_class, num_workers=4):
    """Process stream with multiple workers"""

    def worker(data_queue, result_queue):
        """Worker process"""
        model = model_class()
        while True:
            try:
                data_point = data_queue.get(timeout=1)
                if data_point is None:  # Sentinel value
                    break

                model.learn_one(data_point)
                score = model.score_one(data_point)
                result_queue.put((data_point, score))

            except queue.Empty:
                continue

    # Create queues
    data_queue = multiprocessing.Queue(maxsize=1000)
    result_queue = multiprocessing.Queue()

    # Start workers
    workers = []
    for _ in range(num_workers):
        p = multiprocessing.Process(target=worker, args=(data_queue, result_queue))
        p.start()
        workers.append(p)

    # Feed data to workers
    for data_point in stream:
        data_queue.put(data_point)

    # Send stop signals
    for _ in workers:
        data_queue.put(None)

    # Collect results
    for _ in workers:
        p.join()

Caching and Preprocessing

from functools import lru_cache

class CachedPreprocessor:
    def __init__(self):
        self.feature_cache = {}

    @lru_cache(maxsize=1000)
    def preprocess_features(self, data_tuple):
        """Cache preprocessing results"""
        data_dict = dict(data_tuple)
        # Expensive preprocessing here
        return data_dict

    def process_stream(self, stream, model):
        for data_point in stream:
            # Convert to tuple for hashing
            data_tuple = tuple(sorted(data_point.items()))

            # Use cached preprocessing
            processed = self.preprocess_features(data_tuple)

            model.learn_one(processed)
            score = model.score_one(processed)

Error Handling and Resilience

Robust Stream Processing

import logging
from typing import Optional

def robust_stream_processing(stream, model, error_threshold=0.01):
    """Process stream with error handling"""
    logger = logging.getLogger(__name__)

    total_points = 0
    error_count = 0

    for data_point in stream:
        try:
            # Validate data point
            if not isinstance(data_point, dict):
                raise ValueError("Data point must be a dictionary")

            if not data_point:
                raise ValueError("Data point cannot be empty")

            # Process point
            model.learn_one(data_point)
            score = model.score_one(data_point)

            total_points += 1

        except Exception as e:
            error_count += 1
            error_rate = error_count / (total_points + error_count)

            logger.error(f"Error processing data point: {e}")

            # Stop if error rate is too high
            if error_rate > error_threshold:
                logger.critical(f"Error rate {error_rate:.1%} exceeds threshold {error_threshold:.1%}")
                break

            continue

# Usage with logging
logging.basicConfig(level=logging.INFO)
robust_stream_processing(csv_stream("noisy_data.csv"), model)

Checkpointing

import pickle
import os

def checkpointed_processing(stream, model, checkpoint_interval=1000, checkpoint_file="model_checkpoint.pkl"):
    """Process stream with periodic checkpointing"""

    point_count = 0

    # Load checkpoint if exists
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'rb') as f:
            model = pickle.load(f)
        print(f"Loaded model from checkpoint: {checkpoint_file}")

    try:
        for data_point in stream:
            model.learn_one(data_point)
            score = model.score_one(data_point)
            point_count += 1

            # Save checkpoint periodically
            if point_count % checkpoint_interval == 0:
                with open(checkpoint_file, 'wb') as f:
                    pickle.dump(model, f)
                print(f"Checkpoint saved at point {point_count}")

    except KeyboardInterrupt:
        print("Processing interrupted, saving final checkpoint...")
        with open(checkpoint_file, 'wb') as f:
            pickle.dump(model, f)
        raise

# Usage
checkpointed_processing(csv_stream("large_dataset.csv"), model)

Stream Performance Tips

  • Use generators instead of loading entire datasets into memory
  • Process data in batches when possible for better throughput
  • Monitor memory usage and reset models if needed
  • Implement checkpointing for long-running processes
  • Use appropriate data types (avoid unnecessary string conversions)

Common Pitfalls

  • Memory leaks: Always close file handles and database connections
  • Blocking operations: Avoid synchronous I/O in streaming loops
  • Error propagation: Don't let single bad data points crash the entire stream
  • Resource exhaustion: Monitor and limit memory/CPU usage