Skip to content

Commit fb10d8b

Browse files
authored
refactor: improve worker logging with structured formats and consistent messaging (#532)
# Improved Worker Logging with Structured Context This PR enhances the logging system for edge workers by implementing structured, context-aware logging throughout the worker lifecycle. The changes: - Replace generic log messages with context-specific logging methods - Add structured logging for task execution with detailed context (flow slug, step slug, message ID, run ID, retry information) - Implement a startup banner with worker identity and flow compilation status - Add consistent shutdown/deprecation logging to prevent duplicate messages - Track compilation status during worker startup and include it in logs - Pass worker identity information to task executors for better log context - Improve task execution logging with standardized start/complete/fail messages These changes make logs more consistent and informative, making it easier to track worker and task execution through the system.
1 parent 735f6cb commit fb10d8b

File tree

6 files changed

+121
-32
lines changed

6 files changed

+121
-32
lines changed

pkgs/edge-worker/src/core/BatchProcessor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ export class BatchProcessor<TMessage extends IMessage> {
1818
}
1919

2020
async processBatch() {
21-
this.logger.verbose('Polling for new batch of messages...');
21+
this.logger.polling();
2222
const messageRecords = await this.poller.poll();
2323

2424
if (this.signal.aborted) {
2525
this.logger.info('Discarding messageRecords because worker is stopping');
2626
return;
2727
}
2828

29-
this.logger.verbose(`Starting ${messageRecords.length} tasks`);
29+
this.logger.taskCount(messageRecords.length);
3030

3131
const startPromises = messageRecords.map((message) =>
3232
this.executionController.start(message)

pkgs/edge-worker/src/core/Worker.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export class Worker {
1010
private batchProcessor: IBatchProcessor;
1111
private sql: postgres.Sql;
1212
private mainLoopPromise: Promise<void> | undefined;
13+
private deprecationLogged = false;
1314

1415
constructor(
1516
batchProcessor: IBatchProcessor,
@@ -45,7 +46,7 @@ export class Worker {
4546

4647
// Check if deprecated after heartbeat
4748
if (!this.isMainLoopActive) {
48-
this.logger.info('Worker deprecated, exiting main loop');
49+
this.logDeprecation();
4950
break;
5051
}
5152

@@ -71,7 +72,8 @@ export class Worker {
7172
this.lifecycle.transitionToStopping();
7273

7374
try {
74-
this.logger.info('-> Stopped accepting new messages');
75+
// Signal deprecation (which includes "Stopped accepting new messages")
76+
this.logDeprecation();
7577
this.abortController.abort();
7678

7779
try {
@@ -84,15 +86,17 @@ export class Worker {
8486
throw error;
8587
}
8688

87-
this.logger.debug('-> Waiting for pending tasks to complete...');
89+
// Signal waiting for pending tasks
90+
this.logger.shutdown('waiting');
8891
await this.batchProcessor.awaitCompletion();
89-
this.logger.debug('-> Pending tasks completed!');
9092

9193
this.lifecycle.acknowledgeStop();
9294

9395
this.logger.debug('-> Closing SQL connection...');
9496
await this.sql.end();
95-
this.logger.debug('-> SQL connection closed!');
97+
98+
// Signal graceful stop complete
99+
this.logger.shutdown('stopped');
96100
} catch (error) {
97101
this.logger.debug(`Error during worker stop: ${error}`);
98102
throw error;
@@ -103,6 +107,17 @@ export class Worker {
103107
return this.lifecycle.edgeFunctionName;
104108
}
105109

110+
/**
111+
* Log deprecation message only once (prevents duplicate logs when deprecation
112+
* is detected in heartbeat and then stop() is called)
113+
*/
114+
private logDeprecation(): void {
115+
if (!this.deprecationLogged) {
116+
this.logger.shutdown('deprecating');
117+
this.deprecationLogged = true;
118+
}
119+
}
120+
106121
/**
107122
* Returns true if worker state is Running and worker was not stopped
108123
*/

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Queries } from '../core/Queries.js';
22
import type { ILifecycle, WorkerBootstrap, WorkerRow } from '../core/types.js';
3-
import type { Logger } from '../platform/types.js';
3+
import type { Logger, StartupContext } from '../platform/types.js';
44
import { States, WorkerState } from '../core/WorkerState.js';
55
import type { AnyFlow } from '@pgflow/dsl';
66
import { extractFlowShape } from '@pgflow/dsl';
@@ -11,6 +11,11 @@ export interface FlowLifecycleConfig {
1111
ensureCompiledOnStartup?: boolean;
1212
}
1313

14+
/**
15+
* Compilation status returned by ensureFlowCompiled
16+
*/
17+
type CompilationStatus = 'compiled' | 'verified' | 'recompiled' | 'mismatch';
18+
1419
/**
1520
* A specialized WorkerLifecycle for Flow-based workers that is aware of the Flow's step types
1621
*/
@@ -22,6 +27,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
2227
private flow: TFlow;
2328
// TODO: Temporary field for supplier pattern until we refactor initialization
2429
private _workerId?: string;
30+
private _edgeFunctionName?: string;
2531
private heartbeatInterval: number;
2632
private lastHeartbeat = 0;
2733
private ensureCompiledOnStartup: boolean;
@@ -38,19 +44,24 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
3844
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
3945
this.workerState.transitionTo(States.Starting);
4046

41-
// Store workerId for supplier pattern
47+
// Store workerId and edgeFunctionName for supplier pattern
4248
this._workerId = workerBootstrap.workerId;
49+
this._edgeFunctionName = workerBootstrap.edgeFunctionName;
4350

4451
// Register this edge function for monitoring by ensure_workers() cron.
4552
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName);
4653

4754
// Compile/verify flow as part of Starting (before registering worker)
55+
let compilationStatus: CompilationStatus = 'verified';
4856
if (this.ensureCompiledOnStartup) {
49-
await this.ensureFlowCompiled();
57+
compilationStatus = await this.ensureFlowCompiled();
5058
} else {
5159
this.logger.info(`Skipping compilation check for flow '${this.flow.slug}' (ensureCompiledOnStartup=false)`);
5260
}
5361

62+
// Log startup banner with compilation status
63+
this.logStartupBanner(compilationStatus);
64+
5465
// Only register worker after successful compilation
5566
this.workerRow = await this.queries.onWorkerStarted({
5667
queueName: this.queueName,
@@ -60,9 +71,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
6071
this.workerState.transitionTo(States.Running);
6172
}
6273

63-
private async ensureFlowCompiled(): Promise<void> {
64-
this.logger.info(`Compiling flow '${this.flow.slug}'...`);
65-
74+
private async ensureFlowCompiled(): Promise<CompilationStatus> {
6675
const shape = extractFlowShape(this.flow);
6776

6877
const result = await this.queries.ensureFlowCompiled(
@@ -74,7 +83,26 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
7483
throw new FlowShapeMismatchError(this.flow.slug, result.differences);
7584
}
7685

77-
this.logger.info(`Flow '${this.flow.slug}' compilation complete: ${result.status}`);
86+
return result.status;
87+
}
88+
89+
/**
90+
* Log the startup banner with worker and flow information
91+
*/
92+
private logStartupBanner(compilationStatus: CompilationStatus): void {
93+
const startupContext: StartupContext = {
94+
workerName: this._edgeFunctionName ?? 'unknown',
95+
workerId: this._workerId ?? 'unknown',
96+
queueName: this.queueName,
97+
flows: [
98+
{
99+
flowSlug: this.flow.slug,
100+
compilationStatus,
101+
},
102+
],
103+
};
104+
105+
this.logger.startupBanner(startupContext);
78106
}
79107

80108
acknowledgeStop() {
@@ -95,7 +123,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
95123
}
96124

97125
get edgeFunctionName() {
98-
return this.workerRow?.function_name;
126+
return this._edgeFunctionName ?? this.workerRow?.function_name;
99127
}
100128

101129
get queueName() {

pkgs/edge-worker/src/flow/StepTaskExecutor.ts

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { AnyFlow } from '@pgflow/dsl';
22
import type { IPgflowClient } from './types.js';
33
import type { IExecutor } from '../core/types.js';
4-
import type { Logger } from '../platform/types.js';
4+
import type { Logger, TaskLogContext } from '../platform/types.js';
55
import type { StepTaskHandlerContext } from '../core/context.js';
66

77
class AbortError extends Error {
@@ -11,6 +11,15 @@ class AbortError extends Error {
1111
}
1212
}
1313

14+
/**
15+
* Configuration for worker identity in logging
16+
*/
17+
export interface WorkerIdentity {
18+
workerId: string;
19+
workerName: string;
20+
queueName: string;
21+
}
22+
1423
/**
1524
* An executor that processes step tasks using an IPgflowClient
1625
* with strong typing for the flow's step handlers
@@ -23,7 +32,8 @@ export class StepTaskExecutor<TFlow extends AnyFlow, TContext extends StepTaskHa
2332
private readonly adapter: IPgflowClient<TFlow>,
2433
private readonly signal: AbortSignal,
2534
logger: Logger,
26-
private readonly context: TContext
35+
private readonly context: TContext,
36+
private readonly workerIdentity: WorkerIdentity
2737
) {
2838
this.logger = logger;
2939
}
@@ -41,7 +51,34 @@ export class StepTaskExecutor<TFlow extends AnyFlow, TContext extends StepTaskHa
4151
return this.stepTask.msg_id;
4252
}
4353

54+
/**
55+
* Build TaskLogContext for structured logging
56+
*/
57+
private buildLogContext(): TaskLogContext {
58+
// Note: read_ct includes the initial read (1-indexed), so read_ct=1 is the first attempt,
59+
// read_ct=2 is the first retry, etc.
60+
const retryAttempt = this.rawMessage.read_ct;
61+
const stepDef = this.flow.getStepDefinition(this.stepTask.step_slug);
62+
// maxAttempts includes the initial attempt, so maxRetries = maxAttempts - 1
63+
const maxRetries = stepDef?.options?.maxAttempts ? stepDef.options.maxAttempts - 1 : undefined;
64+
65+
return {
66+
flowSlug: this.stepTask.flow_slug,
67+
stepSlug: this.stepTask.step_slug,
68+
// Convert to string for logging (msg_id is number from PostgreSQL bigint)
69+
msgId: String(this.stepTask.msg_id),
70+
runId: this.stepTask.run_id,
71+
workerId: this.workerIdentity.workerId,
72+
workerName: this.workerIdentity.workerName,
73+
queueName: this.workerIdentity.queueName,
74+
retryAttempt,
75+
maxRetries,
76+
};
77+
}
78+
4479
async execute(): Promise<void> {
80+
const logContext = this.buildLogContext();
81+
4582
try {
4683
if (this.signal.aborted) {
4784
throw new AbortError();
@@ -51,9 +88,9 @@ export class StepTaskExecutor<TFlow extends AnyFlow, TContext extends StepTaskHa
5188
this.signal.throwIfAborted();
5289

5390
const stepSlug = this.stepTask.step_slug;
54-
this.logger.debug(
55-
`Executing step task ${this.msgId} for step ${stepSlug}`
56-
);
91+
92+
// Log task started at debug level
93+
this.logger.taskStarted(logContext);
5794

5895
// Get the step handler from the flow with proper typing
5996
const stepDef = this.flow.getStepDefinition(stepSlug);
@@ -71,14 +108,11 @@ export class StepTaskExecutor<TFlow extends AnyFlow, TContext extends StepTaskHa
71108

72109
const durationMs = Date.now() - startTime;
73110

74-
this.logger.verbose(
75-
`step task ${this.msgId} completed in ${durationMs}ms`
76-
);
111+
// Log task completed at verbose level
112+
this.logger.taskCompleted(logContext, durationMs);
77113
await this.adapter.completeTask(this.stepTask, result);
78-
79-
this.logger.debug(`step task ${this.msgId} marked as complete`);
80114
} catch (error) {
81-
await this.handleExecutionError(error);
115+
await this.handleExecutionError(error, logContext);
82116
}
83117
}
84118

@@ -90,15 +124,15 @@ export class StepTaskExecutor<TFlow extends AnyFlow, TContext extends StepTaskHa
90124
*
91125
* Otherwise, it marks the task as failed.
92126
*/
93-
private async handleExecutionError(error: unknown) {
127+
private async handleExecutionError(error: unknown, logContext: TaskLogContext) {
94128
if (error instanceof Error && error.name === 'AbortError') {
95129
this.logger.debug(`Aborted execution for step task ${this.msgId}`);
96130
// Do not mark as failed - the worker was aborted and stopping,
97131
// the task will be picked up by another worker later
98132
} else {
99-
this.logger.verbose(
100-
`step task ${this.msgId} failed with error: ${error}`
101-
);
133+
// Log task failed at verbose level using structured method
134+
const errorObj = error instanceof Error ? error : new Error(String(error));
135+
this.logger.taskFailed(logContext, errorObj);
102136
await this.adapter.failTask(this.stepTask, error);
103137
}
104138
}

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { AnyFlow, FlowContext } from '@pgflow/dsl';
22
import { ExecutionController } from '../core/ExecutionController.js';
33
import { StepTaskPoller, type StepTaskPollerConfig } from './StepTaskPoller.js';
4-
import { StepTaskExecutor } from './StepTaskExecutor.js';
4+
import { StepTaskExecutor, type WorkerIdentity } from './StepTaskExecutor.js';
55
import { PgflowSqlClient } from '@pgflow/core';
66
import { Queries } from '../core/Queries.js';
77
import type { IExecutor } from '../core/types.js';
@@ -116,6 +116,8 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
116116
);
117117

118118
// Create executor factory with proper typing
119+
// Note: This factory is only called during task execution (after acknowledgeStart completes),
120+
// so lifecycle.workerId and lifecycle.edgeFunctionName are guaranteed to be set.
119121
const executorFactory = (
120122
taskWithMessage: StepTaskWithMessage<TFlow>,
121123
signal: AbortSignal
@@ -135,14 +137,23 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
135137
...platformAdapter.platformResources
136138
};
137139

140+
// Build worker identity for structured logging
141+
// Safe to access here because factory is only called after acknowledgeStart()
142+
const workerIdentity: WorkerIdentity = {
143+
workerId: lifecycle.workerId,
144+
workerName: lifecycle.edgeFunctionName ?? 'unknown',
145+
queueName: queueName,
146+
};
147+
138148
// Type assertion: FlowContext & TResources is compatible with StepTaskHandlerContext<TFlow>
139149
// at runtime, but TypeScript needs help due to generic type variance
140150
return new StepTaskExecutor<TFlow>(
141151
flow,
142152
pgflowAdapter,
143153
signal,
144154
createLogger('StepTaskExecutor'),
145-
context as StepTaskHandlerContext<TFlow>
155+
context as StepTaskHandlerContext<TFlow>,
156+
workerIdentity
146157
);
147158
};
148159

pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
223223
const workerId = this.validatedEnv.SB_EXECUTION_ID;
224224

225225
this.loggingFactory.setWorkerId(workerId);
226+
this.loggingFactory.setWorkerName(this.edgeFunctionName);
226227

227228
// Create the worker using the factory function and the logger
228229
this.worker = createWorkerFn(this.loggingFactory.createLogger);

0 commit comments

Comments
 (0)