Skip to content

Commit e4cd7bb

Browse files
authored
refactor: add handler spy utilities for queue worker tests (#514)
# Add Handler Spy and Delay Utilities for Queue Worker Tests This PR introduces a set of reusable testing utilities for queue worker tests: - Added `createHandlerSpy()` to capture handler invocations with timing data - Added utilities to calculate delays between invocations using both JS timestamps and DB visibility times - Added `assertDelaysMatch()` to verify that actual delays match expected patterns within tolerance - Refactored existing tests to use these new utilities: - `exponential_backoff.test.ts` - `fixed_retry.test.ts` - `legacy_retry_config.test.ts` These changes improve test maintainability by eliminating duplicate code across test files and providing a consistent approach to timing validation.
1 parent 88f3a5b commit e4cd7bb

File tree

4 files changed

+184
-247
lines changed

4 files changed

+184
-247
lines changed

pkgs/edge-worker/tests/integration/_helpers.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { assertEquals, assertAlmostEquals } from '@std/assert';
12
import type { AnyFlow, ExtractFlowInput } from '@pgflow/dsl';
23
import {
34
createFlowWorker,
@@ -79,3 +80,96 @@ export function startWorker<TFlow extends AnyFlow>(
7980

8081
return worker;
8182
}
83+
84+
// ============================================================================
85+
// Handler Spy & Delay Utilities
86+
// ============================================================================
87+
88+
/**
89+
* Represents a single invocation captured by the handler spy.
90+
*/
91+
export interface SpyInvocation<TInput, TContext> {
92+
jsTime: number;
93+
input: TInput;
94+
context: TContext;
95+
}
96+
97+
/**
98+
* Creates a handler spy that captures invocations with timing data.
99+
* @param onInvoke - Optional callback to execute on each invocation (throw to simulate failure)
100+
*/
101+
export function createHandlerSpy<TInput = unknown, TContext = unknown>(
102+
onInvoke?: (input: TInput, context: TContext) => void
103+
) {
104+
const invocations: SpyInvocation<TInput, TContext>[] = [];
105+
106+
return {
107+
handler: (input: TInput, context: TContext) => {
108+
invocations.push({ jsTime: Date.now(), input, context });
109+
return onInvoke?.(input, context);
110+
},
111+
invocations,
112+
count: () => invocations.length,
113+
};
114+
}
115+
116+
/**
117+
* Calculates delays between invocations using JS timestamps.
118+
* @returns Delays in seconds between consecutive invocations.
119+
*/
120+
export function calculateJsDelays<T, C>(
121+
invocations: SpyInvocation<T, C>[]
122+
): number[] {
123+
const delays: number[] = [];
124+
for (let i = 1; i < invocations.length; i++) {
125+
delays.push((invocations[i].jsTime - invocations[i - 1].jsTime) / 1000);
126+
}
127+
return delays;
128+
}
129+
130+
/**
131+
* Context type that includes rawMessage with visibility time.
132+
*/
133+
type WithRawMessage = { rawMessage: { vt: string } };
134+
135+
/**
136+
* Calculates delays between invocations using DB visibility times.
137+
* @returns Delays in seconds between consecutive visibility times.
138+
*/
139+
export function calculateVtDelays<T, C extends WithRawMessage>(
140+
invocations: SpyInvocation<T, C>[]
141+
): number[] {
142+
const delays: number[] = [];
143+
for (let i = 1; i < invocations.length; i++) {
144+
const vt1 = new Date(invocations[i - 1].context.rawMessage.vt).getTime();
145+
const vt2 = new Date(invocations[i].context.rawMessage.vt).getTime();
146+
delays.push((vt2 - vt1) / 1000);
147+
}
148+
return delays;
149+
}
150+
151+
/**
152+
* Asserts that actual delays match expected delays within tolerance.
153+
* @param actual - Actual measured delays in seconds
154+
* @param expected - Expected delays in seconds
155+
* @param toleranceSec - Tolerance in seconds for each comparison
156+
*/
157+
export function assertDelaysMatch(
158+
actual: number[],
159+
expected: number[],
160+
toleranceSec: number
161+
): void {
162+
assertEquals(
163+
actual.length,
164+
expected.length,
165+
`Expected ${expected.length} delays, got ${actual.length}`
166+
);
167+
for (let i = 0; i < expected.length; i++) {
168+
assertAlmostEquals(
169+
actual[i],
170+
expected[i],
171+
toleranceSec,
172+
`Delay #${i + 1} should be ~${expected[i]}s, got ${actual[i]}s`
173+
);
174+
}
175+
}

pkgs/edge-worker/tests/integration/exponential_backoff.test.ts

Lines changed: 21 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1-
import { assertEquals, assertAlmostEquals } from '@std/assert';
1+
import { assertEquals } from '@std/assert';
22
import { createQueueWorker } from '../../src/queue/createQueueWorker.ts';
3-
import { createTestPlatformAdapter } from './_helpers.ts';
3+
import {
4+
createTestPlatformAdapter,
5+
createHandlerSpy,
6+
calculateJsDelays,
7+
calculateVtDelays,
8+
assertDelaysMatch,
9+
} from './_helpers.ts';
410
import { withTransaction } from '../db.ts';
511
import { createFakeLogger } from '../fakes.ts';
612
import { log, waitFor } from '../e2e/_helpers.ts';
@@ -18,44 +24,6 @@ const workerConfig = {
1824
queueName: 'exponential_backoff_test',
1925
} as const;
2026

21-
/**
22-
* Creates a handler that always fails and collects timing data from two sources:
23-
* 1. JS timestamps (Date.now()) - when handler is invoked
24-
* 2. DB visibility times (rawMessage.vt) - when message became visible
25-
*/
26-
function createFailingHandler() {
27-
const invocations: Array<{ jsTime: number; vt: string }> = [];
28-
return {
29-
handler: (_payload: Json, context: MessageHandlerContext) => {
30-
invocations.push({
31-
jsTime: Date.now(),
32-
vt: context.rawMessage.vt,
33-
});
34-
log(`Invocation #${invocations.length} at ${new Date().toISOString()}`);
35-
throw new Error('Intentional failure for exponential backoff test');
36-
},
37-
getInvocationCount: () => invocations.length,
38-
getJsDelays: () => {
39-
// Delays in seconds from JS timestamps
40-
const delays: number[] = [];
41-
for (let i = 1; i < invocations.length; i++) {
42-
delays.push((invocations[i].jsTime - invocations[i - 1].jsTime) / 1000);
43-
}
44-
return delays;
45-
},
46-
getVtDelays: () => {
47-
// Delays in seconds from DB visibility times
48-
const delays: number[] = [];
49-
for (let i = 1; i < invocations.length; i++) {
50-
const vt1 = new Date(invocations[i - 1].vt).getTime();
51-
const vt2 = new Date(invocations[i].vt).getTime();
52-
delays.push((vt2 - vt1) / 1000);
53-
}
54-
return delays;
55-
},
56-
};
57-
}
58-
5927
/**
6028
* Test verifies that exponential backoff is applied correctly:
6129
* - 1st retry: baseDelay * 2^0 = 2 seconds
@@ -65,10 +33,14 @@ function createFailingHandler() {
6533
Deno.test(
6634
'queue worker applies exponential backoff on retries',
6735
withTransaction(async (sql) => {
68-
const { handler, getInvocationCount, getJsDelays, getVtDelays } =
69-
createFailingHandler();
36+
const spy = createHandlerSpy<Json, MessageHandlerContext>(
37+
(_input, _context) => {
38+
log(`Invocation #${spy.count()} at ${new Date().toISOString()}`);
39+
throw new Error('Intentional failure for exponential backoff test');
40+
}
41+
);
7042
const worker = createQueueWorker(
71-
handler,
43+
spy.handler,
7244
{
7345
sql,
7446
...workerConfig,
@@ -94,13 +66,13 @@ Deno.test(
9466
log(`Sent message with ID: ${msgIds[0]}`);
9567

9668
// Wait for all retries to complete
97-
await waitFor(() => getInvocationCount() >= workerConfig.retry.limit, {
69+
await waitFor(() => spy.count() >= workerConfig.retry.limit, {
9870
timeoutMs: 30000,
9971
});
10072

10173
// Get delays from both sources
102-
const jsDelays = getJsDelays();
103-
const vtDelays = getVtDelays();
74+
const jsDelays = calculateJsDelays(spy.invocations);
75+
const vtDelays = calculateVtDelays(spy.invocations);
10476

10577
log(`JS delays: ${JSON.stringify(jsDelays)}`);
10678
log(`VT delays: ${JSON.stringify(vtDelays)}`);
@@ -111,38 +83,14 @@ Deno.test(
11183
const expectedDelays = [2, 4];
11284

11385
// Verify JS-based delays match expected pattern (within 200ms tolerance)
114-
assertEquals(
115-
jsDelays.length,
116-
expectedDelays.length,
117-
`Expected ${expectedDelays.length} delays, got ${jsDelays.length}`
118-
);
119-
for (let i = 0; i < expectedDelays.length; i++) {
120-
assertAlmostEquals(
121-
jsDelays[i],
122-
expectedDelays[i],
123-
0.2,
124-
`JS delay #${i + 1} should be ~${expectedDelays[i]}s`
125-
);
126-
}
86+
assertDelaysMatch(jsDelays, expectedDelays, 0.2);
12787

12888
// Verify VT-based delays match expected pattern (within 200ms tolerance)
129-
assertEquals(
130-
vtDelays.length,
131-
expectedDelays.length,
132-
`Expected ${expectedDelays.length} VT delays, got ${vtDelays.length}`
133-
);
134-
for (let i = 0; i < expectedDelays.length; i++) {
135-
assertAlmostEquals(
136-
vtDelays[i],
137-
expectedDelays[i],
138-
0.2,
139-
`VT delay #${i + 1} should be ~${expectedDelays[i]}s`
140-
);
141-
}
89+
assertDelaysMatch(vtDelays, expectedDelays, 0.2);
14290

14391
// Verify total invocation count
14492
assertEquals(
145-
getInvocationCount(),
93+
spy.count(),
14694
workerConfig.retry.limit,
14795
`Handler should be called ${workerConfig.retry.limit} times`
14896
);

pkgs/edge-worker/tests/integration/fixed_retry.test.ts

Lines changed: 21 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1-
import { assertEquals, assertAlmostEquals } from '@std/assert';
1+
import { assertEquals } from '@std/assert';
22
import { createQueueWorker } from '../../src/queue/createQueueWorker.ts';
3-
import { createTestPlatformAdapter } from './_helpers.ts';
3+
import {
4+
createTestPlatformAdapter,
5+
createHandlerSpy,
6+
calculateJsDelays,
7+
calculateVtDelays,
8+
assertDelaysMatch,
9+
} from './_helpers.ts';
410
import { withTransaction } from '../db.ts';
511
import { createFakeLogger } from '../fakes.ts';
612
import { log, waitFor } from '../e2e/_helpers.ts';
@@ -18,55 +24,21 @@ const workerConfig = {
1824
queueName: 'fixed_retry_test',
1925
} as const;
2026

21-
/**
22-
* Creates a handler that always fails and collects timing data from two sources:
23-
* 1. JS timestamps (Date.now()) - when handler is invoked
24-
* 2. DB visibility times (rawMessage.vt) - when message became visible
25-
*/
26-
function createFailingHandler() {
27-
const invocations: Array<{ jsTime: number; vt: string }> = [];
28-
return {
29-
handler: (_payload: Json, context: MessageHandlerContext) => {
30-
invocations.push({
31-
jsTime: Date.now(),
32-
vt: context.rawMessage.vt,
33-
});
34-
log(`Invocation #${invocations.length} at ${new Date().toISOString()}`);
35-
throw new Error('Intentional failure for fixed retry test');
36-
},
37-
getInvocationCount: () => invocations.length,
38-
getJsDelays: () => {
39-
// Delays in seconds from JS timestamps
40-
const delays: number[] = [];
41-
for (let i = 1; i < invocations.length; i++) {
42-
delays.push((invocations[i].jsTime - invocations[i - 1].jsTime) / 1000);
43-
}
44-
return delays;
45-
},
46-
getVtDelays: () => {
47-
// Delays in seconds from DB visibility times
48-
const delays: number[] = [];
49-
for (let i = 1; i < invocations.length; i++) {
50-
const vt1 = new Date(invocations[i - 1].vt).getTime();
51-
const vt2 = new Date(invocations[i].vt).getTime();
52-
delays.push((vt2 - vt1) / 1000);
53-
}
54-
return delays;
55-
},
56-
};
57-
}
58-
5927
/**
6028
* Test verifies that fixed retry strategy applies constant delay.
6129
* Uses dual-source timing validation (JS timestamps + DB visibility times).
6230
*/
6331
Deno.test(
6432
'queue worker applies fixed delay on retries',
6533
withTransaction(async (sql) => {
66-
const { handler, getInvocationCount, getJsDelays, getVtDelays } =
67-
createFailingHandler();
34+
const spy = createHandlerSpy<Json, MessageHandlerContext>(
35+
(_input, _context) => {
36+
log(`Invocation #${spy.count()} at ${new Date().toISOString()}`);
37+
throw new Error('Intentional failure for fixed retry test');
38+
}
39+
);
6840
const worker = createQueueWorker(
69-
handler,
41+
spy.handler,
7042
{
7143
sql,
7244
...workerConfig,
@@ -92,13 +64,13 @@ Deno.test(
9264
log(`Sent message with ID: ${msgIds[0]}`);
9365

9466
// Wait for all retries to complete
95-
await waitFor(() => getInvocationCount() >= workerConfig.retry.limit, {
67+
await waitFor(() => spy.count() >= workerConfig.retry.limit, {
9668
timeoutMs: 30000,
9769
});
9870

9971
// Get delays from both sources
100-
const jsDelays = getJsDelays();
101-
const vtDelays = getVtDelays();
72+
const jsDelays = calculateJsDelays(spy.invocations);
73+
const vtDelays = calculateVtDelays(spy.invocations);
10274

10375
log(`JS delays: ${JSON.stringify(jsDelays)}`);
10476
log(`VT delays: ${JSON.stringify(vtDelays)}`);
@@ -108,38 +80,14 @@ Deno.test(
10880
const expectedDelays = [3, 3];
10981

11082
// Verify JS-based delays match expected pattern (within 200ms tolerance)
111-
assertEquals(
112-
jsDelays.length,
113-
expectedDelays.length,
114-
`Expected ${expectedDelays.length} delays, got ${jsDelays.length}`
115-
);
116-
for (let i = 0; i < expectedDelays.length; i++) {
117-
assertAlmostEquals(
118-
jsDelays[i],
119-
expectedDelays[i],
120-
0.2,
121-
`JS delay #${i + 1} should be ~${expectedDelays[i]}s`
122-
);
123-
}
83+
assertDelaysMatch(jsDelays, expectedDelays, 0.2);
12484

12585
// Verify VT-based delays match expected pattern (within 200ms tolerance)
126-
assertEquals(
127-
vtDelays.length,
128-
expectedDelays.length,
129-
`Expected ${expectedDelays.length} VT delays, got ${vtDelays.length}`
130-
);
131-
for (let i = 0; i < expectedDelays.length; i++) {
132-
assertAlmostEquals(
133-
vtDelays[i],
134-
expectedDelays[i],
135-
0.2,
136-
`VT delay #${i + 1} should be ~${expectedDelays[i]}s`
137-
);
138-
}
86+
assertDelaysMatch(vtDelays, expectedDelays, 0.2);
13987

14088
// Verify total invocation count
14189
assertEquals(
142-
getInvocationCount(),
90+
spy.count(),
14391
workerConfig.retry.limit,
14492
`Handler should be called ${workerConfig.retry.limit} times`
14593
);

0 commit comments

Comments
 (0)