Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private void writeAdditionalFiles(
writerFactory.accept(String.format("%s%s/index.ts", DocumentClientUtils.DOC_CLIENT_PREFIX,
DocumentClientPaginationGenerator.PAGINATION_FOLDER), writer -> {
writer.write("export * from './Interfaces';");
writer.write("export * from './BatchGetPaginator';");
for (OperationShape operation : overridenOperationsList) {
if (operation.hasTrait(PaginatedTrait.ID)) {
String paginationFileName =
Expand Down
48 changes: 48 additions & 0 deletions lib/lib-dynamodb/src/pagination/BatchGetPaginator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { type DynamoDBClient } from "@aws-sdk/client-dynamodb";

import { BatchGetCommand, BatchGetCommandInput, BatchGetCommandOutput } from "../commands/BatchGetCommand";
import { DynamoDBDocumentClient } from "../DynamoDBDocumentClient";

/**
* Async generator that issues {@link BatchGetCommand}s repeatedly until all keys are processed or an error response is received.
*
* @public
*
* @see {@link paginateBatchGetItem} for a variant that uses the {@link DynamoDBClient | low level DynamoDB client}.
*
* @example
*
* ```typescript
* const client = new DynamoDBClient();
* const docClient = DynamoDBDocumentClient.from(client);
* const input: BatchGetCommandInput = {
* RequestItems: {
* table1: Keys: [...],
* table2: Keys: [...],
* }
* };
*
* let pageNumber = 1;
* for await (const page of paginateBatchGet({ client: docClient }, input)) {
* console.log("page:", pageNumber++);
* console.log("items:", page.Responses);
* console.log("unprocessed:", page.UnprocessedKeys); // will be returned in the next page(s)
* }
* ```
*/
export async function* paginateBatchGet(
config: {
client: DynamoDBDocumentClient;
},
input: BatchGetCommandInput
): AsyncGenerator<BatchGetCommandOutput> {
let RequestItems = input.RequestItems;

while (RequestItems && Object.keys(RequestItems).length > 0) {
const cmd = new BatchGetCommand({ ...input, RequestItems });
const response = await config.client.send(cmd);
RequestItems = { ...response.UnprocessedKeys };

yield response;
}
}
1 change: 1 addition & 0 deletions lib/lib-dynamodb/src/pagination/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./BatchGetPaginator";
// smithy-typescript generated code
export * from "./Interfaces";
export * from "./QueryPaginator";
Expand Down
146 changes: 146 additions & 0 deletions packages/core/integ/pagination.integ.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { requireRequestsFrom } from "@aws-sdk/aws-util-test/src";
import { DynamoDB, paginateScan, ScanCommandInput } from "@aws-sdk/client-dynamodb";
import { BatchGetCommandInput, paginateBatchGet } from "@aws-sdk/lib-dynamodb";
import { HttpResponse } from "@smithy/protocol-http";
import { describe, expect, test as it } from "vitest";

Expand Down Expand Up @@ -94,4 +95,149 @@ describe("pagination", () => {
});
expect.assertions(7);
});

/**
* This test makes a DynamoDB paginated batch get request for 5 items, with keys 1-2-3-4-5, in this exact order.
*
* The first returned page contains items 2 and 1 (order switched to simulate the unpredictability of the order of the
* items returned by the DDB API BatchGetItem command), plus unprocessed keys 3 and 4. The second page contains the
* items 3 and 4, and no further unprocessed keys.
*
* Item 5 is asked for, but we consider that the table does not contain it, so it's not returned at all. That's a
* valid use case and does not generate an error.
*
* In the second part of the test, another paginated request is done for 2 items, with keys 1 and 1. So the same
* item is requested twice. As the API will return an error, we want to catch the generated SDK exception.
*/
it("processes batch items until all items are processed or an error is received", async () => {
const ddb = new DynamoDB({
credentials: {
accessKeyId: "INTEG_TEST",
secretAccessKey: "INTEG_TEST",
},
region: "us-west-2",
});

requireRequestsFrom(ddb)
.toMatch(
// first page request
{
hostname: /dynamodb/,
body(b) {
expect(b).toContain(
'"RequestItems":{"test":{"Keys":[{"id":{"S":"1"}},{"id":{"S":"2"}},{"id":{"S":"3"}},{"id":{"S":"4"}},{"id":{"S":"5"}}]}}'
);
},
},
// second page request
{
hostname: /dynamodb/,
body(b) {
expect(b).toContain('"RequestItems":{"test":{"Keys":[{"id":{"S":"4"}},{"id":{"S":"3"}}]}}');
},
},
// invalid request (duplicate key)
{
hostname: /dynamodb/,
body(b) {
expect(b).toContain('"RequestItems":{"test":{"Keys":[{"id":{"S":"1"}},{"id":{"S":"1"}}]}}');
},
}
)
.respondWith(
// first page response
new HttpResponse({
statusCode: 200,
headers: {},
body: Buffer.from(
JSON.stringify({
Responses: {
test: [
{ id: { S: "2" }, name: { S: "Item 2" } },
{ id: { S: "1" }, name: { S: "Item 1" } },
],
},
UnprocessedKeys: {
test: {
Keys: [{ id: { S: "4" } }, { id: { S: "3" } }],
},
},
})
),
}),
// second page response
new HttpResponse({
statusCode: 200,
headers: {},
body: Buffer.from(
JSON.stringify({
Responses: {
test: [
{ id: { S: "3" }, name: { S: "Item 3" } },
{ id: { S: "4" }, name: { S: "Item 4" } },
],
},
UnprocessedKeys: {},
})
),
}),
// error response
new HttpResponse({
statusCode: 400,
headers: {},
body: Buffer.from(
JSON.stringify({
message: "Provided list of item keys contains duplicates",
})
),
})
);

const requestParams: BatchGetCommandInput = {
RequestItems: {
test: { Keys: [{ id: "1" }, { id: "2" }, { id: "3" }, { id: "4" }, { id: "5" }] },
},
};

let pages = 0;
for await (const page of paginateBatchGet({ client: ddb }, requestParams)) {
pages += 1;
if (pages === 1) {
expect(page.Responses?.test).toEqual([
{ id: "2", name: "Item 2" },
{ id: "1", name: "Item 1" },
]);
} else {
expect(page.Responses?.test).toEqual([
{ id: "3", name: "Item 3" },
{ id: "4", name: "Item 4" },
]);
}
}

expect(pages).toEqual(2);

let thrownError;

try {
for await (const page of paginateBatchGet(
{ client: ddb },
{
RequestItems: {
test: { Keys: [{ id: "1" }, { id: "1" }] },
},
}
)) {
void page;
throw new Error("Received unexpected page");
}
} catch (error) {
thrownError = error;
}

expect(thrownError).toBeInstanceOf(Error);
expect((thrownError as Error).message).toBe("Provided list of item keys contains duplicates");

expect.assertions(11);
});
});
1 change: 1 addition & 0 deletions packages/util-dynamodb/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export * from "./convertToAttr";
export * from "./convertToNative";
export * from "./marshall";
export * from "./models";
export * from "./paginateBatchGetItem";
export * from "./unmarshall";
40 changes: 40 additions & 0 deletions packages/util-dynamodb/src/paginateBatchGetItem.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { BatchGetItemCommand, BatchGetItemCommandInput, DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { type DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb";

/**
* Async generator that issues {@link BatchGetItemCommand}s repeatedly until all keys are processed or an error response is received.
*
* @public
*
* @see {@link paginateBatchGetItem} for a variant that uses the {@link DynamoDBDocumentClient | DynamoDB document client}.
*
* @example
*
* ```typescript
* const client = new DynamoDBClient();
* const input: BatchGetCommandInput = {
* RequestItems: {
* table1: Keys: [...],
* table2: Keys: [...],
* }
* };
*
* let pageNumber = 1;
* for await (const page of paginateBatchGetItem({ client }, input)) {
* console.log("page:", pageNumber++);
* console.log("items:", page.Responses);
* console.log("unprocessed:", page.UnprocessedKeys); // will be returned in the next page(s)
* }
* ```
*/
export async function* paginateBatchGetItem(client: DynamoDBClient, input: BatchGetItemCommandInput) {
let RequestItems = input.RequestItems;

while (RequestItems && Object.keys(RequestItems).length > 0) {
const cmd = new BatchGetItemCommand({ ...input, RequestItems });
const response = await client.send(cmd);
RequestItems = { ...response.UnprocessedKeys };

yield response;
}
}