Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 128 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["crates/http", "adapter/rest", "crates/base"]
members = ["crates/http", "adapter/rest", "crates/base", "adapter/cron"]
resolver = "3"

[workspace.package]
Expand All @@ -22,6 +22,8 @@ anyhow = "1.0.98"
prost = "0.14.0"
tonic-health = "0.14.0"
futures-lite = "2.6.1"
chrono = "0.4.42"
cron = "0.15.0"

[workspace.dependencies.http]
path = "../draco/crates/http"
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ See: [Installation]()
|----------|--------|
| HTTP | 🚧 |
| MQTT | 📝 |
| AMQP | 📝 |
| Cron-Jobs | 📝 |
| Cron-Jobs | 🚧 |

**Legend:**
- ✅ Done: Fully implemented and ready to use
Expand Down
13 changes: 13 additions & 0 deletions adapter/cron/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "cron"
version.workspace = true
edition.workspace = true

[dependencies]
tokio = {workspace = true}
chrono = {workspace = true}
cron = {workspace = true}
base = {workspace = true}
tucana = {workspace = true}
async-trait = {workspace = true}
anyhow = {workspace = true}
112 changes: 112 additions & 0 deletions adapter/cron/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use async_trait::async_trait;
use base::extract_flow_setting_field;
use base::runner::{ServerContext, ServerRunner};
use base::store::FlowIdentifyResult;
use base::traits::{IdentifiableFlow, LoadConfig, Server};
use chrono::{DateTime, Datelike, Timelike, Utc};
use cron::Schedule;
use std::str::FromStr;

#[derive(Default)]
struct Cron {}

#[derive(Clone)]
struct CronConfig {}

impl LoadConfig for CronConfig {
fn load() -> Self {
Self {}
}
}

#[tokio::main]
async fn main() {
let server = Cron::default();
let runner = ServerRunner::new(server).await.unwrap();
runner.serve().await.unwrap();
}

struct Time {
now: DateTime<Utc>,
}

impl IdentifiableFlow for Time {
fn identify(&self, flow: &tucana::shared::ValidationFlow) -> bool {
let Some(minute) = extract_flow_setting_field(&flow.settings, "CRON_MINUTE", "minute")
else {
return false;
};
let Some(hour) = extract_flow_setting_field(&flow.settings, "CRON_HOUR", "hour") else {
return false;
};
let Some(dom) =
extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_MONTH", "day_of_month")
else {
return false;
};
let Some(month) = extract_flow_setting_field(&flow.settings, "CRON_MONTH", "month") else {
return false;
};
let Some(dow) =
extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_WEEK", "day_of_week")
else {
return false;
};

let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow);
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cron expression format is incorrect. The code extracts fields named "minute", "hour", "day_of_month", "month", and "day_of_week", then creates a 6-field expression with a leading asterisk. In the standard 6-field cron format (with seconds), the order is: second minute hour day_of_month month day_of_week.

The expression should be: "0 {} {} {} {} {}" (setting seconds to 0), not "* {} {} {} {} {}" (which would allow any second). Alternatively, if only 5 fields are intended, remove the leading asterisk entirely and use: "{} {} {} {} {}".

The current format "* minute hour dom month dow" creates an ambiguous expression where the asterisk is in the seconds position but the subsequent fields are shifted incorrectly.

Suggested change
let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow);
let expression = format!("0 {} {} {} {} {}", minute, hour, dom, month, dow);

Copilot uses AI. Check for mistakes.
let schedule = Schedule::from_str(expression.as_str()).unwrap();
let next = schedule.upcoming(Utc).next().unwrap();
Comment on lines +57 to +58
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using unwrap() on Schedule::from_str() and upcoming().next() can cause panics if the cron expression is invalid or if there are no upcoming times. Consider handling these errors gracefully with proper error messages, especially since the cron expression comes from user-provided flow settings.

Suggested change
let schedule = Schedule::from_str(expression.as_str()).unwrap();
let next = schedule.upcoming(Utc).next().unwrap();
let schedule = match Schedule::from_str(expression.as_str()) {
Ok(s) => s,
Err(_) => {
// Invalid cron expression, cannot identify
return false;
}
};
let next = match schedule.upcoming(Utc).next() {
Some(n) => n,
None => {
// No upcoming times, cannot identify
return false;
}
};

Copilot uses AI. Check for mistakes.

self.now.year() == next.year()
&& self.now.month() == next.month()
&& self.now.day() == next.day()
&& self.now.hour() == next.hour()
&& self.now.minute() == next.minute()
}
}

#[async_trait]
impl Server<CronConfig> for Cron {
async fn init(&mut self, _ctx: &ServerContext<CronConfig>) -> anyhow::Result<()> {
Ok(())
}

async fn run(&mut self, ctx: &ServerContext<CronConfig>) -> anyhow::Result<()> {
let expression = "0 * * * * *";
let schedule = Schedule::from_str(expression)?;
let pattern = "*.*.CRON.*";

loop {
let now = Utc::now();
if let Some(next) = schedule.upcoming(Utc).take(1).next() {
let until_next = next - now;
tokio::time::sleep(until_next.to_std()?).await;

Comment on lines +80 to +84
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Time struct uses now which is captured before sleeping, but the sleep happens before this struct is created. This means the time used for flow identification will be stale. The now should be captured after the sleep completes, not before.

Currently:

let now = Utc::now();
// ... sleep happens ...
let time = Time { now }; // uses stale time

Should be:

// ... sleep happens ...
let now = Utc::now(); // capture current time after sleep
let time = Time { now };
Suggested change
let now = Utc::now();
if let Some(next) = schedule.upcoming(Utc).take(1).next() {
let until_next = next - now;
tokio::time::sleep(until_next.to_std()?).await;
if let Some(next) = schedule.upcoming(Utc).take(1).next() {
let now = Utc::now();
let until_next = next - now;
tokio::time::sleep(until_next.to_std()?).await;
let now = Utc::now();

Copilot uses AI. Check for mistakes.
let time = Time { now };
match ctx
.adapter_store
.get_possible_flow_match(pattern.to_string(), time)
.await
{
FlowIdentifyResult::None => {}
FlowIdentifyResult::Single(flow) => {
ctx.adapter_store
.validate_and_execute_flow(flow, None, false)
.await;
}
FlowIdentifyResult::Multiple(flows) => {
for flow in flows {
ctx.adapter_store
.validate_and_execute_flow(flow, None, false)
.await;
Comment on lines +99 to +101
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] When multiple cron flows need to execute, they are executed sequentially with await. If any flow execution takes a long time, it will delay subsequent flows. Consider spawning tasks concurrently using tokio::spawn to avoid blocking other scheduled flows from executing on time.

Suggested change
ctx.adapter_store
.validate_and_execute_flow(flow, None, false)
.await;
let adapter_store = ctx.adapter_store.clone();
tokio::spawn(async move {
adapter_store
.validate_and_execute_flow(flow, None, false)
.await;
});

Copilot uses AI. Check for mistakes.
}
}
}
}
}
}

async fn shutdown(&mut self, _ctx: &ServerContext<CronConfig>) -> anyhow::Result<()> {
Ok(())
}
}
5 changes: 4 additions & 1 deletion adapter/rest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ async fn execute_flow(
request: HttpRequest,
store: Arc<base::store::AdapterStore>,
) -> Option<HttpResponse> {
match store.validate_and_execute_flow(flow, request.body).await {
match store
.validate_and_execute_flow(flow, request.body, true)
.await
{
Some(result) => {
let Value {
kind: Some(StructValue(Struct { fields })),
Expand Down
Loading