Skip to content

Commit 0bbf07a

Browse files
authored
feat(sdk): migrate waitForCondition to DurablePromise (#297)
Migrated wait-for-condition-handler to use DurablePromise pattern, enabling two-phase execution where operations start immediately but only gracefully terminate when awaited. Changes: - Updated handler to return DurablePromise<T> instead of Promise<T> - Implemented phase 1 (immediate execution) and phase 2 (await result) - Added isAwaited flag and waitingCallback mechanism for graceful termination - Updated waitForContinuation and executeWaitForCondition to support onAwaitedChange - Fixed tests to handle synchronous validation errors and DurablePromise instances - Added two-phase execution tests - Added assertions after timeout but before await to prove immediate execution - Applied pattern consistently across all three handlers This brings waitForCondition in line with other migrated handlers (step, invoke, wait, callback) and ensures consistent behavior across the SDK.
1 parent 737243c commit 0bbf07a

File tree

6 files changed

+349
-138
lines changed

6 files changed

+349
-138
lines changed

packages/aws-durable-execution-sdk-js/src/context/durable-context/durable-context.ts

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,13 @@ export class DurableContextImpl implements DurableContext {
237237
funcIdOrInput?: string | I,
238238
inputOrConfig?: I | InvokeConfig<I, O>,
239239
maybeConfig?: InvokeConfig<I, O>,
240-
): Promise<O> {
240+
): DurablePromise<O> {
241241
validateContextUsage(
242242
this._stepPrefix,
243243
"invoke",
244244
this.executionContext.terminationManager,
245245
);
246-
return this.withModeManagement(() => {
246+
return this.withDurableModeManagement(() => {
247247
const invokeHandler = createInvokeHandler(
248248
this.executionContext,
249249
this.checkpoint,
@@ -268,13 +268,13 @@ export class DurableContextImpl implements DurableContext {
268268
nameOrFn: string | undefined | ChildFunc<T>,
269269
fnOrOptions?: ChildFunc<T> | ChildConfig<T>,
270270
maybeOptions?: ChildConfig<T>,
271-
): Promise<T> {
271+
): DurablePromise<T> {
272272
validateContextUsage(
273273
this._stepPrefix,
274274
"runInChildContext",
275275
this.executionContext.terminationManager,
276276
);
277-
return this.withModeManagement(() => {
277+
return this.withDurableModeManagement(() => {
278278
const blockHandler = createRunInChildContextHandler(
279279
this.executionContext,
280280
this.checkpoint,
@@ -284,23 +284,20 @@ export class DurableContextImpl implements DurableContext {
284284
createDurableContext,
285285
this._parentId,
286286
);
287-
const promise = blockHandler(nameOrFn, fnOrOptions, maybeOptions);
288-
// Prevent unhandled promise rejections
289-
promise?.catch(() => {});
290-
return promise;
287+
return blockHandler(nameOrFn, fnOrOptions, maybeOptions);
291288
});
292289
}
293290

294291
wait(
295292
nameOrDuration: string | Duration,
296293
maybeDuration?: Duration,
297-
): Promise<void> {
294+
): DurablePromise<void> {
298295
validateContextUsage(
299296
this._stepPrefix,
300297
"wait",
301298
this.executionContext.terminationManager,
302299
);
303-
return this.withModeManagement(() => {
300+
return this.withDurableModeManagement(() => {
304301
const waitHandler = createWaitHandler(
305302
this.executionContext,
306303
this.checkpoint,
@@ -367,27 +364,22 @@ export class DurableContextImpl implements DurableContext {
367364
nameOrSubmitter?: string | undefined | WaitForCallbackSubmitterFunc,
368365
submitterOrConfig?: WaitForCallbackSubmitterFunc | WaitForCallbackConfig<T>,
369366
maybeConfig?: WaitForCallbackConfig<T>,
370-
): Promise<T> {
367+
): DurablePromise<T> {
371368
validateContextUsage(
372369
this._stepPrefix,
373370
"waitForCallback",
374371
this.executionContext.terminationManager,
375372
);
376-
return this.withModeManagement(() => {
373+
return this.withDurableModeManagement(() => {
377374
const waitForCallbackHandler = createWaitForCallbackHandler(
378375
this.executionContext,
379376
this.runInChildContext.bind(this),
380377
);
381-
const promise = waitForCallbackHandler(
378+
return waitForCallbackHandler(
382379
nameOrSubmitter!,
383380
submitterOrConfig,
384381
maybeConfig,
385382
);
386-
// Prevent unhandled promise rejections
387-
promise?.catch(() => {});
388-
return promise?.finally(() => {
389-
this.checkAndUpdateReplayMode();
390-
});
391383
});
392384
}
393385

@@ -397,13 +389,13 @@ export class DurableContextImpl implements DurableContext {
397389
| WaitForConditionCheckFunc<T>
398390
| WaitForConditionConfig<T>,
399391
maybeConfig?: WaitForConditionConfig<T>,
400-
): Promise<T> {
392+
): DurablePromise<T> {
401393
validateContextUsage(
402394
this._stepPrefix,
403395
"waitForCondition",
404396
this.executionContext.terminationManager,
405397
);
406-
return this.withModeManagement(() => {
398+
return this.withDurableModeManagement(() => {
407399
const waitForConditionHandler = createWaitForConditionHandler(
408400
this.executionContext,
409401
this.checkpoint,
@@ -416,20 +408,17 @@ export class DurableContextImpl implements DurableContext {
416408
this._parentId,
417409
);
418410

419-
const promise =
420-
typeof nameOrCheckFunc === "string" || nameOrCheckFunc === undefined
421-
? waitForConditionHandler(
422-
nameOrCheckFunc,
423-
checkFuncOrConfig as WaitForConditionCheckFunc<T>,
424-
maybeConfig!,
425-
)
426-
: waitForConditionHandler(
427-
nameOrCheckFunc,
428-
checkFuncOrConfig as WaitForConditionConfig<T>,
429-
);
430-
// Prevent unhandled promise rejections
431-
promise?.catch(() => {});
432-
return promise;
411+
return typeof nameOrCheckFunc === "string" ||
412+
nameOrCheckFunc === undefined
413+
? waitForConditionHandler(
414+
nameOrCheckFunc,
415+
checkFuncOrConfig as WaitForConditionCheckFunc<T>,
416+
maybeConfig!,
417+
)
418+
: waitForConditionHandler(
419+
nameOrCheckFunc,
420+
checkFuncOrConfig as WaitForConditionConfig<T>,
421+
);
433422
});
434423
}
435424

packages/aws-durable-execution-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler-two-phase.test.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { createRunInChildContextHandler } from "./run-in-child-context-handler";
2-
import { ExecutionContext, DurableContext } from "../../types";
2+
import { ExecutionContext } from "../../types";
33
import { DurablePromise } from "../../types/durable-promise";
44
import { Context } from "aws-lambda";
55

@@ -58,15 +58,18 @@ describe("Run In Child Context Handler Two-Phase Execution", () => {
5858
// Phase 1: Create the promise - this executes the logic immediately
5959
const childPromise = handler(childFn);
6060

61-
// Wait for phase 1 to complete
62-
await new Promise((resolve) => setTimeout(resolve, 50));
63-
6461
// Should return a DurablePromise
6562
expect(childPromise).toBeInstanceOf(DurablePromise);
6663

67-
// Phase 1 should have executed the child function
64+
// Wait briefly for phase 1 to start executing
65+
await new Promise((resolve) => setTimeout(resolve, 10));
66+
67+
// Phase 1 should have executed the child function (before we await the promise)
6868
expect(childFn).toHaveBeenCalled();
6969
expect(mockCheckpoint).toHaveBeenCalled();
70+
71+
// Now await the promise to verify it completes
72+
await childPromise;
7073
});
7174

7275
it("should return cached result in phase 2 when awaited", async () => {
@@ -84,10 +87,12 @@ describe("Run In Child Context Handler Two-Phase Execution", () => {
8487
// Phase 1: Create the promise
8588
const childPromise = handler(childFn);
8689

87-
// Wait for phase 1 to complete
88-
await new Promise((resolve) => setTimeout(resolve, 50));
90+
// Wait briefly for phase 1 to execute
91+
await new Promise((resolve) => setTimeout(resolve, 10));
8992

93+
// Child function should have been called before we await the promise
9094
const phase1Calls = childFn.mock.calls.length;
95+
expect(phase1Calls).toBeGreaterThan(0);
9196

9297
// Phase 2: Await the promise - should return cached result
9398
const result = await childPromise;

packages/aws-durable-execution-sdk-js/src/handlers/step-handler/step-handler-two-phase.test.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,18 @@ describe("Step Handler Two-Phase Execution", () => {
6868
// Phase 1: Create the promise - this executes the logic immediately
6969
const stepPromise = stepHandler(stepFn);
7070

71-
// Wait for phase 1 to complete
72-
await new Promise((resolve) => setTimeout(resolve, 50));
73-
7471
// Should return a DurablePromise
7572
expect(stepPromise).toBeInstanceOf(DurablePromise);
7673

77-
// Phase 1 should have executed the step function
74+
// Wait briefly for phase 1 to start executing
75+
await new Promise((resolve) => setTimeout(resolve, 10));
76+
77+
// Phase 1 should have executed the step function (before we await the promise)
7878
expect(stepFn).toHaveBeenCalled();
7979
expect(mockCheckpoint).toHaveBeenCalled();
80+
81+
// Now await the promise to verify it completes
82+
await stepPromise;
8083
});
8184

8285
it("should return cached result in phase 2 when awaited", async () => {
@@ -97,10 +100,12 @@ describe("Step Handler Two-Phase Execution", () => {
97100
// Phase 1: Create the promise
98101
const stepPromise = stepHandler(stepFn);
99102

100-
// Wait for phase 1 to complete
101-
await new Promise((resolve) => setTimeout(resolve, 50));
103+
// Wait briefly for phase 1 to execute
104+
await new Promise((resolve) => setTimeout(resolve, 10));
102105

106+
// Step function should have been called before we await the promise
103107
const phase1Calls = stepFn.mock.calls.length;
108+
expect(phase1Calls).toBeGreaterThan(0);
104109

105110
// Phase 2: Await the promise - should return cached result
106111
const result = await stepPromise;
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import { createWaitForConditionHandler } from "./wait-for-condition-handler";
2+
import { ExecutionContext, WaitForConditionCheckFunc } from "../../types";
3+
import { EventEmitter } from "events";
4+
import { DurablePromise } from "../../types/durable-promise";
5+
6+
describe("WaitForCondition Handler Two-Phase Execution", () => {
7+
let mockContext: ExecutionContext;
8+
let mockCheckpoint: any;
9+
let createStepId: () => string;
10+
let createContextLogger: (stepId: string, attempt?: number) => any;
11+
let addRunningOperation: jest.Mock;
12+
let removeRunningOperation: jest.Mock;
13+
let hasRunningOperations: () => boolean;
14+
let getOperationsEmitter: () => EventEmitter;
15+
let stepIdCounter = 0;
16+
17+
beforeEach(() => {
18+
stepIdCounter = 0;
19+
mockContext = {
20+
getStepData: jest.fn().mockReturnValue(null),
21+
durableExecutionArn: "test-arn",
22+
terminationManager: {
23+
shouldTerminate: jest.fn().mockReturnValue(false),
24+
terminate: jest.fn(),
25+
},
26+
} as any;
27+
28+
mockCheckpoint = jest.fn().mockResolvedValue(undefined);
29+
mockCheckpoint.force = jest.fn().mockResolvedValue(undefined);
30+
mockCheckpoint.setTerminating = jest.fn();
31+
mockCheckpoint.hasPendingAncestorCompletion = jest
32+
.fn()
33+
.mockReturnValue(false);
34+
35+
createStepId = (): string => `step-${++stepIdCounter}`;
36+
createContextLogger = jest.fn().mockReturnValue({
37+
info: jest.fn(),
38+
warn: jest.fn(),
39+
error: jest.fn(),
40+
});
41+
addRunningOperation = jest.fn();
42+
removeRunningOperation = jest.fn();
43+
hasRunningOperations = jest.fn().mockReturnValue(false) as () => boolean;
44+
getOperationsEmitter = (): EventEmitter => new EventEmitter();
45+
});
46+
47+
it("should execute check function in phase 1 immediately", async () => {
48+
const waitForConditionHandler = createWaitForConditionHandler(
49+
mockContext,
50+
mockCheckpoint,
51+
createStepId,
52+
createContextLogger,
53+
addRunningOperation,
54+
removeRunningOperation,
55+
hasRunningOperations,
56+
getOperationsEmitter,
57+
undefined,
58+
);
59+
60+
const checkFn: WaitForConditionCheckFunc<number> = jest
61+
.fn()
62+
.mockResolvedValue(10);
63+
64+
// Phase 1: Create the promise - this executes the logic immediately
65+
const promise = waitForConditionHandler(checkFn, {
66+
initialState: 0,
67+
waitStrategy: (_state) => ({ shouldContinue: false }),
68+
});
69+
70+
// Should return a DurablePromise
71+
expect(promise).toBeInstanceOf(DurablePromise);
72+
73+
// Wait briefly for phase 1 to start executing
74+
await new Promise((resolve) => setTimeout(resolve, 10));
75+
76+
// Phase 1 should have executed the check function (before we await the promise)
77+
expect(checkFn).toHaveBeenCalled();
78+
expect(mockCheckpoint).toHaveBeenCalled();
79+
80+
// Now await the promise to verify it completes
81+
await promise;
82+
});
83+
84+
it("should return cached result in phase 2 when awaited", async () => {
85+
const waitForConditionHandler = createWaitForConditionHandler(
86+
mockContext,
87+
mockCheckpoint,
88+
createStepId,
89+
createContextLogger,
90+
addRunningOperation,
91+
removeRunningOperation,
92+
hasRunningOperations,
93+
getOperationsEmitter,
94+
undefined,
95+
);
96+
97+
const checkFn: WaitForConditionCheckFunc<string> = jest
98+
.fn()
99+
.mockResolvedValue("completed");
100+
101+
// Phase 1: Create the promise
102+
const promise = waitForConditionHandler(checkFn, {
103+
initialState: "initial",
104+
waitStrategy: (_state) => ({ shouldContinue: false }),
105+
});
106+
107+
// Wait briefly for phase 1 to execute
108+
await new Promise((resolve) => setTimeout(resolve, 10));
109+
110+
// Check function should have been called before we await the promise
111+
expect(checkFn).toHaveBeenCalledTimes(1);
112+
113+
// Phase 2: Await the promise to get the result
114+
const result = await promise;
115+
116+
expect(result).toBe("completed");
117+
expect(checkFn).toHaveBeenCalledTimes(1);
118+
});
119+
120+
it("should execute check function before await", async () => {
121+
const waitForConditionHandler = createWaitForConditionHandler(
122+
mockContext,
123+
mockCheckpoint,
124+
createStepId,
125+
createContextLogger,
126+
addRunningOperation,
127+
removeRunningOperation,
128+
hasRunningOperations,
129+
getOperationsEmitter,
130+
undefined,
131+
);
132+
133+
let executionOrder: string[] = [];
134+
const checkFn: WaitForConditionCheckFunc<number> = jest.fn(async () => {
135+
executionOrder.push("check-executed");
136+
return 42;
137+
});
138+
139+
// Phase 1: Create the promise
140+
executionOrder.push("promise-created");
141+
const promise = waitForConditionHandler(checkFn, {
142+
initialState: 0,
143+
waitStrategy: (_state) => ({ shouldContinue: false }),
144+
});
145+
executionOrder.push("after-handler-call");
146+
147+
// Wait briefly for phase 1 to execute
148+
await new Promise((resolve) => setTimeout(resolve, 10));
149+
150+
// Check should have executed before we await
151+
expect(checkFn).toHaveBeenCalled();
152+
153+
executionOrder.push("before-await");
154+
const result = await promise;
155+
executionOrder.push("after-await");
156+
157+
// Verify execution order: check should execute before await
158+
expect(executionOrder).toEqual([
159+
"promise-created",
160+
"check-executed",
161+
"after-handler-call",
162+
"before-await",
163+
"after-await",
164+
]);
165+
expect(result).toBe(42);
166+
});
167+
});

0 commit comments

Comments
 (0)