diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncClientExchangeHandlerProxy.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncClientExchangeHandlerProxy.java
new file mode 100644
index 0000000000..f14160ef13
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncClientExchangeHandlerProxy.java
@@ -0,0 +1,86 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+
+@Internal
+final class AsyncClientExchangeHandlerProxy implements InvocationHandler {
+
+ private final AsyncClientExchangeHandler handler;
+ private final Runnable onRelease;
+ private final AtomicBoolean released;
+
+ private AsyncClientExchangeHandlerProxy(
+ final AsyncClientExchangeHandler handler,
+ final Runnable onRelease) {
+ this.handler = handler;
+ this.onRelease = onRelease;
+ this.released = new AtomicBoolean(false);
+ }
+
+ static AsyncClientExchangeHandler newProxy(
+ final AsyncClientExchangeHandler handler,
+ final Runnable onRelease) {
+ return (AsyncClientExchangeHandler) Proxy.newProxyInstance(
+ AsyncClientExchangeHandler.class.getClassLoader(),
+ new Class>[]{AsyncClientExchangeHandler.class},
+ new AsyncClientExchangeHandlerProxy(handler, onRelease));
+ }
+
+ @Override
+ public Object invoke(
+ final Object proxy,
+ final Method method,
+ final Object[] args) throws Throwable {
+ if ("releaseResources".equals(method.getName())
+ && method.getParameterCount() == 0) {
+ try {
+ return method.invoke(handler, args);
+ } catch (final InvocationTargetException ex) {
+ throw ex.getCause();
+ } finally {
+ if (released.compareAndSet(false, true)) {
+ onRelease.run();
+ }
+ }
+ }
+ try {
+ return method.invoke(handler, args);
+ } catch (final InvocationTargetException ex) {
+ throw ex.getCause();
+ }
+ }
+
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java
index 998442faa5..0f374e1912 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java
@@ -218,6 +218,8 @@ private ExecInterceptorEntry(
private boolean priorityHeaderDisabled;
+ private int maxQueuedRequests = -1;
+
public static H2AsyncClientBuilder create() {
return new H2AsyncClientBuilder();
}
@@ -324,6 +326,22 @@ public final H2AsyncClientBuilder disableRequestPriority() {
return this;
}
+ /**
+ * Sets a hard cap on the number of requests allowed to be queued/in-flight
+ * within the internal async execution pipeline. When the limit is reached,
+ * new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
+ * A value {@code <= 0} means unlimited (default).
+ *
+ * @param max maximum number of queued requests; {@code <= 0} to disable the cap
+ * @return this builder
+ * @since 5.6
+ */
+ @Experimental
+ public final H2AsyncClientBuilder setMaxQueuedRequests(final int max) {
+ this.maxQueuedRequests = max;
+ return this;
+ }
+
/**
* Adds this protocol interceptor to the head of the protocol processing list.
*
@@ -976,7 +994,9 @@ public CloseableHttpAsyncClient build() {
cookieStoreCopy,
credentialsProviderCopy,
defaultRequestConfig,
- closeablesCopy);
+ closeablesCopy,
+ maxQueuedRequests);
+
}
static class IdleConnectionEvictor implements Closeable {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
index b29ca5f768..4e1d7616e9 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
@@ -267,6 +267,8 @@ private ExecInterceptorEntry(
private ProxySelector proxySelector;
+ private int maxQueuedRequests = -1;
+
private EarlyHintsListener earlyHintsListener;
private boolean priorityHeaderDisabled;
@@ -899,6 +901,23 @@ public HttpAsyncClientBuilder disableContentCompression() {
return this;
}
+ /**
+ * Sets a hard cap on the number of requests allowed to be queued/in-flight
+ * within the internal async execution pipeline. When the limit is reached,
+ * new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
+ * A value <= 0 means unlimited (default).
+ *
+ * @param max maximum number of queued requests; <= 0 to disable the cap
+ * @return this builder
+ * @since 5.6
+ */
+ @Experimental
+ public HttpAsyncClientBuilder setMaxQueuedRequests(final int max) {
+ this.maxQueuedRequests = max;
+ return this;
+ }
+
+
/**
* Disable installing the HTTP/2 Priority header interceptor by default.
* @since 5.6
@@ -1260,7 +1279,8 @@ public CloseableHttpAsyncClient build() {
credentialsProviderCopy,
contextAdaptor(),
defaultRequestConfig,
- closeablesCopy);
+ closeablesCopy,
+ maxQueuedRequests);
}
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java
index 709a4b6420..3c27051000 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java
@@ -29,6 +29,7 @@
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecRuntime;
@@ -69,6 +70,8 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
private static final Logger LOG = LoggerFactory.getLogger(InternalH2AsyncClient.class);
private final HttpRoutePlanner routePlanner;
private final InternalH2ConnPool connPool;
+ private final int maxQueuedRequests;
+ private final AtomicInteger queuedRequests;
InternalH2AsyncClient(
final DefaultConnectingIOReactor ioReactor,
@@ -82,21 +85,27 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
final CookieStore cookieStore,
final CredentialsProvider credentialsProvider,
final RequestConfig defaultConfig,
- final List closeables) {
+ final List closeables,
+ final int maxQueuedRequests) {
super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, HttpClientContext::castOrCreate,
defaultConfig, closeables);
this.connPool = connPool;
this.routePlanner = routePlanner;
+ this.maxQueuedRequests = maxQueuedRequests;
+ this.queuedRequests = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
}
@Override
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory pushHandlerFactory) {
- return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory);
+ return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, maxQueuedRequests, queuedRequests);
}
@Override
- HttpRoute determineRoute(final HttpHost httpHost, final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
+ HttpRoute determineRoute(
+ final HttpHost httpHost,
+ final HttpRequest request,
+ final HttpClientContext clientContext) throws HttpException {
final HttpRoute route = routePlanner.determineRoute(httpHost, request, clientContext);
if (route.isTunnelled()) {
throw new HttpException("HTTP/2 tunneling not supported");
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java
index 96d185201d..48fbaf1cdd 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java
@@ -28,6 +28,8 @@
package org.apache.hc.client5.http.impl.async;
import java.io.InterruptedIOException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.EndpointInfo;
@@ -61,17 +63,30 @@ class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
private final InternalH2ConnPool connPool;
private final HandlerFactory pushHandlerFactory;
private final AtomicReference sessionRef;
+ private final int maxQueued;
+ private final AtomicInteger sharedQueued;
private volatile boolean reusable;
InternalH2AsyncExecRuntime(
final Logger log,
final InternalH2ConnPool connPool,
final HandlerFactory pushHandlerFactory) {
+ this(log, connPool, pushHandlerFactory, -1, null);
+ }
+
+ InternalH2AsyncExecRuntime(
+ final Logger log,
+ final InternalH2ConnPool connPool,
+ final HandlerFactory pushHandlerFactory,
+ final int maxQueued,
+ final AtomicInteger sharedQueued) {
super();
this.log = log;
this.connPool = connPool;
this.pushHandlerFactory = pushHandlerFactory;
this.sessionRef = new AtomicReference<>();
+ this.maxQueued = maxQueued;
+ this.sharedQueued = sharedQueued;
}
@Override
@@ -246,12 +261,41 @@ public EndpointInfo getEndpointInfo() {
return null;
}
+ private boolean tryAcquireSlot() {
+ if (sharedQueued == null || maxQueued <= 0) {
+ return true;
+ }
+ for (;;) {
+ final int q = sharedQueued.get();
+ if (q >= maxQueued) {
+ return false;
+ }
+ if (sharedQueued.compareAndSet(q, q + 1)) {
+ return true;
+ }
+ }
+ }
+
+ private void releaseSlot() {
+ if (sharedQueued != null && maxQueued > 0) {
+ sharedQueued.decrementAndGet();
+ }
+ }
+
@Override
public Cancellable execute(
final String id,
final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
- final ComplexCancellable complexCancellable = new ComplexCancellable();
final Endpoint endpoint = ensureValid();
+ if (!tryAcquireSlot()) {
+ exchangeHandler.failed(new RejectedExecutionException(
+ "Execution pipeline queue limit reached (max=" + maxQueued + ")"));
+ return Operations.nonCancellable();
+ }
+ final AsyncClientExchangeHandler actual = sharedQueued != null
+ ? AsyncClientExchangeHandlerProxy.newProxy(exchangeHandler, this::releaseSlot)
+ : exchangeHandler;
+ final ComplexCancellable complexCancellable = new ComplexCancellable();
final IOSession session = endpoint.session;
if (session.isOpen()) {
if (log.isDebugEnabled()) {
@@ -259,7 +303,7 @@ public Cancellable execute(
}
context.setProtocolVersion(HttpVersion.HTTP_2);
session.enqueue(
- new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
+ new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context),
Command.Priority.NORMAL);
} else {
final HttpRoute route = endpoint.route;
@@ -276,19 +320,19 @@ public void completed(final IOSession ioSession) {
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
}
context.setProtocolVersion(HttpVersion.HTTP_2);
- session.enqueue(
- new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
+ ioSession.enqueue(
+ new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context),
Command.Priority.NORMAL);
}
@Override
public void failed(final Exception ex) {
- exchangeHandler.failed(ex);
+ actual.failed(ex);
}
@Override
public void cancelled() {
- exchangeHandler.failed(new InterruptedIOException());
+ actual.failed(new InterruptedIOException());
}
});
@@ -325,7 +369,7 @@ public String getId() {
@Override
public AsyncExecRuntime fork() {
- return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
+ return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, maxQueued, sharedQueued);
}
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
index e02be0605c..1604039f30 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
@@ -29,6 +29,7 @@
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hc.client5.http.HttpRoute;
@@ -75,6 +76,8 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
private final AsyncClientConnectionManager manager;
private final HttpRoutePlanner routePlanner;
private final TlsConfig tlsConfig;
+ private final int maxQueuedRequests;
+ private final AtomicInteger queuedCounter;
InternalHttpAsyncClient(
final DefaultConnectingIOReactor ioReactor,
@@ -90,18 +93,21 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
final CredentialsProvider credentialsProvider,
final Function contextAdaptor,
final RequestConfig defaultConfig,
- final List closeables) {
+ final List closeables,
+ final int maxQueuedRequests) {
super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, contextAdaptor,
defaultConfig, closeables);
this.manager = manager;
this.routePlanner = routePlanner;
this.tlsConfig = tlsConfig;
+ this.maxQueuedRequests = maxQueuedRequests;
+ this.queuedCounter = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
}
@Override
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory pushHandlerFactory) {
- return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig);
+ return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, maxQueuedRequests, queuedCounter);
}
@Override
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
index 28d59abaf4..5d49bb0dbd 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
@@ -28,6 +28,8 @@
package org.apache.hc.client5.http.impl.async;
import java.io.InterruptedIOException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.EndpointInfo;
@@ -77,6 +79,8 @@ static class ReUseData {
private final TlsConfig tlsConfig;
private final AtomicReference endpointRef;
private final AtomicReference reuseDataRef;
+ private final int maxQueued;
+ private final AtomicInteger sharedQueued;
InternalHttpAsyncExecRuntime(
final Logger log,
@@ -84,6 +88,17 @@ static class ReUseData {
final ConnectionInitiator connectionInitiator,
final HandlerFactory pushHandlerFactory,
final TlsConfig tlsConfig) {
+ this(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, -1, null);
+ }
+
+ InternalHttpAsyncExecRuntime(
+ final Logger log,
+ final AsyncClientConnectionManager manager,
+ final ConnectionInitiator connectionInitiator,
+ final HandlerFactory pushHandlerFactory,
+ final TlsConfig tlsConfig,
+ final int maxQueued,
+ final AtomicInteger sharedQueued) {
super();
this.log = log;
this.manager = manager;
@@ -92,6 +107,8 @@ static class ReUseData {
this.tlsConfig = tlsConfig;
this.endpointRef = new AtomicReference<>();
this.reuseDataRef = new AtomicReference<>();
+ this.maxQueued = maxQueued;
+ this.sharedQueued = sharedQueued;
}
@Override
@@ -183,7 +200,7 @@ public void discardEndpoint() {
@Override
public boolean validateConnection() {
- if (reuseDataRef != null) {
+ if (reuseDataRef.get() != null) {
final AsyncConnectionEndpoint endpoint = endpointRef.get();
return endpoint != null && endpoint.isConnected();
}
@@ -282,10 +299,44 @@ public EndpointInfo getEndpointInfo() {
return endpoint != null ? endpoint.getInfo() : null;
}
+ private boolean tryAcquireSlot() {
+ if (sharedQueued == null || maxQueued <= 0) {
+ return true;
+ }
+ for (;;) {
+ final int q = sharedQueued.get();
+ if (q >= maxQueued) {
+ return false;
+ }
+ if (sharedQueued.compareAndSet(q, q + 1)) {
+ return true;
+ }
+ }
+ }
+
+ private void releaseSlot() {
+ if (sharedQueued != null && maxQueued > 0) {
+ sharedQueued.decrementAndGet();
+ }
+ }
+
+ private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handler) {
+ if (sharedQueued == null) {
+ return handler;
+ }
+ return AsyncClientExchangeHandlerProxy.newProxy(handler, this::releaseSlot);
+ }
+
@Override
public Cancellable execute(
final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
final AsyncConnectionEndpoint endpoint = ensureValid();
+ if (sharedQueued != null && !tryAcquireSlot()) {
+ exchangeHandler.failed(new RejectedExecutionException(
+ "Execution pipeline queue limit reached (max=" + maxQueued + ")"));
+ return Operations.nonCancellable();
+ }
+ final AsyncClientExchangeHandler actual = guard(exchangeHandler);
if (endpoint.isConnected()) {
if (log.isDebugEnabled()) {
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
@@ -295,10 +346,10 @@ public Cancellable execute(
if (responseTimeout != null) {
endpoint.setSocketTimeout(responseTimeout);
}
- endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
+ endpoint.execute(id, actual, pushHandlerFactory, context);
if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
return () -> {
- exchangeHandler.cancel();
+ actual.cancel();
return true;
};
}
@@ -311,7 +362,7 @@ public void completed(final AsyncExecRuntime runtime) {
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
}
try {
- endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
+ endpoint.execute(id, actual, pushHandlerFactory, context);
} catch (final RuntimeException ex) {
failed(ex);
}
@@ -319,12 +370,12 @@ public void completed(final AsyncExecRuntime runtime) {
@Override
public void failed(final Exception ex) {
- exchangeHandler.failed(ex);
+ actual.failed(ex);
}
@Override
public void cancelled() {
- exchangeHandler.failed(new InterruptedIOException());
+ actual.failed(new InterruptedIOException());
}
});
@@ -344,7 +395,7 @@ public void markConnectionNonReusable() {
@Override
public AsyncExecRuntime fork() {
- return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig);
+ return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, maxQueued, sharedQueued);
}
}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java
new file mode 100644
index 0000000000..f37bd43c71
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java
@@ -0,0 +1,224 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.SocketAddress;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.client5.http.EndpointInfo;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
+import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Test;
+
+public class HttpAsyncClientBuilderMaxQueuedRequestsTest {
+
+ @Test
+ void secondSubmissionIsRejectedWhenCapIsReached() throws Exception {
+ final BlockingEndpoint endpoint = new BlockingEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+
+ final RequestConfig rc = RequestConfig.custom().build();
+
+ try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create()
+ .setDefaultRequestConfig(rc)
+ .setConnectionManager(manager)
+ .setMaxQueuedRequests(1)
+ .build()) {
+
+ client.start();
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(rc);
+
+ final SimpleHttpRequest r1 = SimpleRequestBuilder.get("http://localhost/").build();
+ client.execute(SimpleRequestProducer.create(r1), SimpleResponseConsumer.create(), ctx,
+ new FutureCallback() {
+ @Override
+ public void completed(final SimpleHttpResponse result) {
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ }
+
+ @Override
+ public void cancelled() {
+ }
+ });
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference failure = new AtomicReference<>();
+
+ final SimpleHttpRequest r2 = SimpleRequestBuilder.get("http://localhost/second").build();
+ client.execute(SimpleRequestProducer.create(r2), SimpleResponseConsumer.create(), ctx,
+ new FutureCallback() {
+ @Override
+ public void completed(final SimpleHttpResponse result) {
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ failure.set(ex);
+ latch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ failure.set(new CancellationException("cancelled"));
+ latch.countDown();
+ }
+ });
+
+ assertTrue(latch.await(2, TimeUnit.SECONDS), "rejection should arrive quickly");
+ assertInstanceOf(RejectedExecutionException.class, failure.get(), "Expected RejectedExecutionException, got: " + failure.get());
+ }
+ }
+
+
+ private static final class FakeManager implements AsyncClientConnectionManager {
+ private final AsyncConnectionEndpoint endpoint;
+
+ FakeManager(final AsyncConnectionEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public Future lease(final String id,
+ final org.apache.hc.client5.http.HttpRoute route,
+ final Object state,
+ final Timeout requestTimeout,
+ final FutureCallback callback) {
+ final CompletableFuture cf = CompletableFuture.completedFuture(endpoint);
+ if (callback != null) callback.completed(endpoint);
+ return cf;
+ }
+
+ @Override
+ public Future connect(final AsyncConnectionEndpoint endpoint,
+ final ConnectionInitiator connectionInitiator,
+ final Timeout connectTimeout,
+ final Object attachment,
+ final HttpContext context,
+ final FutureCallback callback) {
+ ((BlockingEndpoint) this.endpoint).connected = true;
+ final CompletableFuture cf = CompletableFuture.completedFuture(endpoint);
+ if (callback != null) callback.completed(endpoint);
+ return cf;
+ }
+
+ @Override
+ public void upgrade(final AsyncConnectionEndpoint endpoint,
+ final Object attachment,
+ final HttpContext context) {
+ }
+
+ @Override
+ public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ private static final class BlockingEndpoint extends AsyncConnectionEndpoint {
+ volatile boolean connected = true;
+
+ @Override
+ public void execute(final String id,
+ final AsyncClientExchangeHandler handler,
+ final HandlerFactory pushHandlerFactory,
+ final HttpContext context) {
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected;
+ }
+
+ @Override
+ public void setSocketTimeout(final Timeout timeout) {
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ connected = false;
+ }
+
+ @Override
+ public EndpointInfo getInfo() {
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static final class NoopInitiator implements ConnectionInitiator {
+ @Override
+ public Future connect(final NamedEndpoint endpoint,
+ final SocketAddress remote,
+ final SocketAddress local,
+ final Timeout timeout,
+ final Object attachment,
+ final FutureCallback callback) {
+ final CompletableFuture cf = new CompletableFuture<>();
+ cf.completeExceptionally(new UnsupportedOperationException());
+ if (callback != null) callback.failed(new UnsupportedOperationException());
+ return cf;
+ }
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java
new file mode 100644
index 0000000000..0309d8710d
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java
@@ -0,0 +1,292 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
+
+public class InternalH2AsyncExecRuntimeQueueCapTest {
+
+ private static InternalH2AsyncExecRuntime newRuntime(final int maxQueued) {
+ final IOSession ioSession = newImmediateFailSession();
+ final FakeH2ConnPool connPool = new FakeH2ConnPool(ioSession);
+ final AtomicInteger queued = maxQueued > 0 ? new AtomicInteger(0) : null;
+ return new InternalH2AsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ connPool,
+ new NoopPushFactory(),
+ maxQueued,
+ queued);
+ }
+
+ private static void acquireEndpoint(
+ final InternalH2AsyncExecRuntime runtime,
+ final HttpClientContext ctx) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ runtime.acquireEndpoint(
+ "id",
+ new HttpRoute(new HttpHost("localhost", 80)),
+ null,
+ ctx,
+ new FutureCallback() {
+ @Override
+ public void completed(final AsyncExecRuntime result) {
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ });
+ assertTrue(latch.await(2, TimeUnit.SECONDS), "endpoint should be acquired");
+ }
+
+ /**
+ * With no cap (maxQueued <= 0) the recursive re-entry path should blow the stack.
+ * This documents the pathological behaviour without queue protection.
+ */
+ @Test
+ void testRecursiveReentryCausesSOEWithoutCap() throws Exception {
+ final InternalH2AsyncExecRuntime runtime = newRuntime(-1);
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ acquireEndpoint(runtime, ctx);
+
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+
+ assertThrows(StackOverflowError.class, () -> {
+ runtime.execute("loop", loop, ctx);
+ });
+ }
+
+ /**
+ * With a cap of 1, the second re-entrant execute call must be rejected and
+ * the recursion broken.
+ */
+ @Test
+ void testCapBreaksRecursiveReentry() throws Exception {
+ final InternalH2AsyncExecRuntime runtime = newRuntime(1);
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ acquireEndpoint(runtime, ctx);
+
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+
+ runtime.execute("loop", loop, ctx);
+ // immediate fail path runs synchronously; small wait is just defensive
+ Thread.sleep(50);
+
+ assertTrue(loop.lastException.get() instanceof RejectedExecutionException,
+ "Expected queue rejection to break recursion");
+ }
+
+ /**
+ * Very small fake pool that always returns the same IOSession.
+ */
+ private static final class FakeH2ConnPool extends InternalH2ConnPool {
+
+ private final IOSession session;
+
+ FakeH2ConnPool(final IOSession session) {
+ super(null, null, null);
+ this.session = session;
+ }
+
+ @Override
+ public Future getSession(
+ final HttpRoute route,
+ final Timeout timeout,
+ final FutureCallback callback) {
+ final CompletableFuture cf = new CompletableFuture<>();
+ cf.complete(session);
+ if (callback != null) {
+ callback.completed(session);
+ }
+ return cf;
+ }
+
+ }
+
+ /**
+ * IOSession that immediately fails any RequestExecutionCommand passed
+ * to enqueue(...), simulating an I/O failure that calls handler.failed(...)
+ * synchronously.
+ */
+ private static IOSession newImmediateFailSession() {
+ return (IOSession) Proxy.newProxyInstance(
+ IOSession.class.getClassLoader(),
+ new Class>[]{IOSession.class},
+ (proxy, method, args) -> {
+ final String name = method.getName();
+ if ("isOpen".equals(name)) {
+ return Boolean.TRUE;
+ }
+ if ("enqueue".equals(name)) {
+ final Command cmd = (Command) args[0];
+ if (cmd instanceof RequestExecutionCommand) {
+ ((RequestExecutionCommand) cmd).failed(new IOException("immediate failure"));
+ }
+ return null;
+ }
+ if ("close".equals(name)) {
+ return null;
+ }
+ if (method.getReturnType().isPrimitive()) {
+ if (method.getReturnType() == boolean.class) {
+ return false;
+ }
+ if (method.getReturnType() == int.class) {
+ return 0;
+ }
+ if (method.getReturnType() == long.class) {
+ return 0L;
+ }
+ }
+ return null;
+ });
+ }
+
+ private static final class NoopPushFactory implements HandlerFactory {
+ @Override
+ public AsyncPushConsumer create(final HttpRequest request, final HttpContext context) {
+ return null;
+ }
+ }
+
+ private static final class ReentrantHandler implements AsyncClientExchangeHandler {
+
+ private final InternalH2AsyncExecRuntime runtime;
+ private final HttpClientContext context;
+ final AtomicReference lastException = new AtomicReference<>();
+
+ ReentrantHandler(final InternalH2AsyncExecRuntime runtime, final HttpClientContext context) {
+ this.runtime = runtime;
+ this.context = context;
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ lastException.set(cause);
+ if (!(cause instanceof RejectedExecutionException)) {
+ runtime.execute("loop/reenter", this, context);
+ }
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeResponse(
+ final HttpResponse response,
+ final EntityDetails entityDetails,
+ final HttpContext context) {
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response, final HttpContext context) {
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) {
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) {
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) {
+ }
+
+ @Override
+ public void streamEnd(final java.util.List extends Header> trailers) {
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+ }
+
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java
new file mode 100644
index 0000000000..6aae8413af
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java
@@ -0,0 +1,564 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.client5.http.EndpointInfo;
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.config.TlsConfig;
+import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
+import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
+
+public class InternalHttpAsyncExecRuntimeQueueCapTest {
+
+ @Test
+ void testFailFastWhenQueueFull() throws Exception {
+ final FakeEndpoint endpoint = new FakeEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+ final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ manager,
+ new NoopInitiator(),
+ new NoopPushFactory(),
+ TlsConfig.DEFAULT,
+ 2,
+ new AtomicInteger()
+ );
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new FutureCallback() {
+ @Override
+ public void completed(final AsyncExecRuntime result) {
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ });
+
+ final CountDownLatch rejected = new CountDownLatch(1);
+
+ final LatchingHandler h1 = new LatchingHandler();
+ final LatchingHandler h2 = new LatchingHandler();
+ runtime.execute("r1", h1, ctx);
+ runtime.execute("r2", h2, ctx);
+
+ final LatchingHandler h3 = new LatchingHandler() {
+ @Override
+ public void failed(final Exception cause) {
+ super.failed(cause);
+ rejected.countDown();
+ }
+ };
+ runtime.execute("r3", h3, ctx);
+
+ assertTrue(rejected.await(2, TimeUnit.SECONDS), "r3 should be failed fast");
+ assertTrue(h3.failedException.get() instanceof RejectedExecutionException);
+ assertNull(h1.failedException.get());
+ assertNull(h2.failedException.get());
+ }
+
+ @Test
+ void testSlotReleasedOnTerminalSignalAllowsNext() throws Exception {
+ final FakeEndpoint endpoint = new FakeEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+ final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ manager,
+ new NoopInitiator(),
+ new NoopPushFactory(),
+ TlsConfig.DEFAULT,
+ 1,
+ new AtomicInteger()
+ );
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
+ new FutureCallback() {
+ @Override
+ public void completed(final AsyncExecRuntime result) {
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ });
+
+ final LatchingHandler h1 = new LatchingHandler();
+ runtime.execute("r1", h1, ctx);
+
+ final LatchingHandler h2 = new LatchingHandler();
+ runtime.execute("r2", h2, ctx);
+ assertTrue(h2.awaitFailed(2, TimeUnit.SECONDS));
+ assertTrue(h2.failedException.get() instanceof RejectedExecutionException);
+
+ // free the slot via releaseResources(), not failed()
+ endpoint.completeOne();
+
+ final LatchingHandler h3 = new LatchingHandler();
+ runtime.execute("r3", h3, ctx);
+ Thread.sleep(150);
+ assertNull(h3.failedException.get(), "r3 should not be rejected after slot released");
+ h3.cancel();
+ }
+
+
+ private static final class NoopInitiator implements ConnectionInitiator {
+ @Override
+ public Future connect(final NamedEndpoint endpoint,
+ final SocketAddress remoteAddress,
+ final SocketAddress localAddress,
+ final Timeout timeout,
+ final Object attachment,
+ final FutureCallback callback) {
+ final CompletableFuture cf = new CompletableFuture<>();
+ final UnsupportedOperationException ex = new UnsupportedOperationException("not used");
+ cf.completeExceptionally(ex);
+ if (callback != null) {
+ callback.failed(ex);
+ }
+ return cf;
+ }
+ }
+
+ private static final class NoopPushFactory implements HandlerFactory {
+ @Override
+ public AsyncPushConsumer create(final HttpRequest request, final HttpContext context) {
+ return null;
+ }
+ }
+
+ private static final class FakeManager implements AsyncClientConnectionManager {
+ private final AsyncConnectionEndpoint endpoint;
+
+ FakeManager(final AsyncConnectionEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public Future lease(final String id,
+ final HttpRoute route,
+ final Object state,
+ final Timeout requestTimeout,
+ final FutureCallback callback) {
+ final CompletableFuture cf = CompletableFuture.completedFuture(endpoint);
+ if (callback != null) {
+ callback.completed(endpoint);
+ }
+ return cf;
+ }
+
+ @Override
+ public Future connect(final AsyncConnectionEndpoint endpoint,
+ final ConnectionInitiator connectionInitiator,
+ final Timeout connectTimeout,
+ final Object attachment,
+ final HttpContext context,
+ final FutureCallback callback) {
+ ((FakeEndpoint) this.endpoint).connected = true;
+ final CompletableFuture cf = CompletableFuture.completedFuture(endpoint);
+ if (callback != null) {
+ callback.completed(endpoint);
+ }
+ return cf;
+ }
+
+ @Override
+ public void upgrade(final AsyncConnectionEndpoint endpoint,
+ final Object attachment,
+ final HttpContext context) {
+ }
+
+ @Override
+ public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ private static final class FakeEndpoint extends AsyncConnectionEndpoint {
+ volatile boolean connected = true;
+ private final ConcurrentLinkedQueue inFlight = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public void execute(final String id,
+ final AsyncClientExchangeHandler handler,
+ final HandlerFactory pushHandlerFactory,
+ final HttpContext context) {
+ // keep the guarded handler so tests can signal terminal events
+ inFlight.add(handler);
+ }
+
+ // helpers for tests
+ void failOne(final Exception ex) {
+ final AsyncClientExchangeHandler h = inFlight.poll();
+ if (h != null) {
+ h.failed(ex);
+ }
+ }
+
+ void cancelOne() {
+ final AsyncClientExchangeHandler h = inFlight.poll();
+ if (h != null) {
+ h.cancel();
+ }
+ }
+
+ void completeOne() {
+ final AsyncClientExchangeHandler h = inFlight.poll();
+ if (h != null) {
+ h.releaseResources();
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected;
+ }
+
+ @Override
+ public void setSocketTimeout(final Timeout timeout) {
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ connected = false;
+ }
+
+ @Override
+ public EndpointInfo getInfo() {
+ return null;
+ }
+ }
+
+
+
+ private static class LatchingHandler implements AsyncClientExchangeHandler {
+ final AtomicReference failedException = new AtomicReference<>();
+ final CountDownLatch failLatch = new CountDownLatch(1);
+
+ boolean awaitFailed(final long t, final TimeUnit u) throws InterruptedException {
+ return failLatch.await(t, u);
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response, final HttpContext context) {
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) {
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) {
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) {
+ }
+
+ @Override
+ public void streamEnd(final List extends Header> trailers) {
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ failedException.compareAndSet(null, Objects.requireNonNull(cause));
+ failLatch.countDown();
+ }
+ }
+
+ @Test
+ void testRecursiveReentryCausesSOEWithoutCap() {
+ final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+
+ final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ manager,
+ new NoopInitiator(),
+ new NoopPushFactory(),
+ TlsConfig.DEFAULT,
+ -1,
+ null // no cap, no counter
+ );
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
+ new FutureCallback() {
+ @Override
+ public void completed(final AsyncExecRuntime result) {
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ });
+
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+
+ assertThrows(StackOverflowError.class, () -> {
+ runtime.execute("loop", loop, ctx); // execute -> endpoint.execute -> failed() -> execute -> ...
+ });
+ }
+
+ @Test
+ void testCapBreaksRecursiveReentry() throws Exception {
+ final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+
+ final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ manager,
+ new NoopInitiator(),
+ new NoopPushFactory(),
+ TlsConfig.DEFAULT,
+ 1,
+ new AtomicInteger()
+ );
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
+ new FutureCallback() {
+ @Override
+ public void completed(final AsyncExecRuntime result) {
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ });
+
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+
+ // Should NOT blow the stack; the re-entrant call should be rejected.
+ runtime.execute("loop", loop, ctx);
+ // allow the immediate fail+re-submit path to run
+ Thread.sleep(50);
+
+ assertTrue(loop.lastException.get() instanceof RejectedExecutionException,
+ "Expected rejection to break the recursion");
+ }
+
+ /**
+ * Endpoint that synchronously fails any handler passed to execute().
+ */
+ private static final class ImmediateFailEndpoint extends AsyncConnectionEndpoint {
+ volatile boolean connected = true;
+
+ @Override
+ public void execute(final String id,
+ final AsyncClientExchangeHandler handler,
+ final HandlerFactory pushHandlerFactory,
+ final HttpContext context) {
+ handler.failed(new IOException("immediate failure"));
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected;
+ }
+
+ @Override
+ public void setSocketTimeout(final Timeout timeout) {
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ connected = false;
+ }
+
+ @Override
+ public EndpointInfo getInfo() {
+ return null;
+ }
+ }
+
+ private static final class ReentrantHandler implements AsyncClientExchangeHandler {
+ private final InternalHttpAsyncExecRuntime runtime;
+ private final HttpClientContext ctx;
+ final AtomicReference lastException = new AtomicReference<>();
+
+ ReentrantHandler(final InternalHttpAsyncExecRuntime runtime, final HttpClientContext ctx) {
+ this.runtime = runtime;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ lastException.set(cause);
+ // Re-enter only if this was NOT the cap rejecting us
+ if (!(cause instanceof RejectedExecutionException)) {
+ runtime.execute("loop/reenter", this, ctx);
+ }
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response, final HttpContext context) {
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) {
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) {
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) {
+ }
+
+ @Override
+ public void streamEnd(final List extends Header> trailers) {
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+ }
+
+}