Skip to content

Commit cba27dd

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Add scheduler stats to axiom/datadog integration. (#38288)
Add to function event and rename scheduler_lag event to scheduler_stats (keeping the old event type for backwards compatibility). Also updated docs. GitOrigin-RevId: 6310ad16c8dabc2c7925c34909abc452fe9720cf
1 parent 643ac23 commit cba27dd

File tree

4 files changed

+92
-31
lines changed

4 files changed

+92
-31
lines changed

crates/application/src/function_log.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use common::{
3535
FunctionEventSource,
3636
LogEvent,
3737
LogSender,
38+
SchedulerInfo,
3839
StructuredLogEvent,
3940
},
4041
runtime::{
@@ -163,6 +164,7 @@ pub struct FunctionExecution {
163164
// Number of retries prior to a successful execution. Only applicable for mutations.
164165
pub mutation_retry_count: Option<usize>,
165166

167+
// If this execution resulted in an OCC error, this will be Some.
166168
pub occ_info: Option<OccInfo>,
167169
}
168170

@@ -265,6 +267,7 @@ impl FunctionExecution {
265267
}),
266268
None => None,
267269
},
270+
scheduler_info: self.caller.parent_scheduled_job().map(|_| SchedulerInfo {}),
268271
usage_stats: log_streaming::AggregatedFunctionUsageStats {
269272
database_read_bytes: self.usage_stats.database_read_bytes,
270273
database_write_bytes: self.usage_stats.database_write_bytes,
@@ -1111,11 +1114,16 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
11111114

11121115
/// Indicates that as of now (`timestamp`), the next scheduled job is at
11131116
/// `next_job_ts` (None if there are no pending jobs)
1114-
pub fn log_scheduled_job_lag(&self, next_job_ts: Option<SystemTime>, timestamp: SystemTime) {
1115-
if let Err(mut e) = self
1116-
.inner
1117-
.lock()
1118-
.log_scheduled_job_lag(next_job_ts, timestamp)
1117+
pub fn log_scheduled_job_stats(
1118+
&self,
1119+
next_job_ts: Option<SystemTime>,
1120+
timestamp: SystemTime,
1121+
num_running_jobs: u64,
1122+
) {
1123+
if let Err(mut e) =
1124+
self.inner
1125+
.lock()
1126+
.log_scheduled_job_stats(next_job_ts, timestamp, num_running_jobs)
11191127
{
11201128
report_error_sync(&mut e);
11211129
}
@@ -1670,10 +1678,11 @@ impl<RT: Runtime> Inner<RT> {
16701678
Ok(())
16711679
}
16721680

1673-
fn log_scheduled_job_lag(
1681+
fn log_scheduled_job_stats(
16741682
&mut self,
16751683
next_job_ts: Option<SystemTime>,
16761684
now: SystemTime,
1685+
num_running_jobs: u64,
16771686
) -> anyhow::Result<()> {
16781687
let name = scheduled_job_next_ts_metric();
16791688
// -Infinity means there is no scheduled job
@@ -1686,6 +1695,15 @@ impl<RT: Runtime> Inner<RT> {
16861695
},
16871696
}]);
16881697
}
1698+
if value > 0.0 || num_running_jobs > 0 {
1699+
self.log_manager.send_logs(vec![LogEvent {
1700+
timestamp: UnixTimestamp::from_system_time(now).context("now < UNIX_EPOCH?")?,
1701+
event: StructuredLogEvent::SchedulerStats {
1702+
lag_seconds: Duration::from_secs_f32(value.max(0.0)),
1703+
num_running_jobs,
1704+
},
1705+
}]);
1706+
}
16891707
match self.metrics.add_gauge(name, now, value) {
16901708
Ok(()) => (),
16911709
Err(UdfMetricsError::SamplePrecedesCutoff { ts: _, cutoff }) => {

crates/application/src/scheduled_jobs/mod.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ pub struct ScheduledJobExecutor<RT: Runtime> {
157157
next_job_ready_time: Option<Timestamp>,
158158
job_finished_tx: mpsc::Sender<ResolvedDocumentId>,
159159
job_finished_rx: mpsc::Receiver<ResolvedDocumentId>,
160+
/// The last time we logged stats, used to rate limit logging
161+
last_stats_log: SystemTime,
160162
}
161163

162164
#[derive(Clone)]
@@ -196,7 +198,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
196198
mpsc::channel(*SCHEDULED_JOB_EXECUTION_PARALLELISM);
197199
let mut executor = Self {
198200
context: ScheduledJobContext {
199-
rt,
201+
rt: rt.clone(),
200202
database,
201203
runner,
202204
function_log,
@@ -206,6 +208,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
206208
next_job_ready_time: None,
207209
job_finished_tx,
208210
job_finished_rx,
211+
last_stats_log: rt.system_time(),
209212
};
210213
let mut backoff = Backoff::new(*SCHEDULED_JOB_INITIAL_BACKOFF, *SCHEDULED_JOB_MAX_BACKOFF);
211214
tracing::info!("Starting scheduled job executor");
@@ -244,11 +247,13 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
244247
self.query_and_start_jobs(&mut tx).await?
245248
};
246249

247-
metrics::log_num_running_jobs(self.running_job_ids.len());
248250
let now = self.context.rt.system_time();
249251
let next_job_ready_time = self.next_job_ready_time.map(SystemTime::from);
250-
self.context
251-
.log_scheduled_job_execution_lag(next_job_ready_time, now);
252+
// Only log stats if at least 30 seconds have elapsed since the last log
253+
if now.duration_since(self.last_stats_log).unwrap_or_default() >= Duration::from_secs(30) {
254+
self.log_scheduled_job_stats(next_job_ready_time, now);
255+
self.last_stats_log = now;
256+
}
252257
let next_job_future = if let Some(next_job_ts) = next_job_ready_time {
253258
let wait_time = next_job_ts.duration_since(now).unwrap_or_else(|_| {
254259
// If we're behind, re-run this loop every 5 seconds to log the gauge above and
@@ -287,6 +292,22 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
287292
Ok(())
288293
}
289294

295+
fn log_scheduled_job_stats(&self, next_job_ready_time: Option<SystemTime>, now: SystemTime) {
296+
metrics::log_num_running_jobs(self.running_job_ids.len());
297+
if let Some(next_job_ts) = next_job_ready_time {
298+
metrics::log_scheduled_job_execution_lag(
299+
now.duration_since(next_job_ts).unwrap_or(Duration::ZERO),
300+
);
301+
} else {
302+
metrics::log_scheduled_job_execution_lag(Duration::ZERO);
303+
}
304+
self.context.function_log.log_scheduled_job_stats(
305+
next_job_ready_time,
306+
now,
307+
self.running_job_ids.len() as u64,
308+
);
309+
}
310+
290311
/// Reads through scheduled jobs in timestamp ascending order and starts any
291312
/// that are allowed by our concurrency limit and the jobs' scheduled
292313
/// time.
@@ -393,22 +414,6 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
393414
}
394415
}
395416

396-
fn log_scheduled_job_execution_lag(
397-
&self,
398-
next_job_ready_time: Option<SystemTime>,
399-
now: SystemTime,
400-
) {
401-
if let Some(next_job_ts) = next_job_ready_time {
402-
metrics::log_scheduled_job_execution_lag(
403-
now.duration_since(next_job_ts).unwrap_or(Duration::ZERO),
404-
);
405-
} else {
406-
metrics::log_scheduled_job_execution_lag(Duration::ZERO);
407-
}
408-
self.function_log
409-
.log_scheduled_job_lag(next_job_ready_time, now);
410-
}
411-
412417
// This handles re-running the scheduled function on transient errors. It
413418
// guarantees that the job was successfully run or the job state changed.
414419
pub async fn execute_job(&self, job: ScheduledJob, job_id: ResolvedDocumentId) {

crates/common/src/log_streaming.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ pub struct OccInfo {
8484
pub retry_count: u64,
8585
}
8686

87+
// Nothing yet. Can add information like parent scheduled job, scheduler lag,
88+
// etc.
89+
#[derive(Serialize, Debug, Clone)]
90+
pub struct SchedulerInfo {}
91+
8792
#[derive(Debug, Clone)]
8893
pub enum StructuredLogEvent {
8994
/// Topic for verification logs. These are issued on sink startup and are
@@ -103,6 +108,7 @@ pub enum StructuredLogEvent {
103108
execution_time: Duration,
104109
usage_stats: AggregatedFunctionUsageStats,
105110
occ_info: Option<OccInfo>,
111+
scheduler_info: Option<SchedulerInfo>,
106112
},
107113
/// Topic for exceptions. These happen when a UDF raises an exception from
108114
/// JS
@@ -118,6 +124,12 @@ pub enum StructuredLogEvent {
118124
action: String,
119125
metadata: serde_json::Map<String, JsonValue>,
120126
},
127+
/// Topic for global stats from the scheduler. For function-specific stats,
128+
/// look in FunctionExecution
129+
SchedulerStats {
130+
lag_seconds: Duration,
131+
num_running_jobs: u64,
132+
},
121133
ScheduledJobLag {
122134
lag_seconds: Duration,
123135
},
@@ -254,7 +266,8 @@ impl LogEvent {
254266
error,
255267
execution_time,
256268
usage_stats,
257-
..
269+
occ_info: _,
270+
scheduler_info: _,
258271
} => {
259272
let (reason, status) = match error {
260273
Some(err) => (Some(err.to_string()), "failure"),
@@ -307,6 +320,14 @@ impl LogEvent {
307320
"actionMetadata": metadata
308321
})
309322
},
323+
StructuredLogEvent::SchedulerStats {
324+
lag_seconds,
325+
num_running_jobs,
326+
} => serialize_map!({
327+
"_timestamp": ms,
328+
"_topic": "_scheduler_stats",
329+
"lag_seconds": lag_seconds.as_secs(), "num_running_jobs": num_running_jobs
330+
}),
310331
StructuredLogEvent::ScheduledJobLag { lag_seconds } => {
311332
serialize_map!({
312333
"_timestamp": ms,
@@ -351,6 +372,7 @@ impl LogEvent {
351372
execution_time,
352373
usage_stats,
353374
occ_info,
375+
scheduler_info,
354376
} => {
355377
let function_source = source.to_json_map();
356378
let (status, error_message) = match error {
@@ -376,6 +398,7 @@ impl LogEvent {
376398
"status": status,
377399
"error_message": error_message,
378400
"occ_info": occ_info,
401+
"scheduler_info": scheduler_info,
379402
"usage": Usage {
380403
database_read_bytes: usage_stats.database_read_bytes,
381404
database_write_bytes: usage_stats.database_write_bytes,
@@ -422,6 +445,17 @@ impl LogEvent {
422445
"audit_log_metadata": serde_json::to_string(metadata).map_err(serde::ser::Error::custom)?
423446
})
424447
},
448+
StructuredLogEvent::SchedulerStats {
449+
lag_seconds,
450+
num_running_jobs,
451+
} => {
452+
serialize_map!({
453+
"topic": "scheduler_stats",
454+
"timestamp": ms,
455+
"lag_seconds": lag_seconds.as_secs(),
456+
"num_running_jobs": num_running_jobs
457+
})
458+
},
425459
StructuredLogEvent::ScheduledJobLag { lag_seconds } => {
426460
serialize_map!({
427461
"timestamp": ms,

npm-packages/docs/docs/production/integrations/log-streams/log-streams.mdx

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ Schema:
164164
`table_name`
165165
- `retry_count`: the number of previously failed attempts before the current
166166
function execution
167+
- `scheduler_info`: object, if set, indicates that the function execution was
168+
originally invoked by a scheduled job either directly, or via a subfunction
169+
execution.
167170
- `usage`:
168171
- `database_read_bytes`: number
169172
- `database_write_bytes`: number, this and `database_read_bytes` make up the
@@ -220,17 +223,18 @@ The following fields are added under `function` for all `console` and
220223
[request ID](/functions/debugging.mdx#finding-relevant-logs-by-request-id) of
221224
the function.
222225

223-
### `scheduled_job_lag` events
226+
### `scheduler_stats` events
224227

225-
These events are periodically sent by the scheduler reporting the delay in
226-
executing your scheduled functions.
228+
These events are periodically sent by the scheduler reporting statistics from
229+
the scheduled function executor.
227230

228231
Schema:
229232

230-
- `topic`: `"scheduled_job_lag"`
233+
- `topic`: `"scheduler_stats"`
231234
- `timestamp`: Unix epoch timestamp in milliseconds
232235
- `lag_seconds`: The difference between `timestamp` and the scheduled run time
233236
of the oldest overdue scheduled job, in seconds.
237+
- `num_running_jobs`: number, the number of scheduled jobs currently running
234238

235239
### `audit_log` events
236240

0 commit comments

Comments
 (0)