Skip to content

Commit 00921ca

Browse files
authored
refactor: update worker credentials to use Supabase project ID and service role key (#522)
# Update worker management to use Supabase project credentials This PR updates the `ensure_workers` function to use standard Supabase credentials instead of custom PgFlow credentials: - Changes credential sources from `pgflow_service_role_key` and `pgflow_function_base_url` to `supabase_service_role_key` and `supabase_project_id` - Constructs function URLs dynamically using the Supabase project ID: `https://{project_id}.supabase.co/functions/v1` - Adds proper handling of empty credential strings by treating them as NULL using `nullif()` - Adds a new test case to verify behavior when credentials exist but are empty strings - Removes the helper function that configured vault secrets in e2e tests - Updates migration file with improved comments about extensions These changes make PgFlow more compatible with standard Supabase deployments by using the same credential sources that other Supabase services use.
1 parent d3d75d6 commit 00921ca

10 files changed

+111
-50
lines changed

pkgs/core/schemas/0059_function_ensure_workers.sql

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@ as $$
1212
select pgflow.is_local() as is_local
1313
),
1414

15-
-- Get credentials: Vault secrets with local fallback for base_url only
15+
-- Get credentials: Local mode uses hardcoded URL, production uses vault secrets
16+
-- Empty strings are treated as NULL using nullif()
1617
credentials as (
1718
select
18-
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_service_role_key') as service_role_key,
19-
coalesce(
20-
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_function_base_url'),
21-
case when (select is_local from env) then 'http://kong:8000/functions/v1' end
22-
) as base_url
19+
case
20+
when (select is_local from env) then null
21+
else nullif((select decrypted_secret from vault.decrypted_secrets where name = 'supabase_service_role_key'), '')
22+
end as service_role_key,
23+
case
24+
when (select is_local from env) then 'http://kong:8000/functions/v1'
25+
else (select 'https://' || nullif(decrypted_secret, '') || '.supabase.co/functions/v1' from vault.decrypted_secrets where name = 'supabase_project_id')
26+
end as base_url
2327
),
2428

2529
-- Find functions that pass the debounce check
@@ -93,5 +97,6 @@ comment on function pgflow.ensure_workers() is
9397
In local mode: always pings all enabled functions (for fast restart after code changes).
9498
In production mode: only pings functions that have no alive workers.
9599
Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window.
96-
Credentials: Uses Vault secrets (pgflow_service_role_key, pgflow_function_base_url) or local fallbacks.
100+
Credentials: Uses Vault secrets (supabase_service_role_key, supabase_project_id) or local fallbacks.
101+
URL is built from project_id: https://{project_id}.supabase.co/functions/v1
97102
Returns request_id from pg_net for each HTTP request made.';

pkgs/core/supabase/migrations/20251208045933_pgflow_worker_management.sql renamed to pkgs/core/supabase/migrations/20251208093149_pgflow_worker_management.sql

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
-- Create extension "pg_net" (if not exists)
1+
-- Create extension "pg_net" if not exists (already present in Supabase)
22
CREATE EXTENSION IF NOT EXISTS "pg_net" WITH SCHEMA "public";
3-
-- Create extension "pg_cron" (if not exists)
3+
-- Create extension "pg_cron" if not exists (already present in Supabase)
44
CREATE EXTENSION IF NOT EXISTS "pg_cron";
55
-- Modify "workers" table
66
ALTER TABLE "pgflow"."workers" ADD COLUMN "stopped_at" timestamptz NULL;
@@ -106,14 +106,18 @@ with
106106
select pgflow.is_local() as is_local
107107
),
108108

109-
-- Get credentials: Vault secrets with local fallback for base_url only
109+
-- Get credentials: Local mode uses hardcoded URL, production uses vault secrets
110+
-- Empty strings are treated as NULL using nullif()
110111
credentials as (
111112
select
112-
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_service_role_key') as service_role_key,
113-
coalesce(
114-
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_function_base_url'),
115-
case when (select is_local from env) then 'http://kong:8000/functions/v1' end
116-
) as base_url
113+
case
114+
when (select is_local from env) then null
115+
else nullif((select decrypted_secret from vault.decrypted_secrets where name = 'supabase_service_role_key'), '')
116+
end as service_role_key,
117+
case
118+
when (select is_local from env) then 'http://kong:8000/functions/v1'
119+
else (select 'https://' || nullif(decrypted_secret, '') || '.supabase.co/functions/v1' from vault.decrypted_secrets where name = 'supabase_project_id')
120+
end as base_url
117121
),
118122

119123
-- Find functions that pass the debounce check
@@ -186,7 +190,8 @@ COMMENT ON FUNCTION "pgflow"."ensure_workers" IS 'Ensures worker functions are r
186190
In local mode: always pings all enabled functions (for fast restart after code changes).
187191
In production mode: only pings functions that have no alive workers.
188192
Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window.
189-
Credentials: Uses Vault secrets (pgflow_service_role_key, pgflow_function_base_url) or local fallbacks.
193+
Credentials: Uses Vault secrets (supabase_service_role_key, supabase_project_id) or local fallbacks.
194+
URL is built from project_id: https://{project_id}.supabase.co/functions/v1
190195
Returns request_id from pg_net for each HTTP request made.';
191196
-- Create "mark_worker_stopped" function
192197
CREATE FUNCTION "pgflow"."mark_worker_stopped" ("worker_id" uuid) RETURNS void LANGUAGE sql AS $$

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:VgOpcmBfLwSXeC+q/2hT/Quc8P7YRzKa3x4cZ1z+ZRM=
1+
h1:rp08kLQLd1h/gfTMMGK0y+skRiqSfksIcbQYTe/7gXI=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -12,4 +12,4 @@ h1:VgOpcmBfLwSXeC+q/2hT/Quc8P7YRzKa3x4cZ1z+ZRM=
1212
20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4=
1313
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
1414
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
15-
20251208045933_pgflow_worker_management.sql h1:B9LcYfhZQ5hyoTmXLl6pd/kPik9EJOuPdGI/9THmrks=
15+
20251208093149_pgflow_worker_management.sql h1:/YCw+E63GSUiTkCx4jk4XgRApqKt+9by/NYoPsdJoIQ=

pkgs/core/supabase/tests/ensure_workers/credentials_from_vault.test.sql

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ select pgflow_tests.reset_db();
66
-- Setup: Create Vault secrets
77
select vault.create_secret(
88
'test-service-role-key-from-vault',
9-
'pgflow_service_role_key'
9+
'supabase_service_role_key'
1010
);
1111
select vault.create_secret(
12-
'http://vault-configured-url.example.com/functions/v1',
13-
'pgflow_function_base_url'
12+
'vaultproject123',
13+
'supabase_project_id'
1414
);
1515

1616
-- Setup: Register a worker function
@@ -51,10 +51,10 @@ set last_invoked_at = now() - interval '10 seconds';
5151
select * into temporary test3_result from pgflow.ensure_workers();
5252

5353
select ok(
54-
(select url = 'http://vault-configured-url.example.com/functions/v1/my-function'
54+
(select url = 'https://vaultproject123.supabase.co/functions/v1/my-function'
5555
from net.http_request_queue
5656
where id = (select request_id from test3_result limit 1)),
57-
'HTTP request URL is constructed from Vault pgflow_function_base_url'
57+
'HTTP request URL is constructed from Vault supabase_project_id'
5858
);
5959

6060
drop table test3_result;

pkgs/core/supabase/tests/ensure_workers/credentials_local_fallback.test.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ select plan(4);
44
select pgflow_tests.reset_db();
55

66
-- Ensure no Vault secrets exist
7-
delete from vault.secrets where name in ('pgflow_service_role_key', 'pgflow_function_base_url');
7+
delete from vault.secrets where name in ('supabase_service_role_key', 'supabase_project_id');
88

99
-- Setup: Register a worker function
1010
select pgflow.track_worker_function('my-function');
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
-- Test: ensure_workers() skips invocation when credentials exist but are empty strings
2+
begin;
3+
select plan(4);
4+
select pgflow_tests.reset_db();
5+
6+
-- Setup: Create Vault secrets with EMPTY strings
7+
select vault.create_secret('', 'supabase_service_role_key');
8+
select vault.create_secret('', 'supabase_project_id');
9+
10+
-- Setup: Register a worker function
11+
select pgflow.track_worker_function('my-function');
12+
update pgflow.worker_functions
13+
set last_invoked_at = now() - interval '10 seconds';
14+
15+
-- Simulate production mode (non-local jwt_secret)
16+
set local app.settings.jwt_secret = 'production-secret-different-from-local';
17+
18+
-- TEST: In production mode with empty credentials, returns empty (safe failure)
19+
with result as (
20+
select * from pgflow.ensure_workers()
21+
)
22+
select is(
23+
(select count(*) from result),
24+
0::bigint,
25+
'Production mode with empty credentials returns empty (safe failure)'
26+
);
27+
28+
-- TEST: No HTTP requests were queued (last_invoked_at unchanged)
29+
select ok(
30+
(select last_invoked_at < now() - interval '5 seconds'
31+
from pgflow.worker_functions
32+
where function_name = 'my-function'),
33+
'last_invoked_at not updated when credentials are empty'
34+
);
35+
36+
-- TEST: Empty project_id alone should skip (even with valid service role key)
37+
delete from vault.secrets where name = 'supabase_service_role_key';
38+
select vault.create_secret('valid-service-role-key', 'supabase_service_role_key');
39+
40+
-- Reset debounce
41+
update pgflow.worker_functions
42+
set last_invoked_at = now() - interval '10 seconds';
43+
44+
with result as (
45+
select * from pgflow.ensure_workers()
46+
)
47+
select is(
48+
(select count(*) from result),
49+
0::bigint,
50+
'Empty project_id alone causes skip even with valid service role key'
51+
);
52+
53+
-- TEST: Empty service_role_key alone should skip (even with valid project_id)
54+
delete from vault.secrets where name in ('supabase_service_role_key', 'supabase_project_id');
55+
select vault.create_secret('', 'supabase_service_role_key');
56+
select vault.create_secret('validproject123', 'supabase_project_id');
57+
58+
-- Reset debounce
59+
update pgflow.worker_functions
60+
set last_invoked_at = now() - interval '10 seconds';
61+
62+
with result as (
63+
select * from pgflow.ensure_workers()
64+
)
65+
select is(
66+
(select count(*) from result),
67+
0::bigint,
68+
'Empty service_role_key alone causes skip even with valid project_id'
69+
);
70+
71+
select finish();
72+
rollback;

pkgs/core/supabase/tests/ensure_workers/no_credentials_skips_invocation.test.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ select plan(2);
44
select pgflow_tests.reset_db();
55

66
-- Ensure no Vault secrets exist
7-
delete from vault.secrets where name in ('pgflow_service_role_key', 'pgflow_function_base_url');
7+
delete from vault.secrets where name in ('supabase_service_role_key', 'supabase_project_id');
88

99
-- Setup: Register a worker function
1010
select pgflow.track_worker_function('my-function');

pkgs/core/supabase/tests/ensure_workers/returns_functions_to_invoke.test.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ select plan(4);
44
select pgflow_tests.reset_db();
55

66
-- Setup: Create Vault secrets for production mode tests
7-
select vault.create_secret('test-service-role-key', 'pgflow_service_role_key');
8-
select vault.create_secret('http://test.example.com/functions/v1', 'pgflow_function_base_url');
7+
select vault.create_secret('test-service-role-key', 'supabase_service_role_key');
8+
select vault.create_secret('testproject123', 'supabase_project_id');
99

1010
-- Setup: Register two worker functions
1111
select pgflow.track_worker_function('function-a');

pkgs/core/supabase/tests/ensure_workers/worker_death_detection.test.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ select plan(5);
44
select pgflow_tests.reset_db();
55

66
-- Setup: Create Vault secrets for production mode tests
7-
select vault.create_secret('test-service-role-key', 'pgflow_service_role_key');
8-
select vault.create_secret('http://test.example.com/functions/v1', 'pgflow_function_base_url');
7+
select vault.create_secret('test-service-role-key', 'supabase_service_role_key');
8+
select vault.create_secret('testproject123', 'supabase_project_id');
99

1010
-- Setup: Register a worker function
1111
select pgflow.track_worker_function('my-function');

pkgs/edge-worker/tests/e2e/_helpers.ts

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { delay } from '@std/async';
33
import ProgressBar from 'jsr:@deno-library/progress';
44
import { dim } from 'https://deno.land/std@0.224.0/fmt/colors.ts';
55
import { e2eConfig } from '../config.ts';
6-
import type { Sql } from 'postgres';
76

87
const DEBUG = Deno.env.get('DEBUG') === '1' || Deno.env.get('VERBOSE') === '1';
98

@@ -267,26 +266,6 @@ export async function startWorker(workerName: string) {
267266
);
268267
}
269268

270-
/**
271-
* Configures the vault secrets needed for ensure_workers() to make HTTP requests.
272-
* Must be called before setupEnsureWorkersCron() in tests.
273-
*/
274-
export async function configureVaultForEnsureWorkers(
275-
sql: Sql<Record<string, unknown>>
276-
) {
277-
const baseUrl = `${e2eConfig.apiUrl}/functions/v1`;
278-
log(`Configuring vault with base URL: ${baseUrl}`);
279-
280-
// Insert or update the base URL in vault
281-
await sql`
282-
INSERT INTO vault.secrets (name, secret)
283-
VALUES ('pgflow_function_base_url', ${baseUrl})
284-
ON CONFLICT (name) DO UPDATE SET secret = EXCLUDED.secret
285-
`;
286-
287-
log('Vault configured successfully');
288-
}
289-
290269
/**
291270
* Monitor workers table and cron activity in background for debugging.
292271
* Returns an abort function to stop monitoring.

0 commit comments

Comments
 (0)