diff --git a/Cargo.toml b/Cargo.toml index a298296..3a92b03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,11 @@ serde = { version = "1.0", features = ["derive"] } bincode = "1.0" rsp-rs = "0.2.1" oxigraph = "0.5" +rumqttc = "0.25.1" +serde_json = "1.0.145" + +[target.'cfg(not(windows))'.dependencies] +rdkafka = "0.38.0" [lib] name = "janus" diff --git a/examples/comparator_demo.rs b/examples/comparator_demo.rs new file mode 100644 index 0000000..a86e52f --- /dev/null +++ b/examples/comparator_demo.rs @@ -0,0 +1,61 @@ +use janus::stream::comparator::{ComparatorConfig, StatefulComparator}; + +fn main() { + println!("=== Stateful Comparator Demo ===\n"); + + // 1. Setup Configuration + let config = ComparatorConfig { + abs_threshold: 5.0, + rel_threshold: 0.2, // 20% change + catchup_trigger: 10.0, + slope_epsilon: 0.1, + volatility_buffer: 2.0, + window_size: 10, + outlier_z_threshold: 3.0, + }; + println!("Configuration: {:#?}\n", config); + + // 2. Create Stateful Comparator + let mut comparator = StatefulComparator::new(config); + + // 3. Simulate streaming data over time + println!("Feeding streaming data and checking for anomalies:\n"); + + // Historical baseline: stable around 100.0 + // Live data: starts normal, then becomes volatile and drops + let data_points = vec![ + (0.0, 100.0, 100.0), // Both normal + (1.0, 101.0, 100.1), // Both normal + (2.0, 102.0, 100.2), // Both normal + (3.0, 103.0, 100.3), // Both normal + (4.0, 104.0, 100.4), // Both normal + (5.0, 80.0, 100.5), // Live drops significantly (catch-up + outlier) + (6.0, 75.0, 100.6), // Live continues dropping + (7.0, 70.0, 100.7), // Live continues dropping + (8.0, 65.0, 100.8), // Live continues dropping + (9.0, 60.0, 100.9), // Live continues dropping + ]; + + for (timestamp, live_val, hist_val) in data_points { + let anomalies = comparator.update_and_compare(timestamp, live_val, hist_val); + + if !anomalies.is_empty() { + println!( + "T={:.0}: Live={:.1}, Hist={:.1} -> {} anomalies:", + timestamp, + live_val, + hist_val, + anomalies.len() + ); + for anomaly in &anomalies { + println!(" - {}", anomaly); + } + println!(); + } else { + println!( + "T={:.0}: Live={:.1}, Hist={:.1} -> No anomalies", + timestamp, live_val, hist_val + ); + } + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs new file mode 100644 index 0000000..21bc775 --- /dev/null +++ b/src/api/mod.rs @@ -0,0 +1 @@ +pub mod query_registration; diff --git a/src/api/query_registration.rs b/src/api/query_registration.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/api/query_registration.rs @@ -0,0 +1 @@ + diff --git a/src/lib.rs b/src/lib.rs index f10cc67..732c2af 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,6 +63,8 @@ pub mod store { //! RDF store implementations and interfaces } +pub mod sources; + /// Module for stream processing pub mod stream; @@ -79,8 +81,12 @@ pub mod config { /// Module for parsing JanusQL queries pub mod parsing; +pub mod api; + pub mod storage; +pub mod registry; + pub mod querying; pub mod error { //! Error types and result definitions @@ -138,14 +144,3 @@ pub mod error { // Re-export commonly used types pub use error::{Error, Result}; - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_error_display() { - let err = Error::Config("test error".to_string()); - assert_eq!(format!("{}", err), "Configuration error: test error"); - } -} diff --git a/src/parsing/janusql_parser.rs b/src/parsing/janusql_parser.rs index 15834ae..d9b97a8 100644 --- a/src/parsing/janusql_parser.rs +++ b/src/parsing/janusql_parser.rs @@ -41,7 +41,7 @@ pub struct R2SOperator { } /// Parsed JanusQL query structure containing all components extracted from the query. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ParsedJanusQuery { /// R2S operator if present pub r2s: Option, diff --git a/src/querying/oxigraph_adapter.rs b/src/querying/oxigraph_adapter.rs index 5747a42..6ade078 100644 --- a/src/querying/oxigraph_adapter.rs +++ b/src/querying/oxigraph_adapter.rs @@ -90,269 +90,3 @@ impl SparqlEngine for OxigraphAdapter { Ok(result_strings) } } - -#[cfg(test)] -mod tests { - use super::*; - use oxigraph::model::{GraphName, Literal, NamedNode}; - use std::collections::HashSet; - - /// Helper function to create a test QuadContainer with sample data - fn create_test_container() -> QuadContainer { - let mut quads = HashSet::new(); - - // Add test quads: - let alice = NamedNode::new("http://example.org/alice").unwrap(); - let bob = NamedNode::new("http://example.org/bob").unwrap(); - let charlie = NamedNode::new("http://example.org/charlie").unwrap(); - let knows = NamedNode::new("http://example.org/knows").unwrap(); - let age = NamedNode::new("http://example.org/age").unwrap(); - - // Alice knows Bob - quads.insert(Quad::new(alice.clone(), knows.clone(), bob.clone(), GraphName::DefaultGraph)); - - // Bob knows Charlie - quads.insert(Quad::new( - bob.clone(), - knows.clone(), - charlie.clone(), - GraphName::DefaultGraph, - )); - - // Alice's age - quads.insert(Quad::new( - alice.clone(), - age.clone(), - Literal::new_simple_literal("30"), - GraphName::DefaultGraph, - )); - - // Bob's age - quads.insert(Quad::new( - bob.clone(), - age.clone(), - Literal::new_simple_literal("25"), - GraphName::DefaultGraph, - )); - - QuadContainer::new(quads, 1000) - } - - #[test] - fn test_oxigraph_adapter_creation() { - let _adapter = OxigraphAdapter::new(); - // Adapter created successfully - } - - #[test] - fn test_execute_simple_select_query() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // Query to select all subjects - let query = "SELECT ?s WHERE { ?s ?p ?o }"; - - let results = adapter.execute_query(query, &container); - assert!(results.is_ok(), "Query execution should succeed"); - - let results = results.unwrap(); - assert!(!results.is_empty(), "Results should not be empty"); - assert_eq!(results.len(), 4, "Should return 4 results (4 distinct subjects in quads)"); - } - - #[test] - fn test_execute_select_with_filter() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // Query to select subjects that know someone - let query = r" - PREFIX ex: - SELECT ?s WHERE { - ?s ex:knows ?o - } - "; - - let results = adapter.execute_query(query, &container); - assert!(results.is_ok(), "Query with filter should succeed"); - - let results = results.unwrap(); - assert_eq!(results.len(), 2, "Should return 2 results (Alice and Bob know someone)"); - } - - #[test] - fn test_execute_ask_query() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // ASK query to check if Alice knows Bob - let query = r" - PREFIX ex: - ASK { - ex:alice ex:knows ex:bob - } - "; - - let results = adapter.execute_query(query, &container); - assert!(results.is_ok(), "ASK query should succeed"); - - let results = results.unwrap(); - assert_eq!(results.len(), 1, "ASK query should return one boolean result"); - assert_eq!(results[0], "true", "ASK query should return true"); - } - - #[test] - fn test_execute_ask_query_false() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // ASK query that should return false - let query = r" - PREFIX ex: - ASK { - ex:alice ex:knows ex:charlie - } - "; - - let results = adapter.execute_query(query, &container); - assert!(results.is_ok(), "ASK query should succeed"); - - let results = results.unwrap(); - assert_eq!(results.len(), 1, "ASK query should return one boolean result"); - assert_eq!( - results[0], "false", - "ASK query should return false (Alice doesn't know Charlie directly)" - ); - } - - #[test] - fn test_execute_construct_query() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // CONSTRUCT query to create new triples - let query = r" - PREFIX ex: - CONSTRUCT { - ?s ex:knows ?o - } - WHERE { - ?s ex:knows ?o - } - "; - - let results = adapter.execute_query(query, &container); - assert!(results.is_ok(), "CONSTRUCT query should succeed"); - - let results = results.unwrap(); - assert_eq!(results.len(), 2, "CONSTRUCT should return 2 triples"); - } - - #[test] - fn test_execute_with_empty_container() { - let adapter = OxigraphAdapter::new(); - let empty_container = QuadContainer::new(HashSet::new(), 1000); - - let query = "SELECT ?s WHERE { ?s ?p ?o }"; - - let results = adapter.execute_query(query, &empty_container); - assert!(results.is_ok(), "Query on empty container should succeed"); - - let results = results.unwrap(); - assert!(results.is_empty(), "Results should be empty for empty container"); - } - - #[test] - fn test_execute_invalid_query() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // Invalid SPARQL query - let query = "INVALID SPARQL QUERY"; - - let results = adapter.execute_query(query, &container); - assert!(results.is_err(), "Invalid query should return an error"); - - let error = results.unwrap_err(); - assert!(error.to_string().contains("Oxigraph error"), "Error should be an OxigraphError"); - } - - #[test] - fn test_execute_query_with_literal_filter() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // Query to find people older than 25 - let query = r#" - PREFIX ex: - SELECT ?s ?age WHERE { - ?s ex:age ?age . - FILTER(?age > "25") - } - "#; - - let results = adapter.execute_query(query, &container); - assert!(results.is_ok(), "Query with literal filter should succeed"); - - let results = results.unwrap(); - assert_eq!(results.len(), 1, "Should return 1 result (Alice is 30)"); - } - - #[test] - fn test_execute_count_query() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // Query to count the number of 'knows' relationships - let query = r" - PREFIX ex: - SELECT (COUNT(?s) AS ?count) WHERE { - ?s ex:knows ?o - } - "; - - let results = adapter.execute_query(query, &container); - assert!(results.is_ok(), "COUNT query should succeed"); - - let results = results.unwrap(); - assert_eq!(results.len(), 1, "COUNT query should return 1 result"); - } - - #[test] - fn test_multiple_queries_on_same_adapter() { - let adapter = OxigraphAdapter::new(); - let container = create_test_container(); - - // First query - let query1 = "SELECT ?s WHERE { ?s ?p ?o }"; - let results1 = adapter.execute_query(query1, &container); - assert!(results1.is_ok(), "First query should succeed"); - - // Second query - let query2 = r" - PREFIX ex: - SELECT ?s WHERE { ?s ex:knows ?o } - "; - let results2 = adapter.execute_query(query2, &container); - assert!(results2.is_ok(), "Second query should succeed"); - - // Verify both queries returned results - assert!(!results1.unwrap().is_empty()); - assert!(!results2.unwrap().is_empty()); - } - - #[test] - fn test_oxigraph_error_display() { - let error = OxigraphError("Test error message".to_string()); - let error_string = format!("{}", error); - assert_eq!(error_string, "Oxigraph error: Test error message"); - } - - #[test] - fn test_oxigraph_error_from_storage_error() { - // This tests the From implementation for StorageError - // We can't easily create a real StorageError, but we verify the trait is implemented - let error = OxigraphError::from(oxigraph::store::StorageError::Other("test".into())); - assert!(error.to_string().contains("Oxigraph error")); - } -} diff --git a/src/registry/mod.rs b/src/registry/mod.rs new file mode 100644 index 0000000..5775380 --- /dev/null +++ b/src/registry/mod.rs @@ -0,0 +1 @@ +pub mod query_registry; diff --git a/src/registry/query_registry.rs b/src/registry/query_registry.rs new file mode 100644 index 0000000..7a07eee --- /dev/null +++ b/src/registry/query_registry.rs @@ -0,0 +1,74 @@ +use std::{ + collections::HashMap, + fmt::write, + sync::{Arc, RwLock}, +}; + +use crate::parsing::janusql_parser::ParsedJanusQuery; + +pub type QueryId = String; + +/// Metadata associated with a registered query +#[derive(Debug, Clone)] +pub struct QueryMetadata { + pub query_id: QueryId, + pub query_text: String, + pub parsed: ParsedJanusQuery, + pub registered_at: u64, + pub execution_count: u64, + pub subscribers: Vec, +} + +#[derive(Debug, Clone, Default)] +pub struct QueryRegistryConfig { + /// Maximum number of queries that can be registered + pub max_queries: Option, +} + +/// Defining usual errors specific to the Query Registry Operations +#[derive(Debug)] +pub enum QueryRegistryError { + QueryNotFound(QueryId), + QueryAlreadyExists(QueryId), + MaxQueriesReached, + InvalidQuery(String), +} + +impl std::fmt::Display for QueryRegistryError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + QueryRegistryError::QueryNotFound(id) => write!(f, "Query not found : {}", id), + QueryRegistryError::QueryAlreadyExists(id) => { + write!(f, "Query already exists : {}", id) + } + QueryRegistryError::MaxQueriesReached => { + write!(f, "Maximum number of registered queries reached") + } + QueryRegistryError::InvalidQuery(msg) => write!(f, "Invalid query: {}", msg), + } + } +} + +impl std::error::Error for QueryRegistryError {} + +/// Core Query Registry structure which is the foundation for further query optimization and analysis. +#[allow(dead_code)] +pub struct QueryRegistry { + queries: Arc>>, + config: QueryRegistryConfig, +} + +impl QueryRegistry { + /// Create a new Query Registry with the given configuration + pub fn new() -> Self { + QueryRegistry { + queries: Arc::new(RwLock::new(HashMap::new())), + config: (QueryRegistryConfig::default()), + } + } + + /// Create option if you wish to create with a custom configuration + pub fn with_config(config: QueryRegistryConfig) -> Self { + QueryRegistry { queries: Arc::new(RwLock::new(HashMap::new())), config } + } +} diff --git a/src/sources/kafka_adapter.rs b/src/sources/kafka_adapter.rs new file mode 100644 index 0000000..e6c77d5 --- /dev/null +++ b/src/sources/kafka_adapter.rs @@ -0,0 +1,106 @@ +#[cfg(not(windows))] +use crate::core::RDFEvent; +use crate::sources::stream_source::{StreamError, StreamSource}; +use rdkafka::config::ClientConfig; +use rdkafka::consumer::{BaseConsumer, Consumer}; +use rdkafka::message::Message; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +/// Type alias for the complex callback type to reduce type complexity +type CallbackType = Arc; + +pub struct KafkaSource { + consumer: Arc, + callback: Arc>>, +} + +impl KafkaSource { + /// Creates a new Kafka source with a group ID and list of brokers. + /// # Arguments + /// * `group_id` - The consumer group ID. + /// * `brokers` - A comma-separated list of Kafka brokers. + /// * `auto_offset_reset` - Policy for resetting offsets ("earliest" or "latest"). + pub fn new( + brokers: &str, + group_id: &str, + auto_offset_reset: &str, + ) -> Result { + let raw_consumer: BaseConsumer = ClientConfig::new() + .set("group.id", group_id) + .set("bootstrap.servers", brokers) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "true") + .set("auto.offset.reset", auto_offset_reset) + .create() + .map_err(|e| StreamError::ConnectionError(e.to_string()))?; + let consumer: Arc = Arc::new(raw_consumer); + + let callback = Arc::new(Mutex::new(None::)); + + let consumer_clone = Arc::clone(&consumer); + + let callback_clone = Arc::clone(&callback); + + // Spawn a thread to handle Kafka events + thread::spawn(move || { + loop { + match consumer_clone.poll(Duration::from_millis(100)) { + Some(Ok(message)) => { + // TODO: Parse message payload into RDFEvent and call callback + if let Some(payload) = message.payload() { + // For now, create a dummy RDFEvent + let timestamp = message.timestamp().to_millis().unwrap_or(0); + let timestamp_u64 = u64::try_from(timestamp).unwrap_or(0); + let rdf_event = RDFEvent::new( + timestamp_u64, + "http://example.org/subject", // subject + "http://example.org/predicate", // predicate + &String::from_utf8_lossy(payload), // object as string + "http://example.org/graph", // graph + ); + if let Ok(callback_opt) = callback_clone.lock() { + if let Some(ref callback) = *callback_opt { + callback(rdf_event); + } + } + } + } + Some(Err(e)) => { + eprintln!("Kafka error: {}", e); + break; + } + None => { + // No message, continue polling + } + } + } + }); + + Ok(KafkaSource { consumer, callback }) + } +} + +impl StreamSource for KafkaSource { + fn subscribe( + &self, + topics: Vec, + callback: Arc, + ) -> Result<(), StreamError> { + let topic_refs: Vec<&str> = topics.iter().map(|s| s.as_str()).collect(); + self.consumer + .subscribe(&topic_refs) + .map_err(|e| StreamError::SubscriptionError(e.to_string()))?; + if let Ok(mut callback_opt) = self.callback.lock() { + *callback_opt = Some(callback); + } + Ok(()) + } + + fn stop(&self) -> Result<(), StreamError> { + self.consumer.unsubscribe(); + Ok(()) + } +} diff --git a/src/sources/mod.rs b/src/sources/mod.rs new file mode 100644 index 0000000..37fffd6 --- /dev/null +++ b/src/sources/mod.rs @@ -0,0 +1,5 @@ +#[cfg(not(windows))] +pub mod kafka_adapter; +pub mod mqtt_adapter; +pub mod stream_ingestion_pipeline; +pub mod stream_source; diff --git a/src/sources/mqtt_adapter.rs b/src/sources/mqtt_adapter.rs new file mode 100644 index 0000000..ecd7a62 --- /dev/null +++ b/src/sources/mqtt_adapter.rs @@ -0,0 +1,54 @@ +use crate::core::RDFEvent; +use crate::sources::stream_source::{StreamError, StreamSource}; +use rumqttc::{Client, Event, MqttOptions, Packet, QoS}; +use std::sync::Arc; +use std::thread; + +pub struct MqttSource { + client: Client, +} + +impl MqttSource { + pub fn new(broker: &str, port: u16, client_id: &str) -> Result { + let mut mqtt_options = MqttOptions::new(client_id, broker, port); + mqtt_options.set_keep_alive(std::time::Duration::from_secs(30)); + + let (client, mut connection) = Client::new(mqtt_options, 10); + + // Starting a thread to handle incoming messages + + thread::spawn(move || { + for notification in connection.iter() { + if let Err(e) = notification { + eprintln!("MQTT connection error: {:?}", e); + break; + } + } + }); + + Ok(MqttSource { client }) + } +} + +impl StreamSource for MqttSource { + fn subscribe( + &self, + topics: Vec, + callback: Arc, + ) -> Result<(), StreamError> { + for topic in topics { + self.client + .subscribe(&topic, QoS::AtLeastOnce) + .map_err(|e| StreamError::SubscriptionError(e.to_string()))?; + } + + // TODO : Here we would normally handle incoming messages and invoke the callback. + Ok(()) + } + + fn stop(&self) -> Result<(), StreamError> { + self.client + .disconnect() + .map_err(|e| StreamError::ConnectionError(e.to_string())) + } +} diff --git a/src/sources/stream_ingestion_pipeline.rs b/src/sources/stream_ingestion_pipeline.rs new file mode 100644 index 0000000..0698bfe --- /dev/null +++ b/src/sources/stream_ingestion_pipeline.rs @@ -0,0 +1,41 @@ +use crate::core::RDFEvent; +use crate::sources::stream_source::StreamSource; +use crate::storage::segmented_storage::StreamingSegmentedStorage; +use std::sync::Arc; + +pub struct StreamIngestionPipeline { + storage: Arc, + sources: Vec>, +} + +impl StreamIngestionPipeline { + pub fn new(storage: Arc) -> Self { + StreamIngestionPipeline { storage, sources: Vec::new() } + } + + /// Adding the source for the stream ingestion pipeline (which can be MQTT, Kafka, etc.) + pub fn add_source(&mut self, source: Box) { + self.sources.push(source); + } + + /// Start the stream ingestion pipeline by subscribing to the sources and ingesting data + /// into storage as well as the live stream processing RSP Engine. + pub fn start(&self, topics: Vec) -> Result<(), Box> { + let storage = Arc::clone(&self.storage); + + // Shared callback writes to the storage (handles both storage and live processing) + let callback: Arc = Arc::new(move |event: RDFEvent| { + // Storage will handle the background flushing. + // TODO: Add live stream processing here as a process. + if let Err(e) = storage.write_rdf_event(event) { + eprintln!("Error writing to storage: {:?}", e); + } + }); + + for source in &self.sources { + source.subscribe(topics.clone(), Arc::clone(&callback))?; + } + + Ok(()) + } +} diff --git a/src/sources/stream_source.rs b/src/sources/stream_source.rs new file mode 100644 index 0000000..3e1313a --- /dev/null +++ b/src/sources/stream_source.rs @@ -0,0 +1,35 @@ +use crate::core::RDFEvent; +use std::error::Error; +use std::fmt; +use std::sync::Arc; + +pub trait StreamSource: Send + Sync { + // Subscribe to the stream and invoke a callback for each RDF event. + fn subscribe( + &self, + topics: Vec, + callback: Arc, + ) -> Result<(), StreamError>; + + // Unsubscribe from the stream or stop the subscription. + fn stop(&self) -> Result<(), StreamError>; +} + +#[derive(Debug)] +pub enum StreamError { + ConnectionError(String), + SubscriptionError(String), + Other(String), +} + +impl fmt::Display for StreamError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + StreamError::ConnectionError(msg) => write!(f, "Connection error: {}", msg), + StreamError::SubscriptionError(msg) => write!(f, "Subscription error: {}", msg), + StreamError::Other(msg) => write!(f, "Other error: {}", msg), + } + } +} + +impl Error for StreamError {} diff --git a/src/storage/indexing/dictionary.rs b/src/storage/indexing/dictionary.rs index 1308f8d..99d8db3 100644 --- a/src/storage/indexing/dictionary.rs +++ b/src/storage/indexing/dictionary.rs @@ -65,107 +65,3 @@ impl Dictionary { ) } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::core::Event; - - #[test] - fn test_dictionary_encoding_decoding() { - let mut dict = Dictionary::new(); - - // Encode some RDF terms - let subject_id = dict.encode("http://example.org/person/Alice"); - let predicate_id = dict.encode("http://example.org/knows"); - let object_id = dict.encode("http://example.org/person/Bob"); - let graph_id = dict.encode("http://example.org/graph1"); - - println!("Encoded IDs:"); - println!("Subject: http://example.org/person/Alice -> {}", subject_id); - println!("Predicate: http://example.org/knows -> {}", predicate_id); - println!("Object: http://example.org/person/Bob -> {}", object_id); - println!("Graph: http://example.org/graph1 -> {}", graph_id); - - // Create an event - let event = Event { - timestamp: 1_234_567_890, - subject: subject_id, - predicate: predicate_id, - object: object_id, - graph: graph_id, - }; - - // Decode the event - let decoded = dict.decode_graph(&event); - println!("\nDecoded event: {}", decoded); - - // Verify individual decodings - assert_eq!(dict.decode(subject_id), Some("http://example.org/person/Alice")); - assert_eq!(dict.decode(predicate_id), Some("http://example.org/knows")); - assert_eq!(dict.decode(object_id), Some("http://example.org/person/Bob")); - assert_eq!(dict.decode(graph_id), Some("http://example.org/graph1")); - - // Test that the decoded string contains the expected format - assert!(decoded.contains("http://example.org/person/Alice")); - assert!(decoded.contains("http://example.org/knows")); - assert!(decoded.contains("http://example.org/person/Bob")); - assert!(decoded.contains("http://example.org/graph1")); - assert!(decoded.contains("1234567890")); - } - - #[test] - fn test_clean_rdf_api() { - use crate::core::RDFEvent; - - let mut dict = Dictionary::new(); - - // Test the clean API - user provides URIs directly - let rdf_event = RDFEvent::new( - 1_234_567_890, - "http://example.org/person/Alice", - "http://example.org/knows", - "http://example.org/person/Bob", - "http://example.org/graph1", - ); - - // Encoding happens internally - let encoded_event = rdf_event.encode(&mut dict); - - // Decoding happens internally - let decoded_event = encoded_event.decode(&dict); - - // Verify the round-trip works - assert_eq!(decoded_event.subject, "http://example.org/person/Alice"); - assert_eq!(decoded_event.predicate, "http://example.org/knows"); - assert_eq!(decoded_event.object, "http://example.org/person/Bob"); - assert_eq!(decoded_event.graph, "http://example.org/graph1"); - assert_eq!(decoded_event.timestamp, 1_234_567_890); - - println!("Clean API test passed!"); - println!( - "Original: {} {} {} in {} at timestamp {}", - rdf_event.subject, - rdf_event.predicate, - rdf_event.object, - rdf_event.graph, - rdf_event.timestamp - ); - println!( - "Encoded IDs: {} {} {} {} at timestamp {}", - encoded_event.subject, - encoded_event.predicate, - encoded_event.object, - encoded_event.graph, - encoded_event.timestamp - ); - println!( - "Decoded: {} {} {} in {} at timestamp {}", - decoded_event.subject, - decoded_event.predicate, - decoded_event.object, - decoded_event.graph, - decoded_event.timestamp - ); - } -} diff --git a/src/storage/segmented_storage.rs b/src/storage/segmented_storage.rs index a405aff..5de6e3c 100644 --- a/src/storage/segmented_storage.rs +++ b/src/storage/segmented_storage.rs @@ -1,7 +1,6 @@ use std::{ collections::VecDeque, io::{BufWriter, Read, Seek, SeekFrom, Write}, - rc::Rc, sync::{Arc, Mutex, RwLock}, thread::JoinHandle, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -22,7 +21,7 @@ use crate::{ pub struct StreamingSegmentedStorage { batch_buffer: Arc>, segments: Arc>>, - dictionary: Rc>, + dictionary: Arc>, flush_handle: Option>, shutdown_signal: Arc>, config: StreamingConfig, @@ -43,7 +42,7 @@ impl StreamingSegmentedStorage { })), segments: Arc::new(RwLock::new(Vec::new())), - dictionary: Rc::new(RwLock::new(Dictionary::new())), + dictionary: Arc::new(RwLock::new(Dictionary::new())), flush_handle: None, shutdown_signal: Arc::new(Mutex::new(false)), config, @@ -110,6 +109,15 @@ impl StreamingSegmentedStorage { self.write(encoded_event) } + /// User-friendly API: Write an RDFEvent directly + pub fn write_rdf_event(&self, event: RDFEvent) -> std::io::Result<()> { + let encoded_event = { + let mut dict = self.dictionary.write().unwrap(); + event.encode(&mut dict) + }; + self.write(encoded_event) + } + // Get the current timestamp in milliseconds since UNIX_EPOCH fn current_timestamp() -> u64 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 diff --git a/src/stream/comparator.rs b/src/stream/comparator.rs new file mode 100644 index 0000000..84ea9b4 --- /dev/null +++ b/src/stream/comparator.rs @@ -0,0 +1,237 @@ +use std::collections::VecDeque; +use std::fmt; + +/// Represents a single data point in the stream. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct DataPoint { + pub timestamp: f64, + pub value: f64, +} + +/// Statistical metrics calculated from a window of data points. +#[derive(Debug, Clone, PartialEq)] +pub struct WindowStats { + pub mean: f64, + pub std_dev: f64, + pub slope: f64, + pub count: usize, +} + +impl WindowStats { + /// Calculates statistics from a slice of DataPoints. + /// Returns None if the window is empty. + pub fn from_window(window: &VecDeque) -> Option { + if window.is_empty() { + return None; + } + + #[allow(clippy::cast_precision_loss)] + // Safe: window.len() is small (default 10) and f64 can represent integers up to 2^53 exactly + let n = window.len() as f64; + let count = window.len(); + + // 1. Mean + let sum_val: f64 = window.iter().map(|dp| dp.value).sum(); + let mean = sum_val / n; + + // 2. Standard Deviation (Population) + // \sigma = \sqrt{ \frac{\sum (x - \mu)^2}{N} } + let variance_sum: f64 = window.iter().map(|dp| (dp.value - mean).powi(2)).sum(); + let std_dev = (variance_sum / n).sqrt(); + + // 3. Slope (Linear Regression - Least Squares) + // m = \frac{N \sum(xy) - \sum x \sum y}{N \sum(x^2) - (\sum x)^2} + let sum_x: f64 = window.iter().map(|dp| dp.timestamp).sum(); + let sum_xy: f64 = window.iter().map(|dp| dp.timestamp * dp.value).sum(); + let sum_x_squared: f64 = window.iter().map(|dp| dp.timestamp.powi(2)).sum(); + + let denominator = n * sum_x_squared - sum_x.powi(2); + + let slope = if denominator.abs() < f64::EPSILON { + 0.0 // Avoid division by zero if all timestamps are identical or N=1 + } else { + (n * sum_xy - sum_x * sum_val) / denominator + }; + + Some(WindowStats { mean, std_dev, slope, count }) + } +} + +/// Configuration thresholds for the comparator. +#[derive(Debug, Clone)] +pub struct ComparatorConfig { + /// Threshold for absolute difference: |live - hist| > threshold + pub abs_threshold: f64, + /// Threshold for relative drop: (live - hist) / hist > threshold + pub rel_threshold: f64, + /// Threshold for catch-up: hist - live > threshold + pub catchup_trigger: f64, + /// Minimum magnitude of slope to consider for trend divergence + pub slope_epsilon: f64, + /// Buffer for volatility check: live_sigma > hist_sigma + buffer + pub volatility_buffer: f64, + /// Size of the sliding window to maintain for statistics + pub window_size: usize, + /// Z-score threshold for outlier detection: |z| > threshold + pub outlier_z_threshold: f64, +} + +impl Default for ComparatorConfig { + fn default() -> Self { + Self { + abs_threshold: 1.0, + rel_threshold: 0.1, + catchup_trigger: 2.0, + slope_epsilon: 0.01, + volatility_buffer: 0.5, + window_size: 10, + outlier_z_threshold: 3.0, + } + } +} + +/// Result of a comparison between live and historical windows. +#[derive(Debug, Clone, PartialEq)] +pub enum ComparisonResult { + /// Triggered when |live.mean - hist.mean| > abs_threshold + AbsoluteThresholdExceeded { diff: f64 }, + /// Triggered when (live.mean - hist.mean) / hist.mean > rel_threshold + RelativeDropDetected { rel_change: f64 }, + /// Triggered when hist.mean - live.mean > catchup_trigger + CatchUpTriggered { lag: f64 }, + /// Triggered when slopes have opposite signs and magnitudes > slope_epsilon + TrendDivergence { live_slope: f64, hist_slope: f64 }, + /// Triggered when live.std_dev > hist.std_dev + volatility_buffer + VolatilityIncrease { live_sigma: f64, hist_sigma: f64 }, + /// Triggered when the latest live value is an outlier compared to historical distribution + LiveOutlierDetected { value: f64, z_score: f64 }, +} + +impl fmt::Display for ComparisonResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ComparisonResult::AbsoluteThresholdExceeded { diff } => { + write!(f, "Absolute Threshold Exceeded (diff: {:.4})", diff) + } + ComparisonResult::RelativeDropDetected { rel_change } => { + write!(f, "Relative Drop Detected (change: {:.2}%)", rel_change * 100.0) + } + ComparisonResult::CatchUpTriggered { lag } => { + write!(f, "Catch-Up Triggered (lag: {:.4})", lag) + } + ComparisonResult::TrendDivergence { live_slope, hist_slope } => { + write!(f, "Trend Divergence (live: {:.4}, hist: {:.4})", live_slope, hist_slope) + } + ComparisonResult::VolatilityIncrease { live_sigma, hist_sigma } => { + write!(f, "Volatility Increase (live: {:.4}, hist: {:.4})", live_sigma, hist_sigma) + } + ComparisonResult::LiveOutlierDetected { value, z_score } => { + write!(f, "Live Outlier Detected (value: {:.4}, z-score: {:.2})", value, z_score) + } + } + } +} + +/// A stateful comparator that maintains a history of aggregated values to compute trends. +pub struct StatefulComparator { + config: ComparatorConfig, + live_history: VecDeque, + hist_history: VecDeque, +} + +impl StatefulComparator { + /// Creates a new StatefulComparator with the given configuration. + pub fn new(config: ComparatorConfig) -> Self { + Self { config, live_history: VecDeque::new(), hist_history: VecDeque::new() } + } + + /// Updates the comparator with new aggregated values for live and historical streams, + /// and returns any triggered anomalies based on the updated statistics. + pub fn update_and_compare( + &mut self, + timestamp: f64, + live_val: f64, + hist_val: f64, + ) -> Vec { + // 1. Update History + Self::add_point(&mut self.live_history, timestamp, live_val, self.config.window_size); + Self::add_point(&mut self.hist_history, timestamp, hist_val, self.config.window_size); + + // 2. Calculate Statistics from History + let live_stats = WindowStats::from_window(&self.live_history); + let hist_stats = WindowStats::from_window(&self.hist_history); + + if let (Some(live), Some(hist)) = (live_stats, hist_stats) { + self.compare_stats(&live, &hist) + } else { + Vec::new() // Not enough data yet + } + } + + // Static helper to avoid double borrow issues + fn add_point(history: &mut VecDeque, timestamp: f64, value: f64, max_size: usize) { + if history.len() >= max_size { + history.pop_front(); + } + history.push_back(DataPoint { timestamp, value }); + } + + fn compare_stats(&self, live: &WindowStats, hist: &WindowStats) -> Vec { + let mut results = Vec::new(); + + // 1. Absolute Threshold + let abs_diff = (live.mean - hist.mean).abs(); + if abs_diff > self.config.abs_threshold { + results.push(ComparisonResult::AbsoluteThresholdExceeded { diff: abs_diff }); + } + + // 2. Relative Drop + if hist.mean.abs() > f64::EPSILON { + let rel_change = (live.mean - hist.mean) / hist.mean; + if rel_change > self.config.rel_threshold { + results.push(ComparisonResult::RelativeDropDetected { rel_change }); + } + } + + // 3. Catch-Up Trigger + let lag = hist.mean - live.mean; + if lag > self.config.catchup_trigger { + results.push(ComparisonResult::CatchUpTriggered { lag }); + } + + // 4. Trend Divergence + if (live.slope * hist.slope < 0.0) + && (live.slope.abs() > self.config.slope_epsilon) + && (hist.slope.abs() > self.config.slope_epsilon) + { + results.push(ComparisonResult::TrendDivergence { + live_slope: live.slope, + hist_slope: hist.slope, + }); + } + + // 5. Volatility + if live.std_dev > hist.std_dev + self.config.volatility_buffer { + results.push(ComparisonResult::VolatilityIncrease { + live_sigma: live.std_dev, + hist_sigma: hist.std_dev, + }); + } + + // 6. Outlier Detection + // Check if the latest live value is an outlier compared to historical distribution + if let Some(latest_live) = self.live_history.back() { + if hist.std_dev > f64::EPSILON { + let z_score = (latest_live.value - hist.mean) / hist.std_dev; + if z_score.abs() > self.config.outlier_z_threshold { + results.push(ComparisonResult::LiveOutlierDetected { + value: latest_live.value, + z_score, + }); + } + } + } + + results + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f906880..7facb16 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -4,4 +4,5 @@ pub mod operators { pub mod hs2r; } +pub mod comparator; pub mod live_stream_processing; diff --git a/tests/comparator_test.rs b/tests/comparator_test.rs new file mode 100644 index 0000000..439d922 --- /dev/null +++ b/tests/comparator_test.rs @@ -0,0 +1,106 @@ +use janus::stream::comparator::{ComparatorConfig, DataPoint, StatefulComparator, WindowStats}; +use std::collections::VecDeque; + +#[test] +fn test_window_stats_calculation() { + // y = 2x + 1 + let mut data = VecDeque::new(); + data.push_back(DataPoint { timestamp: 0.0, value: 1.0 }); + data.push_back(DataPoint { timestamp: 1.0, value: 3.0 }); + data.push_back(DataPoint { timestamp: 2.0, value: 5.0 }); + + let stats = WindowStats::from_window(&data).unwrap(); + + assert_eq!(stats.mean, 3.0); + assert_eq!(stats.count, 3); + assert!((stats.slope - 2.0).abs() < 1e-9); + // Population std dev of 1, 3, 5: + // Mean = 3. Variance = ((1-3)^2 + (3-3)^2 + (5-3)^2) / 3 = (4 + 0 + 4) / 3 = 8/3 = 2.666... + // Std Dev = sqrt(2.666...) = 1.63299... + assert!((stats.std_dev - (8.0f64 / 3.0).sqrt()).abs() < 1e-9); +} + +#[test] +fn test_stateful_comparator_triggers() { + let config = ComparatorConfig { + abs_threshold: 10.0, + rel_threshold: 0.5, + catchup_trigger: 5.0, + slope_epsilon: 0.1, + volatility_buffer: 1.0, + window_size: 5, + outlier_z_threshold: 2.0, // Lower threshold for testing + }; + + let mut comparator = StatefulComparator::new(config); + + // Feed data to simulate divergence + // Hist: Stable increasing (100, 101, 102) + // Live: Dropping (90, 89, 88) + + // T=0 + let res0 = comparator.update_and_compare(0.0, 90.0, 100.0); + // Not enough data for slope, but absolute threshold might trigger if we allowed 1-point stats. + // But WindowStats works with 1 point (slope=0, std_dev=0). + // Abs diff = 10. Threshold = 10. 10 > 10 is False. + // Catchup = 10. Trigger = 5. 10 > 5 is True. + assert!(res0.iter().any(|r| matches!( + r, + janus::stream::comparator::ComparisonResult::CatchUpTriggered { .. } + ))); + + // T=1 + let res1 = comparator.update_and_compare(1.0, 89.0, 101.0); + // Hist Slope: (101-100)/1 = 1.0 + // Live Slope: (89-90)/1 = -1.0 + // Divergence should trigger + assert!(res1 + .iter() + .any(|r| matches!(r, janus::stream::comparator::ComparisonResult::TrendDivergence { .. }))); +} + +#[test] +fn test_outlier_detection() { + let config = ComparatorConfig { + abs_threshold: 1000.0, // High threshold to avoid triggering + rel_threshold: 10.0, // High threshold to avoid triggering + catchup_trigger: 1000.0, // High threshold to avoid triggering + slope_epsilon: 10.0, // High threshold to avoid triggering + volatility_buffer: 1000.0, // High threshold to avoid triggering + window_size: 10, + outlier_z_threshold: 2.0, + }; + + let mut comparator = StatefulComparator::new(config); + + // Build stable historical baseline: values around 100.0 + for i in 0..10 { + let hist_val = 100.0 + (i as f64 * 0.1); // 100.0, 100.1, 100.2, ... + comparator.update_and_compare(i as f64, hist_val, hist_val); + } + + // Now feed a normal live value (should not trigger outlier) + let normal_res = comparator.update_and_compare(10.0, 100.5, 100.5); + assert!(!normal_res.iter().any(|r| matches!( + r, + janus::stream::comparator::ComparisonResult::LiveOutlierDetected { .. } + ))); + + // Feed an outlier live value (way above historical mean) + let outlier_res = comparator.update_and_compare(11.0, 150.0, 100.6); // 150.0 is ~5σ above mean + assert!(outlier_res.iter().any(|r| matches!( + r, + janus::stream::comparator::ComparisonResult::LiveOutlierDetected { .. } + ))); + + // Check the z-score is reasonable + if let Some(janus::stream::comparator::ComparisonResult::LiveOutlierDetected { + value, + z_score, + }) = outlier_res.iter().find(|r| { + matches!(r, janus::stream::comparator::ComparisonResult::LiveOutlierDetected { .. }) + }) { + assert_eq!(*value, 150.0); + assert!(z_score > &2.0); // Should be significantly above threshold + } +} diff --git a/tests/dictionary_unit_test.rs b/tests/dictionary_unit_test.rs new file mode 100644 index 0000000..0f2fdd8 --- /dev/null +++ b/tests/dictionary_unit_test.rs @@ -0,0 +1,98 @@ +use janus::core::{Event, RDFEvent}; +use janus::storage::indexing::dictionary::Dictionary; + +#[test] +fn test_dictionary_encoding_decoding() { + let mut dict = Dictionary::new(); + + // Encode some RDF terms + let subject_id = dict.encode("http://example.org/person/Alice"); + let predicate_id = dict.encode("http://example.org/knows"); + let object_id = dict.encode("http://example.org/person/Bob"); + let graph_id = dict.encode("http://example.org/graph1"); + + println!("Encoded IDs:"); + println!("Subject: http://example.org/person/Alice -> {}", subject_id); + println!("Predicate: http://example.org/knows -> {}", predicate_id); + println!("Object: http://example.org/person/Bob -> {}", object_id); + println!("Graph: http://example.org/graph1 -> {}", graph_id); + + // Create an event + let event = Event { + timestamp: 1_234_567_890, + subject: subject_id, + predicate: predicate_id, + object: object_id, + graph: graph_id, + }; + + // Decode the event + let decoded = dict.decode_graph(&event); + println!("\nDecoded event: {}", decoded); + + // Verify individual decodings + assert_eq!(dict.decode(subject_id), Some("http://example.org/person/Alice")); + assert_eq!(dict.decode(predicate_id), Some("http://example.org/knows")); + assert_eq!(dict.decode(object_id), Some("http://example.org/person/Bob")); + assert_eq!(dict.decode(graph_id), Some("http://example.org/graph1")); + + // Test that the decoded string contains the expected format + assert!(decoded.contains("http://example.org/person/Alice")); + assert!(decoded.contains("http://example.org/knows")); + assert!(decoded.contains("http://example.org/person/Bob")); + assert!(decoded.contains("http://example.org/graph1")); + assert!(decoded.contains("1234567890")); +} + +#[test] +fn test_clean_rdf_api() { + let mut dict = Dictionary::new(); + + // Test the clean API - user provides URIs directly + let rdf_event = RDFEvent::new( + 1_234_567_890, + "http://example.org/person/Alice", + "http://example.org/knows", + "http://example.org/person/Bob", + "http://example.org/graph1", + ); + + // Encoding happens internally + let encoded_event = rdf_event.encode(&mut dict); + + // Decoding happens internally + let decoded_event = encoded_event.decode(&dict); + + // Verify the round-trip works + assert_eq!(decoded_event.subject, "http://example.org/person/Alice"); + assert_eq!(decoded_event.predicate, "http://example.org/knows"); + assert_eq!(decoded_event.object, "http://example.org/person/Bob"); + assert_eq!(decoded_event.graph, "http://example.org/graph1"); + assert_eq!(decoded_event.timestamp, 1_234_567_890); + + println!("Clean API test passed!"); + println!( + "Original: {} {} {} in {} at timestamp {}", + rdf_event.subject, + rdf_event.predicate, + rdf_event.object, + rdf_event.graph, + rdf_event.timestamp + ); + println!( + "Encoded IDs: {} {} {} {} at timestamp {}", + encoded_event.subject, + encoded_event.predicate, + encoded_event.object, + encoded_event.graph, + encoded_event.timestamp + ); + println!( + "Decoded: {} {} {} in {} at timestamp {}", + decoded_event.subject, + decoded_event.predicate, + decoded_event.object, + decoded_event.graph, + decoded_event.timestamp + ); +} diff --git a/tests/lib_test.rs b/tests/lib_test.rs new file mode 100644 index 0000000..7b9156e --- /dev/null +++ b/tests/lib_test.rs @@ -0,0 +1,7 @@ +use janus::Error; + +#[test] +fn test_error_display() { + let err = Error::Config("test error".to_string()); + assert_eq!(format!("{}", err), "Configuration error: test error"); +} diff --git a/tests/oxigraph_adapter_test.rs b/tests/oxigraph_adapter_test.rs new file mode 100644 index 0000000..ac5f25b --- /dev/null +++ b/tests/oxigraph_adapter_test.rs @@ -0,0 +1,261 @@ +use janus::querying::oxigraph_adapter::{OxigraphAdapter, OxigraphError}; +use janus::querying::query_processing::SparqlEngine; +use oxigraph::model::{GraphName, Literal, NamedNode, Quad}; +use rsp_rs::QuadContainer; +use std::collections::HashSet; + +/// Helper function to create a test QuadContainer with sample data +fn create_test_container() -> QuadContainer { + let mut quads = HashSet::new(); + + // Add test quads: + let alice = NamedNode::new("http://example.org/alice").unwrap(); + let bob = NamedNode::new("http://example.org/bob").unwrap(); + let charlie = NamedNode::new("http://example.org/charlie").unwrap(); + let knows = NamedNode::new("http://example.org/knows").unwrap(); + let age = NamedNode::new("http://example.org/age").unwrap(); + + // Alice knows Bob + quads.insert(Quad::new(alice.clone(), knows.clone(), bob.clone(), GraphName::DefaultGraph)); + + // Bob knows Charlie + quads.insert(Quad::new(bob.clone(), knows.clone(), charlie.clone(), GraphName::DefaultGraph)); + + // Alice's age + quads.insert(Quad::new( + alice.clone(), + age.clone(), + Literal::new_simple_literal("30"), + GraphName::DefaultGraph, + )); + + // Bob's age + quads.insert(Quad::new( + bob.clone(), + age.clone(), + Literal::new_simple_literal("25"), + GraphName::DefaultGraph, + )); + + QuadContainer::new(quads, 1000) +} + +#[test] +fn test_oxigraph_adapter_creation() { + let _adapter = OxigraphAdapter::new(); + // Adapter created successfully +} + +#[test] +fn test_execute_simple_select_query() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // Query to select all subjects + let query = "SELECT ?s WHERE { ?s ?p ?o }"; + + let results = adapter.execute_query(query, &container); + assert!(results.is_ok(), "Query execution should succeed"); + + let results = results.unwrap(); + assert!(!results.is_empty(), "Results should not be empty"); + assert_eq!(results.len(), 4, "Should return 4 results (4 distinct subjects in quads)"); +} + +#[test] +fn test_execute_select_with_filter() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // Query to select subjects that know someone + let query = r" + PREFIX ex: + SELECT ?s WHERE { + ?s ex:knows ?o + } + "; + + let results = adapter.execute_query(query, &container); + assert!(results.is_ok(), "Query with filter should succeed"); + + let results = results.unwrap(); + assert_eq!(results.len(), 2, "Should return 2 results (Alice and Bob know someone)"); +} + +#[test] +fn test_execute_ask_query() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // ASK query to check if Alice knows Bob + let query = r" + PREFIX ex: + ASK { + ex:alice ex:knows ex:bob + } + "; + + let results = adapter.execute_query(query, &container); + assert!(results.is_ok(), "ASK query should succeed"); + + let results = results.unwrap(); + assert_eq!(results.len(), 1, "ASK query should return one boolean result"); + assert_eq!(results[0], "true", "ASK query should return true"); +} + +#[test] +fn test_execute_ask_query_false() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // ASK query that should return false + let query = r" + PREFIX ex: + ASK { + ex:alice ex:knows ex:charlie + } + "; + + let results = adapter.execute_query(query, &container); + assert!(results.is_ok(), "ASK query should succeed"); + + let results = results.unwrap(); + assert_eq!(results.len(), 1, "ASK query should return one boolean result"); + assert_eq!( + results[0], "false", + "ASK query should return false (Alice doesn't know Charlie directly)" + ); +} + +#[test] +fn test_execute_construct_query() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // CONSTRUCT query to create new triples + let query = r" + PREFIX ex: + CONSTRUCT { + ?s ex:knows ?o + } + WHERE { + ?s ex:knows ?o + } + "; + + let results = adapter.execute_query(query, &container); + assert!(results.is_ok(), "CONSTRUCT query should succeed"); + + let results = results.unwrap(); + assert_eq!(results.len(), 2, "CONSTRUCT should return 2 triples"); +} + +#[test] +fn test_execute_with_empty_container() { + let adapter = OxigraphAdapter::new(); + let empty_container = QuadContainer::new(HashSet::new(), 1000); + + let query = "SELECT ?s WHERE { ?s ?p ?o }"; + + let results = adapter.execute_query(query, &empty_container); + assert!(results.is_ok(), "Query on empty container should succeed"); + + let results = results.unwrap(); + assert!(results.is_empty(), "Results should be empty for empty container"); +} + +#[test] +fn test_execute_invalid_query() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // Invalid SPARQL query + let query = "INVALID SPARQL QUERY"; + + let results = adapter.execute_query(query, &container); + assert!(results.is_err(), "Invalid query should return an error"); + + let error = results.unwrap_err(); + assert!(error.to_string().contains("Oxigraph error"), "Error should be an OxigraphError"); +} + +#[test] +fn test_execute_query_with_literal_filter() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // Query to find people older than 25 + let query = r#" + PREFIX ex: + SELECT ?s ?age WHERE { + ?s ex:age ?age . + FILTER(?age > "25") + } + "#; + + let results = adapter.execute_query(query, &container); + assert!(results.is_ok(), "Query with literal filter should succeed"); + + let results = results.unwrap(); + assert_eq!(results.len(), 1, "Should return 1 result (Alice is 30)"); +} + +#[test] +fn test_execute_count_query() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // Query to count the number of 'knows' relationships + let query = r" + PREFIX ex: + SELECT (COUNT(?s) AS ?count) WHERE { + ?s ex:knows ?o + } + "; + + let results = adapter.execute_query(query, &container); + assert!(results.is_ok(), "COUNT query should succeed"); + + let results = results.unwrap(); + assert_eq!(results.len(), 1, "COUNT query should return 1 result"); +} + +#[test] +fn test_multiple_queries_on_same_adapter() { + let adapter = OxigraphAdapter::new(); + let container = create_test_container(); + + // First query + let query1 = "SELECT ?s WHERE { ?s ?p ?o }"; + let results1 = adapter.execute_query(query1, &container); + assert!(results1.is_ok(), "First query should succeed"); + + // Second query + let query2 = r" + PREFIX ex: + SELECT ?s WHERE { ?s ex:knows ?o } + "; + let results2 = adapter.execute_query(query2, &container); + assert!(results2.is_ok(), "Second query should succeed"); + + // Verify both queries returned results + assert!(!results1.unwrap().is_empty()); + assert!(!results2.unwrap().is_empty()); +} + +#[test] +fn test_oxigraph_error_display() { + let error = + OxigraphError::from(oxigraph::store::StorageError::Other("Test error message".into())); + let error_string = format!("{}", error); + assert!(error_string.contains("Oxigraph error")); + assert!(error_string.contains("Test error message")); +} + +#[test] +fn test_oxigraph_error_from_storage_error() { + // This tests the From implementation for StorageError + // We can't easily create a real StorageError, but we verify the trait is implemented + let error = OxigraphError::from(oxigraph::store::StorageError::Other("test".into())); + assert!(error.to_string().contains("Oxigraph error")); +}