From 697fdfd0c1557a51e8434f4c6d20f9b16a81795e Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Tue, 23 Sep 2025 15:46:31 +0300 Subject: [PATCH] storage: adapt Netty Reactor HTTP client as GCS storage client Notes: * Uses direct memory buffers. Recommend to run Diskless with "-Dio.netty.maxDirectMemory=0" to have the Netty cleaner running. * Has static 96 max connections pool. * Has static 32 worker thread pool. * "SO_KEEPALIVE" set for sockets and keep alive header for HTTP. * Compression disabled, producer compression recommended and compressing again likely not beneficial. * GCS client handles redirects, Netty Reactor client following disabled. * Can use static BoringSSL library to offload SSL to OpenSSL. * Zero-copy until the response handling where direct memory buffer bytes are copied to heap manager byte array. --- build.gradle | 7 ++ gradle/dependencies.gradle | 8 ++ .../storage_backend/gcs/GcsStorage.java | 10 +- .../nettyhttpclient/ReactorNettyRequest.java | 115 ++++++++++++++++++ .../nettyhttpclient/ReactorNettyResponse.java | 81 ++++++++++++ .../ReactorNettyTransport.java | 85 +++++++++++++ 6 files changed, 305 insertions(+), 1 deletion(-) create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyRequest.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyResponse.java create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyTransport.java diff --git a/build.gradle b/build.gradle index 2eca27d31b..5db2a4dbba 100644 --- a/build.gradle +++ b/build.gradle @@ -2442,6 +2442,13 @@ project(':storage:inkless') { exclude group: "com.fasterxml.jackson.core" exclude group: "org.slf4j" } + + implementation(libs.nettyReactorCore) + implementation(libs.nettyReactorHttp) + implementation(libs.nettyTcNativeBoringSSLStatic) + implementation(libs.nettyTcNativeBoringSSLStaticLinuxAarch64) + implementation(libs.nettyTcNativeBoringSSLStaticLinuxX86_64) + implementation(libs.gcsSdk) { exclude group: 'com.google.errorprone', module: 'error_prone_annotations' exclude group: 'org.checkerframework', module: 'checker-qual' diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index ea9498a45c..878cdadddf 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -119,6 +119,9 @@ versions += [ mavenArtifact: "3.9.6", metrics: "2.2.0", mockito: "5.14.2", + // Required to be compatible with BoringSSL version + nettyReactor: "1.1.22", + nettyTcNativeBoringSSL: "2.0.65.Final", opentelemetryProto: "1.3.2-alpha", postgresql: "42.7.4", protobuf: "3.25.5", // a dependency of opentelemetryProto @@ -238,6 +241,11 @@ libs += [ metrics: "com.yammer.metrics:metrics-core:$versions.metrics", mockitoCore: "org.mockito:mockito-core:$versions.mockito", mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito", + nettyReactorCore: "io.projectreactor.netty:reactor-netty-core:$versions.nettyReactor", + nettyReactorHttp: "io.projectreactor.netty:reactor-netty-http:$versions.nettyReactor", + nettyTcNativeBoringSSLStatic: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL", + nettyTcNativeBoringSSLStaticLinuxAarch64: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL:linux-aarch_64", + nettyTcNativeBoringSSLStaticLinuxX86_64: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL:linux-x86_64", pcollections: "org.pcollections:pcollections:$versions.pcollections", opentelemetryProto: "io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto", postgresql: "org.postgresql:postgresql:$versions.postgresql", diff --git a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/GcsStorage.java b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/GcsStorage.java index b42177ccae..9d307e0c88 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/GcsStorage.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/GcsStorage.java @@ -18,6 +18,7 @@ package io.aiven.inkless.storage_backend.gcs; +import com.google.api.client.http.HttpTransport; import com.google.cloud.BaseServiceException; import com.google.cloud.ReadChannel; import com.google.cloud.http.HttpTransportOptions; @@ -34,6 +35,7 @@ import java.nio.channels.ReadableByteChannel; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -43,6 +45,8 @@ import io.aiven.inkless.storage_backend.common.KeyNotFoundException; import io.aiven.inkless.storage_backend.common.StorageBackend; import io.aiven.inkless.storage_backend.common.StorageBackendException; +import io.aiven.inkless.storage_backend.gcs.nettyhttpclient.ReactorNettyTransport; + @CoverageIgnore // tested on integration level public class GcsStorage implements StorageBackend { @@ -60,8 +64,12 @@ public class GcsStorage implements StorageBackend { public void configure(final Map configs) { final GcsStorageConfig config = new GcsStorageConfig(configs); this.bucketName = config.bucketName(); + final String gcsUrl = Optional.ofNullable(config.endpointUrl()).orElse("https://www.googleapis.com"); - final HttpTransportOptions.Builder httpTransportOptionsBuilder = HttpTransportOptions.newBuilder(); + final HttpTransportOptions.Builder httpTransportOptionsBuilder = HttpTransportOptions.newBuilder() + .setHttpTransportFactory(() -> { + return (HttpTransport) ReactorNettyTransport.get(gcsUrl); + }); final StorageOptions.Builder builder = StorageOptions.newBuilder() .setCredentials(config.credentials()) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyRequest.java b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyRequest.java new file mode 100644 index 0000000000..c1f986d52d --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyRequest.java @@ -0,0 +1,115 @@ +package io.aiven.inkless.storage_backend.gcs.nettyhttpclient; + +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; + +import java.io.IOException; +import java.net.URI; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; +import reactor.core.publisher.Flux; +import reactor.netty.ByteBufFlux; +import reactor.netty.http.client.HttpClient; + +import static io.netty.buffer.Unpooled.wrappedBuffer; + +public class ReactorNettyRequest extends LowLevelHttpRequest { + + private HttpClient client; + private final String method; + private final URI uri; + private final HttpHeaders headers = new DefaultHttpHeaders(); + + public ReactorNettyRequest(HttpClient client, String method, String url) { + this.client = client; + this.method = method; + this.uri = URI.create(url); + } + + @Override + public void addHeader(String name, String value) { + headers.add(name, value); + } + + @Override + public LowLevelHttpResponse execute() throws IOException { + final HttpClient.ResponseReceiver receiver; + final ByteBuf buffer; + switch (method) { + case "POST": + case "PUT": + case "DELETE": + if (getStreamingContent() != null) { + long contentLength = getContentLength(); + buffer = wrappedBuffer(new byte[Math.toIntExact(contentLength)]); + buffer.resetWriterIndex(); + try (ByteBufOutputStream out = new ByteBufOutputStream(buffer)) { + getStreamingContent().writeTo(out); + } + + String contentType = getContentType(); + String contentEncoding = getContentEncoding(); + + if (contentType != null) { + headers.set(HttpHeaderNames.CONTENT_TYPE.toString(), contentType); + } + if (contentEncoding != null) { + headers.set(HttpHeaderNames.CONTENT_ENCODING.toString(), contentEncoding); + } + if (contentLength >= 0) { + headers.set(HttpHeaderNames.CONTENT_LENGTH.toString(), Math.toIntExact(contentLength)); + } + } else { + buffer = wrappedBuffer(new byte[0]); + } + + client = client.headers(cons -> { + cons.add(headers); + }); + + HttpClient.RequestSender sender; + switch (method) { + case "POST": + sender = client.post(); + break; + case "PUT": + sender = client.put(); + break; + case "DELETE": + sender = client.delete(); + break; + default: + throw new RuntimeException("unknown method"); + } + receiver = sender.uri(uri).send(ByteBufFlux.fromInbound(Flux.just(buffer))); + break; + case "GET": + client = client.headers(cons -> { + cons.add(headers); + }); + receiver = client.get().uri(uri); + break; + default: + throw new RuntimeException("Unsupported method " + method); + } + + final ReactorNettyResponse block = receiver.responseSingle((response, content) -> { + return content.map(bb -> { + // Buffer is directly allocated, copy bytes to heap and + // allow the Reactor framework to release the direct buffer + // and free the memory for reuse. + final byte[] clone = new byte[bb.readableBytes()]; + bb.readBytes(clone); + return new ReactorNettyResponse(response, clone); + }).defaultIfEmpty(new ReactorNettyResponse(response, null)); + }).block(); + if (block == null) { + throw new RuntimeException("No response received."); + } + return block; + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyResponse.java b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyResponse.java new file mode 100644 index 0000000000..a848cb9618 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyResponse.java @@ -0,0 +1,81 @@ +package io.aiven.inkless.storage_backend.gcs.nettyhttpclient; + +import com.google.api.client.http.LowLevelHttpResponse; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import io.netty.handler.codec.http.HttpHeaderNames; +import reactor.netty.http.client.HttpClientResponse; + +public class ReactorNettyResponse extends LowLevelHttpResponse { + + private final HttpClientResponse response; + private final byte[] content; + + public ReactorNettyResponse(HttpClientResponse response, byte[] content) { + this.response = response; + if (content == null) { + this.content = new byte[0]; + } else { + this.content = content; + } + } + + @Override + public InputStream getContent() throws IOException { + return new ByteArrayInputStream(content); + } + + @Override + public String getContentEncoding() throws IOException { + return response.responseHeaders().get(HttpHeaderNames.CONTENT_ENCODING); + } + + @Override + public long getContentLength() throws IOException { + return content.length; + } + + @Override + public String getContentType() throws IOException { + return response.responseHeaders().get(HttpHeaderNames.CONTENT_TYPE); + } + + @Override + public String getStatusLine() throws IOException { + final StringBuilder buf = new StringBuilder(); + + buf.append(response.version()).append(" ").append(getStatusCode()).append(" "); + if (this.getReasonPhrase() != null) { + buf.append(getReasonPhrase()); + } + return buf.toString(); + } + + @Override + public int getStatusCode() throws IOException { + return response.status().code(); + } + + @Override + public String getReasonPhrase() throws IOException { + return response.status().reasonPhrase(); + } + + @Override + public int getHeaderCount() throws IOException { + return response.responseHeaders().size(); + } + + @Override + public String getHeaderName(int index) throws IOException { + return response.responseHeaders().entries().get(index).getKey(); + } + + @Override + public String getHeaderValue(int index) throws IOException { + return response.responseHeaders().entries().get(index).getValue(); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyTransport.java b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyTransport.java new file mode 100644 index 0000000000..60cba64b78 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/nettyhttpclient/ReactorNettyTransport.java @@ -0,0 +1,85 @@ +package io.aiven.inkless.storage_backend.gcs.nettyhttpclient; + +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.LowLevelHttpRequest; + +import java.io.IOException; +import java.net.URI; + +import io.netty.channel.ChannelOption; +import io.netty.handler.codec.http.HttpHeaderNames; +import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; + +public final class ReactorNettyTransport extends HttpTransport { + + private static ReactorNettyTransport instance; + + private final URI uri; + private final HttpClient client; + private final LoopResources clientEventLoopGroup; + private final ConnectionProvider connectionProvider; + private boolean shutdown = false; + + private ReactorNettyTransport(final String endpoint) { + uri = URI.create(endpoint); + final int port; + if (uri.getPort() == -1) { + // No port defined, select by the scheme + port = "https://".equals(uri.getScheme()) ? 80 : 443; + } else { + port = uri.getPort(); + } + + connectionProvider = ConnectionProvider.builder("custom") + .maxConnections(96) + .build(); + + clientEventLoopGroup = LoopResources.create("gcs-netty-http", 32, true); + + client = HttpClient.create(connectionProvider) + .runOn(clientEventLoopGroup) + .host(uri.getHost()) + .port(uri.getPort()) + .keepAlive(true) + .followRedirect(false) + .compress(false) + .option(ChannelOption.SO_KEEPALIVE, true) + .headers(cons -> cons.set(HttpHeaderNames.HOST, String.format("%s:%s", uri.getHost(), port))); + client.warmup().block(); + } + + @Override + protected LowLevelHttpRequest buildRequest(String method, String url) throws IOException { + return new ReactorNettyRequest(client, method, url); + } + + @Override + public void shutdown() throws IOException { + if (!shutdown) { + connectionProvider.disposeLater().block(); + clientEventLoopGroup.disposeLater().block(); + shutdown = true; + } + } + + /** + * Returns whether the transport is shutdown or not. + * + * @return true if the transport is shutdown. + * @since 1.44.0 + */ + @Override + public boolean isShutdown() { + return shutdown; + } + + public synchronized static ReactorNettyTransport get(String gcsUri) { + if (instance == null) { + instance = new ReactorNettyTransport(gcsUri); + } + return instance; + } + +}