From 5070428146601336504b81d9389c9b40df4c5589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=BE=E5=93=8D?= Date: Thu, 9 Oct 2025 14:02:36 +0800 Subject: [PATCH 1/3] Fix HttpClient resource leak in HTTP transports - Add support for external HttpClient instances in HttpClientStreamableHttpTransport and HttpClientSseClientTransport builders - Implement proper HttpClient resource cleanup using reflection to close SelectorManager threads - Add shouldCloseHttpClient flag to control resource management lifecycle - Prevent thread leaks caused by unclosed HttpClient instances created via HttpClient.Builder.build() - Add comprehensive tests for external HttpClient usage and resource cleanup Fixes thread accumulation issue where HttpClient-xxxx-SelectorManager threads would continuously grow, leading to memory exhaustion. This addresses the underlying JDK issue documented in JDK-8308364. Related: https://bugs.openjdk.org/browse/JDK-8308364 --- .../HttpClientSseClientTransport.java | 80 ++++++++++++++++- .../HttpClientStreamableHttpTransport.java | 85 +++++++++++++++++-- ...tpClientStreamableHttpSyncClientTests.java | 38 +++++++++ .../client/HttpSseMcpSyncClientTests.java | 36 ++++++++ .../HttpClientSseClientTransportTests.java | 2 +- 5 files changed, 230 insertions(+), 11 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index ae093316f..ccd0d7bf0 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -93,6 +93,13 @@ public class HttpClientSseClientTransport implements McpClientTransport { */ private final HttpClient httpClient; + /** + * Flag indicating whether this transport should close the HttpClient when closing + * gracefully. Set to true when the HttpClient is created internally by the builder, + * false when provided externally. + */ + private final boolean shouldCloseHttpClient; + /** HTTP request builder for building requests to send messages to the server */ private final HttpRequest.Builder requestBuilder; @@ -129,7 +136,8 @@ public class HttpClientSseClientTransport implements McpClientTransport { * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null */ HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, - String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) { + String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, + boolean shouldCloseHttpClient) { Assert.notNull(jsonMapper, "jsonMapper must not be null"); Assert.hasText(baseUri, "baseUri must not be empty"); Assert.hasText(sseEndpoint, "sseEndpoint must not be empty"); @@ -142,6 +150,7 @@ public class HttpClientSseClientTransport implements McpClientTransport { this.httpClient = httpClient; this.requestBuilder = requestBuilder; this.httpRequestCustomizer = httpRequestCustomizer; + this.shouldCloseHttpClient = shouldCloseHttpClient; } @Override @@ -169,6 +178,8 @@ public static class Builder { private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1); + private HttpClient externalHttpClient; + private McpJsonMapper jsonMapper; private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(); @@ -227,6 +238,20 @@ public Builder sseEndpoint(String sseEndpoint) { public Builder clientBuilder(HttpClient.Builder clientBuilder) { Assert.notNull(clientBuilder, "clientBuilder must not be null"); this.clientBuilder = clientBuilder; + this.externalHttpClient = null; // Clear external client if builder is set + return this; + } + + /** + * Sets an external HttpClient instance to use instead of creating a new one. When + * an external HttpClient is provided, the transport will not attempt to close it + * during graceful shutdown, leaving resource management to the caller. + * @param httpClient the HttpClient instance to use + * @return this builder + */ + public Builder httpClient(HttpClient httpClient) { + Assert.notNull(httpClient, "httpClient must not be null"); + this.externalHttpClient = httpClient; return this; } @@ -325,9 +350,23 @@ public Builder connectTimeout(Duration connectTimeout) { * @return a new transport instance */ public HttpClientSseClientTransport build() { - HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); + HttpClient httpClient; + boolean shouldCloseHttpClient; + + if (externalHttpClient != null) { + // Use external HttpClient, don't close it + httpClient = externalHttpClient; + shouldCloseHttpClient = false; + } + else { + // Create new HttpClient, should close it + httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); + shouldCloseHttpClient = true; + } + return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint, - jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer); + jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer, + shouldCloseHttpClient); } } @@ -495,7 +534,40 @@ public Mono closeGracefully() { if (subscription != null && !subscription.isDisposed()) { subscription.dispose(); } - }); + }).then(shouldCloseHttpClient ? Mono.fromRunnable(this::closeHttpClientResources) : Mono.empty()); + } + + /** + * Closes HttpClient resources including connection pools and selector threads. This + * method uses reflection to access internal HttpClient implementation details. + */ + private void closeHttpClientResources() { + try { + // Access HttpClientImpl internal fields via reflection + Class httpClientClass = httpClient.getClass(); + + // Close SelectorManager if present + try { + java.lang.reflect.Field selectorManagerField = httpClientClass.getDeclaredField("selectorManager"); + selectorManagerField.setAccessible(true); + Object selectorManager = selectorManagerField.get(httpClient); + + if (selectorManager != null) { + java.lang.reflect.Method shutdownMethod = selectorManager.getClass().getMethod("shutdown"); + shutdownMethod.invoke(selectorManager); + logger.debug("HttpClient SelectorManager shutdown completed"); + } + } + catch (NoSuchFieldException | NoSuchMethodException e) { + // Field/method might not exist in different JDK versions, continue with + // other cleanup + logger.debug("SelectorManager field/method not found, skipping: {}", e.getMessage()); + } + + } + catch (Exception e) { + logger.warn("Failed to close HttpClient resources cleanly: {}", e.getMessage()); + } } /** diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index f4505c898..72168318c 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -87,6 +87,13 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { */ private final HttpClient httpClient; + /** + * Flag indicating whether this transport should close the HttpClient when closing + * gracefully. Set to true when the HttpClient is created internally by the builder, + * false when provided externally. + */ + private final boolean shouldCloseHttpClient; + /** HTTP request builder for building requests to send messages to the server */ private final HttpRequest.Builder requestBuilder; @@ -126,7 +133,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, - boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) { + boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, + boolean shouldCloseHttpClient) { this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.requestBuilder = requestBuilder; @@ -136,6 +144,7 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h this.openConnectionOnStartup = openConnectionOnStartup; this.activeSession.set(createTransportSession()); this.httpRequestCustomizer = httpRequestCustomizer; + this.shouldCloseHttpClient = shouldCloseHttpClient; } @Override @@ -211,13 +220,48 @@ public Mono closeGracefully() { return Mono.defer(() -> { logger.debug("Graceful close triggered"); DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession()); - if (currentSession != null) { - return currentSession.closeGracefully(); + Mono sessionClose = currentSession != null ? currentSession.closeGracefully() : Mono.empty(); + + if (shouldCloseHttpClient) { + return sessionClose.then(Mono.fromRunnable(this::closeHttpClientResources)); } - return Mono.empty(); + return sessionClose; }); } + /** + * Closes HttpClient resources including connection pools and selector threads. This + * method uses reflection to access internal HttpClient implementation details. + */ + private void closeHttpClientResources() { + try { + // Access HttpClientImpl internal fields via reflection + Class httpClientClass = httpClient.getClass(); + + // Close SelectorManager if present + try { + java.lang.reflect.Field selectorManagerField = httpClientClass.getDeclaredField("selectorManager"); + selectorManagerField.setAccessible(true); + Object selectorManager = selectorManagerField.get(httpClient); + + if (selectorManager != null) { + java.lang.reflect.Method shutdownMethod = selectorManager.getClass().getMethod("shutdown"); + shutdownMethod.invoke(selectorManager); + logger.debug("HttpClient SelectorManager shutdown completed"); + } + } + catch (NoSuchFieldException | NoSuchMethodException e) { + // Field/method might not exist in different JDK versions, continue with + // other cleanup + logger.debug("SelectorManager field/method not found, skipping: {}", e.getMessage()); + } + + } + catch (Exception e) { + logger.warn("Failed to close HttpClient resources cleanly: {}", e.getMessage()); + } + } + private Mono reconnect(McpTransportStream stream) { return Mono.deferContextual(ctx -> { @@ -603,6 +647,8 @@ public static class Builder { private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1); + private HttpClient externalHttpClient; + private String endpoint = DEFAULT_ENDPOINT; private boolean resumableStreams = true; @@ -632,6 +678,20 @@ private Builder(String baseUri) { public Builder clientBuilder(HttpClient.Builder clientBuilder) { Assert.notNull(clientBuilder, "clientBuilder must not be null"); this.clientBuilder = clientBuilder; + this.externalHttpClient = null; // Clear external client if builder is set + return this; + } + + /** + * Sets an external HttpClient instance to use instead of creating a new one. When + * an external HttpClient is provided, the transport will not attempt to close it + * during graceful shutdown, leaving resource management to the caller. + * @param httpClient the HttpClient instance to use + * @return this builder + */ + public Builder httpClient(HttpClient httpClient) { + Assert.notNull(httpClient, "httpClient must not be null"); + this.externalHttpClient = httpClient; return this; } @@ -769,10 +829,23 @@ public Builder connectTimeout(Duration connectTimeout) { * @return a new instance of {@link HttpClientStreamableHttpTransport} */ public HttpClientStreamableHttpTransport build() { - HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); + HttpClient httpClient; + boolean shouldCloseHttpClient; + + if (externalHttpClient != null) { + // Use external HttpClient, don't close it + httpClient = externalHttpClient; + shouldCloseHttpClient = false; + } + else { + // Create new HttpClient, should close it + httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); + shouldCloseHttpClient = true; + } + return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup, - httpRequestCustomizer); + httpRequestCustomizer, shouldCloseHttpClient); } } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java index d59ae35b4..f90c606ba 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java @@ -5,6 +5,8 @@ package io.modelcontextprotocol.client; import java.net.URI; +import java.net.http.HttpClient; +import java.time.Duration; import java.util.Map; import org.junit.jupiter.api.AfterAll; @@ -19,6 +21,7 @@ import io.modelcontextprotocol.common.McpTransportContext; import io.modelcontextprotocol.spec.McpClientTransport; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; @@ -70,4 +73,39 @@ void customizesRequests() { }); } + @Test + void supportsExternalHttpClient() { + // Create an external HttpClient + HttpClient externalHttpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + + // Create transport with external HttpClient + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host) + .httpClient(externalHttpClient) + .build(); + + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + // Test should complete without errors + }); + + // External HttpClient should still be usable after transport closes + // (This is a basic test - in practice you'd verify the client is still + // functional) + assertThat(externalHttpClient).isNotNull(); + } + + @Test + void closesInternalHttpClientGracefully() { + // Create transport with internal HttpClient (default behavior) + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).build(); + + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + // Test should complete and close gracefully + }); + + // This test verifies that internal HttpClient resources are cleaned up + // The actual verification happens during the graceful close process + } + } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java index 483d38669..e6ac52fa4 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java @@ -5,6 +5,8 @@ package io.modelcontextprotocol.client; import java.net.URI; +import java.net.http.HttpClient; +import java.time.Duration; import java.util.Map; import org.junit.jupiter.api.AfterAll; @@ -19,6 +21,7 @@ import io.modelcontextprotocol.common.McpTransportContext; import io.modelcontextprotocol.spec.McpClientTransport; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -75,4 +78,37 @@ void customizesRequests() { }); } + @Test + void supportsExternalHttpClient() { + // Create an external HttpClient + HttpClient externalHttpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + + // Create transport with external HttpClient + McpClientTransport transport = HttpClientSseClientTransport.builder(host) + .httpClient(externalHttpClient) + .build(); + + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + // Test should complete without errors + }); + + // External HttpClient should still be usable after transport closes + assertThat(externalHttpClient).isNotNull(); + } + + @Test + void closesInternalHttpClientGracefully() { + // Create transport with internal HttpClient (default behavior) + McpClientTransport transport = HttpClientSseClientTransport.builder(host).build(); + + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + // Test should complete and close gracefully + }); + + // This test verifies that internal HttpClient resources are cleaned up + // The actual verification happens during the graceful close process + } + } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java index c5c365798..8dbf344f6 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java @@ -78,7 +78,7 @@ static class TestHttpClientSseClientTransport extends HttpClientSseClientTranspo public TestHttpClientSseClientTransport(final String baseUri) { super(HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(), HttpRequest.newBuilder().header("Content-Type", "application/json"), baseUri, "/sse", JSON_MAPPER, - McpAsyncHttpClientRequestCustomizer.NOOP); + McpAsyncHttpClientRequestCustomizer.NOOP, true); } public int getInboundMessageCount() { From c6bfd09411648176798ea6e31ca811c750367822 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=BE=E5=93=8D?= Date: Tue, 28 Oct 2025 21:22:57 +0800 Subject: [PATCH 2/3] fix: Fix HttpClient resource leak in SelectorManager threads (#610) Replace shouldCloseHttpClient boolean with Consumer pattern. - Remove .clientBuilder, .customizeClient, .connectTimeout methods - Add .onCloseClient(Consumer) with reflection cleanup default - Replace boolean flag with Consumer pattern in constructors - Use sun.misc.Unsafe to bypass JDK module restrictions - Support both JDK 21+ close() and JDK 17 SelectorManager reflection - Update tests with proper HTTP request validation --- .../HttpClientSseClientTransport.java | 123 +++++++++-------- .../HttpClientStreamableHttpTransport.java | 124 +++++++++--------- ...tpClientStreamableHttpSyncClientTests.java | 91 ++++++++++--- .../HttpClientSseClientTransportTests.java | 48 +------ 4 files changed, 200 insertions(+), 186 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index ccd0d7bf0..5e1b15462 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -5,6 +5,8 @@ package io.modelcontextprotocol.client.transport; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -36,6 +38,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import sun.misc.Unsafe; /** * Server-Sent Events (SSE) implementation of the @@ -93,13 +96,6 @@ public class HttpClientSseClientTransport implements McpClientTransport { */ private final HttpClient httpClient; - /** - * Flag indicating whether this transport should close the HttpClient when closing - * gracefully. Set to true when the HttpClient is created internally by the builder, - * false when provided externally. - */ - private final boolean shouldCloseHttpClient; - /** HTTP request builder for building requests to send messages to the server */ private final HttpRequest.Builder requestBuilder; @@ -123,6 +119,12 @@ public class HttpClientSseClientTransport implements McpClientTransport { */ private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer; + /** + * Consumer to handle HttpClient closure. If null, no cleanup is performed (external + * HttpClient). + */ + private final Consumer onCloseClient; + /** * Creates a new transport instance with custom HTTP client builder, object mapper, * and headers. @@ -137,7 +139,7 @@ public class HttpClientSseClientTransport implements McpClientTransport { */ HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, - boolean shouldCloseHttpClient) { + Consumer onCloseClient) { Assert.notNull(jsonMapper, "jsonMapper must not be null"); Assert.hasText(baseUri, "baseUri must not be empty"); Assert.hasText(sseEndpoint, "sseEndpoint must not be empty"); @@ -150,7 +152,7 @@ public class HttpClientSseClientTransport implements McpClientTransport { this.httpClient = httpClient; this.requestBuilder = requestBuilder; this.httpRequestCustomizer = httpRequestCustomizer; - this.shouldCloseHttpClient = shouldCloseHttpClient; + this.onCloseClient = onCloseClient; } @Override @@ -188,6 +190,8 @@ public static class Builder { private Duration connectTimeout = Duration.ofSeconds(10); + private Consumer onCloseClient; + /** * Creates a new builder instance. */ @@ -230,18 +234,6 @@ public Builder sseEndpoint(String sseEndpoint) { return this; } - /** - * Sets the HTTP client builder. - * @param clientBuilder the HTTP client builder - * @return this builder - */ - public Builder clientBuilder(HttpClient.Builder clientBuilder) { - Assert.notNull(clientBuilder, "clientBuilder must not be null"); - this.clientBuilder = clientBuilder; - this.externalHttpClient = null; // Clear external client if builder is set - return this; - } - /** * Sets an external HttpClient instance to use instead of creating a new one. When * an external HttpClient is provided, the transport will not attempt to close it @@ -255,17 +247,6 @@ public Builder httpClient(HttpClient httpClient) { return this; } - /** - * Customizes the HTTP client builder. - * @param clientCustomizer the consumer to customize the HTTP client builder - * @return this builder - */ - public Builder customizeClient(final Consumer clientCustomizer) { - Assert.notNull(clientCustomizer, "clientCustomizer must not be null"); - clientCustomizer.accept(clientBuilder); - return this; - } - /** * Sets the HTTP request builder. * @param requestBuilder the HTTP request builder @@ -335,13 +316,13 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as } /** - * Sets the connection timeout for the HTTP client. - * @param connectTimeout the connection timeout duration + * Sets a custom consumer to handle HttpClient closure when the transport is + * closed. + * @param onCloseClient the consumer to handle HttpClient closure * @return this builder */ - public Builder connectTimeout(Duration connectTimeout) { - Assert.notNull(connectTimeout, "connectTimeout must not be null"); - this.connectTimeout = connectTimeout; + public Builder onCloseClient(Consumer onCloseClient) { + this.onCloseClient = onCloseClient; return this; } @@ -351,22 +332,22 @@ public Builder connectTimeout(Duration connectTimeout) { */ public HttpClientSseClientTransport build() { HttpClient httpClient; - boolean shouldCloseHttpClient; + Consumer closeHandler; if (externalHttpClient != null) { - // Use external HttpClient, don't close it + // Use external HttpClient, use custom close handler or no-op httpClient = externalHttpClient; - shouldCloseHttpClient = false; + closeHandler = onCloseClient; // null means no cleanup } else { - // Create new HttpClient, should close it + // Create new HttpClient, use custom close handler or default cleanup httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); - shouldCloseHttpClient = true; + closeHandler = onCloseClient != null ? onCloseClient + : HttpClientSseClientTransport::closeHttpClientResourcesStatic; } return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint, - jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer, - shouldCloseHttpClient); + jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer, closeHandler); } } @@ -534,34 +515,52 @@ public Mono closeGracefully() { if (subscription != null && !subscription.isDisposed()) { subscription.dispose(); } - }).then(shouldCloseHttpClient ? Mono.fromRunnable(this::closeHttpClientResources) : Mono.empty()); + }).then(onCloseClient != null ? Mono.fromRunnable(() -> onCloseClient.accept(httpClient)) : Mono.empty()); } /** - * Closes HttpClient resources including connection pools and selector threads. This - * method uses reflection to access internal HttpClient implementation details. + * Static method to close HttpClient resources using reflection. */ - private void closeHttpClientResources() { + private static void closeHttpClientResourcesStatic(HttpClient httpClient) { try { - // Access HttpClientImpl internal fields via reflection - Class httpClientClass = httpClient.getClass(); + // unsafe + Class UnsafeClass = Class.forName("sun.misc.Unsafe"); + Field unsafeField = UnsafeClass.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + Unsafe unsafe = (Unsafe) unsafeField.get(null); + Module ObjectModule = Object.class.getModule(); + Class currentClass = HttpClientSseClientTransport.class; + long addr = unsafe.objectFieldOffset(Class.class.getDeclaredField("module")); + unsafe.getAndSetObject(currentClass, addr, ObjectModule); - // Close SelectorManager if present try { - java.lang.reflect.Field selectorManagerField = httpClientClass.getDeclaredField("selectorManager"); - selectorManagerField.setAccessible(true); - Object selectorManager = selectorManagerField.get(httpClient); - - if (selectorManager != null) { - java.lang.reflect.Method shutdownMethod = selectorManager.getClass().getMethod("shutdown"); - shutdownMethod.invoke(selectorManager); - logger.debug("HttpClient SelectorManager shutdown completed"); + Method closeMethod = httpClient.getClass().getMethod("close"); + closeMethod.invoke(httpClient); + logger.debug("Successfully used JDK 21+ close() method to close HttpClient"); + return; + } + catch (NoSuchMethodException e) { + logger.debug("JDK 21+ close() method not available, falling back to internal reflection"); + } + // This prevents the accumulation of HttpClient-xxx-SelectorManager threads + try { + java.lang.reflect.Field implField = httpClient.getClass().getDeclaredField("impl"); + implField.setAccessible(true); + Object implObj = implField.get(httpClient); + java.lang.reflect.Field selmgrField = implObj.getClass().getDeclaredField("selmgr"); + selmgrField.setAccessible(true); + Object selmgrObj = selmgrField.get(implObj); + + if (selmgrObj != null) { + Method shutDownMethod = selmgrObj.getClass().getDeclaredMethod("shutdown"); + shutDownMethod.setAccessible(true); + shutDownMethod.invoke(selmgrObj); + logger.debug("HttpClient SelectorManager shutdown completed via reflection"); } } catch (NoSuchFieldException | NoSuchMethodException e) { - // Field/method might not exist in different JDK versions, continue with - // other cleanup - logger.debug("SelectorManager field/method not found, skipping: {}", e.getMessage()); + // Field/method structure might differ across JDK versions + logger.debug("SelectorManager field/method not found, skipping internal cleanup: {}", e.getMessage()); } } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 72168318c..148bd1f43 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -5,6 +5,8 @@ package io.modelcontextprotocol.client.transport; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -47,6 +49,7 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; +import sun.misc.Unsafe; /** * An implementation of the Streamable HTTP protocol as defined by the @@ -87,13 +90,6 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { */ private final HttpClient httpClient; - /** - * Flag indicating whether this transport should close the HttpClient when closing - * gracefully. Set to true when the HttpClient is created internally by the builder, - * false when provided externally. - */ - private final boolean shouldCloseHttpClient; - /** HTTP request builder for building requests to send messages to the server */ private final HttpRequest.Builder requestBuilder; @@ -131,10 +127,16 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { private final AtomicReference> exceptionHandler = new AtomicReference<>(); + /** + * Consumer to handle HttpClient closure. If null, no cleanup is performed (external + * HttpClient). + */ + private final Consumer onCloseClient; + private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, - boolean shouldCloseHttpClient) { + Consumer onCloseClient) { this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.requestBuilder = requestBuilder; @@ -144,7 +146,7 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h this.openConnectionOnStartup = openConnectionOnStartup; this.activeSession.set(createTransportSession()); this.httpRequestCustomizer = httpRequestCustomizer; - this.shouldCloseHttpClient = shouldCloseHttpClient; + this.onCloseClient = onCloseClient; } @Override @@ -222,38 +224,56 @@ public Mono closeGracefully() { DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession()); Mono sessionClose = currentSession != null ? currentSession.closeGracefully() : Mono.empty(); - if (shouldCloseHttpClient) { - return sessionClose.then(Mono.fromRunnable(this::closeHttpClientResources)); + if (onCloseClient != null) { + return sessionClose.then(Mono.fromRunnable(() -> onCloseClient.accept(httpClient))); } return sessionClose; }); } /** - * Closes HttpClient resources including connection pools and selector threads. This - * method uses reflection to access internal HttpClient implementation details. + * Static method to close HttpClient resources using reflection. */ - private void closeHttpClientResources() { + private static void closeHttpClientResourcesStatic(HttpClient httpClient) { try { - // Access HttpClientImpl internal fields via reflection - Class httpClientClass = httpClient.getClass(); + // unsafe + Class UnsafeClass = Class.forName("sun.misc.Unsafe"); + Field unsafeField = UnsafeClass.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + Unsafe unsafe = (Unsafe) unsafeField.get(null); + Module ObjectModule = Object.class.getModule(); + Class currentClass = HttpClientStreamableHttpTransport.class; + long addr = unsafe.objectFieldOffset(Class.class.getDeclaredField("module")); + unsafe.getAndSetObject(currentClass, addr, ObjectModule); - // Close SelectorManager if present try { - java.lang.reflect.Field selectorManagerField = httpClientClass.getDeclaredField("selectorManager"); - selectorManagerField.setAccessible(true); - Object selectorManager = selectorManagerField.get(httpClient); - - if (selectorManager != null) { - java.lang.reflect.Method shutdownMethod = selectorManager.getClass().getMethod("shutdown"); - shutdownMethod.invoke(selectorManager); - logger.debug("HttpClient SelectorManager shutdown completed"); + Method closeMethod = httpClient.getClass().getMethod("close"); + closeMethod.invoke(httpClient); + logger.debug("Successfully used JDK 21+ close() method to close HttpClient"); + return; + } + catch (NoSuchMethodException e) { + logger.debug("JDK 21+ close() method not available, falling back to internal reflection"); + } + // This prevents the accumulation of HttpClient-xxx-SelectorManager threads + try { + java.lang.reflect.Field implField = httpClient.getClass().getDeclaredField("impl"); + implField.setAccessible(true); + Object implObj = implField.get(httpClient); + java.lang.reflect.Field selmgrField = implObj.getClass().getDeclaredField("selmgr"); + selmgrField.setAccessible(true); + Object selmgrObj = selmgrField.get(implObj); + + if (selmgrObj != null) { + Method shutDownMethod = selmgrObj.getClass().getDeclaredMethod("shutdown"); + shutDownMethod.setAccessible(true); + shutDownMethod.invoke(selmgrObj); + logger.debug("HttpClient SelectorManager shutdown completed via reflection"); } } catch (NoSuchFieldException | NoSuchMethodException e) { - // Field/method might not exist in different JDK versions, continue with - // other cleanup - logger.debug("SelectorManager field/method not found, skipping: {}", e.getMessage()); + // Field/method structure might differ across JDK versions + logger.debug("SelectorManager field/method not found, skipping internal cleanup: {}", e.getMessage()); } } @@ -661,6 +681,8 @@ public static class Builder { private Duration connectTimeout = Duration.ofSeconds(10); + private Consumer onCloseClient; + /** * Creates a new builder with the specified base URI. * @param baseUri the base URI of the MCP server @@ -670,18 +692,6 @@ private Builder(String baseUri) { this.baseUri = baseUri; } - /** - * Sets the HTTP client builder. - * @param clientBuilder the HTTP client builder - * @return this builder - */ - public Builder clientBuilder(HttpClient.Builder clientBuilder) { - Assert.notNull(clientBuilder, "clientBuilder must not be null"); - this.clientBuilder = clientBuilder; - this.externalHttpClient = null; // Clear external client if builder is set - return this; - } - /** * Sets an external HttpClient instance to use instead of creating a new one. When * an external HttpClient is provided, the transport will not attempt to close it @@ -695,17 +705,6 @@ public Builder httpClient(HttpClient httpClient) { return this; } - /** - * Customizes the HTTP client builder. - * @param clientCustomizer the consumer to customize the HTTP client builder - * @return this builder - */ - public Builder customizeClient(final Consumer clientCustomizer) { - Assert.notNull(clientCustomizer, "clientCustomizer must not be null"); - clientCustomizer.accept(clientBuilder); - return this; - } - /** * Sets the HTTP request builder. * @param requestBuilder the HTTP request builder @@ -813,13 +812,13 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as } /** - * Sets the connection timeout for the HTTP client. - * @param connectTimeout the connection timeout duration + * Sets a custom consumer to handle HttpClient closure when the transport is + * closed. + * @param onCloseClient the consumer to handle HttpClient closure * @return this builder */ - public Builder connectTimeout(Duration connectTimeout) { - Assert.notNull(connectTimeout, "connectTimeout must not be null"); - this.connectTimeout = connectTimeout; + public Builder onCloseClient(Consumer onCloseClient) { + this.onCloseClient = onCloseClient; return this; } @@ -830,22 +829,23 @@ public Builder connectTimeout(Duration connectTimeout) { */ public HttpClientStreamableHttpTransport build() { HttpClient httpClient; - boolean shouldCloseHttpClient; + Consumer closeHandler; if (externalHttpClient != null) { - // Use external HttpClient, don't close it + // Use external HttpClient, use custom close handler or no-op httpClient = externalHttpClient; - shouldCloseHttpClient = false; + closeHandler = onCloseClient; // null means no cleanup } else { - // Create new HttpClient, should close it + // Create new HttpClient, use custom close handler or default cleanup httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); - shouldCloseHttpClient = true; + closeHandler = onCloseClient != null ? onCloseClient + : HttpClientStreamableHttpTransport::closeHttpClientResourcesStatic; } return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup, - httpRequestCustomizer, shouldCloseHttpClient); + httpRequestCustomizer, closeHandler); } } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java index f90c606ba..e1c6f15c2 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java @@ -6,8 +6,12 @@ import java.net.URI; import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.time.Duration; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -74,38 +78,95 @@ void customizesRequests() { } @Test - void supportsExternalHttpClient() { - // Create an external HttpClient + void supportsExternalHttpClient() throws Exception { + // Create an external HttpClient that we manage ourselves HttpClient externalHttpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); - // Create transport with external HttpClient + // Create transport with external HttpClient - should NOT close it when transport + // closes McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host) .httpClient(externalHttpClient) .build(); + // Test MCP operations complete successfully with external HttpClient withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { mcpSyncClient.initialize(); - // Test should complete without errors + + // Perform actual MCP operations to verify functionality + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + // Test should complete without errors - external HttpClient works normally }); - // External HttpClient should still be usable after transport closes - // (This is a basic test - in practice you'd verify the client is still - // functional) - assertThat(externalHttpClient).isNotNull(); + // Critical test: Verify external HttpClient is still functional after transport + // closes + // This proves the transport didn't close our external HttpClient + HttpRequest testRequest = HttpRequest.newBuilder() + .uri(URI.create(host + "/")) + .timeout(Duration.ofSeconds(5)) + .build(); + + HttpResponse response = externalHttpClient.send(testRequest, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode()).isEqualTo(404); // MCP server returns 404 for + // root path + // The key point is that we can still make requests - the HttpClient is functional + + // Clean up: We are responsible for closing external HttpClient + // (In real applications, this would be done in application shutdown) } @Test - void closesInternalHttpClientGracefully() { - // Create transport with internal HttpClient (default behavior) - McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).build(); - + void closesInternalHttpClientGracefully() throws Exception { + // Create a custom onCloseClient handler to verify graceful shutdown + AtomicBoolean closeHandlerCalled = new AtomicBoolean(false); + AtomicReference capturedHttpClient = new AtomicReference<>(); + AtomicBoolean httpClientWasFunctional = new AtomicBoolean(false); + + // Create transport with custom close handler that verifies HttpClient state + // before cleanup + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).onCloseClient(httpClient -> { + closeHandlerCalled.set(true); + capturedHttpClient.set(httpClient); + + // Verify HttpClient is still functional before we clean it up + try { + HttpRequest testRequest = HttpRequest.newBuilder() + .uri(URI.create(host + "/")) + .timeout(Duration.ofSeconds(5)) + .build(); + HttpResponse response = httpClient.send(testRequest, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() == 404) { // MCP server returns 404 for root + // path + httpClientWasFunctional.set(true); + } + } + catch (Exception e) { + throw new RuntimeException("HttpClient should be functional before cleanup", e); + } + + // Here we could perform custom cleanup logic + // For example: close connection pools, shutdown executors, etc. + }).build(); + + // Test MCP operations and graceful shutdown withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { mcpSyncClient.initialize(); - // Test should complete and close gracefully + + // Perform MCP operations to ensure transport works normally + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + + // Test should complete and close gracefully - custom close handler will be + // invoked }); - // This test verifies that internal HttpClient resources are cleaned up - // The actual verification happens during the graceful close process + // Verify graceful shutdown behavior + assertThat(closeHandlerCalled.get()).isTrue(); + assertThat(capturedHttpClient.get()).isNotNull(); + assertThat(httpClientWasFunctional.get()).isTrue(); + + // At this point, the custom close handler has been called and + // the HttpClient has been properly cleaned up according to our custom logic } } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java index 8dbf344f6..10a6b349a 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java @@ -78,7 +78,7 @@ static class TestHttpClientSseClientTransport extends HttpClientSseClientTranspo public TestHttpClientSseClientTransport(final String baseUri) { super(HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(), HttpRequest.newBuilder().header("Content-Type", "application/json"), baseUri, "/sse", JSON_MAPPER, - McpAsyncHttpClientRequestCustomizer.NOOP, true); + McpAsyncHttpClientRequestCustomizer.NOOP, null); } public int getInboundMessageCount() { @@ -313,26 +313,6 @@ void testMessageOrderPreservation() { assertThat(transport.getInboundMessageCount()).isEqualTo(3); } - @Test - void testCustomizeClient() { - // Create an atomic boolean to verify the customizer was called - AtomicBoolean customizerCalled = new AtomicBoolean(false); - - // Create a transport with the customizer - HttpClientSseClientTransport customizedTransport = HttpClientSseClientTransport.builder(host) - .customizeClient(builder -> { - builder.version(HttpClient.Version.HTTP_2); - customizerCalled.set(true); - }) - .build(); - - // Verify the customizer was called - assertThat(customizerCalled.get()).isTrue(); - - // Clean up - customizedTransport.closeGracefully().block(); - } - @Test void testCustomizeRequest() { // Create an atomic boolean to verify the customizer was called @@ -367,32 +347,6 @@ void testCustomizeRequest() { customizedTransport.closeGracefully().block(); } - @Test - void testChainedCustomizations() { - // Create atomic booleans to verify both customizers were called - AtomicBoolean clientCustomizerCalled = new AtomicBoolean(false); - AtomicBoolean requestCustomizerCalled = new AtomicBoolean(false); - - // Create a transport with both customizers chained - HttpClientSseClientTransport customizedTransport = HttpClientSseClientTransport.builder(host) - .customizeClient(builder -> { - builder.connectTimeout(Duration.ofSeconds(30)); - clientCustomizerCalled.set(true); - }) - .customizeRequest(builder -> { - builder.header("X-Api-Key", "test-api-key"); - requestCustomizerCalled.set(true); - }) - .build(); - - // Verify both customizers were called - assertThat(clientCustomizerCalled.get()).isTrue(); - assertThat(requestCustomizerCalled.get()).isTrue(); - - // Clean up - customizedTransport.closeGracefully().block(); - } - @Test void testRequestCustomizer() { var mockCustomizer = mock(McpSyncHttpClientRequestCustomizer.class); From 150b758b4ba1ef7c64e6e34b0b630d67aef37305 Mon Sep 17 00:00:00 2001 From: TsengX Date: Mon, 1 Dec 2025 00:31:17 +0800 Subject: [PATCH 3/3] Fix HttpClient resource leak with ExecutorService-based cleanup Implement automatic ExecutorService shutdown for internal HttpClient and add support for external HttpClient injection to prevent thread accumulation. Related to #610 --- .../HttpClientSseClientTransport.java | 126 ++++++++------ .../HttpClientStreamableHttpTransport.java | 136 ++++++++------- ...tpClientStreamableHttpSyncClientTests.java | 159 ++++++++++++++---- .../client/HttpSseMcpSyncClientTests.java | 144 +++++++++++++++- .../HttpClientSseClientTransportTests.java | 3 +- 5 files changed, 409 insertions(+), 159 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index 5e1b15462..f63b7e007 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -5,8 +5,6 @@ package io.modelcontextprotocol.client.transport; import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -14,6 +12,9 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -38,7 +39,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import sun.misc.Unsafe; /** * Server-Sent Events (SSE) implementation of the @@ -146,6 +146,7 @@ public class HttpClientSseClientTransport implements McpClientTransport { Assert.notNull(httpClient, "httpClient must not be null"); Assert.notNull(requestBuilder, "requestBuilder must not be null"); Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null"); + Assert.notNull(onCloseClient, "onCloseClient must not be null"); this.baseUri = URI.create(baseUri); this.sseEndpoint = sseEndpoint; this.jsonMapper = jsonMapper; @@ -178,8 +179,6 @@ public static class Builder { private String sseEndpoint = DEFAULT_SSE_ENDPOINT; - private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1); - private HttpClient externalHttpClient; private McpJsonMapper jsonMapper; @@ -190,7 +189,8 @@ public static class Builder { private Duration connectTimeout = Duration.ofSeconds(10); - private Consumer onCloseClient; + private Consumer onCloseClient = (HttpClient client) -> { + }; /** * Creates a new builder instance. @@ -235,13 +235,17 @@ public Builder sseEndpoint(String sseEndpoint) { } /** - * Sets an external HttpClient instance to use instead of creating a new one. When - * an external HttpClient is provided, the transport will not attempt to close it - * during graceful shutdown, leaving resource management to the caller. + * Provides an external HttpClient instance to use instead of creating a new one. + * When an external HttpClient is provided, the transport will not attempt to + * close it during graceful shutdown, leaving resource management to the caller. + *

+ * Use this method when you want to share a single HttpClient instance across + * multiple transports or when you need fine-grained control over HttpClient + * lifecycle. * @param httpClient the HttpClient instance to use * @return this builder */ - public Builder httpClient(HttpClient httpClient) { + public Builder withExternalHttpClient(HttpClient httpClient) { Assert.notNull(httpClient, "httpClient must not be null"); this.externalHttpClient = httpClient; return this; @@ -317,11 +321,15 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as /** * Sets a custom consumer to handle HttpClient closure when the transport is - * closed. + * closed. This allows for custom cleanup logic beyond the default behavior. + *

+ * Note: This is typically used for advanced use cases. The default behavior + * (shutting down the internal ExecutorService) is sufficient for most scenarios. * @param onCloseClient the consumer to handle HttpClient closure * @return this builder */ - public Builder onCloseClient(Consumer onCloseClient) { + public Builder onHttpClientClose(Consumer onCloseClient) { + Assert.notNull(onCloseClient, "onCloseClient must not be null"); this.onCloseClient = onCloseClient; return this; } @@ -337,13 +345,29 @@ public HttpClientSseClientTransport build() { if (externalHttpClient != null) { // Use external HttpClient, use custom close handler or no-op httpClient = externalHttpClient; - closeHandler = onCloseClient; // null means no cleanup + closeHandler = onCloseClient; } else { - // Create new HttpClient, use custom close handler or default cleanup - httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); - closeHandler = onCloseClient != null ? onCloseClient - : HttpClientSseClientTransport::closeHttpClientResourcesStatic; + // Create internal HttpClient with custom ExecutorService + // Create a custom ExecutorService with meaningful thread names + ExecutorService internalExecutor = Executors.newCachedThreadPool(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("MCP-HttpClient-" + thread.getId()); + thread.setDaemon(true); + return thread; + }); + + httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(this.connectTimeout) + .executor(internalExecutor) + .build(); + + // Combine default cleanup (shutdown executor) with custom handler if + // provided + closeHandler = (client) -> shutdownHttpClientExecutor(internalExecutor); + closeHandler = closeHandler.andThen(onCloseClient); + } return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint, @@ -519,53 +543,43 @@ public Mono closeGracefully() { } /** - * Static method to close HttpClient resources using reflection. + * Closes HttpClient resources by shutting down its associated ExecutorService. This + * allows the GC to reclaim HttpClient-related threads (including SelectorManager) on + * the next garbage collection cycle. + *

+ * This approach avoids using reflection, Unsafe, or Java 21+ specific APIs, making it + * compatible with Java 17+. + * @param executor the ExecutorService to shutdown */ - private static void closeHttpClientResourcesStatic(HttpClient httpClient) { + private static void shutdownHttpClientExecutor(ExecutorService executor) { + if (executor == null) { + return; + } + try { - // unsafe - Class UnsafeClass = Class.forName("sun.misc.Unsafe"); - Field unsafeField = UnsafeClass.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - Unsafe unsafe = (Unsafe) unsafeField.get(null); - Module ObjectModule = Object.class.getModule(); - Class currentClass = HttpClientSseClientTransport.class; - long addr = unsafe.objectFieldOffset(Class.class.getDeclaredField("module")); - unsafe.getAndSetObject(currentClass, addr, ObjectModule); + logger.debug("Shutting down HttpClient ExecutorService"); + executor.shutdown(); - try { - Method closeMethod = httpClient.getClass().getMethod("close"); - closeMethod.invoke(httpClient); - logger.debug("Successfully used JDK 21+ close() method to close HttpClient"); - return; - } - catch (NoSuchMethodException e) { - logger.debug("JDK 21+ close() method not available, falling back to internal reflection"); - } - // This prevents the accumulation of HttpClient-xxx-SelectorManager threads - try { - java.lang.reflect.Field implField = httpClient.getClass().getDeclaredField("impl"); - implField.setAccessible(true); - Object implObj = implField.get(httpClient); - java.lang.reflect.Field selmgrField = implObj.getClass().getDeclaredField("selmgr"); - selmgrField.setAccessible(true); - Object selmgrObj = selmgrField.get(implObj); - - if (selmgrObj != null) { - Method shutDownMethod = selmgrObj.getClass().getDeclaredMethod("shutdown"); - shutDownMethod.setAccessible(true); - shutDownMethod.invoke(selmgrObj); - logger.debug("HttpClient SelectorManager shutdown completed via reflection"); + // Wait for graceful shutdown + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + logger.debug("ExecutorService did not terminate in time, forcing shutdown"); + executor.shutdownNow(); + + // Wait a bit more after forced shutdown + if (!executor.awaitTermination(2, TimeUnit.SECONDS)) { + logger.warn("ExecutorService did not terminate even after shutdownNow()"); } } - catch (NoSuchFieldException | NoSuchMethodException e) { - // Field/method structure might differ across JDK versions - logger.debug("SelectorManager field/method not found, skipping internal cleanup: {}", e.getMessage()); - } + logger.debug("HttpClient ExecutorService shutdown completed"); + } + catch (InterruptedException e) { + logger.warn("Interrupted while shutting down HttpClient ExecutorService"); + executor.shutdownNow(); + Thread.currentThread().interrupt(); } catch (Exception e) { - logger.warn("Failed to close HttpClient resources cleanly: {}", e.getMessage()); + logger.warn("Failed to shutdown HttpClient ExecutorService cleanly: {}", e.getMessage()); } } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 29a88befa..f90716a6d 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -5,8 +5,6 @@ package io.modelcontextprotocol.client.transport; import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -16,6 +14,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -50,7 +51,6 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; -import sun.misc.Unsafe; /** * An implementation of the Streamable HTTP protocol as defined by the @@ -138,6 +138,7 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, Consumer onCloseClient) { + Assert.notNull(onCloseClient, "onCloseClient must not be null"); this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.requestBuilder = requestBuilder; @@ -230,67 +231,56 @@ private void handleException(Throwable t) { public Mono closeGracefully() { return Mono.defer(() -> { logger.debug("Graceful close triggered"); - DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession()); - Mono sessionClose = currentSession != null ? currentSession.closeGracefully() : Mono.empty(); + McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession); + Mono sessionClose = currentSession != null ? Mono.from(currentSession.closeGracefully()) + : Mono.empty(); + // First close the session, then handle HttpClient cleanup if (onCloseClient != null) { return sessionClose.then(Mono.fromRunnable(() -> onCloseClient.accept(httpClient))); - McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession); - if (currentSession != null) { - return Mono.from(currentSession.closeGracefully()); } return sessionClose; }); } /** - * Static method to close HttpClient resources using reflection. + * Closes HttpClient resources by shutting down its associated ExecutorService. This + * allows the GC to reclaim HttpClient-related threads (including SelectorManager) on + * the next garbage collection cycle. + *

+ * This approach avoids using reflection, Unsafe, or Java 21+ specific APIs, making it + * compatible with Java 17+. + * @param executor the ExecutorService to shutdown */ - private static void closeHttpClientResourcesStatic(HttpClient httpClient) { + private static void shutdownHttpClientExecutor(ExecutorService executor) { + if (executor == null) { + return; + } + try { - // unsafe - Class UnsafeClass = Class.forName("sun.misc.Unsafe"); - Field unsafeField = UnsafeClass.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - Unsafe unsafe = (Unsafe) unsafeField.get(null); - Module ObjectModule = Object.class.getModule(); - Class currentClass = HttpClientStreamableHttpTransport.class; - long addr = unsafe.objectFieldOffset(Class.class.getDeclaredField("module")); - unsafe.getAndSetObject(currentClass, addr, ObjectModule); - - try { - Method closeMethod = httpClient.getClass().getMethod("close"); - closeMethod.invoke(httpClient); - logger.debug("Successfully used JDK 21+ close() method to close HttpClient"); - return; - } - catch (NoSuchMethodException e) { - logger.debug("JDK 21+ close() method not available, falling back to internal reflection"); - } - // This prevents the accumulation of HttpClient-xxx-SelectorManager threads - try { - java.lang.reflect.Field implField = httpClient.getClass().getDeclaredField("impl"); - implField.setAccessible(true); - Object implObj = implField.get(httpClient); - java.lang.reflect.Field selmgrField = implObj.getClass().getDeclaredField("selmgr"); - selmgrField.setAccessible(true); - Object selmgrObj = selmgrField.get(implObj); - - if (selmgrObj != null) { - Method shutDownMethod = selmgrObj.getClass().getDeclaredMethod("shutdown"); - shutDownMethod.setAccessible(true); - shutDownMethod.invoke(selmgrObj); - logger.debug("HttpClient SelectorManager shutdown completed via reflection"); + logger.debug("Shutting down HttpClient ExecutorService"); + executor.shutdown(); + + // Wait for graceful shutdown + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + logger.debug("ExecutorService did not terminate in time, forcing shutdown"); + executor.shutdownNow(); + + // Wait a bit more after forced shutdown + if (!executor.awaitTermination(2, TimeUnit.SECONDS)) { + logger.warn("ExecutorService did not terminate even after shutdownNow()"); } } - catch (NoSuchFieldException | NoSuchMethodException e) { - // Field/method structure might differ across JDK versions - logger.debug("SelectorManager field/method not found, skipping internal cleanup: {}", e.getMessage()); - } + logger.debug("HttpClient ExecutorService shutdown completed"); + } + catch (InterruptedException e) { + logger.warn("Interrupted while shutting down HttpClient ExecutorService"); + executor.shutdownNow(); + Thread.currentThread().interrupt(); } catch (Exception e) { - logger.warn("Failed to close HttpClient resources cleanly: {}", e.getMessage()); + logger.warn("Failed to shutdown HttpClient ExecutorService cleanly: {}", e.getMessage()); } } @@ -677,8 +667,6 @@ public static class Builder { private McpJsonMapper jsonMapper; - private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1); - private HttpClient externalHttpClient; private String endpoint = DEFAULT_ENDPOINT; @@ -693,7 +681,8 @@ public static class Builder { private Duration connectTimeout = Duration.ofSeconds(10); - private Consumer onCloseClient; + private Consumer onCloseClient = (HttpClient client) -> { + }; /** * Creates a new builder with the specified base URI. @@ -705,13 +694,17 @@ private Builder(String baseUri) { } /** - * Sets an external HttpClient instance to use instead of creating a new one. When - * an external HttpClient is provided, the transport will not attempt to close it - * during graceful shutdown, leaving resource management to the caller. + * Provides an external HttpClient instance to use instead of creating a new one. + * When an external HttpClient is provided, the transport will not attempt to + * close it during graceful shutdown, leaving resource management to the caller. + *

+ * Use this method when you want to share a single HttpClient instance across + * multiple transports or when you need fine-grained control over HttpClient + * lifecycle. * @param httpClient the HttpClient instance to use * @return this builder */ - public Builder httpClient(HttpClient httpClient) { + public Builder withExternalHttpClient(HttpClient httpClient) { Assert.notNull(httpClient, "httpClient must not be null"); this.externalHttpClient = httpClient; return this; @@ -825,11 +818,15 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as /** * Sets a custom consumer to handle HttpClient closure when the transport is - * closed. + * closed. This allows for custom cleanup logic beyond the default behavior. + *

+ * Note: This is typically used for advanced use cases. The default behavior + * (shutting down the internal ExecutorService) is sufficient for most scenarios. * @param onCloseClient the consumer to handle HttpClient closure * @return this builder */ - public Builder onCloseClient(Consumer onCloseClient) { + public Builder onHttpClientClose(Consumer onCloseClient) { + Assert.notNull(onCloseClient, "onCloseClient must not be null"); this.onCloseClient = onCloseClient; return this; } @@ -846,13 +843,28 @@ public HttpClientStreamableHttpTransport build() { if (externalHttpClient != null) { // Use external HttpClient, use custom close handler or no-op httpClient = externalHttpClient; - closeHandler = onCloseClient; // null means no cleanup + closeHandler = onCloseClient; } else { - // Create new HttpClient, use custom close handler or default cleanup - httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); - closeHandler = onCloseClient != null ? onCloseClient - : HttpClientStreamableHttpTransport::closeHttpClientResourcesStatic; + // Create internal HttpClient with custom ExecutorService + // Create a custom ExecutorService with meaningful thread names + ExecutorService internalExecutor = Executors.newCachedThreadPool(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("MCP-HttpClient-" + thread.getId()); + thread.setDaemon(true); + return thread; + }); + + httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(this.connectTimeout) + .executor(internalExecutor) + .build(); + + // Combine default cleanup (shutdown executor) with custom handler if + // provided + closeHandler = (client) -> shutdownHttpClientExecutor(internalExecutor); + closeHandler = closeHandler.andThen(onCloseClient); } return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java index e1c6f15c2..f9703ede8 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java @@ -85,7 +85,7 @@ void supportsExternalHttpClient() throws Exception { // Create transport with external HttpClient - should NOT close it when transport // closes McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host) - .httpClient(externalHttpClient) + .withExternalHttpClient(externalHttpClient) .build(); // Test MCP operations complete successfully with external HttpClient @@ -117,56 +117,147 @@ void supportsExternalHttpClient() throws Exception { @Test void closesInternalHttpClientGracefully() throws Exception { - // Create a custom onCloseClient handler to verify graceful shutdown + // Verify internal HttpClient's ExecutorService threads are properly cleaned up + // after transport closes + + // Count MCP-HttpClient threads before creating transport + int threadCountBefore = countMcpHttpClientThreads(); + + // Create transport with default internal HttpClient (no custom close handler) + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + + // Verify MCP-HttpClient threads exist during operation + int threadCountDuringOperation = countMcpHttpClientThreads(); + assertThat(threadCountDuringOperation).isGreaterThan(threadCountBefore); + + // Perform MCP operations + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // After transport closes, wait a bit for ExecutorService shutdown to complete + Thread.sleep(2000); + + // Verify MCP-HttpClient threads are cleaned up + int threadCountAfter = countMcpHttpClientThreads(); + assertThat(threadCountAfter).isEqualTo(threadCountBefore); + } + + @Test + void invokesCustomCloseHandler() throws Exception { + // Verify custom onHttpClientClose callback is invoked correctly AtomicBoolean closeHandlerCalled = new AtomicBoolean(false); AtomicReference capturedHttpClient = new AtomicReference<>(); - AtomicBoolean httpClientWasFunctional = new AtomicBoolean(false); - // Create transport with custom close handler that verifies HttpClient state - // before cleanup - McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).onCloseClient(httpClient -> { + // Create transport with custom close handler + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).onHttpClientClose(httpClient -> { closeHandlerCalled.set(true); capturedHttpClient.set(httpClient); - // Verify HttpClient is still functional before we clean it up - try { - HttpRequest testRequest = HttpRequest.newBuilder() - .uri(URI.create(host + "/")) - .timeout(Duration.ofSeconds(5)) - .build(); - HttpResponse response = httpClient.send(testRequest, HttpResponse.BodyHandlers.ofString()); - if (response.statusCode() == 404) { // MCP server returns 404 for root - // path - httpClientWasFunctional.set(true); - } - } - catch (Exception e) { - throw new RuntimeException("HttpClient should be functional before cleanup", e); - } - - // Here we could perform custom cleanup logic - // For example: close connection pools, shutdown executors, etc. + // Custom cleanup logic would go here + // For example: logging, metrics, custom resource cleanup, etc. }).build(); - // Test MCP operations and graceful shutdown + // Use the transport withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { mcpSyncClient.initialize(); - - // Perform MCP operations to ensure transport works normally var capabilities = mcpSyncClient.listTools(); assertThat(capabilities).isNotNull(); - - // Test should complete and close gracefully - custom close handler will be - // invoked }); - // Verify graceful shutdown behavior + // Verify custom close handler was called assertThat(closeHandlerCalled.get()).isTrue(); assertThat(capturedHttpClient.get()).isNotNull(); - assertThat(httpClientWasFunctional.get()).isTrue(); + } + + @Test + void releasesHttpClientResourcesAfterExecutorShutdownAndGC() throws Exception { + // Verify that after ExecutorService shutdown, GC can reclaim HttpClient resources + // This test validates our core fix: shutdown ExecutorService -> GC reclaims + // SelectorManager threads + + // Count threads before creating transport + int threadCountBefore = countMcpHttpClientThreads(); + int httpClientSelectorThreadsBefore = countHttpClientSelectorThreads(); + + // Create transport with default internal HttpClient + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + + // Verify threads exist during operation + int threadCountDuringOperation = countMcpHttpClientThreads(); + + assertThat(threadCountDuringOperation).isGreaterThan(threadCountBefore); + // Note: SelectorManager threads may or may not be created yet, depending on + // timing + + // Perform MCP operations + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // After transport closes, ExecutorService is shut down + // Wait a bit for shutdown to complete + Thread.sleep(5000); + + // Verify MCP-HttpClient threads are cleaned up immediately after ExecutorService + // shutdown + int threadCountAfterShutdown = countMcpHttpClientThreads(); + assertThat(threadCountAfterShutdown).isEqualTo(threadCountBefore); - // At this point, the custom close handler has been called and - // the HttpClient has been properly cleaned up according to our custom logic + // Now explicitly trigger GC to reclaim HttpClient and its SelectorManager threads + System.gc(); + System.runFinalization(); + + // Wait for GC to complete + Thread.sleep(2000); + + // Verify SelectorManager threads are also cleaned up after GC + int selectorThreadsAfterGC = countHttpClientSelectorThreads(); + // SelectorManager threads should be cleaned up by GC + // Note: This may not always be 100% reliable as GC timing is non-deterministic, + // but it validates the mechanism works + assertThat(selectorThreadsAfterGC).isLessThanOrEqualTo(httpClientSelectorThreadsBefore); + } + + /** + * Counts the number of HttpClient SelectorManager threads. These threads are created + * by HttpClient internally and should be cleaned up by GC after ExecutorService + * shutdown. + */ + private int countHttpClientSelectorThreads() { + Thread[] threads = new Thread[Thread.activeCount() * 2]; + int count = Thread.enumerate(threads); + int selectorThreadCount = 0; + for (int i = 0; i < count; i++) { + if (threads[i] != null && threads[i].getName().contains("HttpClient") + && threads[i].getName().contains("SelectorManager")) { + selectorThreadCount++; + } + } + return selectorThreadCount; + } + + /** + * Counts the number of threads with names starting with "MCP-HttpClient-" + */ + private int countMcpHttpClientThreads() { + Thread[] threads = new Thread[Thread.activeCount() * 2]; + int count = Thread.enumerate(threads); + int mcpThreadCount = 0; + for (int i = 0; i < count; i++) { + if (threads[i] != null && threads[i].getName().startsWith("MCP-HttpClient-")) { + mcpThreadCount++; + } + } + return mcpThreadCount; } } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java index e6ac52fa4..499254771 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java @@ -85,7 +85,7 @@ void supportsExternalHttpClient() { // Create transport with external HttpClient McpClientTransport transport = HttpClientSseClientTransport.builder(host) - .httpClient(externalHttpClient) + .withExternalHttpClient(externalHttpClient) .build(); withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { @@ -98,17 +98,149 @@ void supportsExternalHttpClient() { } @Test - void closesInternalHttpClientGracefully() { - // Create transport with internal HttpClient (default behavior) + void closesInternalHttpClientGracefully() throws Exception { + // Verify internal HttpClient's ExecutorService threads are properly cleaned up + // after transport closes + + // Count MCP-HttpClient threads before creating transport + int threadCountBefore = countMcpHttpClientThreads(); + + // Create transport with default internal HttpClient (no custom close handler) + McpClientTransport transport = HttpClientSseClientTransport.builder(host).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + + // Verify MCP-HttpClient threads exist during operation + int threadCountDuringOperation = countMcpHttpClientThreads(); + assertThat(threadCountDuringOperation).isGreaterThan(threadCountBefore); + + // Perform MCP operations + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // After transport closes, wait a bit for ExecutorService shutdown to complete + Thread.sleep(1000); + + // Verify MCP-HttpClient threads are cleaned up + int threadCountAfter = countMcpHttpClientThreads(); + assertThat(threadCountAfter).isEqualTo(threadCountBefore); + } + + @Test + void invokesCustomCloseHandler() throws Exception { + // Verify custom onHttpClientClose callback is invoked correctly + java.util.concurrent.atomic.AtomicBoolean closeHandlerCalled = new java.util.concurrent.atomic.AtomicBoolean( + false); + java.util.concurrent.atomic.AtomicReference capturedHttpClient = new java.util.concurrent.atomic.AtomicReference<>(); + + // Create transport with custom close handler + McpClientTransport transport = HttpClientSseClientTransport.builder(host).onHttpClientClose(httpClient -> { + closeHandlerCalled.set(true); + capturedHttpClient.set(httpClient); + + // Custom cleanup logic would go here + // For example: logging, metrics, custom resource cleanup, etc. + }).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // Verify custom close handler was called + assertThat(closeHandlerCalled.get()).isTrue(); + assertThat(capturedHttpClient.get()).isNotNull(); + } + + @Test + void releasesHttpClientResourcesAfterExecutorShutdownAndGC() throws Exception { + // Verify that after ExecutorService shutdown, GC can reclaim HttpClient resources + // This test validates our core fix: shutdown ExecutorService -> GC reclaims + // SelectorManager threads + + // Count threads before creating transport + int threadCountBefore = countMcpHttpClientThreads(); + int httpClientSelectorThreadsBefore = countHttpClientSelectorThreads(); + + // Create transport with default internal HttpClient McpClientTransport transport = HttpClientSseClientTransport.builder(host).build(); + // Use the transport withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { mcpSyncClient.initialize(); - // Test should complete and close gracefully + + // Verify threads exist during operation + int threadCountDuringOperation = countMcpHttpClientThreads(); + + assertThat(threadCountDuringOperation).isGreaterThan(threadCountBefore); + // Note: SelectorManager threads may or may not be created yet, depending on + // timing + + // Perform MCP operations + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); }); - // This test verifies that internal HttpClient resources are cleaned up - // The actual verification happens during the graceful close process + // After transport closes, ExecutorService is shut down + // Wait a bit for shutdown to complete + Thread.sleep(5000); + + // Verify MCP-HttpClient threads are cleaned up immediately after ExecutorService + // shutdown + int threadCountAfterShutdown = countMcpHttpClientThreads(); + assertThat(threadCountAfterShutdown).isEqualTo(threadCountBefore); + + // Now explicitly trigger GC to reclaim HttpClient and its SelectorManager threads + System.gc(); + System.runFinalization(); + + // Wait for GC to complete + Thread.sleep(2000); + + // Verify SelectorManager threads are also cleaned up after GC + int selectorThreadsAfterGC = countHttpClientSelectorThreads(); + // SelectorManager threads should be cleaned up by GC + // Note: This may not always be 100% reliable as GC timing is non-deterministic, + // but it validates the mechanism works + assertThat(selectorThreadsAfterGC).isLessThanOrEqualTo(httpClientSelectorThreadsBefore); + } + + /** + * Counts the number of HttpClient SelectorManager threads. These threads are created + * by HttpClient internally and should be cleaned up by GC after ExecutorService + * shutdown. + */ + private int countHttpClientSelectorThreads() { + Thread[] threads = new Thread[Thread.activeCount() * 2]; + int count = Thread.enumerate(threads); + int selectorThreadCount = 0; + for (int i = 0; i < count; i++) { + if (threads[i] != null && threads[i].getName().contains("HttpClient") + && threads[i].getName().contains("SelectorManager")) { + selectorThreadCount++; + } + } + return selectorThreadCount; + } + + /** + * Counts the number of threads with names starting with "MCP-HttpClient-" + */ + private int countMcpHttpClientThreads() { + Thread[] threads = new Thread[Thread.activeCount() * 2]; + int count = Thread.enumerate(threads); + int mcpThreadCount = 0; + for (int i = 0; i < count; i++) { + if (threads[i] != null && threads[i].getName().startsWith("MCP-HttpClient-")) { + mcpThreadCount++; + } + } + return mcpThreadCount; } } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java index 10a6b349a..1b5d5a1dc 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java @@ -78,7 +78,8 @@ static class TestHttpClientSseClientTransport extends HttpClientSseClientTranspo public TestHttpClientSseClientTransport(final String baseUri) { super(HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(), HttpRequest.newBuilder().header("Content-Type", "application/json"), baseUri, "/sse", JSON_MAPPER, - McpAsyncHttpClientRequestCustomizer.NOOP, null); + McpAsyncHttpClientRequestCustomizer.NOOP, client -> { + }); } public int getInboundMessageCount() {