Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ serde = { version = "1.0", features = ["derive"] }
bincode = "1.0"
rsp-rs = "0.2.1"
oxigraph = "0.5"
rumqttc = "0.25.1"
serde_json = "1.0.145"

[target.'cfg(not(windows))'.dependencies]
rdkafka = "0.38.0"

[lib]
name = "janus"
Expand Down
61 changes: 61 additions & 0 deletions examples/comparator_demo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use janus::stream::comparator::{ComparatorConfig, StatefulComparator};

fn main() {
println!("=== Stateful Comparator Demo ===\n");

// 1. Setup Configuration
let config = ComparatorConfig {
abs_threshold: 5.0,
rel_threshold: 0.2, // 20% change
catchup_trigger: 10.0,
slope_epsilon: 0.1,
volatility_buffer: 2.0,
window_size: 10,
outlier_z_threshold: 3.0,
};
println!("Configuration: {:#?}\n", config);

// 2. Create Stateful Comparator
let mut comparator = StatefulComparator::new(config);

// 3. Simulate streaming data over time
println!("Feeding streaming data and checking for anomalies:\n");

// Historical baseline: stable around 100.0
// Live data: starts normal, then becomes volatile and drops
let data_points = vec![
(0.0, 100.0, 100.0), // Both normal
(1.0, 101.0, 100.1), // Both normal
(2.0, 102.0, 100.2), // Both normal
(3.0, 103.0, 100.3), // Both normal
(4.0, 104.0, 100.4), // Both normal
(5.0, 80.0, 100.5), // Live drops significantly (catch-up + outlier)
(6.0, 75.0, 100.6), // Live continues dropping
(7.0, 70.0, 100.7), // Live continues dropping
(8.0, 65.0, 100.8), // Live continues dropping
(9.0, 60.0, 100.9), // Live continues dropping
];

for (timestamp, live_val, hist_val) in data_points {
let anomalies = comparator.update_and_compare(timestamp, live_val, hist_val);

if !anomalies.is_empty() {
println!(
"T={:.0}: Live={:.1}, Hist={:.1} -> {} anomalies:",
timestamp,
live_val,
hist_val,
anomalies.len()
);
for anomaly in &anomalies {
println!(" - {}", anomaly);
}
println!();
} else {
println!(
"T={:.0}: Live={:.1}, Hist={:.1} -> No anomalies",
timestamp, live_val, hist_val
);
}
}
}
1 change: 1 addition & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod query_registration;
1 change: 1 addition & 0 deletions src/api/query_registration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

17 changes: 6 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub mod store {
//! RDF store implementations and interfaces
}

pub mod sources;

/// Module for stream processing
pub mod stream;

Expand All @@ -79,8 +81,12 @@ pub mod config {
/// Module for parsing JanusQL queries
pub mod parsing;

pub mod api;

pub mod storage;

pub mod registry;

pub mod querying;
pub mod error {
//! Error types and result definitions
Expand Down Expand Up @@ -138,14 +144,3 @@ pub mod error {

// Re-export commonly used types
pub use error::{Error, Result};

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_error_display() {
let err = Error::Config("test error".to_string());
assert_eq!(format!("{}", err), "Configuration error: test error");
}
}
2 changes: 1 addition & 1 deletion src/parsing/janusql_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct R2SOperator {
}

/// Parsed JanusQL query structure containing all components extracted from the query.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ParsedJanusQuery {
/// R2S operator if present
pub r2s: Option<R2SOperator>,
Expand Down
266 changes: 0 additions & 266 deletions src/querying/oxigraph_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,269 +90,3 @@ impl SparqlEngine for OxigraphAdapter {
Ok(result_strings)
}
}

#[cfg(test)]
mod tests {
use super::*;
use oxigraph::model::{GraphName, Literal, NamedNode};
use std::collections::HashSet;

/// Helper function to create a test QuadContainer with sample data
fn create_test_container() -> QuadContainer {
let mut quads = HashSet::new();

// Add test quads: <http://example.org/alice> <http://example.org/knows> <http://example.org/bob>
let alice = NamedNode::new("http://example.org/alice").unwrap();
let bob = NamedNode::new("http://example.org/bob").unwrap();
let charlie = NamedNode::new("http://example.org/charlie").unwrap();
let knows = NamedNode::new("http://example.org/knows").unwrap();
let age = NamedNode::new("http://example.org/age").unwrap();

// Alice knows Bob
quads.insert(Quad::new(alice.clone(), knows.clone(), bob.clone(), GraphName::DefaultGraph));

// Bob knows Charlie
quads.insert(Quad::new(
bob.clone(),
knows.clone(),
charlie.clone(),
GraphName::DefaultGraph,
));

// Alice's age
quads.insert(Quad::new(
alice.clone(),
age.clone(),
Literal::new_simple_literal("30"),
GraphName::DefaultGraph,
));

// Bob's age
quads.insert(Quad::new(
bob.clone(),
age.clone(),
Literal::new_simple_literal("25"),
GraphName::DefaultGraph,
));

QuadContainer::new(quads, 1000)
}

#[test]
fn test_oxigraph_adapter_creation() {
let _adapter = OxigraphAdapter::new();
// Adapter created successfully
}

#[test]
fn test_execute_simple_select_query() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// Query to select all subjects
let query = "SELECT ?s WHERE { ?s ?p ?o }";

let results = adapter.execute_query(query, &container);
assert!(results.is_ok(), "Query execution should succeed");

let results = results.unwrap();
assert!(!results.is_empty(), "Results should not be empty");
assert_eq!(results.len(), 4, "Should return 4 results (4 distinct subjects in quads)");
}

#[test]
fn test_execute_select_with_filter() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// Query to select subjects that know someone
let query = r"
PREFIX ex: <http://example.org/>
SELECT ?s WHERE {
?s ex:knows ?o
}
";

let results = adapter.execute_query(query, &container);
assert!(results.is_ok(), "Query with filter should succeed");

let results = results.unwrap();
assert_eq!(results.len(), 2, "Should return 2 results (Alice and Bob know someone)");
}

#[test]
fn test_execute_ask_query() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// ASK query to check if Alice knows Bob
let query = r"
PREFIX ex: <http://example.org/>
ASK {
ex:alice ex:knows ex:bob
}
";

let results = adapter.execute_query(query, &container);
assert!(results.is_ok(), "ASK query should succeed");

let results = results.unwrap();
assert_eq!(results.len(), 1, "ASK query should return one boolean result");
assert_eq!(results[0], "true", "ASK query should return true");
}

#[test]
fn test_execute_ask_query_false() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// ASK query that should return false
let query = r"
PREFIX ex: <http://example.org/>
ASK {
ex:alice ex:knows ex:charlie
}
";

let results = adapter.execute_query(query, &container);
assert!(results.is_ok(), "ASK query should succeed");

let results = results.unwrap();
assert_eq!(results.len(), 1, "ASK query should return one boolean result");
assert_eq!(
results[0], "false",
"ASK query should return false (Alice doesn't know Charlie directly)"
);
}

#[test]
fn test_execute_construct_query() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// CONSTRUCT query to create new triples
let query = r"
PREFIX ex: <http://example.org/>
CONSTRUCT {
?s ex:knows ?o
}
WHERE {
?s ex:knows ?o
}
";

let results = adapter.execute_query(query, &container);
assert!(results.is_ok(), "CONSTRUCT query should succeed");

let results = results.unwrap();
assert_eq!(results.len(), 2, "CONSTRUCT should return 2 triples");
}

#[test]
fn test_execute_with_empty_container() {
let adapter = OxigraphAdapter::new();
let empty_container = QuadContainer::new(HashSet::new(), 1000);

let query = "SELECT ?s WHERE { ?s ?p ?o }";

let results = adapter.execute_query(query, &empty_container);
assert!(results.is_ok(), "Query on empty container should succeed");

let results = results.unwrap();
assert!(results.is_empty(), "Results should be empty for empty container");
}

#[test]
fn test_execute_invalid_query() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// Invalid SPARQL query
let query = "INVALID SPARQL QUERY";

let results = adapter.execute_query(query, &container);
assert!(results.is_err(), "Invalid query should return an error");

let error = results.unwrap_err();
assert!(error.to_string().contains("Oxigraph error"), "Error should be an OxigraphError");
}

#[test]
fn test_execute_query_with_literal_filter() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// Query to find people older than 25
let query = r#"
PREFIX ex: <http://example.org/>
SELECT ?s ?age WHERE {
?s ex:age ?age .
FILTER(?age > "25")
}
"#;

let results = adapter.execute_query(query, &container);
assert!(results.is_ok(), "Query with literal filter should succeed");

let results = results.unwrap();
assert_eq!(results.len(), 1, "Should return 1 result (Alice is 30)");
}

#[test]
fn test_execute_count_query() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// Query to count the number of 'knows' relationships
let query = r"
PREFIX ex: <http://example.org/>
SELECT (COUNT(?s) AS ?count) WHERE {
?s ex:knows ?o
}
";

let results = adapter.execute_query(query, &container);
assert!(results.is_ok(), "COUNT query should succeed");

let results = results.unwrap();
assert_eq!(results.len(), 1, "COUNT query should return 1 result");
}

#[test]
fn test_multiple_queries_on_same_adapter() {
let adapter = OxigraphAdapter::new();
let container = create_test_container();

// First query
let query1 = "SELECT ?s WHERE { ?s ?p ?o }";
let results1 = adapter.execute_query(query1, &container);
assert!(results1.is_ok(), "First query should succeed");

// Second query
let query2 = r"
PREFIX ex: <http://example.org/>
SELECT ?s WHERE { ?s ex:knows ?o }
";
let results2 = adapter.execute_query(query2, &container);
assert!(results2.is_ok(), "Second query should succeed");

// Verify both queries returned results
assert!(!results1.unwrap().is_empty());
assert!(!results2.unwrap().is_empty());
}

#[test]
fn test_oxigraph_error_display() {
let error = OxigraphError("Test error message".to_string());
let error_string = format!("{}", error);
assert_eq!(error_string, "Oxigraph error: Test error message");
}

#[test]
fn test_oxigraph_error_from_storage_error() {
// This tests the From implementation for StorageError
// We can't easily create a real StorageError, but we verify the trait is implemented
let error = OxigraphError::from(oxigraph::store::StorageError::Other("test".into()));
assert!(error.to_string().contains("Oxigraph error"));
}
}
1 change: 1 addition & 0 deletions src/registry/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod query_registry;
Loading