Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ createTests({

// Verify totalIterations matches actual operations created
expect(result.totalIterations).toBe(
// TODO: https://github.com/aws/aws-durable-execution-sdk-js/issues/365
// execution.getOperations({
// status: "SUCCEEDED",
// }).length,
execution.getOperations().length,
execution.getOperations({
status: "SUCCEEDED",
}).length,
);

// TODO: https://github.com/aws/aws-durable-execution-sdk-js/issues/365
// assertEventSignatures(execution.getHistoryEvents(), historyEvents);
assertEventSignatures(execution.getHistoryEvents(), historyEvents);
}, 120000);
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ export class CheckpointManager implements Checkpoint {
reject: (error: Error) => void;
}> = [];
private queueCompletionResolver: (() => void) | null = null;
private queueCompletionTimeout: NodeJS.Timeout | null = null;
private readonly MAX_PAYLOAD_SIZE = 750 * 1024; // 750KB in bytes
private isTerminating = false;
private static textEncoder = new TextEncoder();
Expand Down Expand Up @@ -152,17 +151,8 @@ export class CheckpointManager implements Checkpoint {
return;
}

return new Promise<void>((resolve, reject) => {
return new Promise<void>((resolve) => {
this.queueCompletionResolver = resolve;

// Set a timeout to prevent infinite waiting
this.queueCompletionTimeout = setTimeout(() => {
this.queueCompletionResolver = null;
this.queueCompletionTimeout = null;
// Clear the queue since it's taking too long
this.clearQueue();
reject(new Error("Timeout waiting for checkpoint queue completion"));
}, 3000); // 3 second timeout
});
}

Expand Down Expand Up @@ -393,10 +383,6 @@ export class CheckpointManager implements Checkpoint {

private notifyQueueCompletion(): void {
if (this.queueCompletionResolver) {
if (this.queueCompletionTimeout) {
clearTimeout(this.queueCompletionTimeout);
this.queueCompletionTimeout = null;
}
this.queueCompletionResolver();
this.queueCompletionResolver = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,58 +67,6 @@ describe("CheckpointManager Queue Completion", () => {

await expect(waits).resolves.toEqual([undefined, undefined, undefined]);
});

it("should timeout after 3 seconds if queue doesn't complete", async () => {
jest.useFakeTimers();

const mockCheckpoint = jest.fn().mockImplementation(
() => new Promise(() => {}), // Never resolves
);
(checkpointManager as any).storage = { checkpoint: mockCheckpoint };

// Add item to queue
checkpointManager.checkpoint("test-step", {});

const waitPromise = checkpointManager.waitForQueueCompletion();

// Fast-forward time by 3 seconds
jest.advanceTimersByTime(3000);

await expect(waitPromise).rejects.toThrow(
"Timeout waiting for checkpoint queue completion",
);

jest.useRealTimers();
});

it("should clear queue on timeout", async () => {
jest.useFakeTimers();

const mockCheckpoint = jest.fn().mockImplementation(
() => new Promise(() => {}), // Never resolves
);
(checkpointManager as any).storage = { checkpoint: mockCheckpoint };

// Add items to queue
checkpointManager.checkpoint("test-step-1", {});
checkpointManager.checkpoint("test-step-2", {});

expect((checkpointManager as any).queue.length).toBe(2);

const waitPromise = checkpointManager.waitForQueueCompletion();

// Fast-forward time by 3 seconds
jest.advanceTimersByTime(3000);

await expect(waitPromise).rejects.toThrow(
"Timeout waiting for checkpoint queue completion",
);

// Queue should be cleared
expect((checkpointManager as any).queue.length).toBe(0);

jest.useRealTimers();
});
});

describe("clearQueue", () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,37 +104,6 @@ describe("withDurableExecution Queue Completion", () => {
waitSpy.mockRestore();
});

it("should handle waitForQueueCompletion timeout gracefully", async () => {
// Mock waitForQueueCompletion to reject with timeout error after 3 seconds
const waitSpy = jest
.spyOn(CheckpointManager.prototype, "waitForQueueCompletion")
.mockImplementation(
() =>
new Promise((_, reject) =>
setTimeout(
() =>
reject(
new Error("Timeout waiting for checkpoint queue completion"),
),
3000,
),
),
);

const mockHandler = jest.fn().mockResolvedValue("success");
const wrappedHandler = withDurableExecution(mockHandler);

const startTime = Date.now();
await wrappedHandler(mockEvent, mockContext);
const endTime = Date.now();

// Should complete within timeout period (3 seconds + buffer for test overhead)
expect(endTime - startTime).toBeLessThan(7000);
expect(waitSpy).toHaveBeenCalled();

waitSpy.mockRestore();
}, 10000);

it("should handle waitForQueueCompletion errors gracefully", async () => {
const waitSpy = jest
.spyOn(CheckpointManager.prototype, "waitForQueueCompletion")
Expand Down