Skip to content

Conversation

@argahsuknesib
Copy link
Collaborator

@argahsuknesib argahsuknesib commented Nov 25, 2025

Summary

This PR introduces comprehensive stream processing enhancements to Janus, including a new anomaly detection system and Kafka integration for live data ingestion.

Key Features

Stateful Comparator for Anomaly Detection

  • Multi-Dimensional Analysis: Detects absolute differences, relative drops, catch-up triggers, trend divergence, volatility increases, and statistical outliers
  • Sliding Window Statistics: Maintains rolling windows of mean, standard deviation, and linear regression slopes
  • Configurable Thresholds: Fully customizable detection parameters via ComparatorConfig
  • Real-time Processing: Efficient O(window_size) operations for streaming data

Kafka Stream Source Integration

  • Asynchronous Polling: Dedicated thread polls Kafka consumer and processes messages
  • Callback System: Mutex-protected callbacks for RDF event processing
  • StreamSource Trait: Standardized interface for subscribe/stop operations
  • RDF Event Construction: Builds RDFEvent placeholders from Kafka message payloads

Architecture Improvements

  • Modular Design: Added api and registry modules to core library
  • Type Safety: Introduced CallbackType alias for complex function signatures
  • Thread Safety: Proper synchronization with Arc<Mutex<>> patterns
  • Error Handling: Comprehensive error types for stream operations

Files Added/Modified

New Files:

  • src/stream/comparator.rs - Core anomaly detection engine (280+ lines)
  • examples/comparator_demo.rs - Interactive anomaly detection demo
  • tests/comparator_test.rs - Comprehensive test suite (3 functions)
  • src/sources/kafka_adapter.rs - Kafka consumer implementation (100+ lines)

Modified Files:

  • src/lib.rs - Added pub mod api; and pub mod registry;
  • src/stream/mod.rs - Added pub mod comparator; export
  • src/sources/mod.rs - Added pub mod kafka_adapter;
  • src/parsing/janusql_parser.rs - Added #[derive(Clone)] to ParsedJanusQuery
  • Various source files - Minor formatting and refactoring improvements

Implementation Details

Comparator Engine:

// Configuration example
let config = ComparatorConfig {
    abs_threshold: 5.0,
    rel_threshold: 0.2,
    catchup_trigger: 10.0,
    slope_epsilon: 0.1,
    volatility_buffer: 2.0,
    window_size: 10,
    outlier_z_threshold: 3.0,
};

// Usage
let mut comparator = StatefulComparator::new(config);
let anomalies = comparator.update_and_compare(timestamp, live_val, hist_val);

Introduce a new sources module with StreamSource trait, MQTT and Kafka
adapters, and StreamIngestionPipeline. Export sources in lib.rs. Add
rumqttc, rdkafka and serde_json to Cargo.toml. Change
StreamingSegmentedStorage dictionary from Rc to Arc and add a
write_rdf_event helper.
Spawn a polling thread that polls the BaseConsumer, builds RDFEvent
placeholders from message payloads, and invokes a mutex-protected
callback. Introduce a CallbackType alias and store the callback in a
Mutex<Option<_>>. Implement StreamSource for KafkaSource (subscribe and
stop) and update consumer handling.

Also add api and registry modules to lib, derive Clone for
ParsedJanusQuery, and apply minor formatting/refactors in source
modules.
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@argahsuknesib argahsuknesib merged commit 22da443 into main Nov 25, 2025
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants