From 28be2c757d498cb83c87847d6822454344dfe5c9 Mon Sep 17 00:00:00 2001 From: Joe Wu Date: Fri, 5 Dec 2025 16:59:02 -0800 Subject: [PATCH 1/3] Add support for request compression --- .../jsonprotocols/AwsJson1ProtocolTests.java | 9 +- .../aws/restjson/RestJson1ProtocolTests.java | 4 +- .../client/restxml/RestXmlProtocolTests.java | 8 +- .../smithy/java/client/core/ClientTest.java | 2 + .../smithy/java/client/http/HttpContext.java | 12 + .../compression/CompressionAlgorithm.java | 25 ++ .../java/client/http/compression/Gzip.java | 44 +++ .../plugins/RequestCompressionPlugin.java | 104 ++++++ ...n.smithy.java.client.core.AutoClientPlugin | 1 + .../client/http/compression/GzipTest.java | 84 +++++ .../plugins/RequestCompressionPluginTest.java | 300 ++++++++++++++++++ .../RequestCompressionTraitInitializer.java | 2 +- .../java/io/datastream/GzipInputStream.java | 61 ++++ .../io/datastream/GzipInputStreamTest.java | 100 ++++++ 14 files changed, 739 insertions(+), 17 deletions(-) create mode 100644 client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/CompressionAlgorithm.java create mode 100644 client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/Gzip.java create mode 100644 client/client-http/src/main/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPlugin.java create mode 100644 client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipTest.java create mode 100644 client/client-http/src/test/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPluginTest.java create mode 100644 io/src/main/java/software/amazon/smithy/java/io/datastream/GzipInputStream.java create mode 100644 io/src/test/java/software/amazon/smithy/java/io/datastream/GzipInputStreamTest.java diff --git a/aws/client/aws-client-awsjson/src/it/java/software/amazon/smithy/java/client/aws/jsonprotocols/AwsJson1ProtocolTests.java b/aws/client/aws-client-awsjson/src/it/java/software/amazon/smithy/java/client/aws/jsonprotocols/AwsJson1ProtocolTests.java index 36d4cf867..e4f302a00 100644 --- a/aws/client/aws-client-awsjson/src/it/java/software/amazon/smithy/java/client/aws/jsonprotocols/AwsJson1ProtocolTests.java +++ b/aws/client/aws-client-awsjson/src/it/java/software/amazon/smithy/java/client/aws/jsonprotocols/AwsJson1ProtocolTests.java @@ -20,10 +20,6 @@ public class AwsJson1ProtocolTests { @HttpClientRequestTests @ProtocolTestFilter( skipTests = { - // TODO: implement content-encoding - "SDKAppliedContentEncoding_awsJson1_0", - "SDKAppendsGzipAndIgnoresHttpProvidedEncoding_awsJson1_0", - // Skipping top-level input defaults isn't necessary in Smithy-Java given it uses builders and // the defaults don't impact nullability. This applies to the following tests. "AwsJson10ClientSkipsTopLevelDefaultValuesInInput", @@ -42,8 +38,9 @@ public void requestTest(DataStream expected, DataStream actual) { Node.parse(new String(ByteBufferUtils.getBytes(expected.asByteBuffer()), StandardCharsets.UTF_8))); } - assertEquals(expectedJson, new StringBuildingSubscriber(actual).getResult()); - + if (expected.contentType() != null) { // Skip request compression tests since they do not have expected body + assertEquals(expectedJson, new StringBuildingSubscriber(actual).getResult()); + } } @HttpClientResponseTests diff --git a/aws/client/aws-client-restjson/src/it/java/software/amazon/smithy/java/client/aws/restjson/RestJson1ProtocolTests.java b/aws/client/aws-client-restjson/src/it/java/software/amazon/smithy/java/client/aws/restjson/RestJson1ProtocolTests.java index e5dc54b93..9aa3023b9 100644 --- a/aws/client/aws-client-restjson/src/it/java/software/amazon/smithy/java/client/aws/restjson/RestJson1ProtocolTests.java +++ b/aws/client/aws-client-restjson/src/it/java/software/amazon/smithy/java/client/aws/restjson/RestJson1ProtocolTests.java @@ -26,8 +26,6 @@ skipOperations = { // We dont ignore defaults on input shapes "aws.protocoltests.restjson#OperationWithDefaults", - // TODO: support content-encoding - "aws.protocoltests.restjson#PutWithContentEncoding" }) public class RestJson1ProtocolTests { private static final String EMPTY_BODY = ""; @@ -50,7 +48,7 @@ public void requestTest(DataStream expected, DataStream actual) { } else { assertEquals(expectedStr, actualStr); } - } else { + } else if (expected.contentType() != null) { // Skip request compression tests since they do not have expected body assertEquals(EMPTY_BODY, actualStr); } } diff --git a/aws/client/aws-client-restxml/src/it/java/software/amazon/smithy/java/aws/client/restxml/RestXmlProtocolTests.java b/aws/client/aws-client-restxml/src/it/java/software/amazon/smithy/java/aws/client/restxml/RestXmlProtocolTests.java index 021e0ba12..9e45c3dea 100644 --- a/aws/client/aws-client-restxml/src/it/java/software/amazon/smithy/java/aws/client/restxml/RestXmlProtocolTests.java +++ b/aws/client/aws-client-restxml/src/it/java/software/amazon/smithy/java/aws/client/restxml/RestXmlProtocolTests.java @@ -25,7 +25,6 @@ import software.amazon.smithy.java.protocoltests.harness.HttpClientRequestTests; import software.amazon.smithy.java.protocoltests.harness.HttpClientResponseTests; import software.amazon.smithy.java.protocoltests.harness.ProtocolTest; -import software.amazon.smithy.java.protocoltests.harness.ProtocolTestFilter; import software.amazon.smithy.java.protocoltests.harness.StringBuildingSubscriber; import software.amazon.smithy.java.protocoltests.harness.TestType; @@ -34,11 +33,6 @@ testType = TestType.CLIENT) public class RestXmlProtocolTests { @HttpClientRequestTests - @ProtocolTestFilter( - skipTests = { - "SDKAppliedContentEncoding_restXml", - "SDKAppendedGzipAfterProvidedEncoding_restXml", - }) public void requestTest(DataStream expected, DataStream actual) { if (expected.contentLength() != 0) { var a = new String(ByteBufferUtils.getBytes(actual.asByteBuffer()), StandardCharsets.UTF_8); @@ -51,7 +45,7 @@ public void requestTest(DataStream expected, DataStream actual) { } else { assertEquals(a, b); } - } else { + } else if (expected.contentType() != null) { // Skip request compression tests since they do not have expected body assertEquals("", new StringBuildingSubscriber(actual).getResult()); } } diff --git a/client/client-core/src/test/java/software/amazon/smithy/java/client/core/ClientTest.java b/client/client-core/src/test/java/software/amazon/smithy/java/client/core/ClientTest.java index e971b6d61..266ae3d15 100644 --- a/client/client-core/src/test/java/software/amazon/smithy/java/client/core/ClientTest.java +++ b/client/client-core/src/test/java/software/amazon/smithy/java/client/core/ClientTest.java @@ -35,6 +35,7 @@ import software.amazon.smithy.java.client.http.mock.MockQueue; import software.amazon.smithy.java.client.http.plugins.ApplyHttpRetryInfoPlugin; import software.amazon.smithy.java.client.http.plugins.HttpChecksumPlugin; +import software.amazon.smithy.java.client.http.plugins.RequestCompressionPlugin; import software.amazon.smithy.java.client.http.plugins.UserAgentPlugin; import software.amazon.smithy.java.core.serde.document.Document; import software.amazon.smithy.java.dynamicclient.DynamicClient; @@ -83,6 +84,7 @@ public class ClientTest { SimpleAuthDetectionPlugin.class, UserAgentPlugin.class, ApplyHttpRetryInfoPlugin.class, + RequestCompressionPlugin.class, HttpChecksumPlugin.class, FooPlugin.class); diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/HttpContext.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/HttpContext.java index 845e78a11..63689e16d 100644 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/HttpContext.java +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/HttpContext.java @@ -27,5 +27,17 @@ public final class HttpContext { public static final Context.Key ENDPOINT_RESOLVER_HTTP_HEADERS = Context.key( "HTTP headers to use with the request returned from an endpoint resolver"); + /** + * The minimum length of bytes threshold for a request body to be compressed. Defaults to 10240 bytes if not set. + */ + public static final Context.Key REQUEST_MIN_COMPRESSION_SIZE_BYTES = + Context.key("Minimum bytes size for request compression"); + + /** + * If request compression is disabled. + */ + public static final Context.Key DISABLE_REQUEST_COMPRESSION = + Context.key("If request compression is disabled"); + private HttpContext() {} } diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/CompressionAlgorithm.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/CompressionAlgorithm.java new file mode 100644 index 000000000..49f5c66f1 --- /dev/null +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/CompressionAlgorithm.java @@ -0,0 +1,25 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http.compression; + +import software.amazon.smithy.java.io.datastream.DataStream; + +/** + * Represents a compression algorithm that can be used to compress request + * bodies. + */ +public interface CompressionAlgorithm { + /** + * The ID of the checksum algorithm. This is matched against the algorithm + * names used in the trait e.g. "gzip" + */ + String algorithmId(); + + /** + * Compresses content of fixed length + */ + DataStream compress(DataStream data); +} diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/Gzip.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/Gzip.java new file mode 100644 index 000000000..793575049 --- /dev/null +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/Gzip.java @@ -0,0 +1,44 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http.compression; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.zip.GZIPOutputStream; +import software.amazon.smithy.java.io.ByteBufferOutputStream; +import software.amazon.smithy.java.io.datastream.DataStream; +import software.amazon.smithy.java.io.datastream.GzipInputStream; + +public class Gzip implements CompressionAlgorithm { + @Override + public String algorithmId() { + return "gzip"; + } + + @Override + public DataStream compress(DataStream data) { + if (!data.hasKnownLength()) { // Using streaming + try { + return DataStream.ofInputStream( + new GzipInputStream(data.asInputStream()), + data.contentType(), + -1); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + try (var bos = new ByteBufferOutputStream(); + var gzip = new GZIPOutputStream(bos); + var in = data.asInputStream()) { + in.transferTo(gzip); + gzip.close(); + return DataStream.ofBytes(bos.toByteBuffer().array()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPlugin.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPlugin.java new file mode 100644 index 000000000..ab0a8e154 --- /dev/null +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPlugin.java @@ -0,0 +1,104 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http.plugins; + +import java.util.List; +import software.amazon.smithy.java.client.core.AutoClientPlugin; +import software.amazon.smithy.java.client.core.ClientConfig; +import software.amazon.smithy.java.client.core.interceptors.ClientInterceptor; +import software.amazon.smithy.java.client.core.interceptors.RequestHook; +import software.amazon.smithy.java.client.http.HttpContext; +import software.amazon.smithy.java.client.http.HttpMessageExchange; +import software.amazon.smithy.java.client.http.compression.CompressionAlgorithm; +import software.amazon.smithy.java.client.http.compression.Gzip; +import software.amazon.smithy.java.context.Context; +import software.amazon.smithy.java.core.schema.TraitKey; +import software.amazon.smithy.java.http.api.HttpRequest; +import software.amazon.smithy.java.io.datastream.DataStream; +import software.amazon.smithy.model.traits.RequestCompressionTrait; +import software.amazon.smithy.utils.ListUtils; +import software.amazon.smithy.utils.SmithyInternalApi; + +/** + * Compress the request body using provided compression algorithm if @requestCompression trait is applied. + */ +@SmithyInternalApi +public final class RequestCompressionPlugin implements AutoClientPlugin { + + @Override + public void configureClient(ClientConfig.Builder config) { + if (config.isUsingMessageExchange(HttpMessageExchange.INSTANCE)) { + config.addInterceptor(RequestCompressionInterceptor.INSTANCE); + config.putConfigIfAbsent(HttpContext.DISABLE_REQUEST_COMPRESSION, false); + } + } + + static final class RequestCompressionInterceptor implements ClientInterceptor { + + private static final int DEFAULT_MIN_COMPRESSION_SIZE_BYTES = 10240; + private static final int COMPRESSION_SIZE_CAP = 10485760; + private static final String CONTENT_ENCODING_HEADER = "Content-Encoding"; + private static final ClientInterceptor INSTANCE = new RequestCompressionInterceptor(); + private static final TraitKey REQUEST_COMPRESSION_TRAIT_KEY = + TraitKey.get(RequestCompressionTrait.class); + // Currently only Gzip is supported in Smithy model: https://smithy.io/2.0/spec/behavior-traits.html#requestcompression-trait + private static final List supportedAlgorithms = ListUtils.of(new Gzip()); + + @Override + public RequestT modifyBeforeTransmit(RequestHook hook) { + return hook.mapRequest(HttpRequest.class, RequestCompressionInterceptor::processRequest); + } + + private static HttpRequest processRequest(RequestHook hook) { + if (shouldCompress(hook)) { + RequestCompressionTrait compressionTrait = + hook.operation().schema().getTrait(REQUEST_COMPRESSION_TRAIT_KEY); + var request = hook.request(); + // Will pick the first supported algorithm to compress the body. + for (String algorithmId : compressionTrait.getEncodings()) { + for (CompressionAlgorithm algorithm : supportedAlgorithms) { + if (algorithmId.equals(algorithm.algorithmId())) { + var compressed = algorithm.compress(request.body()); + return request.toBuilder() + .body(compressed) + .withAddedHeader(CONTENT_ENCODING_HEADER, algorithmId) + .build(); + } + } + } + } + return hook.request(); + } + + private static boolean shouldCompress(RequestHook hook) { + var context = hook.context(); + var operation = hook.operation(); + if (!operation.schema().hasTrait(REQUEST_COMPRESSION_TRAIT_KEY) + || context.getOrDefault(HttpContext.DISABLE_REQUEST_COMPRESSION, false)) { + return false; + } + var requestBody = hook.request().body(); + // Streaming should not have known length + if (operation.inputStreamMember() != null && !requestBody.hasKnownLength()) { + return true; + } + return isBodySizeValid(requestBody, context); + } + + private static boolean isBodySizeValid(DataStream requestBody, Context context) { + var minCompressionSize = context.getOrDefault(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, + DEFAULT_MIN_COMPRESSION_SIZE_BYTES); + validateCompressionSize(minCompressionSize); + return requestBody.contentLength() >= minCompressionSize; + } + + private static void validateCompressionSize(int minCompressionSize) { + if (minCompressionSize < 0 || minCompressionSize > COMPRESSION_SIZE_CAP) { + throw new IllegalArgumentException("Min compression size must be between 0 and 10485760"); + } + } + } +} diff --git a/client/client-http/src/main/resources/META-INF/services/software.amazon.smithy.java.client.core.AutoClientPlugin b/client/client-http/src/main/resources/META-INF/services/software.amazon.smithy.java.client.core.AutoClientPlugin index 4755e76fe..4c632c1f3 100644 --- a/client/client-http/src/main/resources/META-INF/services/software.amazon.smithy.java.client.core.AutoClientPlugin +++ b/client/client-http/src/main/resources/META-INF/services/software.amazon.smithy.java.client.core.AutoClientPlugin @@ -1,3 +1,4 @@ software.amazon.smithy.java.client.http.plugins.UserAgentPlugin software.amazon.smithy.java.client.http.plugins.ApplyHttpRetryInfoPlugin +software.amazon.smithy.java.client.http.plugins.RequestCompressionPlugin software.amazon.smithy.java.client.http.plugins.HttpChecksumPlugin diff --git a/client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipTest.java b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipTest.java new file mode 100644 index 000000000..e5c63fab2 --- /dev/null +++ b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipTest.java @@ -0,0 +1,84 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http.compression; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPInputStream; +import org.junit.jupiter.api.Test; +import software.amazon.smithy.java.io.datastream.DataStream; + +public class GzipTest { + + private final Gzip gzip = new Gzip(); + + @Test + public void algorithmIdReturnsGzip() { + assertThat(gzip.algorithmId(), equalTo("gzip")); + } + + @Test + public void compressesKnownLengthData() throws Exception { + // Use larger, repetitive data that compresses well + String original = "Hello World! ".repeat(10); + DataStream input = DataStream.ofString(original); + + DataStream compressed = gzip.compress(input); + + // Verify compressed data is smaller + assertThat(compressed.contentLength(), lessThan((long) original.length())); + + // Verify decompression produces original + String decompressed = decompress(compressed.asByteBuffer().array()); + assertThat(decompressed, equalTo(original)); + } + + @Test + public void compressesLargeStreamInChunks() throws Exception { + // Create 100KB of data + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 10000; i++) { + sb.append("0123456789"); + } + String original = sb.toString(); + byte[] bytes = original.getBytes(StandardCharsets.UTF_8); + DataStream input = DataStream.ofInputStream(new ByteArrayInputStream(bytes)); + + DataStream compressed = gzip.compress(input); + + // Verify decompression produces original + byte[] compressedBytes = compressed.asInputStream().readAllBytes(); + String decompressed = decompress(compressedBytes); + assertThat(decompressed, equalTo(original)); + } + + @Test + public void compressesEmptyData() throws Exception { + DataStream input = DataStream.ofString(""); + + DataStream compressed = gzip.compress(input); + + String decompressed = decompress(compressed.asByteBuffer().array()); + assertThat(decompressed, equalTo("")); + } + + private String decompress(byte[] compressed) throws Exception { + try (GZIPInputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(compressed)); + ByteArrayOutputStream out = new ByteArrayOutputStream()) { + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipIn.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + return out.toString(StandardCharsets.UTF_8); + } + } +} diff --git a/client/client-http/src/test/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPluginTest.java b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPluginTest.java new file mode 100644 index 000000000..1d5d6b1d1 --- /dev/null +++ b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPluginTest.java @@ -0,0 +1,300 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http.plugins; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.zip.GZIPInputStream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import software.amazon.smithy.java.client.core.interceptors.RequestHook; +import software.amazon.smithy.java.client.http.HttpContext; +import software.amazon.smithy.java.context.Context; +import software.amazon.smithy.java.core.schema.ApiOperation; +import software.amazon.smithy.java.core.schema.ApiService; +import software.amazon.smithy.java.core.schema.Schema; +import software.amazon.smithy.java.core.schema.SerializableStruct; +import software.amazon.smithy.java.core.schema.ShapeBuilder; +import software.amazon.smithy.java.core.serde.ShapeSerializer; +import software.amazon.smithy.java.core.serde.TypeRegistry; +import software.amazon.smithy.java.http.api.HttpRequest; +import software.amazon.smithy.java.io.datastream.DataStream; +import software.amazon.smithy.model.shapes.ShapeId; +import software.amazon.smithy.model.traits.RequestCompressionTrait; +import software.amazon.smithy.model.traits.StreamingTrait; + +public class RequestCompressionPluginTest { + + private static final String REQUEST_BODY = "THIS IS MY COMPRESSION TEST BODY!"; + + @Test + public void doesNotCompressWhenDisabled() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + context.put(HttpContext.DISABLE_REQUEST_COMPRESSION, true); + context.put(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, 1); + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(DataStream.ofString(REQUEST_BODY)) + .build(); + + var result = interceptor.modifyBeforeTransmit( + new RequestHook<>(createOperationWithCompressionTrait(), context, new TestInput(), req)); + + assertThat(result.headers().allValues("Content-Encoding"), empty()); + } + + @Test + public void compressesWhenBodyMeetsMinSize() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + String largeBody = REQUEST_BODY.repeat(10); + context.put(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, 10); + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(DataStream.ofString(largeBody)) + .build(); + + var result = interceptor.modifyBeforeTransmit( + new RequestHook<>(createOperationWithCompressionTrait(), context, new TestInput(), req)); + + assertThat(result.headers().allValues("Content-Encoding"), contains("gzip")); + String decompressed = decompress(result.body().asByteBuffer().array()); + assertThat(decompressed, equalTo(largeBody)); + } + + @Test + public void doesNotCompressWhenBodyBelowMinSize() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + context.put(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, 10000); + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(DataStream.ofString(REQUEST_BODY)) + .build(); + + var result = interceptor.modifyBeforeTransmit( + new RequestHook<>(createOperationWithCompressionTrait(), context, new TestInput(), req)); + + assertThat(result.headers().allValues("Content-Encoding"), empty()); + } + + @Test + public void usesDefaultMinCompressionSize() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + // Body is smaller than default 10240 + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(DataStream.ofString(REQUEST_BODY)) + .build(); + + var result = interceptor.modifyBeforeTransmit( + new RequestHook<>(createOperationWithCompressionTrait(), context, new TestInput(), req)); + + assertThat(result.headers().allValues("Content-Encoding"), empty()); + } + + @Test + public void alwaysCompressesStreamingWithoutKnownLength() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + context.put(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, 999999); + String original = "small"; + var streamBody = DataStream.ofInputStream(new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8))); + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(streamBody) + .build(); + + var result = interceptor.modifyBeforeTransmit( + new RequestHook<>(createOperationWithStreamingInput(), context, new TestInput(), req)); + + assertThat(result.headers().allValues("Content-Encoding"), contains("gzip")); + String decompressed = decompress(result.body().asInputStream().readAllBytes()); + assertThat(decompressed, equalTo(original)); + } + + @Test + public void throwsForNegativeMinCompressionSize() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + context.put(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, -1); + String largeBody = REQUEST_BODY.repeat(100); + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(DataStream.ofString(largeBody)) + .build(); + + var hook = new RequestHook<>(createOperationWithCompressionTrait(), context, new TestInput(), req); + Assertions.assertThrows(IllegalArgumentException.class, () -> interceptor.modifyBeforeTransmit(hook)); + } + + @Test + public void throwsForMinCompressionSizeExceedingCap() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + context.put(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, 10485761); + String largeBody = REQUEST_BODY.repeat(100); + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(DataStream.ofString(largeBody)) + .build(); + + var hook = new RequestHook<>(createOperationWithCompressionTrait(), context, new TestInput(), req); + Assertions.assertThrows(IllegalArgumentException.class, () -> interceptor.modifyBeforeTransmit(hook)); + } + + @Test + public void doesNotCompressWhenTraitAbsent() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + context.put(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, 1); + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(DataStream.ofString(REQUEST_BODY)) + .build(); + + var result = interceptor.modifyBeforeTransmit( + new RequestHook<>(createOperationWithoutCompressionTrait(), context, new TestInput(), req)); + + assertThat(result.headers().allValues("Content-Encoding"), empty()); + } + + @Test + public void appendsGzipToExistingContentEncoding() throws Exception { + var interceptor = new RequestCompressionPlugin.RequestCompressionInterceptor(); + var context = Context.create(); + context.put(HttpContext.REQUEST_MIN_COMPRESSION_SIZE_BYTES, 10); + var req = HttpRequest.builder() + .uri(new URI("/")) + .method("POST") + .body(DataStream.ofString(REQUEST_BODY)) + .withAddedHeader("Content-Encoding", "custom") + .build(); + + var result = interceptor.modifyBeforeTransmit( + new RequestHook<>(createOperationWithCompressionTrait(), context, new TestInput(), req)); + + var encodings = result.headers().allValues("Content-Encoding"); + assertThat(encodings, contains("custom", "gzip")); + } + + // Helper: Create operation with @requestCompression trait + private ApiOperation createOperationWithCompressionTrait() { + var trait = RequestCompressionTrait.builder().addEncoding("gzip").build(); + var schema = Schema.createOperation(ShapeId.from("com.test#TestOp"), trait); + return createOperation(schema, null); + } + + // Helper: Create operation without @requestCompression trait + private ApiOperation createOperationWithoutCompressionTrait() { + var schema = Schema.createOperation(ShapeId.from("com.test#TestOp")); + return createOperation(schema, null); + } + + // Helper: Create operation with streaming input + private ApiOperation createOperationWithStreamingInput() { + var trait = RequestCompressionTrait.builder().addEncoding("gzip").build(); + var schema = Schema.createOperation(ShapeId.from("com.test#TestOp"), trait); + var blobSchema = Schema.createBlob(ShapeId.from("com.test#StreamBody"), new StreamingTrait()); + return createOperation(schema, blobSchema); + } + + private ApiOperation createOperation(Schema schema, Schema streamMember) { + return new ApiOperation<>() { + @Override + public ShapeBuilder inputBuilder() { + return null; + } + + @Override + public ShapeBuilder outputBuilder() { + return null; + } + + @Override + public Schema schema() { + return schema; + } + + @Override + public Schema inputSchema() { + return null; + } + + @Override + public Schema outputSchema() { + return null; + } + + @Override + public ApiService service() { + return null; + } + + @Override + public TypeRegistry errorRegistry() { + return null; + } + + @Override + public List effectiveAuthSchemes() { + return List.of(); + } + + @Override + public Schema inputStreamMember() { + return streamMember; + } + }; + } + + private static final class TestInput implements SerializableStruct { + @Override + public Schema schema() { + throw new UnsupportedOperationException(); + } + + @Override + public void serializeMembers(ShapeSerializer serializer) { + throw new UnsupportedOperationException(); + } + + @Override + public T getMemberValue(Schema member) { + return null; + } + } + + private String decompress(byte[] compressed) throws Exception { + try (var gzipIn = new GZIPInputStream(new ByteArrayInputStream(compressed)); + var out = new ByteArrayOutputStream()) { + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipIn.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + return out.toString(StandardCharsets.UTF_8); + } + } +} diff --git a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/integrations/core/RequestCompressionTraitInitializer.java b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/integrations/core/RequestCompressionTraitInitializer.java index 2ff180bd4..db4b2baf9 100644 --- a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/integrations/core/RequestCompressionTraitInitializer.java +++ b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/integrations/core/RequestCompressionTraitInitializer.java @@ -22,6 +22,6 @@ public void accept(JavaWriter writer, RequestCompressionTrait requestCompression writer.putContext("requestComp", RequestCompressionTrait.class); writer.putContext("list", List.class); writer.writeInline( - "${requestComp:T}.builder().encodings(${list:T}.of(${#enc}${enc:S}${^key.last}, ${/key.last}${/enc})).build()"); + "${requestComp:T}.builder().encodings(${list:T}.of(${#enc}${value:S}${^key.last}, ${/key.last}${/enc})).build()"); } } diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/GzipInputStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/GzipInputStream.java new file mode 100644 index 000000000..a06f79a58 --- /dev/null +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/GzipInputStream.java @@ -0,0 +1,61 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.io.datastream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.zip.GZIPOutputStream; + +public class GzipInputStream extends InputStream { + private final PipedInputStream pipedIn; + private final Thread compressionThread; + private volatile IOException compressionException; + + public GzipInputStream(InputStream source) throws IOException { + this.pipedIn = new PipedInputStream(); + PipedOutputStream pipedOut = new PipedOutputStream(pipedIn); + + this.compressionThread = Thread.ofVirtual().start(() -> { + try (GZIPOutputStream gzipOut = new GZIPOutputStream(pipedOut); + InputStream in = source) { + in.transferTo(gzipOut); + } catch (IOException e) { + compressionException = e; + } + }); + } + + @Override + public int read() throws IOException { + checkCompressionException(); + return pipedIn.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkCompressionException(); + return pipedIn.read(b, off, len); + } + + @Override + public void close() throws IOException { + pipedIn.close(); + compressionThread.interrupt(); + try { + compressionThread.join(1000); // Wait max 1 second + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void checkCompressionException() throws IOException { + if (compressionException != null) { + throw compressionException; + } + } +} diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/GzipInputStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/GzipInputStreamTest.java new file mode 100644 index 000000000..5ebc8ef44 --- /dev/null +++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/GzipInputStreamTest.java @@ -0,0 +1,100 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.io.datastream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPInputStream; +import org.junit.jupiter.api.Test; + +public class GzipInputStreamTest { + + @Test + public void compressesDataCorrectly() throws Exception { + String original = "Hello World!"; + var source = new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8)); + + try (var gzipStream = new GzipInputStream(source)) { + byte[] compressed = gzipStream.readAllBytes(); + String decompressed = decompress(compressed); + assertThat(decompressed, equalTo(original)); + } + } + + @Test + public void compressesLargeData() throws Exception { + String original = "Hello World! ".repeat(10000); + var source = new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8)); + + try (var gzipStream = new GzipInputStream(source)) { + byte[] compressed = gzipStream.readAllBytes(); + assertThat(compressed.length, lessThan(original.length())); + String decompressed = decompress(compressed); + assertThat(decompressed, equalTo(original)); + } + } + + @Test + public void compressesEmptyData() throws Exception { + var source = new ByteArrayInputStream(new byte[0]); + + try (var gzipStream = new GzipInputStream(source)) { + byte[] compressed = gzipStream.readAllBytes(); + String decompressed = decompress(compressed); + assertThat(decompressed, equalTo("")); + } + } + + @Test + public void readSingleByteWorks() throws Exception { + String original = "AB"; + var source = new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8)); + + try (var gzipStream = new GzipInputStream(source); + var out = new ByteArrayOutputStream()) { + int b; + while ((b = gzipStream.read()) != -1) { + out.write(b); + } + String decompressed = decompress(out.toByteArray()); + assertThat(decompressed, equalTo(original)); + } + } + + @Test + public void readWithBufferWorks() throws Exception { + String original = "Test buffer read"; + var source = new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8)); + + try (var gzipStream = new GzipInputStream(source); + var out = new ByteArrayOutputStream()) { + byte[] buffer = new byte[4]; + int len; + while ((len = gzipStream.read(buffer, 0, buffer.length)) != -1) { + out.write(buffer, 0, len); + } + String decompressed = decompress(out.toByteArray()); + assertThat(decompressed, equalTo(original)); + } + } + + private String decompress(byte[] compressed) throws Exception { + try (var gzipIn = new GZIPInputStream(new ByteArrayInputStream(compressed)); + var out = new ByteArrayOutputStream()) { + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipIn.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + return out.toString(StandardCharsets.UTF_8); + } + } +} From ee7afed7c83b228dd1c0d0126a5de92486d3d54c Mon Sep 17 00:00:00 2001 From: Joe Wu Date: Thu, 11 Dec 2025 17:34:38 -0800 Subject: [PATCH 2/3] Address comments --- .../compression/CompressionAlgorithm.java | 8 +- .../java/client/http/compression/Gzip.java | 18 ++-- .../GzipCompressingInputStream.java | 56 +++++++++++++ .../plugins/RequestCompressionPlugin.java | 17 ++-- .../GzipCompressingInputStreamTest.java | 42 +++++----- .../client/http/compression/GzipTest.java | 84 ------------------- .../java/io/datastream/GzipInputStream.java | 61 -------------- 7 files changed, 99 insertions(+), 187 deletions(-) create mode 100644 client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStream.java rename io/src/test/java/software/amazon/smithy/java/io/datastream/GzipInputStreamTest.java => client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStreamTest.java (67%) delete mode 100644 client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipTest.java delete mode 100644 io/src/main/java/software/amazon/smithy/java/io/datastream/GzipInputStream.java diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/CompressionAlgorithm.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/CompressionAlgorithm.java index 49f5c66f1..02d3a72e9 100644 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/CompressionAlgorithm.java +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/CompressionAlgorithm.java @@ -5,7 +5,9 @@ package software.amazon.smithy.java.client.http.compression; +import java.util.List; import software.amazon.smithy.java.io.datastream.DataStream; +import software.amazon.smithy.utils.ListUtils; /** * Represents a compression algorithm that can be used to compress request @@ -13,7 +15,7 @@ */ public interface CompressionAlgorithm { /** - * The ID of the checksum algorithm. This is matched against the algorithm + * The ID of the compression algorithm. This is matched against the algorithm * names used in the trait e.g. "gzip" */ String algorithmId(); @@ -22,4 +24,8 @@ public interface CompressionAlgorithm { * Compresses content of fixed length */ DataStream compress(DataStream data); + + static List supportedAlgorithms() { + return ListUtils.of(new Gzip()); + } } diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/Gzip.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/Gzip.java index 793575049..c650e8357 100644 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/Gzip.java +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/Gzip.java @@ -10,9 +10,9 @@ import java.util.zip.GZIPOutputStream; import software.amazon.smithy.java.io.ByteBufferOutputStream; import software.amazon.smithy.java.io.datastream.DataStream; -import software.amazon.smithy.java.io.datastream.GzipInputStream; -public class Gzip implements CompressionAlgorithm { +public final class Gzip implements CompressionAlgorithm { + @Override public String algorithmId() { return "gzip"; @@ -21,19 +21,15 @@ public String algorithmId() { @Override public DataStream compress(DataStream data) { if (!data.hasKnownLength()) { // Using streaming - try { - return DataStream.ofInputStream( - new GzipInputStream(data.asInputStream()), - data.contentType(), - -1); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return DataStream.ofInputStream( + new GzipCompressingInputStream(data.asInputStream()), + data.contentType(), + -1); } try (var bos = new ByteBufferOutputStream(); - var gzip = new GZIPOutputStream(bos); var in = data.asInputStream()) { + var gzip = new GZIPOutputStream(bos); in.transferTo(gzip); gzip.close(); return DataStream.ofBytes(bos.toByteBuffer().array()); diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStream.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStream.java new file mode 100644 index 000000000..ab22d6351 --- /dev/null +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStream.java @@ -0,0 +1,56 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http.compression; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPOutputStream; + +final class GzipCompressingInputStream extends InputStream { + + private final InputStream source; + private byte[] compressedData; + private int position = 0; + + public GzipCompressingInputStream(InputStream source) { + this.source = source; + } + + @Override + public int read() throws IOException { + ensureCompressed(); + return position < compressedData.length ? compressedData[position++] & 0xFF : -1; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + ensureCompressed(); + var available = compressedData.length - position; + if (available <= 0) + return -1; + + var toRead = Math.min(len, available); + System.arraycopy(compressedData, position, b, off, toRead); + position += toRead; + return toRead; + } + + private void ensureCompressed() throws IOException { + if (compressedData == null) { + var buffer = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(buffer)) { + source.transferTo(gzip); + } + compressedData = buffer.toByteArray(); + } + } + + @Override + public void close() throws IOException { + source.close(); + } +} diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPlugin.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPlugin.java index ab0a8e154..51ad5f869 100644 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPlugin.java +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/plugins/RequestCompressionPlugin.java @@ -13,13 +13,11 @@ import software.amazon.smithy.java.client.http.HttpContext; import software.amazon.smithy.java.client.http.HttpMessageExchange; import software.amazon.smithy.java.client.http.compression.CompressionAlgorithm; -import software.amazon.smithy.java.client.http.compression.Gzip; import software.amazon.smithy.java.context.Context; import software.amazon.smithy.java.core.schema.TraitKey; import software.amazon.smithy.java.http.api.HttpRequest; import software.amazon.smithy.java.io.datastream.DataStream; import software.amazon.smithy.model.traits.RequestCompressionTrait; -import software.amazon.smithy.utils.ListUtils; import software.amazon.smithy.utils.SmithyInternalApi; /** @@ -32,20 +30,21 @@ public final class RequestCompressionPlugin implements AutoClientPlugin { public void configureClient(ClientConfig.Builder config) { if (config.isUsingMessageExchange(HttpMessageExchange.INSTANCE)) { config.addInterceptor(RequestCompressionInterceptor.INSTANCE); - config.putConfigIfAbsent(HttpContext.DISABLE_REQUEST_COMPRESSION, false); } } static final class RequestCompressionInterceptor implements ClientInterceptor { private static final int DEFAULT_MIN_COMPRESSION_SIZE_BYTES = 10240; - private static final int COMPRESSION_SIZE_CAP = 10485760; + // This cap matches ApiGateway's spec: https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-openapi-minimum-compression-size.html + private static final int MIN_COMPRESSION_SIZE_CAP = 10485760; private static final String CONTENT_ENCODING_HEADER = "Content-Encoding"; private static final ClientInterceptor INSTANCE = new RequestCompressionInterceptor(); private static final TraitKey REQUEST_COMPRESSION_TRAIT_KEY = TraitKey.get(RequestCompressionTrait.class); // Currently only Gzip is supported in Smithy model: https://smithy.io/2.0/spec/behavior-traits.html#requestcompression-trait - private static final List supportedAlgorithms = ListUtils.of(new Gzip()); + private static final List supportedAlgorithms = + CompressionAlgorithm.supportedAlgorithms(); @Override public RequestT modifyBeforeTransmit(RequestHook hook) { @@ -54,12 +53,12 @@ public RequestT modifyBeforeTransmit(RequestHook hook private static HttpRequest processRequest(RequestHook hook) { if (shouldCompress(hook)) { - RequestCompressionTrait compressionTrait = + var compressionTrait = hook.operation().schema().getTrait(REQUEST_COMPRESSION_TRAIT_KEY); var request = hook.request(); // Will pick the first supported algorithm to compress the body. - for (String algorithmId : compressionTrait.getEncodings()) { - for (CompressionAlgorithm algorithm : supportedAlgorithms) { + for (var algorithmId : compressionTrait.getEncodings()) { + for (var algorithm : supportedAlgorithms) { if (algorithmId.equals(algorithm.algorithmId())) { var compressed = algorithm.compress(request.body()); return request.toBuilder() @@ -96,7 +95,7 @@ private static boolean isBodySizeValid(DataStream requestBody, Context context) } private static void validateCompressionSize(int minCompressionSize) { - if (minCompressionSize < 0 || minCompressionSize > COMPRESSION_SIZE_CAP) { + if (minCompressionSize < 0 || minCompressionSize > MIN_COMPRESSION_SIZE_CAP) { throw new IllegalArgumentException("Min compression size must be between 0 and 10485760"); } } diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/GzipInputStreamTest.java b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStreamTest.java similarity index 67% rename from io/src/test/java/software/amazon/smithy/java/io/datastream/GzipInputStreamTest.java rename to client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStreamTest.java index 5ebc8ef44..b44162319 100644 --- a/io/src/test/java/software/amazon/smithy/java/io/datastream/GzipInputStreamTest.java +++ b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStreamTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package software.amazon.smithy.java.io.datastream; +package software.amazon.smithy.java.client.http.compression; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -15,29 +15,29 @@ import java.util.zip.GZIPInputStream; import org.junit.jupiter.api.Test; -public class GzipInputStreamTest { +public class GzipCompressingInputStreamTest { @Test public void compressesDataCorrectly() throws Exception { - String original = "Hello World!"; + var original = "Hello World!"; var source = new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8)); - try (var gzipStream = new GzipInputStream(source)) { - byte[] compressed = gzipStream.readAllBytes(); - String decompressed = decompress(compressed); + try (var gzipStream = new GzipCompressingInputStream(source)) { + var compressed = gzipStream.readAllBytes(); + var decompressed = decompress(compressed); assertThat(decompressed, equalTo(original)); } } @Test public void compressesLargeData() throws Exception { - String original = "Hello World! ".repeat(10000); + var original = "Hello World! ".repeat(10000); var source = new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8)); - try (var gzipStream = new GzipInputStream(source)) { - byte[] compressed = gzipStream.readAllBytes(); + try (var gzipStream = new GzipCompressingInputStream(source)) { + var compressed = gzipStream.readAllBytes(); assertThat(compressed.length, lessThan(original.length())); - String decompressed = decompress(compressed); + var decompressed = decompress(compressed); assertThat(decompressed, equalTo(original)); } } @@ -46,42 +46,42 @@ public void compressesLargeData() throws Exception { public void compressesEmptyData() throws Exception { var source = new ByteArrayInputStream(new byte[0]); - try (var gzipStream = new GzipInputStream(source)) { - byte[] compressed = gzipStream.readAllBytes(); - String decompressed = decompress(compressed); + try (var gzipStream = new GzipCompressingInputStream(source)) { + var compressed = gzipStream.readAllBytes(); + var decompressed = decompress(compressed); assertThat(decompressed, equalTo("")); } } @Test public void readSingleByteWorks() throws Exception { - String original = "AB"; + var original = "AB"; var source = new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8)); - try (var gzipStream = new GzipInputStream(source); + try (var gzipStream = new GzipCompressingInputStream(source); var out = new ByteArrayOutputStream()) { int b; while ((b = gzipStream.read()) != -1) { out.write(b); } - String decompressed = decompress(out.toByteArray()); + var decompressed = decompress(out.toByteArray()); assertThat(decompressed, equalTo(original)); } } @Test public void readWithBufferWorks() throws Exception { - String original = "Test buffer read"; + var original = "Test buffer read"; var source = new ByteArrayInputStream(original.getBytes(StandardCharsets.UTF_8)); - try (var gzipStream = new GzipInputStream(source); + try (var gzipStream = new GzipCompressingInputStream(source); var out = new ByteArrayOutputStream()) { - byte[] buffer = new byte[4]; + var buffer = new byte[4]; int len; while ((len = gzipStream.read(buffer, 0, buffer.length)) != -1) { out.write(buffer, 0, len); } - String decompressed = decompress(out.toByteArray()); + var decompressed = decompress(out.toByteArray()); assertThat(decompressed, equalTo(original)); } } @@ -89,7 +89,7 @@ public void readWithBufferWorks() throws Exception { private String decompress(byte[] compressed) throws Exception { try (var gzipIn = new GZIPInputStream(new ByteArrayInputStream(compressed)); var out = new ByteArrayOutputStream()) { - byte[] buffer = new byte[1024]; + var buffer = new byte[1024]; int len; while ((len = gzipIn.read(buffer)) > 0) { out.write(buffer, 0, len); diff --git a/client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipTest.java b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipTest.java deleted file mode 100644 index e5c63fab2..000000000 --- a/client/client-http/src/test/java/software/amazon/smithy/java/client/http/compression/GzipTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.client.http.compression; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.lessThan; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.nio.charset.StandardCharsets; -import java.util.zip.GZIPInputStream; -import org.junit.jupiter.api.Test; -import software.amazon.smithy.java.io.datastream.DataStream; - -public class GzipTest { - - private final Gzip gzip = new Gzip(); - - @Test - public void algorithmIdReturnsGzip() { - assertThat(gzip.algorithmId(), equalTo("gzip")); - } - - @Test - public void compressesKnownLengthData() throws Exception { - // Use larger, repetitive data that compresses well - String original = "Hello World! ".repeat(10); - DataStream input = DataStream.ofString(original); - - DataStream compressed = gzip.compress(input); - - // Verify compressed data is smaller - assertThat(compressed.contentLength(), lessThan((long) original.length())); - - // Verify decompression produces original - String decompressed = decompress(compressed.asByteBuffer().array()); - assertThat(decompressed, equalTo(original)); - } - - @Test - public void compressesLargeStreamInChunks() throws Exception { - // Create 100KB of data - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 10000; i++) { - sb.append("0123456789"); - } - String original = sb.toString(); - byte[] bytes = original.getBytes(StandardCharsets.UTF_8); - DataStream input = DataStream.ofInputStream(new ByteArrayInputStream(bytes)); - - DataStream compressed = gzip.compress(input); - - // Verify decompression produces original - byte[] compressedBytes = compressed.asInputStream().readAllBytes(); - String decompressed = decompress(compressedBytes); - assertThat(decompressed, equalTo(original)); - } - - @Test - public void compressesEmptyData() throws Exception { - DataStream input = DataStream.ofString(""); - - DataStream compressed = gzip.compress(input); - - String decompressed = decompress(compressed.asByteBuffer().array()); - assertThat(decompressed, equalTo("")); - } - - private String decompress(byte[] compressed) throws Exception { - try (GZIPInputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(compressed)); - ByteArrayOutputStream out = new ByteArrayOutputStream()) { - byte[] buffer = new byte[1024]; - int len; - while ((len = gzipIn.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - return out.toString(StandardCharsets.UTF_8); - } - } -} diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/GzipInputStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/GzipInputStream.java deleted file mode 100644 index a06f79a58..000000000 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/GzipInputStream.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.io.datastream; - -import java.io.IOException; -import java.io.InputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.zip.GZIPOutputStream; - -public class GzipInputStream extends InputStream { - private final PipedInputStream pipedIn; - private final Thread compressionThread; - private volatile IOException compressionException; - - public GzipInputStream(InputStream source) throws IOException { - this.pipedIn = new PipedInputStream(); - PipedOutputStream pipedOut = new PipedOutputStream(pipedIn); - - this.compressionThread = Thread.ofVirtual().start(() -> { - try (GZIPOutputStream gzipOut = new GZIPOutputStream(pipedOut); - InputStream in = source) { - in.transferTo(gzipOut); - } catch (IOException e) { - compressionException = e; - } - }); - } - - @Override - public int read() throws IOException { - checkCompressionException(); - return pipedIn.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - checkCompressionException(); - return pipedIn.read(b, off, len); - } - - @Override - public void close() throws IOException { - pipedIn.close(); - compressionThread.interrupt(); - try { - compressionThread.join(1000); // Wait max 1 second - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private void checkCompressionException() throws IOException { - if (compressionException != null) { - throw compressionException; - } - } -} From 117140b0e9cdf4805a21bb39f9ed872a987d2912 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20Sugawara=20=28=E2=88=A9=EF=BD=80-=C2=B4=29?= =?UTF-8?q?=E2=8A=83=E2=94=81=E7=82=8E=E7=82=8E=E7=82=8E=E7=82=8E=E7=82=8E?= Date: Sun, 14 Dec 2025 10:51:58 -0800 Subject: [PATCH 3/3] Streaming compression input stream implementation --- .../GzipCompressingInputStream.java | 134 +++++++++++++++--- 1 file changed, 113 insertions(+), 21 deletions(-) diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStream.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStream.java index ab22d6351..1d90282b5 100644 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStream.java +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/compression/GzipCompressingInputStream.java @@ -8,49 +8,141 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; import java.util.zip.GZIPOutputStream; +/** + * An InputStream that compresses data from a source InputStream using GZIP compression. + * This implementation lazily compress from the source data on-demand as it's read. + */ final class GzipCompressingInputStream extends InputStream { - + private static final int CHUNK_SIZE = 8192; private final InputStream source; - private byte[] compressedData; - private int position = 0; + private final ByteArrayOutputStream bufferStream; + private final GZIPOutputStream gzipStream; + private final byte[] chunk = new byte[CHUNK_SIZE]; + private byte[] buffer; + private int bufferPos; + private int bufferLimit; + private boolean sourceExhausted; + private boolean closed; public GzipCompressingInputStream(InputStream source) { this.source = source; + this.bufferStream = new ByteArrayOutputStream(); + this.gzipStream = createGzipOutputStream(bufferStream); + this.buffer = new byte[0]; + this.bufferPos = 0; + this.bufferLimit = 0; + this.sourceExhausted = false; + this.closed = false; } @Override public int read() throws IOException { - ensureCompressed(); - return position < compressedData.length ? compressedData[position++] & 0xFF : -1; + byte[] b = new byte[1]; + int result = read(b, 0, 1); + return result == -1 ? -1 : (b[0] & 0xFF); } @Override public int read(byte[] b, int off, int len) throws IOException { - ensureCompressed(); - var available = compressedData.length - position; - if (available <= 0) - return -1; - - var toRead = Math.min(len, available); - System.arraycopy(compressedData, position, b, off, toRead); - position += toRead; + if (closed) { + throw new IOException("Stream closed"); + } + + if (b == null) { + throw new NullPointerException("b"); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + // Try to fill the output buffer if it's empty + while (bufferPos >= bufferLimit) { + if (!fillBuffer()) { + return -1; // End of stream + } + } + + // Copy available data from buffer + int available = bufferLimit - bufferPos; + int toRead = Math.min(available, len); + System.arraycopy(buffer, bufferPos, b, off, toRead); + bufferPos += toRead; + return toRead; } - private void ensureCompressed() throws IOException { - if (compressedData == null) { - var buffer = new ByteArrayOutputStream(); - try (GZIPOutputStream gzip = new GZIPOutputStream(buffer)) { - source.transferTo(gzip); - } - compressedData = buffer.toByteArray(); + /** + * Reads a chunk from the source, compresses it, and fills the internal buffer. + * + * @return true if data was added to buffer, false if end of stream reached + */ + private boolean fillBuffer() throws IOException { + if (sourceExhausted) { + return false; + } + + // Read a chunk from source + int bytesRead = source.read(chunk); + + if (bytesRead == -1) { + // Source is exhausted, finish compression + gzipStream.finish(); + sourceExhausted = true; + } else { + // Compress the chunk + gzipStream.write(chunk, 0, bytesRead); + gzipStream.flush(); } + + // Get compressed data from buffer stream + byte[] compressed = bufferStream.toByteArray(); + if (compressed.length > 0) { + buffer = compressed; + bufferPos = 0; + bufferLimit = compressed.length; + bufferStream.reset(); + return true; + } + + if (sourceExhausted) { + return bufferPos >= bufferLimit; + } + return true; } @Override public void close() throws IOException { - source.close(); + if (!closed) { + closed = true; + try { + gzipStream.close(); + } finally { + source.close(); + } + } + } + + @Override + public int available() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + return bufferLimit - bufferPos; + } + + /** + * Utility method to avoid having to throw the checked IOException exception. + */ + private GZIPOutputStream createGzipOutputStream(OutputStream bufferStream) { + try { + return new GZIPOutputStream(bufferStream); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } }