From ea8d09f97edeef3683e2eb970308d2b6918a4142 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sat, 27 Sep 2025 18:58:25 +0200 Subject: [PATCH] HTTPCLIENT-2398 - cap async execution queue to break recursive re-entry. Add configurable maxQueuedRequests (default unlimited). Release slot on fail/cancel/close to avoid leaks --- .../AsyncClientExchangeHandlerProxy.java | 86 +++ .../http/impl/async/H2AsyncClientBuilder.java | 22 +- .../impl/async/HttpAsyncClientBuilder.java | 22 +- .../impl/async/InternalH2AsyncClient.java | 15 +- .../async/InternalH2AsyncExecRuntime.java | 58 +- .../impl/async/InternalHttpAsyncClient.java | 10 +- .../async/InternalHttpAsyncExecRuntime.java | 65 +- ...yncClientBuilderMaxQueuedRequestsTest.java | 224 +++++++ ...nternalH2AsyncExecRuntimeQueueCapTest.java | 292 +++++++++ ...ernalHttpAsyncExecRuntimeQueueCapTest.java | 564 ++++++++++++++++++ 10 files changed, 1337 insertions(+), 21 deletions(-) create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncClientExchangeHandlerProxy.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncClientExchangeHandlerProxy.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncClientExchangeHandlerProxy.java new file mode 100644 index 0000000000..f14160ef13 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncClientExchangeHandlerProxy.java @@ -0,0 +1,86 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; + +@Internal +final class AsyncClientExchangeHandlerProxy implements InvocationHandler { + + private final AsyncClientExchangeHandler handler; + private final Runnable onRelease; + private final AtomicBoolean released; + + private AsyncClientExchangeHandlerProxy( + final AsyncClientExchangeHandler handler, + final Runnable onRelease) { + this.handler = handler; + this.onRelease = onRelease; + this.released = new AtomicBoolean(false); + } + + static AsyncClientExchangeHandler newProxy( + final AsyncClientExchangeHandler handler, + final Runnable onRelease) { + return (AsyncClientExchangeHandler) Proxy.newProxyInstance( + AsyncClientExchangeHandler.class.getClassLoader(), + new Class[]{AsyncClientExchangeHandler.class}, + new AsyncClientExchangeHandlerProxy(handler, onRelease)); + } + + @Override + public Object invoke( + final Object proxy, + final Method method, + final Object[] args) throws Throwable { + if ("releaseResources".equals(method.getName()) + && method.getParameterCount() == 0) { + try { + return method.invoke(handler, args); + } catch (final InvocationTargetException ex) { + throw ex.getCause(); + } finally { + if (released.compareAndSet(false, true)) { + onRelease.run(); + } + } + } + try { + return method.invoke(handler, args); + } catch (final InvocationTargetException ex) { + throw ex.getCause(); + } + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java index 998442faa5..0f374e1912 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java @@ -218,6 +218,8 @@ private ExecInterceptorEntry( private boolean priorityHeaderDisabled; + private int maxQueuedRequests = -1; + public static H2AsyncClientBuilder create() { return new H2AsyncClientBuilder(); } @@ -324,6 +326,22 @@ public final H2AsyncClientBuilder disableRequestPriority() { return this; } + /** + * Sets a hard cap on the number of requests allowed to be queued/in-flight + * within the internal async execution pipeline. When the limit is reached, + * new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}. + * A value {@code <= 0} means unlimited (default). + * + * @param max maximum number of queued requests; {@code <= 0} to disable the cap + * @return this builder + * @since 5.6 + */ + @Experimental + public final H2AsyncClientBuilder setMaxQueuedRequests(final int max) { + this.maxQueuedRequests = max; + return this; + } + /** * Adds this protocol interceptor to the head of the protocol processing list. * @@ -976,7 +994,9 @@ public CloseableHttpAsyncClient build() { cookieStoreCopy, credentialsProviderCopy, defaultRequestConfig, - closeablesCopy); + closeablesCopy, + maxQueuedRequests); + } static class IdleConnectionEvictor implements Closeable { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java index b29ca5f768..4e1d7616e9 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java @@ -267,6 +267,8 @@ private ExecInterceptorEntry( private ProxySelector proxySelector; + private int maxQueuedRequests = -1; + private EarlyHintsListener earlyHintsListener; private boolean priorityHeaderDisabled; @@ -899,6 +901,23 @@ public HttpAsyncClientBuilder disableContentCompression() { return this; } + /** + * Sets a hard cap on the number of requests allowed to be queued/in-flight + * within the internal async execution pipeline. When the limit is reached, + * new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}. + * A value <= 0 means unlimited (default). + * + * @param max maximum number of queued requests; <= 0 to disable the cap + * @return this builder + * @since 5.6 + */ + @Experimental + public HttpAsyncClientBuilder setMaxQueuedRequests(final int max) { + this.maxQueuedRequests = max; + return this; + } + + /** * Disable installing the HTTP/2 Priority header interceptor by default. * @since 5.6 @@ -1260,7 +1279,8 @@ public CloseableHttpAsyncClient build() { credentialsProviderCopy, contextAdaptor(), defaultRequestConfig, - closeablesCopy); + closeablesCopy, + maxQueuedRequests); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java index 709a4b6420..3c27051000 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java @@ -29,6 +29,7 @@ import java.io.Closeable; import java.util.List; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.async.AsyncExecRuntime; @@ -69,6 +70,8 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient private static final Logger LOG = LoggerFactory.getLogger(InternalH2AsyncClient.class); private final HttpRoutePlanner routePlanner; private final InternalH2ConnPool connPool; + private final int maxQueuedRequests; + private final AtomicInteger queuedRequests; InternalH2AsyncClient( final DefaultConnectingIOReactor ioReactor, @@ -82,21 +85,27 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient final CookieStore cookieStore, final CredentialsProvider credentialsProvider, final RequestConfig defaultConfig, - final List closeables) { + final List closeables, + final int maxQueuedRequests) { super(ioReactor, pushConsumerRegistry, threadFactory, execChain, cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, HttpClientContext::castOrCreate, defaultConfig, closeables); this.connPool = connPool; this.routePlanner = routePlanner; + this.maxQueuedRequests = maxQueuedRequests; + this.queuedRequests = maxQueuedRequests > 0 ? new AtomicInteger(0) : null; } @Override AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory pushHandlerFactory) { - return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory); + return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, maxQueuedRequests, queuedRequests); } @Override - HttpRoute determineRoute(final HttpHost httpHost, final HttpRequest request, final HttpClientContext clientContext) throws HttpException { + HttpRoute determineRoute( + final HttpHost httpHost, + final HttpRequest request, + final HttpClientContext clientContext) throws HttpException { final HttpRoute route = routePlanner.determineRoute(httpHost, request, clientContext); if (route.isTunnelled()) { throw new HttpException("HTTP/2 tunneling not supported"); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java index 96d185201d..48fbaf1cdd 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java @@ -28,6 +28,8 @@ package org.apache.hc.client5.http.impl.async; import java.io.InterruptedIOException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.client5.http.EndpointInfo; @@ -61,17 +63,30 @@ class InternalH2AsyncExecRuntime implements AsyncExecRuntime { private final InternalH2ConnPool connPool; private final HandlerFactory pushHandlerFactory; private final AtomicReference sessionRef; + private final int maxQueued; + private final AtomicInteger sharedQueued; private volatile boolean reusable; InternalH2AsyncExecRuntime( final Logger log, final InternalH2ConnPool connPool, final HandlerFactory pushHandlerFactory) { + this(log, connPool, pushHandlerFactory, -1, null); + } + + InternalH2AsyncExecRuntime( + final Logger log, + final InternalH2ConnPool connPool, + final HandlerFactory pushHandlerFactory, + final int maxQueued, + final AtomicInteger sharedQueued) { super(); this.log = log; this.connPool = connPool; this.pushHandlerFactory = pushHandlerFactory; this.sessionRef = new AtomicReference<>(); + this.maxQueued = maxQueued; + this.sharedQueued = sharedQueued; } @Override @@ -246,12 +261,41 @@ public EndpointInfo getEndpointInfo() { return null; } + private boolean tryAcquireSlot() { + if (sharedQueued == null || maxQueued <= 0) { + return true; + } + for (;;) { + final int q = sharedQueued.get(); + if (q >= maxQueued) { + return false; + } + if (sharedQueued.compareAndSet(q, q + 1)) { + return true; + } + } + } + + private void releaseSlot() { + if (sharedQueued != null && maxQueued > 0) { + sharedQueued.decrementAndGet(); + } + } + @Override public Cancellable execute( final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) { - final ComplexCancellable complexCancellable = new ComplexCancellable(); final Endpoint endpoint = ensureValid(); + if (!tryAcquireSlot()) { + exchangeHandler.failed(new RejectedExecutionException( + "Execution pipeline queue limit reached (max=" + maxQueued + ")")); + return Operations.nonCancellable(); + } + final AsyncClientExchangeHandler actual = sharedQueued != null + ? AsyncClientExchangeHandlerProxy.newProxy(exchangeHandler, this::releaseSlot) + : exchangeHandler; + final ComplexCancellable complexCancellable = new ComplexCancellable(); final IOSession session = endpoint.session; if (session.isOpen()) { if (log.isDebugEnabled()) { @@ -259,7 +303,7 @@ public Cancellable execute( } context.setProtocolVersion(HttpVersion.HTTP_2); session.enqueue( - new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context), + new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context), Command.Priority.NORMAL); } else { final HttpRoute route = endpoint.route; @@ -276,19 +320,19 @@ public void completed(final IOSession ioSession) { log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); } context.setProtocolVersion(HttpVersion.HTTP_2); - session.enqueue( - new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context), + ioSession.enqueue( + new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context), Command.Priority.NORMAL); } @Override public void failed(final Exception ex) { - exchangeHandler.failed(ex); + actual.failed(ex); } @Override public void cancelled() { - exchangeHandler.failed(new InterruptedIOException()); + actual.failed(new InterruptedIOException()); } }); @@ -325,7 +369,7 @@ public String getId() { @Override public AsyncExecRuntime fork() { - return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory); + return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, maxQueued, sharedQueued); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java index e02be0605c..1604039f30 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java @@ -29,6 +29,7 @@ import java.io.Closeable; import java.util.List; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hc.client5.http.HttpRoute; @@ -75,6 +76,8 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie private final AsyncClientConnectionManager manager; private final HttpRoutePlanner routePlanner; private final TlsConfig tlsConfig; + private final int maxQueuedRequests; + private final AtomicInteger queuedCounter; InternalHttpAsyncClient( final DefaultConnectingIOReactor ioReactor, @@ -90,18 +93,21 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie final CredentialsProvider credentialsProvider, final Function contextAdaptor, final RequestConfig defaultConfig, - final List closeables) { + final List closeables, + final int maxQueuedRequests) { super(ioReactor, pushConsumerRegistry, threadFactory, execChain, cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, contextAdaptor, defaultConfig, closeables); this.manager = manager; this.routePlanner = routePlanner; this.tlsConfig = tlsConfig; + this.maxQueuedRequests = maxQueuedRequests; + this.queuedCounter = maxQueuedRequests > 0 ? new AtomicInteger(0) : null; } @Override AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory pushHandlerFactory) { - return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig); + return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, maxQueuedRequests, queuedCounter); } @Override diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java index 28d59abaf4..5d49bb0dbd 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java @@ -28,6 +28,8 @@ package org.apache.hc.client5.http.impl.async; import java.io.InterruptedIOException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.client5.http.EndpointInfo; @@ -77,6 +79,8 @@ static class ReUseData { private final TlsConfig tlsConfig; private final AtomicReference endpointRef; private final AtomicReference reuseDataRef; + private final int maxQueued; + private final AtomicInteger sharedQueued; InternalHttpAsyncExecRuntime( final Logger log, @@ -84,6 +88,17 @@ static class ReUseData { final ConnectionInitiator connectionInitiator, final HandlerFactory pushHandlerFactory, final TlsConfig tlsConfig) { + this(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, -1, null); + } + + InternalHttpAsyncExecRuntime( + final Logger log, + final AsyncClientConnectionManager manager, + final ConnectionInitiator connectionInitiator, + final HandlerFactory pushHandlerFactory, + final TlsConfig tlsConfig, + final int maxQueued, + final AtomicInteger sharedQueued) { super(); this.log = log; this.manager = manager; @@ -92,6 +107,8 @@ static class ReUseData { this.tlsConfig = tlsConfig; this.endpointRef = new AtomicReference<>(); this.reuseDataRef = new AtomicReference<>(); + this.maxQueued = maxQueued; + this.sharedQueued = sharedQueued; } @Override @@ -183,7 +200,7 @@ public void discardEndpoint() { @Override public boolean validateConnection() { - if (reuseDataRef != null) { + if (reuseDataRef.get() != null) { final AsyncConnectionEndpoint endpoint = endpointRef.get(); return endpoint != null && endpoint.isConnected(); } @@ -282,10 +299,44 @@ public EndpointInfo getEndpointInfo() { return endpoint != null ? endpoint.getInfo() : null; } + private boolean tryAcquireSlot() { + if (sharedQueued == null || maxQueued <= 0) { + return true; + } + for (;;) { + final int q = sharedQueued.get(); + if (q >= maxQueued) { + return false; + } + if (sharedQueued.compareAndSet(q, q + 1)) { + return true; + } + } + } + + private void releaseSlot() { + if (sharedQueued != null && maxQueued > 0) { + sharedQueued.decrementAndGet(); + } + } + + private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handler) { + if (sharedQueued == null) { + return handler; + } + return AsyncClientExchangeHandlerProxy.newProxy(handler, this::releaseSlot); + } + @Override public Cancellable execute( final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) { final AsyncConnectionEndpoint endpoint = ensureValid(); + if (sharedQueued != null && !tryAcquireSlot()) { + exchangeHandler.failed(new RejectedExecutionException( + "Execution pipeline queue limit reached (max=" + maxQueued + ")")); + return Operations.nonCancellable(); + } + final AsyncClientExchangeHandler actual = guard(exchangeHandler); if (endpoint.isConnected()) { if (log.isDebugEnabled()) { log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); @@ -295,10 +346,10 @@ public Cancellable execute( if (responseTimeout != null) { endpoint.setSocketTimeout(responseTimeout); } - endpoint.execute(id, exchangeHandler, pushHandlerFactory, context); + endpoint.execute(id, actual, pushHandlerFactory, context); if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) { return () -> { - exchangeHandler.cancel(); + actual.cancel(); return true; }; } @@ -311,7 +362,7 @@ public void completed(final AsyncExecRuntime runtime) { log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); } try { - endpoint.execute(id, exchangeHandler, pushHandlerFactory, context); + endpoint.execute(id, actual, pushHandlerFactory, context); } catch (final RuntimeException ex) { failed(ex); } @@ -319,12 +370,12 @@ public void completed(final AsyncExecRuntime runtime) { @Override public void failed(final Exception ex) { - exchangeHandler.failed(ex); + actual.failed(ex); } @Override public void cancelled() { - exchangeHandler.failed(new InterruptedIOException()); + actual.failed(new InterruptedIOException()); } }); @@ -344,7 +395,7 @@ public void markConnectionNonReusable() { @Override public AsyncExecRuntime fork() { - return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig); + return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, maxQueued, sharedQueued); } } diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java new file mode 100644 index 0000000000..f37bd43c71 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java @@ -0,0 +1,224 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.SocketAddress; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.client5.http.EndpointInfo; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.nio.AsyncClientConnectionManager; +import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.ConnectionInitiator; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Test; + +public class HttpAsyncClientBuilderMaxQueuedRequestsTest { + + @Test + void secondSubmissionIsRejectedWhenCapIsReached() throws Exception { + final BlockingEndpoint endpoint = new BlockingEndpoint(); + final FakeManager manager = new FakeManager(endpoint); + + final RequestConfig rc = RequestConfig.custom().build(); + + try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create() + .setDefaultRequestConfig(rc) + .setConnectionManager(manager) + .setMaxQueuedRequests(1) + .build()) { + + client.start(); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(rc); + + final SimpleHttpRequest r1 = SimpleRequestBuilder.get("http://localhost/").build(); + client.execute(SimpleRequestProducer.create(r1), SimpleResponseConsumer.create(), ctx, + new FutureCallback() { + @Override + public void completed(final SimpleHttpResponse result) { + } + + @Override + public void failed(final Exception ex) { + } + + @Override + public void cancelled() { + } + }); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference<>(); + + final SimpleHttpRequest r2 = SimpleRequestBuilder.get("http://localhost/second").build(); + client.execute(SimpleRequestProducer.create(r2), SimpleResponseConsumer.create(), ctx, + new FutureCallback() { + @Override + public void completed(final SimpleHttpResponse result) { + } + + @Override + public void failed(final Exception ex) { + failure.set(ex); + latch.countDown(); + } + + @Override + public void cancelled() { + failure.set(new CancellationException("cancelled")); + latch.countDown(); + } + }); + + assertTrue(latch.await(2, TimeUnit.SECONDS), "rejection should arrive quickly"); + assertInstanceOf(RejectedExecutionException.class, failure.get(), "Expected RejectedExecutionException, got: " + failure.get()); + } + } + + + private static final class FakeManager implements AsyncClientConnectionManager { + private final AsyncConnectionEndpoint endpoint; + + FakeManager(final AsyncConnectionEndpoint endpoint) { + this.endpoint = endpoint; + } + + @Override + public Future lease(final String id, + final org.apache.hc.client5.http.HttpRoute route, + final Object state, + final Timeout requestTimeout, + final FutureCallback callback) { + final CompletableFuture cf = CompletableFuture.completedFuture(endpoint); + if (callback != null) callback.completed(endpoint); + return cf; + } + + @Override + public Future connect(final AsyncConnectionEndpoint endpoint, + final ConnectionInitiator connectionInitiator, + final Timeout connectTimeout, + final Object attachment, + final HttpContext context, + final FutureCallback callback) { + ((BlockingEndpoint) this.endpoint).connected = true; + final CompletableFuture cf = CompletableFuture.completedFuture(endpoint); + if (callback != null) callback.completed(endpoint); + return cf; + } + + @Override + public void upgrade(final AsyncConnectionEndpoint endpoint, + final Object attachment, + final HttpContext context) { + } + + @Override + public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) { + } + + @Override + public void close(final CloseMode closeMode) { + } + + @Override + public void close() { + } + } + + private static final class BlockingEndpoint extends AsyncConnectionEndpoint { + volatile boolean connected = true; + + @Override + public void execute(final String id, + final AsyncClientExchangeHandler handler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) { + } + + @Override + public boolean isConnected() { + return connected; + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + } + + @Override + public void close(final CloseMode closeMode) { + connected = false; + } + + @Override + public EndpointInfo getInfo() { + return null; + } + } + + @SuppressWarnings("unused") + private static final class NoopInitiator implements ConnectionInitiator { + @Override + public Future connect(final NamedEndpoint endpoint, + final SocketAddress remote, + final SocketAddress local, + final Timeout timeout, + final Object attachment, + final FutureCallback callback) { + final CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(new UnsupportedOperationException()); + if (callback != null) callback.failed(new UnsupportedOperationException()); + return cf; + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java new file mode 100644 index 0000000000..0309d8710d --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java @@ -0,0 +1,292 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.async; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.async.AsyncExecRuntime; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.HandlerFactory; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; + +public class InternalH2AsyncExecRuntimeQueueCapTest { + + private static InternalH2AsyncExecRuntime newRuntime(final int maxQueued) { + final IOSession ioSession = newImmediateFailSession(); + final FakeH2ConnPool connPool = new FakeH2ConnPool(ioSession); + final AtomicInteger queued = maxQueued > 0 ? new AtomicInteger(0) : null; + return new InternalH2AsyncExecRuntime( + LoggerFactory.getLogger("test"), + connPool, + new NoopPushFactory(), + maxQueued, + queued); + } + + private static void acquireEndpoint( + final InternalH2AsyncExecRuntime runtime, + final HttpClientContext ctx) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + runtime.acquireEndpoint( + "id", + new HttpRoute(new HttpHost("localhost", 80)), + null, + ctx, + new FutureCallback() { + @Override + public void completed(final AsyncExecRuntime result) { + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + }); + assertTrue(latch.await(2, TimeUnit.SECONDS), "endpoint should be acquired"); + } + + /** + * With no cap (maxQueued <= 0) the recursive re-entry path should blow the stack. + * This documents the pathological behaviour without queue protection. + */ + @Test + void testRecursiveReentryCausesSOEWithoutCap() throws Exception { + final InternalH2AsyncExecRuntime runtime = newRuntime(-1); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(RequestConfig.custom().build()); + + acquireEndpoint(runtime, ctx); + + final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); + + assertThrows(StackOverflowError.class, () -> { + runtime.execute("loop", loop, ctx); + }); + } + + /** + * With a cap of 1, the second re-entrant execute call must be rejected and + * the recursion broken. + */ + @Test + void testCapBreaksRecursiveReentry() throws Exception { + final InternalH2AsyncExecRuntime runtime = newRuntime(1); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(RequestConfig.custom().build()); + + acquireEndpoint(runtime, ctx); + + final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); + + runtime.execute("loop", loop, ctx); + // immediate fail path runs synchronously; small wait is just defensive + Thread.sleep(50); + + assertTrue(loop.lastException.get() instanceof RejectedExecutionException, + "Expected queue rejection to break recursion"); + } + + /** + * Very small fake pool that always returns the same IOSession. + */ + private static final class FakeH2ConnPool extends InternalH2ConnPool { + + private final IOSession session; + + FakeH2ConnPool(final IOSession session) { + super(null, null, null); + this.session = session; + } + + @Override + public Future getSession( + final HttpRoute route, + final Timeout timeout, + final FutureCallback callback) { + final CompletableFuture cf = new CompletableFuture<>(); + cf.complete(session); + if (callback != null) { + callback.completed(session); + } + return cf; + } + + } + + /** + * IOSession that immediately fails any RequestExecutionCommand passed + * to enqueue(...), simulating an I/O failure that calls handler.failed(...) + * synchronously. + */ + private static IOSession newImmediateFailSession() { + return (IOSession) Proxy.newProxyInstance( + IOSession.class.getClassLoader(), + new Class[]{IOSession.class}, + (proxy, method, args) -> { + final String name = method.getName(); + if ("isOpen".equals(name)) { + return Boolean.TRUE; + } + if ("enqueue".equals(name)) { + final Command cmd = (Command) args[0]; + if (cmd instanceof RequestExecutionCommand) { + ((RequestExecutionCommand) cmd).failed(new IOException("immediate failure")); + } + return null; + } + if ("close".equals(name)) { + return null; + } + if (method.getReturnType().isPrimitive()) { + if (method.getReturnType() == boolean.class) { + return false; + } + if (method.getReturnType() == int.class) { + return 0; + } + if (method.getReturnType() == long.class) { + return 0L; + } + } + return null; + }); + } + + private static final class NoopPushFactory implements HandlerFactory { + @Override + public AsyncPushConsumer create(final HttpRequest request, final HttpContext context) { + return null; + } + } + + private static final class ReentrantHandler implements AsyncClientExchangeHandler { + + private final InternalH2AsyncExecRuntime runtime; + private final HttpClientContext context; + final AtomicReference lastException = new AtomicReference<>(); + + ReentrantHandler(final InternalH2AsyncExecRuntime runtime, final HttpClientContext context) { + this.runtime = runtime; + this.context = context; + } + + @Override + public void failed(final Exception cause) { + lastException.set(cause); + if (!(cause instanceof RejectedExecutionException)) { + runtime.execute("loop/reenter", this, context); + } + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext context) { + } + + @Override + public void consumeResponse( + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext context) { + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext context) { + } + + @Override + public void cancel() { + } + + @Override + public int available() { + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) { + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) { + } + + @Override + public void consume(final ByteBuffer src) { + } + + @Override + public void streamEnd(final java.util.List trailers) { + } + + @Override + public void releaseResources() { + } + + } + +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java new file mode 100644 index 0000000000..6aae8413af --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java @@ -0,0 +1,564 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.async; + +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.client5.http.EndpointInfo; +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.async.AsyncExecRuntime; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.nio.AsyncClientConnectionManager; +import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.HandlerFactory; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.ConnectionInitiator; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; + +public class InternalHttpAsyncExecRuntimeQueueCapTest { + + @Test + void testFailFastWhenQueueFull() throws Exception { + final FakeEndpoint endpoint = new FakeEndpoint(); + final FakeManager manager = new FakeManager(endpoint); + final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( + LoggerFactory.getLogger("test"), + manager, + new NoopInitiator(), + new NoopPushFactory(), + TlsConfig.DEFAULT, + 2, + new AtomicInteger() + ); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(RequestConfig.custom().build()); + + runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new FutureCallback() { + @Override + public void completed(final AsyncExecRuntime result) { + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + }); + + final CountDownLatch rejected = new CountDownLatch(1); + + final LatchingHandler h1 = new LatchingHandler(); + final LatchingHandler h2 = new LatchingHandler(); + runtime.execute("r1", h1, ctx); + runtime.execute("r2", h2, ctx); + + final LatchingHandler h3 = new LatchingHandler() { + @Override + public void failed(final Exception cause) { + super.failed(cause); + rejected.countDown(); + } + }; + runtime.execute("r3", h3, ctx); + + assertTrue(rejected.await(2, TimeUnit.SECONDS), "r3 should be failed fast"); + assertTrue(h3.failedException.get() instanceof RejectedExecutionException); + assertNull(h1.failedException.get()); + assertNull(h2.failedException.get()); + } + + @Test + void testSlotReleasedOnTerminalSignalAllowsNext() throws Exception { + final FakeEndpoint endpoint = new FakeEndpoint(); + final FakeManager manager = new FakeManager(endpoint); + final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( + LoggerFactory.getLogger("test"), + manager, + new NoopInitiator(), + new NoopPushFactory(), + TlsConfig.DEFAULT, + 1, + new AtomicInteger() + ); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(RequestConfig.custom().build()); + + runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, + new FutureCallback() { + @Override + public void completed(final AsyncExecRuntime result) { + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + }); + + final LatchingHandler h1 = new LatchingHandler(); + runtime.execute("r1", h1, ctx); + + final LatchingHandler h2 = new LatchingHandler(); + runtime.execute("r2", h2, ctx); + assertTrue(h2.awaitFailed(2, TimeUnit.SECONDS)); + assertTrue(h2.failedException.get() instanceof RejectedExecutionException); + + // free the slot via releaseResources(), not failed() + endpoint.completeOne(); + + final LatchingHandler h3 = new LatchingHandler(); + runtime.execute("r3", h3, ctx); + Thread.sleep(150); + assertNull(h3.failedException.get(), "r3 should not be rejected after slot released"); + h3.cancel(); + } + + + private static final class NoopInitiator implements ConnectionInitiator { + @Override + public Future connect(final NamedEndpoint endpoint, + final SocketAddress remoteAddress, + final SocketAddress localAddress, + final Timeout timeout, + final Object attachment, + final FutureCallback callback) { + final CompletableFuture cf = new CompletableFuture<>(); + final UnsupportedOperationException ex = new UnsupportedOperationException("not used"); + cf.completeExceptionally(ex); + if (callback != null) { + callback.failed(ex); + } + return cf; + } + } + + private static final class NoopPushFactory implements HandlerFactory { + @Override + public AsyncPushConsumer create(final HttpRequest request, final HttpContext context) { + return null; + } + } + + private static final class FakeManager implements AsyncClientConnectionManager { + private final AsyncConnectionEndpoint endpoint; + + FakeManager(final AsyncConnectionEndpoint endpoint) { + this.endpoint = endpoint; + } + + @Override + public Future lease(final String id, + final HttpRoute route, + final Object state, + final Timeout requestTimeout, + final FutureCallback callback) { + final CompletableFuture cf = CompletableFuture.completedFuture(endpoint); + if (callback != null) { + callback.completed(endpoint); + } + return cf; + } + + @Override + public Future connect(final AsyncConnectionEndpoint endpoint, + final ConnectionInitiator connectionInitiator, + final Timeout connectTimeout, + final Object attachment, + final HttpContext context, + final FutureCallback callback) { + ((FakeEndpoint) this.endpoint).connected = true; + final CompletableFuture cf = CompletableFuture.completedFuture(endpoint); + if (callback != null) { + callback.completed(endpoint); + } + return cf; + } + + @Override + public void upgrade(final AsyncConnectionEndpoint endpoint, + final Object attachment, + final HttpContext context) { + } + + @Override + public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) { + } + + @Override + public void close(final CloseMode closeMode) { + } + + @Override + public void close() { + } + } + + private static final class FakeEndpoint extends AsyncConnectionEndpoint { + volatile boolean connected = true; + private final ConcurrentLinkedQueue inFlight = new ConcurrentLinkedQueue<>(); + + @Override + public void execute(final String id, + final AsyncClientExchangeHandler handler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) { + // keep the guarded handler so tests can signal terminal events + inFlight.add(handler); + } + + // helpers for tests + void failOne(final Exception ex) { + final AsyncClientExchangeHandler h = inFlight.poll(); + if (h != null) { + h.failed(ex); + } + } + + void cancelOne() { + final AsyncClientExchangeHandler h = inFlight.poll(); + if (h != null) { + h.cancel(); + } + } + + void completeOne() { + final AsyncClientExchangeHandler h = inFlight.poll(); + if (h != null) { + h.releaseResources(); + } + } + + @Override + public boolean isConnected() { + return connected; + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + } + + @Override + public void close(final CloseMode closeMode) { + connected = false; + } + + @Override + public EndpointInfo getInfo() { + return null; + } + } + + + + private static class LatchingHandler implements AsyncClientExchangeHandler { + final AtomicReference failedException = new AtomicReference<>(); + final CountDownLatch failLatch = new CountDownLatch(1); + + boolean awaitFailed(final long t, final TimeUnit u) throws InterruptedException { + return failLatch.await(t, u); + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext context) { + } + + @Override + public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) { + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext context) { + } + + @Override + public void cancel() { + } + + @Override + public int available() { + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) { + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) { + } + + @Override + public void consume(final ByteBuffer src) { + } + + @Override + public void streamEnd(final List trailers) { + } + + @Override + public void releaseResources() { + } + + @Override + public void failed(final Exception cause) { + failedException.compareAndSet(null, Objects.requireNonNull(cause)); + failLatch.countDown(); + } + } + + @Test + void testRecursiveReentryCausesSOEWithoutCap() { + final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint(); + final FakeManager manager = new FakeManager(endpoint); + + final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( + LoggerFactory.getLogger("test"), + manager, + new NoopInitiator(), + new NoopPushFactory(), + TlsConfig.DEFAULT, + -1, + null // no cap, no counter + ); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(RequestConfig.custom().build()); + + runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, + new FutureCallback() { + @Override + public void completed(final AsyncExecRuntime result) { + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + }); + + final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); + + assertThrows(StackOverflowError.class, () -> { + runtime.execute("loop", loop, ctx); // execute -> endpoint.execute -> failed() -> execute -> ... + }); + } + + @Test + void testCapBreaksRecursiveReentry() throws Exception { + final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint(); + final FakeManager manager = new FakeManager(endpoint); + + final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( + LoggerFactory.getLogger("test"), + manager, + new NoopInitiator(), + new NoopPushFactory(), + TlsConfig.DEFAULT, + 1, + new AtomicInteger() + ); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(RequestConfig.custom().build()); + + runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, + new FutureCallback() { + @Override + public void completed(final AsyncExecRuntime result) { + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + }); + + final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); + + // Should NOT blow the stack; the re-entrant call should be rejected. + runtime.execute("loop", loop, ctx); + // allow the immediate fail+re-submit path to run + Thread.sleep(50); + + assertTrue(loop.lastException.get() instanceof RejectedExecutionException, + "Expected rejection to break the recursion"); + } + + /** + * Endpoint that synchronously fails any handler passed to execute(). + */ + private static final class ImmediateFailEndpoint extends AsyncConnectionEndpoint { + volatile boolean connected = true; + + @Override + public void execute(final String id, + final AsyncClientExchangeHandler handler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) { + handler.failed(new IOException("immediate failure")); + } + + @Override + public boolean isConnected() { + return connected; + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + } + + @Override + public void close(final CloseMode closeMode) { + connected = false; + } + + @Override + public EndpointInfo getInfo() { + return null; + } + } + + private static final class ReentrantHandler implements AsyncClientExchangeHandler { + private final InternalHttpAsyncExecRuntime runtime; + private final HttpClientContext ctx; + final AtomicReference lastException = new AtomicReference<>(); + + ReentrantHandler(final InternalHttpAsyncExecRuntime runtime, final HttpClientContext ctx) { + this.runtime = runtime; + this.ctx = ctx; + } + + @Override + public void failed(final Exception cause) { + lastException.set(cause); + // Re-enter only if this was NOT the cap rejecting us + if (!(cause instanceof RejectedExecutionException)) { + runtime.execute("loop/reenter", this, ctx); + } + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext context) { + } + + @Override + public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) { + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext context) { + } + + @Override + public void cancel() { + } + + @Override + public int available() { + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) { + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) { + } + + @Override + public void consume(final ByteBuffer src) { + } + + @Override + public void streamEnd(final List trailers) { + } + + @Override + public void releaseResources() { + } + } + +}