Skip to content

Conversation

@raphael-goetz
Copy link
Member

Resolves: #80

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a cron adapter for the Draco protocol adapter system, enabling scheduled task execution based on cron expressions.

Key Changes:

  • Added new cron adapter that schedules and executes flows based on cron expressions
  • Modified AdapterStore to support fire-and-forget flow execution via a new wait_for_result parameter
  • Updated project status to reflect cron-jobs as under construction

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
adapter/cron/src/main.rs New cron adapter implementation with scheduling logic and flow identification
adapter/cron/Cargo.toml Cargo configuration for the cron adapter package
crates/base/src/store.rs Added wait_for_result parameter to support both request-response and publish patterns; fixed typo in documentation
adapter/rest/src/main.rs Updated to pass true for wait_for_result parameter to maintain existing request-response behavior
Cargo.toml Added cron adapter to workspace members and cron/chrono dependencies
Cargo.lock Lock file updates for new dependencies (chrono, cron, and transitive dependencies)
README.md Updated roadmap to show Cron-Jobs as under construction and removed AMQP entry

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +99 to +101
ctx.adapter_store
.validate_and_execute_flow(flow, None, false)
.await;
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] When multiple cron flows need to execute, they are executed sequentially with await. If any flow execution takes a long time, it will delay subsequent flows. Consider spawning tasks concurrently using tokio::spawn to avoid blocking other scheduled flows from executing on time.

Suggested change
ctx.adapter_store
.validate_and_execute_flow(flow, None, false)
.await;
let adapter_store = ctx.adapter_store.clone();
tokio::spawn(async move {
adapter_store
.validate_and_execute_flow(flow, None, false)
.await;
});

Copilot uses AI. Check for mistakes.
Comment on lines +80 to +84
let now = Utc::now();
if let Some(next) = schedule.upcoming(Utc).take(1).next() {
let until_next = next - now;
tokio::time::sleep(until_next.to_std()?).await;

Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Time struct uses now which is captured before sleeping, but the sleep happens before this struct is created. This means the time used for flow identification will be stale. The now should be captured after the sleep completes, not before.

Currently:

let now = Utc::now();
// ... sleep happens ...
let time = Time { now }; // uses stale time

Should be:

// ... sleep happens ...
let now = Utc::now(); // capture current time after sleep
let time = Time { now };
Suggested change
let now = Utc::now();
if let Some(next) = schedule.upcoming(Utc).take(1).next() {
let until_next = next - now;
tokio::time::sleep(until_next.to_std()?).await;
if let Some(next) = schedule.upcoming(Utc).take(1).next() {
let now = Utc::now();
let until_next = next - now;
tokio::time::sleep(until_next.to_std()?).await;
let now = Utc::now();

Copilot uses AI. Check for mistakes.
Comment on lines +57 to +58
let schedule = Schedule::from_str(expression.as_str()).unwrap();
let next = schedule.upcoming(Utc).next().unwrap();
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using unwrap() on Schedule::from_str() and upcoming().next() can cause panics if the cron expression is invalid or if there are no upcoming times. Consider handling these errors gracefully with proper error messages, especially since the cron expression comes from user-provided flow settings.

Suggested change
let schedule = Schedule::from_str(expression.as_str()).unwrap();
let next = schedule.upcoming(Utc).next().unwrap();
let schedule = match Schedule::from_str(expression.as_str()) {
Ok(s) => s,
Err(_) => {
// Invalid cron expression, cannot identify
return false;
}
};
let next = match schedule.upcoming(Utc).next() {
Some(n) => n,
None => {
// No upcoming times, cannot identify
return false;
}
};

Copilot uses AI. Check for mistakes.
return false;
};

let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow);
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cron expression format is incorrect. The code extracts fields named "minute", "hour", "day_of_month", "month", and "day_of_week", then creates a 6-field expression with a leading asterisk. In the standard 6-field cron format (with seconds), the order is: second minute hour day_of_month month day_of_week.

The expression should be: "0 {} {} {} {} {}" (setting seconds to 0), not "* {} {} {} {} {}" (which would allow any second). Alternatively, if only 5 fields are intended, remove the leading asterisk entirely and use: "{} {} {} {} {}".

The current format "* minute hour dom month dow" creates an ambiguous expression where the asterisk is in the seconds position but the subsequent fields are shifted incorrectly.

Suggested change
let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow);
let expression = format!("0 {} {} {} {} {}", minute, hour, dom, month, dow);

Copilot uses AI. Check for mistakes.
Comment on lines +9 to +10
pub client: async_nats::Client,
pub kv: async_nats::jetstream::kv::Store,
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Making the client and kv fields public exposes internal implementation details. Consider if these need to be public or if access should be provided through dedicated methods to maintain better encapsulation.

Suggested change
pub client: async_nats::Client,
pub kv: async_nats::jetstream::kv::Store,
client: async_nats::Client,
kv: async_nats::jetstream::kv::Store,

Copilot uses AI. Check for mistakes.
match self.client.publish(topic, bytes.into()).await {
Ok(_) => None,
Err(err) => {
log::error!("Failed to send request to NATS server: {:?}", err);
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message "Failed to send request to NATS server" is misleading when using publish instead of request. When wait_for_result is false, the operation is a publish (fire-and-forget), not a request-response. Consider using a more accurate message like "Failed to publish to NATS server".

Suggested change
log::error!("Failed to send request to NATS server: {:?}", err);
log::error!("Failed to publish to NATS server: {:?}", err);

Copilot uses AI. Check for mistakes.
&self,
flow: ValidationFlow,
input_value: Option<Value>,
wait_for_result: bool,
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait_for_result parameter is not documented in the function's documentation comment. Please add documentation explaining when this should be true vs false and what the implications are for the return value.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants