-
Notifications
You must be signed in to change notification settings - Fork 0
Cron Adapter #117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: alpha
Are you sure you want to change the base?
Cron Adapter #117
Changes from all commits
b482bec
ff4501f
bcbc9a5
3b76c00
0af7076
07f4211
81c8d0d
35f839a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| [package] | ||
| name = "cron" | ||
| version.workspace = true | ||
| edition.workspace = true | ||
|
|
||
| [dependencies] | ||
| tokio = {workspace = true} | ||
| chrono = {workspace = true} | ||
| cron = {workspace = true} | ||
| base = {workspace = true} | ||
| tucana = {workspace = true} | ||
| async-trait = {workspace = true} | ||
| anyhow = {workspace = true} |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,112 @@ | ||||||||||||||||||||||||||||||||||
| use async_trait::async_trait; | ||||||||||||||||||||||||||||||||||
| use base::extract_flow_setting_field; | ||||||||||||||||||||||||||||||||||
| use base::runner::{ServerContext, ServerRunner}; | ||||||||||||||||||||||||||||||||||
| use base::store::FlowIdentifyResult; | ||||||||||||||||||||||||||||||||||
| use base::traits::{IdentifiableFlow, LoadConfig, Server}; | ||||||||||||||||||||||||||||||||||
| use chrono::{DateTime, Datelike, Timelike, Utc}; | ||||||||||||||||||||||||||||||||||
| use cron::Schedule; | ||||||||||||||||||||||||||||||||||
| use std::str::FromStr; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| #[derive(Default)] | ||||||||||||||||||||||||||||||||||
| struct Cron {} | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| #[derive(Clone)] | ||||||||||||||||||||||||||||||||||
| struct CronConfig {} | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| impl LoadConfig for CronConfig { | ||||||||||||||||||||||||||||||||||
| fn load() -> Self { | ||||||||||||||||||||||||||||||||||
| Self {} | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| #[tokio::main] | ||||||||||||||||||||||||||||||||||
| async fn main() { | ||||||||||||||||||||||||||||||||||
| let server = Cron::default(); | ||||||||||||||||||||||||||||||||||
| let runner = ServerRunner::new(server).await.unwrap(); | ||||||||||||||||||||||||||||||||||
| runner.serve().await.unwrap(); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| struct Time { | ||||||||||||||||||||||||||||||||||
| now: DateTime<Utc>, | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| impl IdentifiableFlow for Time { | ||||||||||||||||||||||||||||||||||
| fn identify(&self, flow: &tucana::shared::ValidationFlow) -> bool { | ||||||||||||||||||||||||||||||||||
| let Some(minute) = extract_flow_setting_field(&flow.settings, "CRON_MINUTE", "minute") | ||||||||||||||||||||||||||||||||||
| else { | ||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
| let Some(hour) = extract_flow_setting_field(&flow.settings, "CRON_HOUR", "hour") else { | ||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
| let Some(dom) = | ||||||||||||||||||||||||||||||||||
| extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_MONTH", "day_of_month") | ||||||||||||||||||||||||||||||||||
| else { | ||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
| let Some(month) = extract_flow_setting_field(&flow.settings, "CRON_MONTH", "month") else { | ||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
| let Some(dow) = | ||||||||||||||||||||||||||||||||||
| extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_WEEK", "day_of_week") | ||||||||||||||||||||||||||||||||||
| else { | ||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow); | ||||||||||||||||||||||||||||||||||
| let schedule = Schedule::from_str(expression.as_str()).unwrap(); | ||||||||||||||||||||||||||||||||||
| let next = schedule.upcoming(Utc).next().unwrap(); | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+57
to
+58
|
||||||||||||||||||||||||||||||||||
| let schedule = Schedule::from_str(expression.as_str()).unwrap(); | |
| let next = schedule.upcoming(Utc).next().unwrap(); | |
| let schedule = match Schedule::from_str(expression.as_str()) { | |
| Ok(s) => s, | |
| Err(_) => { | |
| // Invalid cron expression, cannot identify | |
| return false; | |
| } | |
| }; | |
| let next = match schedule.upcoming(Utc).next() { | |
| Some(n) => n, | |
| None => { | |
| // No upcoming times, cannot identify | |
| return false; | |
| } | |
| }; |
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Time struct uses now which is captured before sleeping, but the sleep happens before this struct is created. This means the time used for flow identification will be stale. The now should be captured after the sleep completes, not before.
Currently:
let now = Utc::now();
// ... sleep happens ...
let time = Time { now }; // uses stale timeShould be:
// ... sleep happens ...
let now = Utc::now(); // capture current time after sleep
let time = Time { now };| let now = Utc::now(); | |
| if let Some(next) = schedule.upcoming(Utc).take(1).next() { | |
| let until_next = next - now; | |
| tokio::time::sleep(until_next.to_std()?).await; | |
| if let Some(next) = schedule.upcoming(Utc).take(1).next() { | |
| let now = Utc::now(); | |
| let until_next = next - now; | |
| tokio::time::sleep(until_next.to_std()?).await; | |
| let now = Utc::now(); |
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] When multiple cron flows need to execute, they are executed sequentially with await. If any flow execution takes a long time, it will delay subsequent flows. Consider spawning tasks concurrently using tokio::spawn to avoid blocking other scheduled flows from executing on time.
| ctx.adapter_store | |
| .validate_and_execute_flow(flow, None, false) | |
| .await; | |
| let adapter_store = ctx.adapter_store.clone(); | |
| tokio::spawn(async move { | |
| adapter_store | |
| .validate_and_execute_flow(flow, None, false) | |
| .await; | |
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cron expression format is incorrect. The code extracts fields named "minute", "hour", "day_of_month", "month", and "day_of_week", then creates a 6-field expression with a leading asterisk. In the standard 6-field cron format (with seconds), the order is:
second minute hour day_of_month month day_of_week.The expression should be:
"0 {} {} {} {} {}"(setting seconds to 0), not"* {} {} {} {} {}"(which would allow any second). Alternatively, if only 5 fields are intended, remove the leading asterisk entirely and use:"{} {} {} {} {}".The current format
"* minute hour dom month dow"creates an ambiguous expression where the asterisk is in the seconds position but the subsequent fields are shifted incorrectly.