Skip to content

Commit 88f3a5b

Browse files
authored
feat: add worker tracking and graceful shutdown for edge functions (#513)
# Implement Worker Tracking for Improved Reliability This PR adds functionality to track edge function workers more effectively, improving the reliability of worker management: 1. Added two new methods to the `Queries` class: - `trackWorkerFunction()` - Registers an edge function for monitoring by the `ensure_workers()` cron job - `markWorkerStopped()` - Marks a worker as stopped during graceful shutdown 2. Updated worker lifecycle implementations to call `trackWorkerFunction()` early in the startup process, which: - Sets `last_invoked_at` to prevent cron from pinging during startup (debounce) - Ensures proper monitoring of the worker by the cron job 3. Enhanced the shutdown process in `SupabasePlatformAdapter`: - Replaced the previous approach of spawning a new edge function - Now signals worker death to the cron job by setting `stopped_at` - Allows immediate detection of worker termination 4. Added comprehensive unit tests for the new functionality These changes improve worker reliability by providing better signaling between workers and the monitoring system, ensuring faster recovery from failures and more efficient worker management.
1 parent 460223e commit 88f3a5b

19 files changed

+809
-198
lines changed

pkgs/edge-worker/deno.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkgs/edge-worker/src/core/Queries.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,25 @@ export class Queries {
7373
`;
7474
return result.result;
7575
}
76+
77+
/**
78+
* Registers an edge function for monitoring by ensure_workers() cron.
79+
* Called by workers on startup. Sets last_invoked_at to prevent cron from
80+
* pinging during startup (debounce).
81+
*/
82+
async trackWorkerFunction(functionName: string): Promise<void> {
83+
await this.sql`
84+
SELECT pgflow.track_worker_function(${functionName})
85+
`;
86+
}
87+
88+
/**
89+
* Marks a worker as stopped for graceful shutdown signaling.
90+
* Called by workers on beforeunload to allow ensure_workers() to detect death immediately.
91+
*/
92+
async markWorkerStopped(workerId: string): Promise<void> {
93+
await this.sql`
94+
SELECT pgflow.mark_worker_stopped(${workerId}::uuid)
95+
`;
96+
}
7697
}

pkgs/edge-worker/src/core/WorkerLifecycle.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ export class WorkerLifecycle<IMessage extends Json> implements ILifecycle {
2929
async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
3030
this.workerState.transitionTo(States.Starting);
3131

32+
// Register this edge function for monitoring by ensure_workers() cron.
33+
// Must be called early to set last_invoked_at (debounce) before heartbeat timeout.
34+
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName);
35+
3236
this.logger.debug(`Ensuring queue '${this.queue.queueName}' exists...`);
3337
await this.queue.safeCreate();
3438

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
4141
// Store workerId for supplier pattern
4242
this._workerId = workerBootstrap.workerId;
4343

44+
// Register this edge function for monitoring by ensure_workers() cron.
45+
// Must be called early to set last_invoked_at (debounce) before heartbeat timeout.
46+
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName);
47+
4448
// Compile/verify flow as part of Starting (before registering worker)
4549
if (this.ensureCompiledOnStartup) {
4650
await this.ensureFlowCompiled();

pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
resolveConnectionString,
1111
resolveSqlConnection,
1212
} from './resolveConnection.js';
13+
import { Queries } from '../core/Queries.js';
1314

1415
/**
1516
* Supabase Edge Runtime type (without global augmentation to comply with JSR)
@@ -47,6 +48,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
4748
private _platformResources: SupabaseResources;
4849
private validatedEnv: SupabaseEnv;
4950
private _connectionString: string | undefined;
51+
private queries: Queries;
5052

5153
// Logging factory with dynamic workerId support
5254
private loggingFactory = createLoggingFactory();
@@ -79,6 +81,9 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
7981
sql: resolveSqlConnection(env, options),
8082
supabase: createServiceSupabaseClient(this.validatedEnv)
8183
};
84+
85+
// Create Queries instance for shutdown handler
86+
this.queries = new Queries(this._platformResources.sql);
8287
}
8388

8489
/**
@@ -158,35 +163,6 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
158163
return this._platformResources;
159164
}
160165

161-
private async spawnNewEdgeFunction(): Promise<void> {
162-
if (!this.edgeFunctionName) {
163-
throw new Error('functionName cannot be null or empty');
164-
}
165-
166-
const supabaseUrl = this.validatedEnv.SUPABASE_URL;
167-
168-
this.logger.debug('Spawning a new Edge Function...');
169-
170-
const response = await fetch(
171-
`${supabaseUrl}/functions/v1/${this.edgeFunctionName}`,
172-
{
173-
method: 'POST',
174-
headers: {
175-
Authorization: `Bearer ${this.validatedEnv.SUPABASE_ANON_KEY}`,
176-
'Content-Type': 'application/json',
177-
},
178-
}
179-
);
180-
181-
this.logger.debug('Edge Function spawned successfully!');
182-
183-
if (!response.ok) {
184-
throw new Error(
185-
`Edge function returned non-OK status: ${response.status} ${response.statusText}`
186-
);
187-
}
188-
}
189-
190166
private extractFunctionName(req: Request): string {
191167
return new URL(req.url).pathname.replace(/^\/+|\/+$/g, '');
192168
}
@@ -196,7 +172,10 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
196172
this.logger.debug('Shutting down...');
197173

198174
if (this.worker) {
199-
await this.spawnNewEdgeFunction();
175+
// Signal death to ensure_workers() cron by setting stopped_at.
176+
// This allows the cron to immediately ping for a replacement worker.
177+
const workerId = this.validatedEnv.SB_EXECUTION_ID;
178+
await this.queries.markWorkerStopped(workerId);
200179
}
201180

202181
await this.stopWorker();

pkgs/edge-worker/supabase/functions/deno.lock

Lines changed: 128 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { EdgeWorker } from '@pgflow/edge-worker';
2+
import { crypto } from 'jsr:@std/crypto';
3+
import { sql } from '../utils.ts';
4+
5+
async function cpuIntensiveTask(payload: { debug?: boolean }) {
6+
let data = new TextEncoder().encode('burn');
7+
const timeId = `stopped_at_test_${Math.random()}`;
8+
9+
if (payload.debug) {
10+
console.time(timeId);
11+
}
12+
13+
for (let i = 0; i < 10000; i++) {
14+
data = new Uint8Array(await crypto.subtle.digest('SHA-256', data));
15+
}
16+
17+
if (payload.debug) {
18+
console.timeEnd(timeId);
19+
console.log(
20+
'[stopped_at_test] last_val = ',
21+
await sql`SELECT nextval('stopped_at_test_seq')`
22+
);
23+
} else {
24+
await sql`SELECT nextval('stopped_at_test_seq')`;
25+
}
26+
}
27+
28+
EdgeWorker.start(cpuIntensiveTask, { queueName: 'stopped_at_test' });

0 commit comments

Comments
 (0)