-
Notifications
You must be signed in to change notification settings - Fork 0
Cron Adapter #117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: alpha
Are you sure you want to change the base?
Cron Adapter #117
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a cron adapter for the Draco protocol adapter system, enabling scheduled task execution based on cron expressions.
Key Changes:
- Added new cron adapter that schedules and executes flows based on cron expressions
- Modified
AdapterStoreto support fire-and-forget flow execution via a newwait_for_resultparameter - Updated project status to reflect cron-jobs as under construction
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| adapter/cron/src/main.rs | New cron adapter implementation with scheduling logic and flow identification |
| adapter/cron/Cargo.toml | Cargo configuration for the cron adapter package |
| crates/base/src/store.rs | Added wait_for_result parameter to support both request-response and publish patterns; fixed typo in documentation |
| adapter/rest/src/main.rs | Updated to pass true for wait_for_result parameter to maintain existing request-response behavior |
| Cargo.toml | Added cron adapter to workspace members and cron/chrono dependencies |
| Cargo.lock | Lock file updates for new dependencies (chrono, cron, and transitive dependencies) |
| README.md | Updated roadmap to show Cron-Jobs as under construction and removed AMQP entry |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ctx.adapter_store | ||
| .validate_and_execute_flow(flow, None, false) | ||
| .await; |
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] When multiple cron flows need to execute, they are executed sequentially with await. If any flow execution takes a long time, it will delay subsequent flows. Consider spawning tasks concurrently using tokio::spawn to avoid blocking other scheduled flows from executing on time.
| ctx.adapter_store | |
| .validate_and_execute_flow(flow, None, false) | |
| .await; | |
| let adapter_store = ctx.adapter_store.clone(); | |
| tokio::spawn(async move { | |
| adapter_store | |
| .validate_and_execute_flow(flow, None, false) | |
| .await; | |
| }); |
| let now = Utc::now(); | ||
| if let Some(next) = schedule.upcoming(Utc).take(1).next() { | ||
| let until_next = next - now; | ||
| tokio::time::sleep(until_next.to_std()?).await; | ||
|
|
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Time struct uses now which is captured before sleeping, but the sleep happens before this struct is created. This means the time used for flow identification will be stale. The now should be captured after the sleep completes, not before.
Currently:
let now = Utc::now();
// ... sleep happens ...
let time = Time { now }; // uses stale timeShould be:
// ... sleep happens ...
let now = Utc::now(); // capture current time after sleep
let time = Time { now };| let now = Utc::now(); | |
| if let Some(next) = schedule.upcoming(Utc).take(1).next() { | |
| let until_next = next - now; | |
| tokio::time::sleep(until_next.to_std()?).await; | |
| if let Some(next) = schedule.upcoming(Utc).take(1).next() { | |
| let now = Utc::now(); | |
| let until_next = next - now; | |
| tokio::time::sleep(until_next.to_std()?).await; | |
| let now = Utc::now(); |
| let schedule = Schedule::from_str(expression.as_str()).unwrap(); | ||
| let next = schedule.upcoming(Utc).next().unwrap(); |
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using unwrap() on Schedule::from_str() and upcoming().next() can cause panics if the cron expression is invalid or if there are no upcoming times. Consider handling these errors gracefully with proper error messages, especially since the cron expression comes from user-provided flow settings.
| let schedule = Schedule::from_str(expression.as_str()).unwrap(); | |
| let next = schedule.upcoming(Utc).next().unwrap(); | |
| let schedule = match Schedule::from_str(expression.as_str()) { | |
| Ok(s) => s, | |
| Err(_) => { | |
| // Invalid cron expression, cannot identify | |
| return false; | |
| } | |
| }; | |
| let next = match schedule.upcoming(Utc).next() { | |
| Some(n) => n, | |
| None => { | |
| // No upcoming times, cannot identify | |
| return false; | |
| } | |
| }; |
| return false; | ||
| }; | ||
|
|
||
| let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow); |
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cron expression format is incorrect. The code extracts fields named "minute", "hour", "day_of_month", "month", and "day_of_week", then creates a 6-field expression with a leading asterisk. In the standard 6-field cron format (with seconds), the order is: second minute hour day_of_month month day_of_week.
The expression should be: "0 {} {} {} {} {}" (setting seconds to 0), not "* {} {} {} {} {}" (which would allow any second). Alternatively, if only 5 fields are intended, remove the leading asterisk entirely and use: "{} {} {} {} {}".
The current format "* minute hour dom month dow" creates an ambiguous expression where the asterisk is in the seconds position but the subsequent fields are shifted incorrectly.
| let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow); | |
| let expression = format!("0 {} {} {} {} {}", minute, hour, dom, month, dow); |
| pub client: async_nats::Client, | ||
| pub kv: async_nats::jetstream::kv::Store, |
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Making the client and kv fields public exposes internal implementation details. Consider if these need to be public or if access should be provided through dedicated methods to maintain better encapsulation.
| pub client: async_nats::Client, | |
| pub kv: async_nats::jetstream::kv::Store, | |
| client: async_nats::Client, | |
| kv: async_nats::jetstream::kv::Store, |
| match self.client.publish(topic, bytes.into()).await { | ||
| Ok(_) => None, | ||
| Err(err) => { | ||
| log::error!("Failed to send request to NATS server: {:?}", err); |
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message "Failed to send request to NATS server" is misleading when using publish instead of request. When wait_for_result is false, the operation is a publish (fire-and-forget), not a request-response. Consider using a more accurate message like "Failed to publish to NATS server".
| log::error!("Failed to send request to NATS server: {:?}", err); | |
| log::error!("Failed to publish to NATS server: {:?}", err); |
| &self, | ||
| flow: ValidationFlow, | ||
| input_value: Option<Value>, | ||
| wait_for_result: bool, |
Copilot
AI
Nov 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wait_for_result parameter is not documented in the function's documentation comment. Please add documentation explaining when this should be true vs false and what the implications are for the return value.
Resolves: #80