diff --git a/Cargo.lock b/Cargo.lock index 8e8d366..5bcaf24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -253,7 +262,11 @@ version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ + "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", + "windows-link", ] [[package]] @@ -323,6 +336,30 @@ dependencies = [ "libc", ] +[[package]] +name = "cron" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "base", + "chrono", + "cron 0.15.0", + "tokio", + "tucana", +] + +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -748,6 +785,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "1.5.0" @@ -960,9 +1021,9 @@ checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" [[package]] name = "linux-raw-sys" -version = "0.9.3" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" [[package]] name = "litemap" @@ -1410,15 +1471,15 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.5" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" +checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ "bitflags", "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2236,12 +2297,65 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -2333,6 +2447,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/Cargo.toml b/Cargo.toml index 622a623..4124f3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["crates/http", "adapter/rest", "crates/base"] +members = ["crates/http", "adapter/rest", "crates/base", "adapter/cron"] resolver = "3" [workspace.package] @@ -22,6 +22,8 @@ anyhow = "1.0.98" prost = "0.14.0" tonic-health = "0.14.0" futures-lite = "2.6.1" +chrono = "0.4.42" +cron = "0.15.0" [workspace.dependencies.http] path = "../draco/crates/http" diff --git a/README.md b/README.md index afb6697..6b829d5 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,7 @@ See: [Installation]() |----------|--------| | HTTP | 🚧 | | MQTT | 📝 | -| AMQP | 📝 | -| Cron-Jobs | 📝 | +| Cron-Jobs | 🚧 | **Legend:** - ✅ Done: Fully implemented and ready to use diff --git a/adapter/cron/Cargo.toml b/adapter/cron/Cargo.toml new file mode 100644 index 0000000..0d348f5 --- /dev/null +++ b/adapter/cron/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "cron" +version.workspace = true +edition.workspace = true + +[dependencies] +tokio = {workspace = true} +chrono = {workspace = true} +cron = {workspace = true} +base = {workspace = true} +tucana = {workspace = true} +async-trait = {workspace = true} +anyhow = {workspace = true} \ No newline at end of file diff --git a/adapter/cron/src/main.rs b/adapter/cron/src/main.rs new file mode 100644 index 0000000..f29dda6 --- /dev/null +++ b/adapter/cron/src/main.rs @@ -0,0 +1,112 @@ +use async_trait::async_trait; +use base::extract_flow_setting_field; +use base::runner::{ServerContext, ServerRunner}; +use base::store::FlowIdentifyResult; +use base::traits::{IdentifiableFlow, LoadConfig, Server}; +use chrono::{DateTime, Datelike, Timelike, Utc}; +use cron::Schedule; +use std::str::FromStr; + +#[derive(Default)] +struct Cron {} + +#[derive(Clone)] +struct CronConfig {} + +impl LoadConfig for CronConfig { + fn load() -> Self { + Self {} + } +} + +#[tokio::main] +async fn main() { + let server = Cron::default(); + let runner = ServerRunner::new(server).await.unwrap(); + runner.serve().await.unwrap(); +} + +struct Time { + now: DateTime, +} + +impl IdentifiableFlow for Time { + fn identify(&self, flow: &tucana::shared::ValidationFlow) -> bool { + let Some(minute) = extract_flow_setting_field(&flow.settings, "CRON_MINUTE", "minute") + else { + return false; + }; + let Some(hour) = extract_flow_setting_field(&flow.settings, "CRON_HOUR", "hour") else { + return false; + }; + let Some(dom) = + extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_MONTH", "day_of_month") + else { + return false; + }; + let Some(month) = extract_flow_setting_field(&flow.settings, "CRON_MONTH", "month") else { + return false; + }; + let Some(dow) = + extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_WEEK", "day_of_week") + else { + return false; + }; + + let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow); + let schedule = Schedule::from_str(expression.as_str()).unwrap(); + let next = schedule.upcoming(Utc).next().unwrap(); + + self.now.year() == next.year() + && self.now.month() == next.month() + && self.now.day() == next.day() + && self.now.hour() == next.hour() + && self.now.minute() == next.minute() + } +} + +#[async_trait] +impl Server for Cron { + async fn init(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { + Ok(()) + } + + async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { + let expression = "0 * * * * *"; + let schedule = Schedule::from_str(expression)?; + let pattern = "*.*.CRON.*"; + + loop { + let now = Utc::now(); + if let Some(next) = schedule.upcoming(Utc).take(1).next() { + let until_next = next - now; + tokio::time::sleep(until_next.to_std()?).await; + + let time = Time { now }; + match ctx + .adapter_store + .get_possible_flow_match(pattern.to_string(), time) + .await + { + FlowIdentifyResult::None => {} + FlowIdentifyResult::Single(flow) => { + ctx.adapter_store + .validate_and_execute_flow(flow, None, false) + .await; + } + FlowIdentifyResult::Multiple(flows) => { + for flow in flows { + ctx.adapter_store + .validate_and_execute_flow(flow, None, false) + .await; + } + } + } + } + } + } + + async fn shutdown(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { + Ok(()) + } +} diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 31eec45..8c9310f 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -96,7 +96,10 @@ async fn execute_flow( request: HttpRequest, store: Arc, ) -> Option { - match store.validate_and_execute_flow(flow, request.body).await { + match store + .validate_and_execute_flow(flow, request.body, true) + .await + { Some(result) => { let Value { kind: Some(StructValue(Struct { fields })), diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index 872e440..6c8ee84 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -6,8 +6,8 @@ use prost::Message; use tucana::shared::{ExecutionFlow, ValidationFlow, Value}; pub struct AdapterStore { - client: async_nats::Client, - kv: async_nats::jetstream::kv::Store, + pub client: async_nats::Client, + pub kv: async_nats::jetstream::kv::Store, } pub enum FlowIdentifyResult { @@ -56,7 +56,7 @@ impl AdapterStore { /// - id: The identifier to use for identifying the possible matches. Its just a fine grain identifier that can be used to identify the possible matches. For a REST Flow this will be the regex matcher, for a CRON Flow the trait just return true every time. /// /// Returns: - /// - FlowIdenfiyResult: The result of the flow identification process. This can be one of the following: + /// - FlowIdentifyResult: The result of the flow identification process. This can be one of the following: /// - None: No flows matched the identifier. /// - Single(ValidationFlow): A single flow matched the identifier. /// - Multiple(Vec): Multiple flows matched the identifier. @@ -112,6 +112,7 @@ impl AdapterStore { &self, flow: ValidationFlow, input_value: Option, + wait_for_result: bool, ) -> Option { if let Some(body) = input_value.clone() { let verify_result = verify_flow(flow.clone(), body); @@ -124,23 +125,32 @@ impl AdapterStore { }; } - let uuid = uuid::Uuid::new_v4().to_string(); let execution_flow: ExecutionFlow = Self::convert_validation_flow(flow, input_value); - let bytes = execution_flow.encode_to_vec(); + let uuid = uuid::Uuid::new_v4().to_string(); let topic = format!("execution.{}", uuid); - let result = self.client.request(topic, bytes.into()).await; + let bytes = execution_flow.encode_to_vec(); - match result { - Ok(message) => match Value::decode(message.payload) { - Ok(value) => Some(value), + if wait_for_result { + match self.client.request(topic, bytes.into()).await { + Ok(message) => match Value::decode(message.payload) { + Ok(value) => Some(value), + Err(err) => { + log::error!("Failed to decode response from NATS server: {:?}", err); + None + } + }, Err(err) => { - log::error!("Failed to decode response from NATS server: {:?}", err); + log::error!("Failed to send request to NATS server: {:?}", err); + None + } + } + } else { + match self.client.publish(topic, bytes.into()).await { + Ok(_) => None, + Err(err) => { + log::error!("Failed to send request to NATS server: {:?}", err); None } - }, - Err(err) => { - log::error!("Failed to send request to NATS server: {:?}", err); - None } } }