Skip to content

Commit dd43d4d

Browse files
committed
chore: add op name to logs (#3564)
1 parent b48d994 commit dd43d4d

File tree

4 files changed

+43
-35
lines changed

4 files changed

+43
-35
lines changed

engine/packages/gasoline/src/ctx/common.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ where
109109

110110
let res = tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input))
111111
.await
112-
.map_err(|_| WorkflowError::OperationTimeout(0))
113-
.map(|res| res.map_err(WorkflowError::OperationFailure));
112+
.map_err(|_| WorkflowError::OperationTimeout(I::Operation::NAME, 0))
113+
.map(|res| res.map_err(|err| WorkflowError::OperationFailure(I::Operation::NAME, err)));
114114

115115
// Record metrics
116116
{

engine/packages/gasoline/src/ctx/workflow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,12 +618,12 @@ impl WorkflowCtx {
618618
WorkflowError::ActivityTimeout(error_count)
619619
}
620620
}
621-
WorkflowError::OperationTimeout(_) => {
621+
WorkflowError::OperationTimeout(op_name, _) => {
622622
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
623623
WorkflowError::ActivityMaxFailuresReached(err.into())
624624
} else {
625625
// Add error count to the error for backoff calculation
626-
WorkflowError::OperationTimeout(error_count)
626+
WorkflowError::OperationTimeout(op_name, error_count)
627627
}
628628
}
629629
_ => err,

engine/packages/gasoline/src/error.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ pub enum WorkflowError {
1919
#[error("activity failure, max retries reached: {0:?}")]
2020
ActivityMaxFailuresReached(#[source] anyhow::Error),
2121

22-
#[error("operation failure: {0:?}")]
23-
OperationFailure(#[source] anyhow::Error),
22+
#[error("operation failure ({0}): {1:?}")]
23+
OperationFailure(&'static str, #[source] anyhow::Error),
2424

2525
#[error("workflow missing from registry: {0}")]
2626
WorkflowMissingFromRegistry(String),
@@ -150,8 +150,8 @@ pub enum WorkflowError {
150150
ActivityTimeout(usize),
151151

152152
// Includes error count
153-
#[error("operation timed out")]
154-
OperationTimeout(usize),
153+
#[error("operation {0} timed out")]
154+
OperationTimeout(&'static str, usize),
155155

156156
#[error("duplicate registered workflow: {0}")]
157157
DuplicateRegisteredWorkflow(String),
@@ -188,7 +188,7 @@ impl WorkflowError {
188188
match self {
189189
WorkflowError::ActivityFailure(_, error_count)
190190
| WorkflowError::ActivityTimeout(error_count)
191-
| WorkflowError::OperationTimeout(error_count) => {
191+
| WorkflowError::OperationTimeout(_, error_count) => {
192192
// NOTE: Max retry is handled in `WorkflowCtx::activity`
193193
let mut backoff = rivet_util::backoff::Backoff::new_at(
194194
8,
@@ -220,7 +220,7 @@ impl WorkflowError {
220220
match self {
221221
WorkflowError::ActivityFailure(_, _)
222222
| WorkflowError::ActivityTimeout(_)
223-
| WorkflowError::OperationTimeout(_)
223+
| WorkflowError::OperationTimeout(_, _)
224224
| WorkflowError::NoSignalFound(_)
225225
| WorkflowError::NoSignalFoundAndSleep(_, _)
226226
| WorkflowError::SubWorkflowIncomplete(_)
@@ -235,7 +235,7 @@ impl WorkflowError {
235235
match self {
236236
WorkflowError::ActivityFailure(_, _)
237237
| WorkflowError::ActivityTimeout(_)
238-
| WorkflowError::OperationTimeout(_) => true,
238+
| WorkflowError::OperationTimeout(_, _) => true,
239239
_ => false,
240240
}
241241
}

engine/packages/pegboard-serverless/src/lib.rs

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
6161
}
6262
}
6363

64+
#[tracing::instrument(skip_all)]
6465
async fn tick(
6566
ctx: &StandaloneCtx,
6667
outbound_connections: &mut HashMap<(Id, String), Vec<OutboundConnection>>,
@@ -157,6 +158,7 @@ async fn tick(
157158
Ok(())
158159
}
159160

161+
#[tracing::instrument(skip_all)]
160162
async fn tick_runner_config(
161163
ctx: &StandaloneCtx,
162164
ns_id: Id,
@@ -264,32 +266,35 @@ fn spawn_connection(
264266
let draining = Arc::new(AtomicBool::new(false));
265267

266268
let draining2 = draining.clone();
267-
let handle = tokio::spawn(async move {
268-
if let Err(err) = outbound_handler(
269-
&ctx,
270-
url,
271-
headers,
272-
request_lifespan,
273-
slots_per_runner,
274-
runner_name,
275-
namespace_name,
276-
shutdown_rx,
277-
draining2,
278-
)
279-
.await
280-
{
281-
tracing::warn!(?err, "outbound req failed");
269+
let handle = tokio::spawn(
270+
async move {
271+
if let Err(err) = outbound_handler(
272+
&ctx,
273+
url,
274+
headers,
275+
request_lifespan,
276+
slots_per_runner,
277+
runner_name,
278+
namespace_name,
279+
shutdown_rx,
280+
draining2,
281+
)
282+
.await
283+
{
284+
tracing::warn!(?err, "outbound req failed");
282285

283-
// TODO: Add backoff
284-
tokio::time::sleep(Duration::from_secs(1)).await;
286+
// TODO: Add backoff
287+
tokio::time::sleep(Duration::from_secs(1)).await;
285288

286-
// On error, bump the autoscaler loop again
287-
let _ = ctx
288-
.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
289-
.send()
290-
.await;
289+
// On error, bump the autoscaler loop again
290+
let _ = ctx
291+
.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
292+
.send()
293+
.await;
294+
}
291295
}
292-
});
296+
.custom_instrument(tracing::info_span!("outbound_req_task")),
297+
);
293298

294299
OutboundConnection {
295300
handle,
@@ -298,6 +303,7 @@ fn spawn_connection(
298303
}
299304
}
300305

306+
#[tracing::instrument(skip_all)]
301307
async fn outbound_handler(
302308
ctx: &StandaloneCtx,
303309
url: String,
@@ -478,6 +484,7 @@ async fn outbound_handler(
478484
Ok(())
479485
}
480486

487+
#[tracing::instrument(skip_all)]
481488
async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
482489
let res = ctx
483490
.signal(pegboard::workflows::runner::Stop {
@@ -498,7 +505,7 @@ async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
498505
"runner workflow not found, likely already stopped"
499506
);
500507
} else {
501-
res?;
508+
res.context("failed sending drain signal")?;
502509
}
503510

504511
Ok(())
@@ -507,6 +514,7 @@ async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
507514
/// Send a stop message to the client.
508515
///
509516
/// This will close the runner's WebSocket.
517+
#[tracing::instrument(skip_all)]
510518
async fn publish_to_client_stop(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
511519
let receiver_subject =
512520
pegboard::pubsub_subjects::RunnerReceiverSubject::new(runner_id).to_string();

0 commit comments

Comments
 (0)