diff --git a/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.history.json b/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.history.json index 4c0f87f0..d36ace62 100644 --- a/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.history.json +++ b/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.history.json @@ -2,8 +2,8 @@ { "EventType": "ExecutionStarted", "EventId": 1, - "Id": "a5d643ee-d1a2-4ee7-aed2-5cdc482963b3", - "EventTimestamp": "2025-12-03T22:59:01.757Z", + "Id": "5bf93d96-3d76-4b4a-ab50-6030f2e8519a", + "EventTimestamp": "2025-12-05T00:16:50.853Z", "ExecutionStartedDetails": { "Input": { "Payload": "{}" @@ -16,7 +16,7 @@ "EventId": 2, "Id": "c4ca4238a0b92382", "Name": "parent-block", - "EventTimestamp": "2025-12-03T22:59:01.759Z", + "EventTimestamp": "2025-12-05T00:16:50.869Z", "ContextStartedDetails": {} }, { @@ -25,7 +25,7 @@ "EventId": 3, "Id": "ea66c06c1e1c05fa", "Name": "parallel-branch-0", - "EventTimestamp": "2025-12-03T22:59:01.759Z", + "EventTimestamp": "2025-12-05T00:16:50.869Z", "ParentId": "c4ca4238a0b92382", "ContextStartedDetails": {} }, @@ -35,11 +35,11 @@ "EventId": 4, "Id": "2f221a18eb863803", "Name": "wait-1-second", - "EventTimestamp": "2025-12-03T22:59:01.759Z", + "EventTimestamp": "2025-12-05T00:16:50.872Z", "ParentId": "ea66c06c1e1c05fa", "WaitStartedDetails": { "Duration": 1, - "ScheduledEndTimestamp": "2025-12-03T22:59:02.759Z" + "ScheduledEndTimestamp": "2025-12-05T00:16:51.872Z" } }, { @@ -48,7 +48,7 @@ "EventId": 5, "Id": "98c6f2c2287f4c73", "Name": "parallel-branch-1", - "EventTimestamp": "2025-12-03T22:59:01.759Z", + "EventTimestamp": "2025-12-05T00:16:50.872Z", "ParentId": "c4ca4238a0b92382", "ContextStartedDetails": {} }, @@ -57,12 +57,12 @@ "SubType": "Wait", "EventId": 6, "Id": "6151f5ab282d90e4", - "Name": "wait-2-seconds", - "EventTimestamp": "2025-12-03T22:59:01.759Z", + "Name": "wait-1-second-again", + "EventTimestamp": "2025-12-05T00:16:50.872Z", "ParentId": "98c6f2c2287f4c73", "WaitStartedDetails": { - "Duration": 2, - "ScheduledEndTimestamp": "2025-12-03T22:59:03.759Z" + "Duration": 1, + "ScheduledEndTimestamp": "2025-12-05T00:16:51.872Z" } }, { @@ -71,7 +71,7 @@ "EventId": 7, "Id": "13cee27a2bd93915", "Name": "parallel-branch-2", - "EventTimestamp": "2025-12-03T22:59:01.759Z", + "EventTimestamp": "2025-12-05T00:16:50.872Z", "ParentId": "c4ca4238a0b92382", "ContextStartedDetails": {} }, @@ -80,44 +80,91 @@ "SubType": "Wait", "EventId": 8, "Id": "b425e0c75591aa8f", - "Name": "wait-5-seconds", - "EventTimestamp": "2025-12-03T22:59:01.759Z", + "Name": "wait-2-seconds", + "EventTimestamp": "2025-12-05T00:16:50.872Z", "ParentId": "13cee27a2bd93915", + "WaitStartedDetails": { + "Duration": 2, + "ScheduledEndTimestamp": "2025-12-05T00:16:52.872Z" + } + }, + { + "EventType": "ContextStarted", + "SubType": "ParallelBranch", + "EventId": 9, + "Id": "3a170a9fe4f47efa", + "Name": "parallel-branch-3", + "EventTimestamp": "2025-12-05T00:16:50.872Z", + "ParentId": "c4ca4238a0b92382", + "ContextStartedDetails": {} + }, + { + "EventType": "WaitStarted", + "SubType": "Wait", + "EventId": 10, + "Id": "a4e1cd317d54f087", + "Name": "wait-5-seconds", + "EventTimestamp": "2025-12-05T00:16:50.872Z", + "ParentId": "3a170a9fe4f47efa", "WaitStartedDetails": { "Duration": 5, - "ScheduledEndTimestamp": "2025-12-03T22:59:06.759Z" + "ScheduledEndTimestamp": "2025-12-05T00:16:55.872Z" } }, { "EventType": "InvocationCompleted", - "EventId": 9, - "EventTimestamp": "2025-12-03T22:59:01.810Z", + "EventId": 11, + "EventTimestamp": "2025-12-05T00:16:50.924Z", "InvocationCompletedDetails": { - "StartTimestamp": "2025-12-03T22:59:01.757Z", - "EndTimestamp": "2025-12-03T22:59:01.810Z", + "StartTimestamp": "2025-12-05T00:16:50.853Z", + "EndTimestamp": "2025-12-05T00:16:50.924Z", "Error": {}, - "RequestId": "50ccf531-dbcd-4392-a08b-2c4c60bb4c80" + "RequestId": "24b3c358-2e22-4f84-9423-577754ffe3c7" } }, { "EventType": "WaitSucceeded", "SubType": "Wait", - "EventId": 10, + "EventId": 12, "Id": "2f221a18eb863803", "Name": "wait-1-second", - "EventTimestamp": "2025-12-03T22:59:02.761Z", + "EventTimestamp": "2025-12-05T00:16:51.872Z", "ParentId": "ea66c06c1e1c05fa", "WaitSucceededDetails": { "Duration": 1 } }, + { + "EventType": "WaitSucceeded", + "SubType": "Wait", + "EventId": 13, + "Id": "6151f5ab282d90e4", + "Name": "wait-1-second-again", + "EventTimestamp": "2025-12-05T00:16:51.872Z", + "ParentId": "98c6f2c2287f4c73", + "WaitSucceededDetails": { + "Duration": 1 + } + }, { "EventType": "ContextSucceeded", "SubType": "ParallelBranch", - "EventId": 11, + "EventId": 14, "Id": "ea66c06c1e1c05fa", "Name": "parallel-branch-0", - "EventTimestamp": "2025-12-03T22:59:02.770Z", + "EventTimestamp": "2025-12-05T00:16:51.875Z", + "ParentId": "c4ca4238a0b92382", + "ContextSucceededDetails": { + "Result": {} + } + }, + { + "EventType": "ContextSucceeded", + "SubType": "ParallelBranch", + "EventId": 15, + "Id": "98c6f2c2287f4c73", + "Name": "parallel-branch-1", + "EventTimestamp": "2025-12-05T00:16:51.875Z", "ParentId": "c4ca4238a0b92382", "ContextSucceededDetails": { "Result": {} @@ -125,23 +172,23 @@ }, { "EventType": "InvocationCompleted", - "EventId": 12, - "EventTimestamp": "2025-12-03T22:59:02.822Z", + "EventId": 16, + "EventTimestamp": "2025-12-05T00:16:51.925Z", "InvocationCompletedDetails": { - "StartTimestamp": "2025-12-03T22:59:02.762Z", - "EndTimestamp": "2025-12-03T22:59:02.822Z", + "StartTimestamp": "2025-12-05T00:16:51.872Z", + "EndTimestamp": "2025-12-05T00:16:51.925Z", "Error": {}, - "RequestId": "52e8da81-1d04-4aae-8a67-c051fe2f9043" + "RequestId": "2119315f-aff7-4503-9556-8288b7cfaa5a" } }, { "EventType": "WaitSucceeded", "SubType": "Wait", - "EventId": 13, - "Id": "6151f5ab282d90e4", + "EventId": 17, + "Id": "b425e0c75591aa8f", "Name": "wait-2-seconds", - "EventTimestamp": "2025-12-03T22:59:03.760Z", - "ParentId": "98c6f2c2287f4c73", + "EventTimestamp": "2025-12-05T00:16:52.873Z", + "ParentId": "13cee27a2bd93915", "WaitSucceededDetails": { "Duration": 2 } @@ -149,10 +196,10 @@ { "EventType": "ContextSucceeded", "SubType": "ParallelBranch", - "EventId": 14, - "Id": "98c6f2c2287f4c73", - "Name": "parallel-branch-1", - "EventTimestamp": "2025-12-03T22:59:03.761Z", + "EventId": 18, + "Id": "13cee27a2bd93915", + "Name": "parallel-branch-2", + "EventTimestamp": "2025-12-05T00:16:52.877Z", "ParentId": "c4ca4238a0b92382", "ContextSucceededDetails": { "Result": {} @@ -160,23 +207,23 @@ }, { "EventType": "InvocationCompleted", - "EventId": 15, - "EventTimestamp": "2025-12-03T22:59:03.813Z", + "EventId": 19, + "EventTimestamp": "2025-12-05T00:16:52.929Z", "InvocationCompletedDetails": { - "StartTimestamp": "2025-12-03T22:59:03.760Z", - "EndTimestamp": "2025-12-03T22:59:03.813Z", + "StartTimestamp": "2025-12-05T00:16:52.873Z", + "EndTimestamp": "2025-12-05T00:16:52.929Z", "Error": {}, - "RequestId": "1abb1bfd-2628-4f0e-80c7-0fe60bd3f416" + "RequestId": "5907d455-ef8a-4021-b822-bcffa5dd2d07" } }, { "EventType": "WaitSucceeded", "SubType": "Wait", - "EventId": 16, - "Id": "b425e0c75591aa8f", + "EventId": 20, + "Id": "a4e1cd317d54f087", "Name": "wait-5-seconds", - "EventTimestamp": "2025-12-03T22:59:06.760Z", - "ParentId": "13cee27a2bd93915", + "EventTimestamp": "2025-12-05T00:16:55.873Z", + "ParentId": "3a170a9fe4f47efa", "WaitSucceededDetails": { "Duration": 5 } @@ -184,10 +231,10 @@ { "EventType": "ContextSucceeded", "SubType": "ParallelBranch", - "EventId": 17, - "Id": "13cee27a2bd93915", - "Name": "parallel-branch-2", - "EventTimestamp": "2025-12-03T22:59:06.762Z", + "EventId": 21, + "Id": "3a170a9fe4f47efa", + "Name": "parallel-branch-3", + "EventTimestamp": "2025-12-05T00:16:55.878Z", "ParentId": "c4ca4238a0b92382", "ContextSucceededDetails": { "Result": {} @@ -196,32 +243,32 @@ { "EventType": "ContextSucceeded", "SubType": "Parallel", - "EventId": 18, + "EventId": 22, "Id": "c4ca4238a0b92382", "Name": "parent-block", - "EventTimestamp": "2025-12-03T22:59:06.762Z", + "EventTimestamp": "2025-12-05T00:16:55.880Z", "ContextSucceededDetails": { "Result": { - "Payload": "{\"all\":[{\"index\":0,\"status\":\"SUCCEEDED\"},{\"index\":1,\"status\":\"SUCCEEDED\"},{\"index\":2,\"status\":\"SUCCEEDED\"}],\"completionReason\":\"ALL_COMPLETED\"}" + "Payload": "{\"all\":[{\"index\":0,\"status\":\"SUCCEEDED\"},{\"index\":1,\"status\":\"SUCCEEDED\"},{\"index\":2,\"status\":\"SUCCEEDED\"},{\"index\":3,\"status\":\"SUCCEEDED\"}],\"completionReason\":\"ALL_COMPLETED\"}" } } }, { "EventType": "InvocationCompleted", - "EventId": 19, - "EventTimestamp": "2025-12-03T22:59:06.762Z", + "EventId": 23, + "EventTimestamp": "2025-12-05T00:16:55.881Z", "InvocationCompletedDetails": { - "StartTimestamp": "2025-12-03T22:59:06.760Z", - "EndTimestamp": "2025-12-03T22:59:06.762Z", + "StartTimestamp": "2025-12-05T00:16:55.874Z", + "EndTimestamp": "2025-12-05T00:16:55.881Z", "Error": {}, - "RequestId": "9c2ae0a0-53bf-497f-880f-cd211084c897" + "RequestId": "6fa7d6b2-831c-4e48-a0c6-9cb81bdfe3c6" } }, { "EventType": "ExecutionSucceeded", - "EventId": 20, - "Id": "a5d643ee-d1a2-4ee7-aed2-5cdc482963b3", - "EventTimestamp": "2025-12-03T22:59:06.762Z", + "EventId": 24, + "Id": "5bf93d96-3d76-4b4a-ab50-6030f2e8519a", + "EventTimestamp": "2025-12-05T00:16:55.881Z", "ExecutionSucceededDetails": { "Result": { "Payload": "\"Completed waits\"" diff --git a/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.test.ts b/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.test.ts index 32f6de05..f51b7d44 100644 --- a/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.test.ts +++ b/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.test.ts @@ -15,14 +15,16 @@ createTests({ const parentBlockOp = runner.getOperation("parent-block"); const wait1SecondOp = runner.getOperation("wait-1-second"); + const wait1SecondAgainOp = runner.getOperation("wait-1-second-again"); const wait2SecondsOp = runner.getOperation("wait-2-seconds"); const wait5SecondsOp = runner.getOperation("wait-5-seconds"); expect(execution.getResult()).toBe("Completed waits"); - expect(execution.getOperations()).toHaveLength(7); + expect(execution.getOperations()).toHaveLength(9); - expect(parentBlockOp.getChildOperations()).toHaveLength(3); + expect(parentBlockOp.getChildOperations()).toHaveLength(4); + expect(wait1SecondAgainOp.getWaitDetails()!.waitSeconds).toBe(1); expect(wait1SecondOp.getWaitDetails()!.waitSeconds!).toBe(1); expect(wait2SecondsOp.getWaitDetails()!.waitSeconds!).toBe(2); expect(wait5SecondsOp.getWaitDetails()!.waitSeconds!).toBe(5); diff --git a/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.ts b/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.ts index c253a31f..18f755b3 100644 --- a/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.ts +++ b/packages/aws-durable-execution-sdk-js-examples/src/examples/parallel/wait/parallel-wait.ts @@ -16,6 +16,8 @@ export const handler = withDurableExecution( await context.parallel("parent-block", [ async (childContext: DurableContext) => await childContext.wait("wait-1-second", { seconds: 1 }), + async (childContext: DurableContext) => + await childContext.wait("wait-1-second-again", { seconds: 1 }), async (childContext: DurableContext) => await childContext.wait("wait-2-seconds", { seconds: 2 }), async (childContext: DurableContext) => diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/callbacks.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/callbacks.test.ts index 4d9d9185..755fcba2 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/callbacks.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/callbacks.test.ts @@ -14,6 +14,7 @@ import { CheckpointManager } from "../../storage/checkpoint-manager"; import { createExecutionId, createCallbackId, + createInvocationId, } from "../../utils/tagged-strings"; describe("callbacks handlers", () => { @@ -28,6 +29,7 @@ describe("callbacks handlers", () => { executionManager.startExecution({ executionId: mockExecutionId, payload: '{"test": "data"}', + invocationId: createInvocationId(), }); const storage = executionManager.getCheckpointsByExecution(mockExecutionId); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/checkpoint-handlers.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/checkpoint-handlers.test.ts index 2e5b5973..455d40a5 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/checkpoint-handlers.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/checkpoint-handlers.test.ts @@ -12,7 +12,10 @@ import { processCheckpointDurableExecution, } from "../checkpoint-handlers"; import { ExecutionManager } from "../../storage/execution-manager"; -import { createExecutionId } from "../../utils/tagged-strings"; +import { + createExecutionId, + createInvocationId, +} from "../../utils/tagged-strings"; import { encodeCheckpointToken } from "../../utils/checkpoint-token"; // Mock only external dependencies we can't control @@ -28,6 +31,7 @@ describe("checkpoint handlers", () => { const invocationResult = executionManager.startExecution({ executionId, payload: '{"test": "data"}', + invocationId: createInvocationId(), }); const storage = executionManager.getCheckpointsByExecution(executionId); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/execution-handlers.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/execution-handlers.test.ts index d7c23098..f525e888 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/execution-handlers.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/execution-handlers.test.ts @@ -26,17 +26,25 @@ describe("execution handlers", () => { const startExecutionSpy = jest.spyOn(executionManager, "startExecution"); const payload = '{"test": "execution data"}'; - const result = processStartDurableExecution(payload, executionManager); + const invocationId = "mock-invocation-id"; + const result = processStartDurableExecution( + { + payload, + invocationId: createInvocationId(invocationId), + }, + executionManager, + ); expect(startExecutionSpy).toHaveBeenCalledWith({ payload, + invocationId: invocationId, executionId: expect.any(String), }); expect(result).toEqual({ checkpointToken: expect.any(String), executionId: expect.any(String), - invocationId: expect.any(String), + invocationId: invocationId, operationEvents: expect.any(Array), }); }); @@ -47,6 +55,7 @@ describe("execution handlers", () => { // First create an execution const payload = '{"test": "data"}'; executionManager.startExecution({ + invocationId: createInvocationId(), executionId: createExecutionId("test-execution-id"), payload, }); @@ -56,14 +65,20 @@ describe("execution handlers", () => { "startInvocation", ); + const executionId = createExecutionId("test-execution-id"); + const invocationId = createInvocationId("test-invocation-id"); const result = processStartInvocation( - "test-execution-id", + { + executionId, + invocationId, + }, executionManager, ); - expect(startInvocationSpy).toHaveBeenCalledWith( - createExecutionId("test-execution-id"), - ); + expect(startInvocationSpy).toHaveBeenCalledWith({ + executionId, + invocationId, + }); expect(result).toEqual({ checkpointToken: expect.any(String), executionId: createExecutionId("test-execution-id"), @@ -79,13 +94,23 @@ describe("execution handlers", () => { throw new Error("Execution not found"); }); + const executionId = createExecutionId("non-existent-execution"); + const invocationId = createInvocationId("test-invocation-id"); + expect(() => - processStartInvocation("non-existent-execution", executionManager), + processStartInvocation( + { + executionId, + invocationId, + }, + executionManager, + ), ).toThrow("Execution not found"); - expect(startInvocationSpy).toHaveBeenCalledWith( - createExecutionId("non-existent-execution"), - ); + expect(startInvocationSpy).toHaveBeenCalledWith({ + executionId, + invocationId, + }); }); }); @@ -94,7 +119,11 @@ describe("execution handlers", () => { // First create an execution and invocation const executionId = createExecutionId("test-execution-id"); const invocationId = createInvocationId("test-invocation-id"); - executionManager.startExecution({ executionId, payload: "{}" }); + executionManager.startExecution({ + executionId, + payload: "{}", + invocationId: createInvocationId(), + }); const completeInvocationSpy = jest.spyOn( executionManager, diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/state-handlers.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/state-handlers.test.ts index 335e23f9..0a9a4b35 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/state-handlers.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/state-handlers.test.ts @@ -1,7 +1,10 @@ import { OperationType, OperationAction } from "@aws-sdk/client-lambda"; import { processGetDurableExecutionState } from "../state-handlers"; import { ExecutionManager } from "../../storage/execution-manager"; -import { createExecutionId } from "../../utils/tagged-strings"; +import { + createExecutionId, + createInvocationId, +} from "../../utils/tagged-strings"; describe("state handlers", () => { let executionManager: ExecutionManager; @@ -21,6 +24,7 @@ describe("state handlers", () => { executionManager.startExecution({ executionId, payload: '{"test": "data"}', + invocationId: createInvocationId(), }); const storage = executionManager.getCheckpointsByExecution(executionId); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/execution-handlers.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/execution-handlers.ts index 0a8ad3a5..d7722f5a 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/execution-handlers.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/execution-handlers.ts @@ -8,17 +8,21 @@ import { ExecutionManager, InvocationResult, } from "../storage/execution-manager"; +import { + StartDurableExecutionRequest, + StartInvocationRequest, +} from "../worker-api/worker-api-request"; /** * Starts a durable execution. Returns the data needed for the handler invocation event. */ export function processStartDurableExecution( - payload: string | undefined, + params: StartDurableExecutionRequest, executionManager: ExecutionManager, ): InvocationResult { return executionManager.startExecution({ - payload, executionId: createExecutionId(), + ...params, }); } @@ -27,10 +31,10 @@ export function processStartDurableExecution( * in-progress execution. */ export function processStartInvocation( - executionIdParam: string, + params: StartInvocationRequest, executionManager: ExecutionManager, ): InvocationResult { - return executionManager.startInvocation(createExecutionId(executionIdParam)); + return executionManager.startInvocation(params); } export function processCompleteInvocation( diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/execution-manager.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/execution-manager.test.ts index 031d260b..f4ffb69b 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/execution-manager.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/execution-manager.test.ts @@ -115,9 +115,11 @@ describe("execution-manager", () => { it("should create a new execution with the provided parameters", () => { // Test data const executionId = createExecutionId("test-execution-id"); + const invocationId = createInvocationId("test-invocation-id"); const params: StartExecutionParams = { executionId, payload: '{"key":"value"}', + invocationId, }; // Expected mock operation with properly typed id @@ -144,9 +146,9 @@ describe("execution-manager", () => { // Verify results expect(result).toEqual({ - checkpointToken: `encoded-{"executionId":"test-execution-id","token":"mocked-uuid","invocationId":"mocked-uuid"}`, + checkpointToken: `encoded-{"executionId":"${executionId}","token":"mocked-uuid","invocationId":"${invocationId}"}`, executionId, - invocationId: "mocked-uuid", + invocationId, operationEvents: [ { operation: mockInitialOperation, @@ -167,6 +169,7 @@ describe("execution-manager", () => { const executionId = createExecutionId("test-execution-id"); const params: StartExecutionParams = { executionId, + invocationId: createInvocationId("test-invocation-id"), }; const initializeSpy = jest.spyOn( @@ -185,7 +188,12 @@ describe("execution-manager", () => { it("should throw error if execution ID doesn't exist", () => { const nonExistentId = createExecutionId("non-existent"); - expect(() => executionManager.startInvocation(nonExistentId)).toThrow( + expect(() => + executionManager.startInvocation({ + executionId: nonExistentId, + invocationId: createInvocationId("test-invocation-id"), + }), + ).toThrow( "Could not start invocation for invalid execution non-existent", ); }); @@ -193,30 +201,35 @@ describe("execution-manager", () => { it("should start a new invocation for an existing execution", () => { // First create an execution const executionId = createExecutionId("test-execution-id"); - executionManager.startExecution({ executionId }); + const invocationId = createInvocationId("test-invocation-id"); + executionManager.startExecution({ executionId, invocationId }); // Reset mocks to track new calls jest.clearAllMocks(); - (randomUUID as jest.Mock).mockReturnValue("new-invocation-uuid"); + const newInvocationId = createInvocationId("new-invocation-id"); // Start a new invocation - const result = executionManager.startInvocation(executionId); + const result = executionManager.startInvocation({ + executionId, + invocationId: newInvocationId, + }); expect(result).toBeDefined(); expect(result.executionId).toBe(executionId); - expect(result.invocationId).toBe("new-invocation-uuid"); + expect(result.invocationId).toBe(newInvocationId); expect(result.operationEvents).toBeInstanceOf(Array); expect(encodeCheckpointToken).toHaveBeenCalledWith({ executionId, - token: "new-invocation-uuid", - invocationId: "new-invocation-uuid", + token: expect.any(String), + invocationId: newInvocationId, }); }); it("should include all operations from the checkpoint storage", () => { // Create an execution with a mock operation const executionId = createExecutionId("test-execution-id"); - executionManager.startExecution({ executionId }); + const invocationId = createInvocationId("test-invocation-id"); + executionManager.startExecution({ executionId, invocationId }); // Get the storage and add some operations to it const storage = executionManager.getCheckpointsByExecution(executionId); @@ -247,7 +260,11 @@ describe("execution-manager", () => { }); // Start a new invocation - const result = executionManager.startInvocation(executionId); + const newInvocationId = createInvocationId("new-invocation-id"); + const result = executionManager.startInvocation({ + executionId, + invocationId: newInvocationId, + }); // Check that we got all operations expect(result.operationEvents).toHaveLength(2); @@ -264,13 +281,20 @@ describe("execution-manager", () => { it("should throw error if execution is completed already", () => { // First create an execution const executionId = createExecutionId("test-execution-id"); - executionManager.startExecution({ executionId }); + const invocationId = createInvocationId("test-invocation-id"); + executionManager.startExecution({ executionId, invocationId }); jest .spyOn(CheckpointManager.prototype, "isExecutionCompleted") .mockReturnValue(true); - expect(() => executionManager.startInvocation(executionId)).toThrow( + const newInvocationId = createInvocationId("new-invocation-id"); + expect(() => + executionManager.startInvocation({ + executionId, + invocationId: newInvocationId, + }), + ).toThrow( `Could not start invocation for completed execution ${executionId}`, ); }); @@ -286,7 +310,8 @@ describe("execution-manager", () => { it("should return the checkpoint storage for an existing execution ID", () => { const executionId = createExecutionId("test-execution-id"); - executionManager.startExecution({ executionId }); + const invocationId = createInvocationId("test-invocation-id"); + executionManager.startExecution({ executionId, invocationId }); const storage = executionManager.getCheckpointsByExecution(executionId); @@ -321,7 +346,8 @@ describe("execution-manager", () => { it("should return storage and token data for a valid token", () => { // Create an execution const executionId = createExecutionId("test-execution-id"); - executionManager.startExecution({ executionId }); + const invocationId = createInvocationId("test-invocation-id"); + executionManager.startExecution({ executionId, invocationId }); // Create token data that references the execution const tokenData: CheckpointTokenData = { @@ -378,7 +404,8 @@ describe("execution-manager", () => { it("should return CheckpointManager for valid callback ID with existing execution", () => { // Create an execution first const executionId = createExecutionId("test-execution-id"); - executionManager.startExecution({ executionId }); + const invocationId = createInvocationId("test-invocation-id"); + executionManager.startExecution({ executionId, invocationId }); const callbackId = createCallbackId("valid-callback-id"); @@ -398,7 +425,8 @@ describe("execution-manager", () => { it("should return the same CheckpointManager instance as getCheckpointsByExecution", () => { // Create an execution first const executionId = createExecutionId("test-execution-id"); - executionManager.startExecution({ executionId }); + const invocationId = createInvocationId("test-invocation-id"); + executionManager.startExecution({ executionId, invocationId }); const callbackId = createCallbackId("valid-callback-id"); @@ -451,7 +479,7 @@ describe("execution-manager", () => { // Create an execution first const executionId = createExecutionId("test-execution-id"); const invocationId = createInvocationId("test-invocation-id"); - executionManager.startExecution({ executionId }); + executionManager.startExecution({ executionId, invocationId }); const storage = executionManager.getCheckpointsByExecution(executionId); expect(storage).toBeDefined(); @@ -517,7 +545,7 @@ describe("execution-manager", () => { // Create an execution first const executionId = createExecutionId("test-execution-id"); const invocationId = createInvocationId("test-invocation-id"); - executionManager.startExecution({ executionId }); + executionManager.startExecution({ executionId, invocationId }); const storage = executionManager.getCheckpointsByExecution(executionId); expect(storage).toBeDefined(); @@ -590,7 +618,7 @@ describe("execution-manager", () => { // Create an execution first const executionId = createExecutionId("test-execution-id"); const invocationId = createInvocationId("test-invocation-id"); - executionManager.startExecution({ executionId }); + executionManager.startExecution({ executionId, invocationId }); const storage = executionManager.getCheckpointsByExecution(executionId); expect(storage).toBeDefined(); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/execution-manager.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/execution-manager.ts index b8328b78..62120287 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/execution-manager.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/execution-manager.ts @@ -6,14 +6,14 @@ import { CheckpointTokenData, } from "../utils/checkpoint-token"; import { CallbackId, CheckpointToken } from "../utils/tagged-strings"; -import { - ExecutionId, - InvocationId, - createInvocationId, -} from "../utils/tagged-strings"; +import { ExecutionId, InvocationId } from "../utils/tagged-strings"; import { decodeCallbackId } from "../utils/callback-id"; import { OperationEvents } from "../../test-runner/common/operations/operation-with-data"; import { ErrorObject, EventType, Event } from "@aws-sdk/client-lambda"; +import { + StartDurableExecutionRequest, + StartInvocationRequest, +} from "../worker-api/worker-api-request"; export interface InvocationResult { checkpointToken: CheckpointToken; @@ -22,8 +22,7 @@ export interface InvocationResult { operationEvents: OperationEvents[]; } -export interface StartExecutionParams { - payload?: string; +export interface StartExecutionParams extends StartDurableExecutionRequest { executionId: ExecutionId; } @@ -40,7 +39,7 @@ export class ExecutionManager { * @returns the necessary initial parameters that must be passed to the execution invocation event. */ startExecution(params: StartExecutionParams): InvocationResult { - const invocationId = createInvocationId(); + const invocationId = params.invocationId; const executionId = params.executionId; const storage = new CheckpointManager(executionId); @@ -69,34 +68,33 @@ export class ExecutionManager { * * @returns The list of operations for this execution and other data for the invocation event. */ - startInvocation(executionId: ExecutionId): InvocationResult { - const invocationId = createInvocationId(); - const checkpointStorage = this.executions.get(executionId); + startInvocation(params: StartInvocationRequest): InvocationResult { + const checkpointStorage = this.executions.get(params.executionId); if (!checkpointStorage) { throw new Error( - `Could not start invocation for invalid execution ${executionId}`, + `Could not start invocation for invalid execution ${params.executionId}`, ); } if (checkpointStorage.isExecutionCompleted()) { throw new Error( - `Could not start invocation for completed execution ${executionId}`, + `Could not start invocation for completed execution ${params.executionId}`, ); } - checkpointStorage.startInvocation(invocationId); + checkpointStorage.startInvocation(params.invocationId); const checkpointToken = encodeCheckpointToken({ - executionId, + executionId: params.executionId, token: randomUUID(), - invocationId, + invocationId: params.invocationId, }); return { checkpointToken, - executionId, - invocationId, + executionId: params.executionId, + invocationId: params.invocationId, operationEvents: Array.from(checkpointStorage.operationDataMap.values()), }; } diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/utils/checkpoint-token.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/utils/checkpoint-token.ts index c6545a0e..8e76741d 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/utils/checkpoint-token.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/utils/checkpoint-token.ts @@ -25,7 +25,6 @@ export function decodeCheckpointToken( !decodedJson || typeof decodedJson !== "object" || !("executionId" in decodedJson) || - !("invocationId" in decodedJson) || !("token" in decodedJson) ) { throw new Error("Invalid CheckpointTokenData format"); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/__tests__/worker-server-api-handler.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/__tests__/worker-server-api-handler.test.ts index f59177f8..78e8e971 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/__tests__/worker-server-api-handler.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/__tests__/worker-server-api-handler.test.ts @@ -1,6 +1,9 @@ import { WorkerServerApiHandler } from "../worker-server-api-handler"; import { ApiType } from "../worker-api-types"; -import { ExecutionManager } from "../../storage/execution-manager"; +import { + ExecutionManager, + StartExecutionParams, +} from "../../storage/execution-manager"; import { processCompleteInvocation, processStartDurableExecution, @@ -17,7 +20,13 @@ import { processCallbackHeartbeat, processCallbackSuccess, } from "../../handlers/callbacks"; -import { ExecutionId, InvocationId } from "../../utils/tagged-strings"; +import { + createExecutionId, + createInvocationId, + ExecutionId, + InvocationId, +} from "../../utils/tagged-strings"; +import { StartInvocationRequest } from "../worker-api-request"; // Mock all dependencies jest.mock("../../storage/execution-manager"); @@ -100,31 +109,41 @@ describe("WorkerServerApiHandler", () => { describe("performApiCall", () => { it("should delegate StartDurableExecution to processStartDurableExecution", () => { + const params: StartExecutionParams = { + payload: "test-payload", + invocationId: createInvocationId("test-invocation-id"), + executionId: createExecutionId("test-execution-id"), + }; + const requestData = { type: ApiType.StartDurableExecution as const, requestId: "test-request-id", - params: { payload: "test-payload" }, + params, }; void handler.performApiCall(requestData); expect(mockProcessStartDurableExecution).toHaveBeenCalledWith( - "test-payload", + params, mockExecutionManagerInstance, ); }); it("should delegate StartInvocation to processStartInvocation", () => { + const params: StartInvocationRequest = { + executionId: createExecutionId("execution-123"), + invocationId: createInvocationId("invocation-123"), + }; const requestData = { type: ApiType.StartInvocation as const, requestId: "test-request-id", - params: { executionId: "execution-123" as ExecutionId }, + params, }; void handler.performApiCall(requestData); expect(mockProcessStartInvocation).toHaveBeenCalledWith( - "execution-123", + params, mockExecutionManagerInstance, ); }); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-api-request.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-api-request.ts index a2ad6d68..c5b01c80 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-api-request.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-api-request.ts @@ -12,10 +12,12 @@ import { ApiType } from "./worker-api-types"; export interface StartDurableExecutionRequest { payload?: string; + invocationId: InvocationId; } export interface StartInvocationRequest { executionId: ExecutionId; + invocationId: InvocationId; } export interface CompleteInvocationRequest { diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-server-api-handler.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-server-api-handler.ts index 69a1458c..16b9ea4a 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-server-api-handler.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-server-api-handler.ts @@ -24,15 +24,9 @@ export class WorkerServerApiHandler { performApiCall(data: WorkerApiRequestMessage) { switch (data.type) { case ApiType.StartDurableExecution: - return processStartDurableExecution( - data.params.payload, - this.executionManager, - ); + return processStartDurableExecution(data.params, this.executionManager); case ApiType.StartInvocation: - return processStartInvocation( - data.params.executionId, - this.executionManager, - ); + return processStartInvocation(data.params, this.executionManager); case ApiType.CompleteInvocation: return processCompleteInvocation( data.params.executionId, diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-invocation-order.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-invocation-order.test.ts index 142ae422..4a85a048 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-invocation-order.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-invocation-order.test.ts @@ -13,6 +13,7 @@ import { ErrorObject, Event, EventType, + OperationAction, OperationStatus, OperationType, } from "@aws-sdk/client-lambda"; @@ -23,6 +24,7 @@ import { FunctionStorage } from "../operations/function-storage"; import { ILocalDurableTestRunnerFactory } from "../interfaces/durable-test-runner-factory"; import { DurableApiClient } from "../../common/create-durable-api-client"; import { CheckpointApiClient } from "../api-client/checkpoint-api-client"; +import { InvocationResult } from "../../../checkpoint-server/storage/execution-manager"; // Mock dependencies jest.mock("../operations/local-operation-storage"); @@ -44,7 +46,6 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { const mockHandlerFunction = jest.fn(); const mockExecutionId = createExecutionId("test-execution-id"); const mockCheckpointToken = createCheckpointToken("test-checkpoint-token"); - const mockInvocationId = createInvocationId("test-invocation-id"); const mockOperationEvents: OperationEvents[] = [ { @@ -124,14 +125,12 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { executionId: mockExecutionId, checkpointToken: mockCheckpointToken, operationEvents: mockOperationEvents, - invocationId: mockInvocationId, }), pollCheckpointData: jest.fn().mockReturnValue(nonResolvingPromise), updateCheckpointData: jest.fn().mockResolvedValue(undefined), startInvocation: jest.fn().mockResolvedValue({ checkpointToken: mockCheckpointToken, executionId: mockExecutionId, - invocationId: mockInvocationId, operationEvents: [], }), completeInvocation: jest.fn().mockImplementation(() => { @@ -202,7 +201,7 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { ); expect(completeInvocationSpy).toHaveBeenCalledWith( mockExecutionId, - mockInvocationId, + expect.any(String), undefined, // no error for successful invocation ); }); @@ -294,7 +293,7 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { ); expect(completeInvocationSpy).toHaveBeenCalledWith( mockExecutionId, - mockInvocationId, + expect.any(String), mockError, ); }); @@ -330,7 +329,7 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { ); expect(completeInvocationSpy).toHaveBeenCalledWith( mockExecutionId, - mockInvocationId, + expect.any(String), { ErrorMessage: "Handler threw an exception", ErrorType: "Error", @@ -377,7 +376,7 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { ); expect(completeInvocationSpy).toHaveBeenCalledWith( mockExecutionId, - mockInvocationId, + expect.any(String), { ErrorMessage: "Async handler failure", ErrorType: "Error", @@ -387,6 +386,102 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { }); }); + describe("Race condition prevention", () => { + it("should prevent duplicate startInvocation requests when multiple callback operations trigger concurrent invocations", async () => { + let startInvocationCallCount = 0; + let resolveStartInvocation: () => void; + + // Mock startInvocation to be delayed, allowing us to control when it resolves + const startInvocationSpy = jest + .spyOn(checkpointApi, "startInvocation") + .mockImplementation(() => { + startInvocationCallCount++; + return new Promise((resolve) => { + resolveStartInvocation = () => { + resolve({ + checkpointToken: createCheckpointToken("delayed-token"), + executionId: mockExecutionId, + operationEvents: [], + invocationId: createInvocationId("delayed-invocation"), + }); + }; + }); + }); + + // Mock handler to return PENDING first (to continue execution), then SUCCESS + mockInvoke + .mockResolvedValueOnce({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ result: "success" }), + }); + + // Set up polling to return multiple callback operations that would normally trigger multiple invocations + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockImplementationOnce(() => { + return Promise.resolve({ + operations: [ + // Multiple callback operations arriving simultaneously + { + operation: { + Id: "callback-op-1", + Type: OperationType.CALLBACK, + Status: OperationStatus.SUCCEEDED, + StartTimestamp: new Date(), + }, + update: { + Id: "callback-op-1", + Type: OperationType.CALLBACK, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + { + operation: { + Id: "callback-op-2", + Type: OperationType.CALLBACK, + Status: OperationStatus.SUCCEEDED, + StartTimestamp: new Date(), + }, + update: { + Id: "callback-op-2", + Type: OperationType.CALLBACK, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + ], + }); + }) + .mockReturnValue(nonResolvingPromise); + + // Start execution + const executePromise = orchestrator.executeHandler({ + payload: { input: "race-condition-test" }, + }); + + // Allow initial execution (which returns PENDING) to complete and polling to start + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Despite multiple callback operations, only one startInvocation call was made + expect(startInvocationCallCount).toBe(1); + expect(startInvocationSpy).toHaveBeenCalledTimes(1); + + // Now resolve the delayed startInvocation to allow the test to complete + resolveStartInvocation!(); + + // Wait for execution to complete + const result = await executePromise; + + // Verify execution completed successfully + expect(result.status).toBe(OperationStatus.SUCCEEDED); + expect(startInvocationSpy).toHaveBeenCalledTimes(1); + }); + }); + describe("Multiple invocation scenarios", () => { it("should maintain history ordering across multiple handler invocations", async () => { // Setup for multiple invocations (simulating retries or callbacks) diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator.test.ts index 6c9f65fb..30f11fe6 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator.test.ts @@ -253,11 +253,12 @@ describe("TestExecutionOrchestrator", () => { payload: { input: "test" }, }); - expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith( - JSON.stringify({ + expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith({ + invocationId: expect.any(String), + payload: JSON.stringify({ input: "test", }), - ); + }); expect(mockInvoke).toHaveBeenCalledWith(mockHandlerFunction, { durableExecutionArn: mockExecutionId, checkpointToken: mockCheckpointToken, @@ -316,11 +317,12 @@ describe("TestExecutionOrchestrator", () => { const result = await resultPromise; - expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith( - JSON.stringify({ + expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith({ + invocationId: expect.any(String), + payload: JSON.stringify({ input: "test", }), - ); + }); expect(mockInvoke).toHaveBeenCalledWith(mockHandlerFunction, { durableExecutionArn: mockExecutionId, checkpointToken: mockCheckpointToken, @@ -397,11 +399,12 @@ describe("TestExecutionOrchestrator", () => { const result = await resultPromise; - expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith( - JSON.stringify({ + expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith({ + invocationId: expect.any(String), + payload: JSON.stringify({ input: "test", }), - ); + }); expect(mockInvoke).toHaveBeenCalledWith(mockHandlerFunction, { durableExecutionArn: mockExecutionId, @@ -465,11 +468,12 @@ describe("TestExecutionOrchestrator", () => { resolvePolling!(); - expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith( - JSON.stringify({ + expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith({ + invocationId: expect.any(String), + payload: JSON.stringify({ input: "test", }), - ); + }); expect(mockInvoke).toHaveBeenCalledWith(mockHandlerFunction, { durableExecutionArn: mockExecutionId, checkpointToken: mockCheckpointToken, @@ -528,11 +532,12 @@ describe("TestExecutionOrchestrator", () => { expect(mockInvoke).toHaveBeenCalledTimes(1); - expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith( - JSON.stringify({ + expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith({ + invocationId: expect.any(String), + payload: JSON.stringify({ input: "test", }), - ); + }); expect(mockInvoke).toHaveBeenCalledWith(mockHandlerFunction, { durableExecutionArn: mockExecutionId, checkpointToken: mockCheckpointToken, @@ -582,11 +587,12 @@ describe("TestExecutionOrchestrator", () => { }), ).rejects.toThrow("Could not find status in execution operation"); - expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith( - JSON.stringify({ + expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith({ + invocationId: expect.any(String), + payload: JSON.stringify({ input: "test", }), - ); + }); // Polling resolves before invocation starts, since invocation is scheduled expect(mockInvoke).not.toHaveBeenCalled(); }); @@ -595,9 +601,10 @@ describe("TestExecutionOrchestrator", () => { it("should handle execution without parameters", async () => { const result = await orchestrator.executeHandler(); - expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith( - undefined, - ); + expect(checkpointApi.startDurableExecution).toHaveBeenCalledWith({ + invocationId: expect.any(String), + payload: undefined, + }); expect(result.status).toBe(OperationStatus.SUCCEEDED); }); @@ -914,9 +921,10 @@ describe("TestExecutionOrchestrator", () => { await executePromise; // Verify new invocation was started - expect(checkpointApi.startInvocation).toHaveBeenCalledWith( - mockExecutionId, - ); + expect(checkpointApi.startInvocation).toHaveBeenCalledWith({ + executionId: mockExecutionId, + invocationId: expect.any(String), + }); // Verify handler was re-invoked after retry expect(mockInvoke).toHaveBeenCalledTimes(2); // Initial + retry @@ -976,9 +984,10 @@ describe("TestExecutionOrchestrator", () => { expect(result.status).toBe(OperationStatus.SUCCEEDED); // Verify new invocation was started (retry was triggered) - expect(checkpointApi.startInvocation).toHaveBeenCalledWith( - mockExecutionId, - ); + expect(checkpointApi.startInvocation).toHaveBeenCalledWith({ + executionId: mockExecutionId, + invocationId: expect.any(String), + }); // Verify handler was re-invoked after retry expect(mockInvoke).toHaveBeenCalledTimes(2); // Initial + retry @@ -1285,9 +1294,10 @@ describe("TestExecutionOrchestrator", () => { }); // Verify new invocation was started after status update - expect(checkpointApi.startInvocation).toHaveBeenCalledWith( - mockExecutionId, - ); + expect(checkpointApi.startInvocation).toHaveBeenCalledWith({ + executionId: mockExecutionId, + invocationId: expect.any(String), + }); // Verify handler was re-invoked after retry expect(mockInvoke).toHaveBeenCalledTimes(2); // Initial + retry @@ -1335,9 +1345,10 @@ describe("TestExecutionOrchestrator", () => { }); // Verify new invocation was started - expect(checkpointApi.startInvocation).toHaveBeenCalledWith( - mockExecutionId, - ); + expect(checkpointApi.startInvocation).toHaveBeenCalledWith({ + executionId: mockExecutionId, + invocationId: expect.any(String), + }); }); it("should execute callback before invocation function in scheduleAsyncFunction", async () => { @@ -1634,9 +1645,10 @@ describe("TestExecutionOrchestrator", () => { const result = await executePromise; // Verify startInvocation was called - expect(checkpointApi.startInvocation).toHaveBeenCalledWith( - mockExecutionId, - ); + expect(checkpointApi.startInvocation).toHaveBeenCalledWith({ + executionId: mockExecutionId, + invocationId: expect.any(String), + }); // Verify handler was invoked twice (initial + callback) expect(mockInvoke).toHaveBeenCalledTimes(2); @@ -1706,9 +1718,10 @@ describe("TestExecutionOrchestrator", () => { ); // Verify startInvocation was called - expect(checkpointApi.startInvocation).toHaveBeenCalledWith( - mockExecutionId, - ); + expect(checkpointApi.startInvocation).toHaveBeenCalledWith({ + executionId: mockExecutionId, + invocationId: expect.any(String), + }); // Verify handler was invoked twice (initial) expect(mockInvoke).toHaveBeenCalledTimes(1); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-api-client.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-api-client.ts index ebff24fa..d84b770d 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-api-client.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-api-client.ts @@ -6,6 +6,10 @@ import { ExecutionId, InvocationId, } from "../../../checkpoint-server/utils/tagged-strings"; +import { + StartDurableExecutionRequest, + StartInvocationRequest, +} from "../../../checkpoint-server/worker-api/worker-api-request"; export interface SerializedPollCheckpointResponse { operations: SerializedCheckpointOperation[]; @@ -18,7 +22,9 @@ export interface CheckpointApiClient { /** * Start a new durable invocation */ - startDurableExecution(payload?: string): Promise; + startDurableExecution( + params: StartDurableExecutionRequest, + ): Promise; /** * Poll for checkpoint data @@ -50,7 +56,7 @@ export interface CheckpointApiClient { /** * Start a new invocation for an existing execution */ - startInvocation(executionId: ExecutionId): Promise; + startInvocation(params: StartInvocationRequest): Promise; completeInvocation( executionId: ExecutionId, diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-worker-api-client.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-worker-api-client.ts index 784ab488..5eb65914 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-worker-api-client.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-worker-api-client.ts @@ -8,6 +8,10 @@ import { import { CheckpointWorkerManager } from "../worker/checkpoint-worker-manager"; import { CheckpointApiClient } from "./checkpoint-api-client"; import { ApiType } from "../../../checkpoint-server/worker-api/worker-api-types"; +import { + StartDurableExecutionRequest, + StartInvocationRequest, +} from "../../../checkpoint-server/worker-api/worker-api-request"; export class CheckpointWorkerApiClient implements CheckpointApiClient { constructor(private readonly workerManager: CheckpointWorkerManager) {} @@ -15,10 +19,13 @@ export class CheckpointWorkerApiClient implements CheckpointApiClient { /** * Start a new durable invocation */ - async startDurableExecution(payload?: string): Promise { - return this.workerManager.sendApiRequest(ApiType.StartDurableExecution, { - payload, - }); + async startDurableExecution( + params: StartDurableExecutionRequest, + ): Promise { + return this.workerManager.sendApiRequest( + ApiType.StartDurableExecution, + params, + ); } /** @@ -59,10 +66,10 @@ export class CheckpointWorkerApiClient implements CheckpointApiClient { /** * Start a new invocation for an existing execution */ - async startInvocation(executionId: ExecutionId): Promise { - return this.workerManager.sendApiRequest(ApiType.StartInvocation, { - executionId, - }); + async startInvocation( + params: StartInvocationRequest, + ): Promise { + return this.workerManager.sendApiRequest(ApiType.StartInvocation, params); } async completeInvocation( diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/__tests__/invocation-tracker.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/__tests__/invocation-tracker.test.ts index adc369c7..e91cc82b 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/__tests__/invocation-tracker.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/__tests__/invocation-tracker.test.ts @@ -41,11 +41,9 @@ describe("InvocationTracker", () => { describe("createInvocation", () => { it("should create an invocation with the given ID", () => { - const invocationId = createInvocationId("test-invocation"); - expect(invocationTracker.hasActiveInvocation()).toBe(false); - invocationTracker.createInvocation(invocationId); + invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); }); @@ -57,21 +55,15 @@ describe("InvocationTracker", () => { }); it("should return true when invocations exist but none are completed", () => { - const invocationId1 = createInvocationId("test-invocation-1"); - const invocationId2 = createInvocationId("test-invocation-2"); - - invocationTracker.createInvocation(invocationId1); - invocationTracker.createInvocation(invocationId2); + invocationTracker.createInvocation(); + invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); }); it("should return false when all invocations are completed", async () => { - const invocationId1 = createInvocationId("test-invocation-1"); - const invocationId2 = createInvocationId("test-invocation-2"); - - invocationTracker.createInvocation(invocationId1); - invocationTracker.createInvocation(invocationId2); + const invocationId1 = invocationTracker.createInvocation(); + const invocationId2 = invocationTracker.createInvocation(); // Complete both invocations await invocationTracker.completeInvocation( @@ -89,13 +81,9 @@ describe("InvocationTracker", () => { }); it("should return true when some but not all invocations are completed", async () => { - const invocationId1 = createInvocationId("test-invocation-1"); - const invocationId2 = createInvocationId("test-invocation-2"); - const invocationId3 = createInvocationId("test-invocation-3"); - - invocationTracker.createInvocation(invocationId1); - invocationTracker.createInvocation(invocationId2); - invocationTracker.createInvocation(invocationId3); + const invocationId1 = invocationTracker.createInvocation(); + const invocationId2 = invocationTracker.createInvocation(); + invocationTracker.createInvocation(); // Complete only first two invocations await invocationTracker.completeInvocation( @@ -113,13 +101,11 @@ describe("InvocationTracker", () => { }); it("should handle single invocation lifecycle correctly", async () => { - const invocationId = createInvocationId("test-invocation"); - // No invocations - should be false expect(invocationTracker.hasActiveInvocation()).toBe(false); // Create invocation - should be true (active) - invocationTracker.createInvocation(invocationId); + const invocationId = invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); // Complete invocation - should be false (no active) @@ -134,9 +120,7 @@ describe("InvocationTracker", () => { describe("completeInvocation", () => { it("should mark a single invocation as completed", async () => { - const invocationId = createInvocationId("test-invocation"); - - invocationTracker.createInvocation(invocationId); + const invocationId = invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); await invocationTracker.completeInvocation( @@ -153,9 +137,7 @@ describe("InvocationTracker", () => { }); it("should mark a single invocation as completed with error", async () => { - const invocationId = createInvocationId("test-invocation"); - - invocationTracker.createInvocation(invocationId); + const invocationId = invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); await invocationTracker.completeInvocation( @@ -176,13 +158,9 @@ describe("InvocationTracker", () => { }); it("should handle completing multiple invocations", async () => { - const invocationId1 = createInvocationId("test-invocation-1"); - const invocationId2 = createInvocationId("test-invocation-2"); - const invocationId3 = createInvocationId("test-invocation-3"); - - invocationTracker.createInvocation(invocationId1); - invocationTracker.createInvocation(invocationId2); - invocationTracker.createInvocation(invocationId3); + const invocationId1 = invocationTracker.createInvocation(); + const invocationId2 = invocationTracker.createInvocation(); + const invocationId3 = invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); @@ -210,9 +188,7 @@ describe("InvocationTracker", () => { }); it("should handle completing the same invocation multiple times", async () => { - const invocationId = createInvocationId("test-invocation"); - - invocationTracker.createInvocation(invocationId); + const invocationId = invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); // Complete the same invocation multiple times @@ -227,9 +203,8 @@ describe("InvocationTracker", () => { it("should handle completing non-existent invocations gracefully", async () => { const nonExistentId = createInvocationId("non-existent"); - const existingId = createInvocationId("existing"); - invocationTracker.createInvocation(existingId); + const existingId = invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); // Try to complete a non-existent invocation @@ -256,11 +231,8 @@ describe("InvocationTracker", () => { describe("reset with new completion tracking", () => { it("should clear completion tracking when reset", async () => { - const invocationId1 = createInvocationId("test-invocation-1"); - const invocationId2 = createInvocationId("test-invocation-2"); - - invocationTracker.createInvocation(invocationId1); - invocationTracker.createInvocation(invocationId2); + const invocationId1 = invocationTracker.createInvocation(); + invocationTracker.createInvocation(); await invocationTracker.completeInvocation( mockExecutionId, @@ -276,8 +248,7 @@ describe("InvocationTracker", () => { expect(invocationTracker.hasActiveInvocation()).toBe(false); // No invocations // After reset, creating new invocations should work normally - const newInvocationId = createInvocationId("new-invocation"); - invocationTracker.createInvocation(newInvocationId); + invocationTracker.createInvocation(); expect(invocationTracker.hasActiveInvocation()).toBe(true); }); }); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/invocation-tracker.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/invocation-tracker.ts index 7e738180..b3615ff2 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/invocation-tracker.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/invocation-tracker.ts @@ -1,5 +1,6 @@ import { ErrorObject, Event } from "@aws-sdk/client-lambda"; import { + createInvocationId, ExecutionId, InvocationId, } from "../../../checkpoint-server/utils/tagged-strings"; @@ -23,12 +24,15 @@ export class InvocationTracker { } /** - * Create a new invocation with the given ID. + * Create a new invocation and returns the invocation ID. The invocation ID + * must be tracked before any asynchronous operations to prevent race conditions. * * @param invocationId The ID of the invocation */ - createInvocation(invocationId: InvocationId): void { + createInvocation(): InvocationId { + const invocationId = createInvocationId(); this.invocations.add(invocationId); + return invocationId; } hasActiveInvocation(): boolean { diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/test-execution-orchestrator.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/test-execution-orchestrator.ts index f57d032a..d77c447e 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/test-execution-orchestrator.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/test-execution-orchestrator.ts @@ -119,14 +119,16 @@ export class TestExecutionOrchestrator { const abortController = new AbortController(); try { + const invocationId = this.invocationTracker.createInvocation(); + const { executionId, operationEvents: initialOperationsEvents, checkpointToken, + }: InvocationResult = await this.checkpointApi.startDurableExecution({ + payload: JSON.stringify(params?.payload), invocationId, - }: InvocationResult = await this.checkpointApi.startDurableExecution( - JSON.stringify(params?.payload), - ); + }); const executionOperationId = initialOperationsEvents .at(0) @@ -536,13 +538,14 @@ export class TestExecutionOrchestrator { * and polling stops. For "PENDING" status, execution continues. * * @param executionId Current execution ID - * @param invocationParams Data for the invocation. Defaults to creating a new invocation. + * @param invocationParams Data for the invocation if an invocation was already created. + * If not provided, a new invocation will be created. */ private async invokeHandler( executionId: ExecutionId, invocationParams?: Omit, ): Promise { - if (this.invocationTracker.hasActiveInvocation()) { + if (!invocationParams && this.invocationTracker.hasActiveInvocation()) { if (this.skipTimeProps.enabled) { throw new Error( "Cannot schedule concurrent invocation when skip time is enabled", @@ -554,15 +557,19 @@ export class TestExecutionOrchestrator { return; } - const { checkpointToken, invocationId, operationEvents } = + const invocationId = + invocationParams?.invocationId ?? + this.invocationTracker.createInvocation(); + + const { checkpointToken, operationEvents } = invocationParams ?? - (await this.checkpointApi.startInvocation(executionId)); + (await this.checkpointApi.startInvocation({ + executionId, + invocationId, + })); const operations = operationEvents.map((operation) => operation.operation); - // Create invocation record at the start of each invocation using the tracker - this.invocationTracker.createInvocation(invocationId); - try { defaultLogger.debug(`Invoking handler with invocationId=${invocationId}`); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/worker/__tests__/checkpoint-worker-manager.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/worker/__tests__/checkpoint-worker-manager.test.ts index 320d30ff..ff400611 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/worker/__tests__/checkpoint-worker-manager.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/worker/__tests__/checkpoint-worker-manager.test.ts @@ -4,6 +4,8 @@ import * as fs from "fs/promises"; import * as path from "path"; import { defaultLogger } from "../../../../logger"; import { ApiType } from "../../../../checkpoint-server/worker-api/worker-api-types"; +import { createInvocationId } from "../../../../checkpoint-server/utils/tagged-strings"; +import { StartDurableExecutionRequest } from "../../../../checkpoint-server/worker-api/worker-api-request"; // Mock worker_threads jest.mock("worker_threads"); @@ -137,6 +139,7 @@ describe("CheckpointWorkerManager", () => { await expect( freshManager.sendApiRequest(ApiType.StartDurableExecution, { payload: "test-payload", + invocationId: createInvocationId(), }), ).rejects.toThrow("Worker not initialized"); }); @@ -302,9 +305,15 @@ describe("CheckpointWorkerManager", () => { }, ); - const promise = manager.sendApiRequest(ApiType.StartDurableExecution, { + const params: StartDurableExecutionRequest = { payload: "test-payload", - }); + invocationId: createInvocationId(), + }; + + const promise = manager.sendApiRequest( + ApiType.StartDurableExecution, + params, + ); // Verify worker.postMessage was called with correct structure expect(mockWorker.postMessage).toHaveBeenCalledWith({ @@ -312,7 +321,7 @@ describe("CheckpointWorkerManager", () => { data: { requestId: expect.any(String), type: ApiType.StartDurableExecution, - params: { payload: "test-payload" }, + params, }, }); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/worker/__tests__/worker-client-api-handler.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/worker/__tests__/worker-client-api-handler.test.ts index b84e63c7..832c0e3c 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/worker/__tests__/worker-client-api-handler.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/worker/__tests__/worker-client-api-handler.test.ts @@ -4,7 +4,14 @@ import { randomUUID } from "node:crypto"; import { ApiType } from "../../../../checkpoint-server/worker-api/worker-api-types"; import { WorkerCommandType } from "../../../../checkpoint-server/worker/worker-message-types"; import { defaultLogger } from "../../../../logger"; -import { ExecutionId } from "../../../../checkpoint-server/utils/tagged-strings"; +import { + createInvocationId, + ExecutionId, +} from "../../../../checkpoint-server/utils/tagged-strings"; +import { + StartDurableExecutionRequest, + StartInvocationRequest, +} from "../../../../checkpoint-server/worker-api/worker-api-request"; // Mock dependencies jest.mock("node:worker_threads"); @@ -50,7 +57,10 @@ describe("WorkerClientApiHandler", () => { describe("callWorkerApi", () => { it("should send correct message to worker via postMessage", async () => { const apiType = ApiType.StartDurableExecution; - const params = { payload: "test-payload" }; + const params = { + payload: "test-payload", + invocationId: createInvocationId(), + }; // Start the API call but don't await it yet const promise = handler.callWorkerApi(apiType, params, mockWorker); @@ -81,15 +91,26 @@ describe("WorkerClientApiHandler", () => { .mockReturnValueOnce("11111111-1111-1111-1111-111111111111") .mockReturnValueOnce("22222222-2222-2222-2222-222222222222"); + const params1: StartDurableExecutionRequest = { + payload: "test1", + invocationId: createInvocationId("test-invocation-id"), + }; + // Start two API calls const promise1 = handler.callWorkerApi( ApiType.StartDurableExecution, - { payload: "test1" }, + params1, mockWorker, ); + + const params2: StartInvocationRequest = { + executionId: "test-execution" as ExecutionId, + invocationId: createInvocationId("test-invocation-id"), + }; + const promise2 = handler.callWorkerApi( ApiType.StartInvocation, - { executionId: "test-execution" as ExecutionId }, + params2, mockWorker, ); @@ -99,7 +120,7 @@ describe("WorkerClientApiHandler", () => { data: { requestId: "11111111-1111-1111-1111-111111111111", type: ApiType.StartDurableExecution, - params: { payload: "test1" }, + params: params1, }, }); @@ -108,7 +129,7 @@ describe("WorkerClientApiHandler", () => { data: { requestId: "22222222-2222-2222-2222-222222222222", type: ApiType.StartInvocation, - params: { executionId: "test-execution" }, + params: params2, }, }); @@ -132,7 +153,7 @@ describe("WorkerClientApiHandler", () => { { name: "StartDurableExecution", apiType: ApiType.StartDurableExecution, - params: { payload: "test" }, + params: { payload: "test", invocationId: createInvocationId() }, uuid: TEST_UUIDS.SUCCESS, }, { @@ -151,7 +172,7 @@ describe("WorkerClientApiHandler", () => { params: { executionId: "exec-id" as ExecutionId }, uuid: TEST_UUIDS.CLEANUP, }, - ])( + ] as const)( "should handle $name API type correctly", async ({ apiType, params, uuid }) => { mockRandomUUID.mockReturnValue(uuid); @@ -187,7 +208,10 @@ describe("WorkerClientApiHandler", () => { // Start API call const promise = handler.callWorkerApi( ApiType.StartDurableExecution, - { payload: "test" }, + { + payload: "test", + invocationId: createInvocationId("test-invocation-id"), + }, mockWorker, ); @@ -210,7 +234,10 @@ describe("WorkerClientApiHandler", () => { // Start API call const promise = handler.callWorkerApi( ApiType.StartDurableExecution, - { payload: "test" }, + { + payload: "test", + invocationId: createInvocationId("test-invocation-id"), + }, mockWorker, ); @@ -248,7 +275,10 @@ describe("WorkerClientApiHandler", () => { // Start API call const promise = handler.callWorkerApi( ApiType.StartDurableExecution, - { payload: "test" }, + { + payload: "test", + invocationId: createInvocationId("test-invocation-id"), + }, mockWorker, ); @@ -279,7 +309,10 @@ describe("WorkerClientApiHandler", () => { // Start API call const promise = handler.callWorkerApi( ApiType.StartDurableExecution, - { payload: "test" }, + { + payload: "test", + invocationId: createInvocationId("test-invocation-id"), + }, mockWorker, ); @@ -315,7 +348,10 @@ describe("WorkerClientApiHandler", () => { // Start three concurrent API calls const promise1 = handler.callWorkerApi( ApiType.StartDurableExecution, - { payload: "test1" }, + { + payload: "test1", + invocationId: createInvocationId("test-invocation-id"), + }, mockWorker, ); const promise2 = handler.callWorkerApi( @@ -364,7 +400,10 @@ describe("WorkerClientApiHandler", () => { // Start two API calls const successPromise = handler.callWorkerApi( ApiType.StartDurableExecution, - { payload: "success" }, + { + payload: "success", + invocationId: createInvocationId("test-invocation-id"), + }, mockWorker, ); const errorPromise = handler.callWorkerApi( @@ -406,7 +445,10 @@ describe("WorkerClientApiHandler", () => { // Start calls for different API types with same UUID const promise1 = handler.callWorkerApi( ApiType.StartDurableExecution, - { payload: "test1" }, + { + payload: "test1", + invocationId: createInvocationId("test-invocation-id"), + }, mockWorker, ); const promise2 = handler.callWorkerApi(