Skip to content

Commit 7889c2a

Browse files
authored
refactor: improve worker function invocation logic in local and production modes (#527)
# Improve worker function management in local and production modes This PR refines the `ensure_workers()` function to better handle different behaviors between local and production environments: - **Local mode**: Now pings ALL enabled functions, completely bypassing both the debounce check and alive workers check - **Production mode**: Only pings functions that pass debounce AND have no alive workers The key changes include: 1. Restructured the `functions_to_invoke` CTE to more clearly separate local vs production logic 2. Updated function comments to better document the behavior differences 3. Fixed tests to align with the new behavior, particularly that debounce is now bypassed in local mode 4. Renamed migration file with updated timestamp This change improves developer experience in local environments by ensuring all enabled functions are always pinged, while maintaining efficient worker management in production.
1 parent 35e1652 commit 7889c2a

File tree

5 files changed

+53
-25
lines changed

5 files changed

+53
-25
lines changed

pkgs/core/schemas/0059_function_ensure_workers.sql

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,20 @@ as $$
4848
),
4949

5050
-- Determine which functions should be invoked
51+
-- Local mode: all enabled functions (bypass debounce AND alive workers check)
52+
-- Production mode: only functions that pass debounce AND have no alive workers
5153
functions_to_invoke as (
52-
select dp.function_name
53-
from debounce_passed as dp
54-
where
55-
pgflow.is_local() = true
56-
or dp.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
54+
select wf.function_name
55+
from pgflow.worker_functions as wf
56+
where wf.enabled = true
57+
and (
58+
pgflow.is_local() = true -- Local: all enabled functions
59+
or (
60+
-- Production: debounce + no alive workers
61+
wf.function_name in (select dp.function_name from debounce_passed as dp)
62+
and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
63+
)
64+
)
5765
),
5866

5967
-- Make HTTP requests and capture request_ids
@@ -94,9 +102,9 @@ $$;
94102

95103
comment on function pgflow.ensure_workers() is
96104
'Ensures worker functions are running by pinging them via HTTP when needed.
97-
In local mode: always pings all enabled functions (for fast restart after code changes).
98-
In production mode: only pings functions that have no alive workers.
99-
Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window.
105+
In local mode: pings ALL enabled functions (ignores debounce AND alive workers check).
106+
In production mode: only pings functions that pass debounce AND have no alive workers.
107+
Debounce: skips functions pinged within their heartbeat_timeout_seconds window (production only).
100108
Credentials: Uses Vault secrets (supabase_service_role_key, supabase_project_id) or local fallbacks.
101109
URL is built from project_id: https://{project_id}.supabase.co/functions/v1
102110
Returns request_id from pg_net for each HTTP request made.';

pkgs/core/supabase/migrations/20251208093149_pgflow_worker_management.sql renamed to pkgs/core/supabase/migrations/20251209074533_pgflow_worker_management.sql

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
-- Create extension "pg_net" if not exists (already present in Supabase)
1+
-- Create extension "pg_net" if not exists
22
CREATE EXTENSION IF NOT EXISTS "pg_net" WITH SCHEMA "public";
3-
-- Create extension "pg_cron" if not exists (already present in Supabase)
3+
-- Create extension "pg_cron" if not exists
44
CREATE EXTENSION IF NOT EXISTS "pg_cron";
55
-- Modify "workers" table
66
ALTER TABLE "pgflow"."workers" ADD COLUMN "stopped_at" timestamptz NULL;
@@ -142,12 +142,20 @@ with
142142
),
143143

144144
-- Determine which functions should be invoked
145+
-- Local mode: all enabled functions (bypass debounce AND alive workers check)
146+
-- Production mode: only functions that pass debounce AND have no alive workers
145147
functions_to_invoke as (
146-
select dp.function_name
147-
from debounce_passed as dp
148-
where
149-
pgflow.is_local() = true
150-
or dp.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
148+
select wf.function_name
149+
from pgflow.worker_functions as wf
150+
where wf.enabled = true
151+
and (
152+
pgflow.is_local() = true -- Local: all enabled functions
153+
or (
154+
-- Production: debounce + no alive workers
155+
wf.function_name in (select dp.function_name from debounce_passed as dp)
156+
and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
157+
)
158+
)
151159
),
152160

153161
-- Make HTTP requests and capture request_ids
@@ -187,9 +195,9 @@ with
187195
$$;
188196
-- Set comment to function: "ensure_workers"
189197
COMMENT ON FUNCTION "pgflow"."ensure_workers" IS 'Ensures worker functions are running by pinging them via HTTP when needed.
190-
In local mode: always pings all enabled functions (for fast restart after code changes).
191-
In production mode: only pings functions that have no alive workers.
192-
Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window.
198+
In local mode: pings ALL enabled functions (ignores debounce AND alive workers check).
199+
In production mode: only pings functions that pass debounce AND have no alive workers.
200+
Debounce: skips functions pinged within their heartbeat_timeout_seconds window (production only).
193201
Credentials: Uses Vault secrets (supabase_service_role_key, supabase_project_id) or local fallbacks.
194202
URL is built from project_id: https://{project_id}.supabase.co/functions/v1
195203
Returns request_id from pg_net for each HTTP request made.';

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:rp08kLQLd1h/gfTMMGK0y+skRiqSfksIcbQYTe/7gXI=
1+
h1:+LFk0wyD7sBjvwZW93t7THECms67qWV2Vm0E5GHTCxQ=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -12,4 +12,4 @@ h1:rp08kLQLd1h/gfTMMGK0y+skRiqSfksIcbQYTe/7gXI=
1212
20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4=
1313
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
1414
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
15-
20251208093149_pgflow_worker_management.sql h1:/YCw+E63GSUiTkCx4jk4XgRApqKt+9by/NYoPsdJoIQ=
15+
20251209074533_pgflow_worker_management.sql h1:Yi+WZEgL+AcK0eAfKJD8v/uPBVBM8WcL+ZOL4ilkYz8=

pkgs/core/supabase/tests/ensure_workers/debounce.test.sql

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
1-
-- Test: ensure_workers() respects debounce window
1+
-- Test: ensure_workers() respects debounce window in production mode
2+
-- Note: Debounce only applies in production mode; local mode bypasses debounce
23
begin;
34
select plan(4);
45
select pgflow_tests.reset_db();
56

7+
-- Setup: Create Vault secrets for production mode
8+
select vault.create_secret(
9+
'test-service-role-key',
10+
'supabase_service_role_key'
11+
);
12+
select vault.create_secret(
13+
'testproject123',
14+
'supabase_project_id'
15+
);
16+
617
-- Setup: Register a worker function with 6 second heartbeat timeout
718
select pgflow.track_worker_function('my-function');
819

@@ -12,7 +23,8 @@ update pgflow.worker_functions
1223
set last_invoked_at = now()
1324
where function_name = 'my-function';
1425

15-
set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';
26+
-- Simulate production mode (non-local jwt_secret)
27+
set local app.settings.jwt_secret = 'production-secret-different-from-local';
1628
select is(
1729
(select count(*) from pgflow.ensure_workers()),
1830
0::bigint,

pkgs/core/supabase/tests/ensure_workers/local_mode_always_pings.test.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ select is(
3737
'Local mode returns function even when worker has fresh heartbeat'
3838
);
3939

40-
-- TEST: Debounce still applies in local mode
40+
-- TEST: Debounce is bypassed in local mode
4141
-- Reset last_invoked_at to now (will be within debounce window)
4242
update pgflow.worker_functions
4343
set last_invoked_at = now();
4444

4545
select is(
4646
(select count(*) from pgflow.ensure_workers()),
47-
0::bigint,
48-
'Debounce still applies in local mode'
47+
1::bigint,
48+
'Debounce is bypassed in local mode'
4949
);
5050

5151
select finish();

0 commit comments

Comments
 (0)