diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index ae093316f..f63b7e007 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -12,6 +12,9 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -116,6 +119,12 @@ public class HttpClientSseClientTransport implements McpClientTransport { */ private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer; + /** + * Consumer to handle HttpClient closure. If null, no cleanup is performed (external + * HttpClient). + */ + private final Consumer onCloseClient; + /** * Creates a new transport instance with custom HTTP client builder, object mapper, * and headers. @@ -129,19 +138,22 @@ public class HttpClientSseClientTransport implements McpClientTransport { * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null */ HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, - String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) { + String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, + Consumer onCloseClient) { Assert.notNull(jsonMapper, "jsonMapper must not be null"); Assert.hasText(baseUri, "baseUri must not be empty"); Assert.hasText(sseEndpoint, "sseEndpoint must not be empty"); Assert.notNull(httpClient, "httpClient must not be null"); Assert.notNull(requestBuilder, "requestBuilder must not be null"); Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null"); + Assert.notNull(onCloseClient, "onCloseClient must not be null"); this.baseUri = URI.create(baseUri); this.sseEndpoint = sseEndpoint; this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.requestBuilder = requestBuilder; this.httpRequestCustomizer = httpRequestCustomizer; + this.onCloseClient = onCloseClient; } @Override @@ -167,7 +179,7 @@ public static class Builder { private String sseEndpoint = DEFAULT_SSE_ENDPOINT; - private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1); + private HttpClient externalHttpClient; private McpJsonMapper jsonMapper; @@ -177,6 +189,9 @@ public static class Builder { private Duration connectTimeout = Duration.ofSeconds(10); + private Consumer onCloseClient = (HttpClient client) -> { + }; + /** * Creates a new builder instance. */ @@ -220,24 +235,19 @@ public Builder sseEndpoint(String sseEndpoint) { } /** - * Sets the HTTP client builder. - * @param clientBuilder the HTTP client builder - * @return this builder - */ - public Builder clientBuilder(HttpClient.Builder clientBuilder) { - Assert.notNull(clientBuilder, "clientBuilder must not be null"); - this.clientBuilder = clientBuilder; - return this; - } - - /** - * Customizes the HTTP client builder. - * @param clientCustomizer the consumer to customize the HTTP client builder + * Provides an external HttpClient instance to use instead of creating a new one. + * When an external HttpClient is provided, the transport will not attempt to + * close it during graceful shutdown, leaving resource management to the caller. + *

+ * Use this method when you want to share a single HttpClient instance across + * multiple transports or when you need fine-grained control over HttpClient + * lifecycle. + * @param httpClient the HttpClient instance to use * @return this builder */ - public Builder customizeClient(final Consumer clientCustomizer) { - Assert.notNull(clientCustomizer, "clientCustomizer must not be null"); - clientCustomizer.accept(clientBuilder); + public Builder withExternalHttpClient(HttpClient httpClient) { + Assert.notNull(httpClient, "httpClient must not be null"); + this.externalHttpClient = httpClient; return this; } @@ -310,13 +320,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as } /** - * Sets the connection timeout for the HTTP client. - * @param connectTimeout the connection timeout duration + * Sets a custom consumer to handle HttpClient closure when the transport is + * closed. This allows for custom cleanup logic beyond the default behavior. + *

+ * Note: This is typically used for advanced use cases. The default behavior + * (shutting down the internal ExecutorService) is sufficient for most scenarios. + * @param onCloseClient the consumer to handle HttpClient closure * @return this builder */ - public Builder connectTimeout(Duration connectTimeout) { - Assert.notNull(connectTimeout, "connectTimeout must not be null"); - this.connectTimeout = connectTimeout; + public Builder onHttpClientClose(Consumer onCloseClient) { + Assert.notNull(onCloseClient, "onCloseClient must not be null"); + this.onCloseClient = onCloseClient; return this; } @@ -325,9 +339,39 @@ public Builder connectTimeout(Duration connectTimeout) { * @return a new transport instance */ public HttpClientSseClientTransport build() { - HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); + HttpClient httpClient; + Consumer closeHandler; + + if (externalHttpClient != null) { + // Use external HttpClient, use custom close handler or no-op + httpClient = externalHttpClient; + closeHandler = onCloseClient; + } + else { + // Create internal HttpClient with custom ExecutorService + // Create a custom ExecutorService with meaningful thread names + ExecutorService internalExecutor = Executors.newCachedThreadPool(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("MCP-HttpClient-" + thread.getId()); + thread.setDaemon(true); + return thread; + }); + + httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(this.connectTimeout) + .executor(internalExecutor) + .build(); + + // Combine default cleanup (shutdown executor) with custom handler if + // provided + closeHandler = (client) -> shutdownHttpClientExecutor(internalExecutor); + closeHandler = closeHandler.andThen(onCloseClient); + + } + return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint, - jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer); + jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer, closeHandler); } } @@ -495,7 +539,48 @@ public Mono closeGracefully() { if (subscription != null && !subscription.isDisposed()) { subscription.dispose(); } - }); + }).then(onCloseClient != null ? Mono.fromRunnable(() -> onCloseClient.accept(httpClient)) : Mono.empty()); + } + + /** + * Closes HttpClient resources by shutting down its associated ExecutorService. This + * allows the GC to reclaim HttpClient-related threads (including SelectorManager) on + * the next garbage collection cycle. + *

+ * This approach avoids using reflection, Unsafe, or Java 21+ specific APIs, making it + * compatible with Java 17+. + * @param executor the ExecutorService to shutdown + */ + private static void shutdownHttpClientExecutor(ExecutorService executor) { + if (executor == null) { + return; + } + + try { + logger.debug("Shutting down HttpClient ExecutorService"); + executor.shutdown(); + + // Wait for graceful shutdown + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + logger.debug("ExecutorService did not terminate in time, forcing shutdown"); + executor.shutdownNow(); + + // Wait a bit more after forced shutdown + if (!executor.awaitTermination(2, TimeUnit.SECONDS)) { + logger.warn("ExecutorService did not terminate even after shutdownNow()"); + } + } + + logger.debug("HttpClient ExecutorService shutdown completed"); + } + catch (InterruptedException e) { + logger.warn("Interrupted while shutting down HttpClient ExecutorService"); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + catch (Exception e) { + logger.warn("Failed to shutdown HttpClient ExecutorService cleanly: {}", e.getMessage()); + } } /** diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index e41f45ebb..a78ab69db 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -16,6 +16,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -128,10 +131,17 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { private final String latestSupportedProtocolVersion; + /** + * Consumer to handle HttpClient closure. If null, no cleanup is performed (external + * HttpClient). + */ + private final Consumer onCloseClient; + private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, - List supportedProtocolVersions) { + List supportedProtocolVersions, Consumer onCloseClient) { + Assert.notNull(onCloseClient, "onCloseClient must not be null"); this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.requestBuilder = requestBuilder; @@ -141,6 +151,7 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h this.openConnectionOnStartup = openConnectionOnStartup; this.activeSession.set(createTransportSession()); this.httpRequestCustomizer = httpRequestCustomizer; + this.onCloseClient = onCloseClient; this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions); this.latestSupportedProtocolVersion = this.supportedProtocolVersions.stream() .sorted(Comparator.reverseOrder()) @@ -230,13 +241,58 @@ public Mono closeGracefully() { return Mono.defer(() -> { logger.debug("Graceful close triggered"); McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession); - if (currentSession != null) { - return Mono.from(currentSession.closeGracefully()); + Mono sessionClose = currentSession != null ? Mono.from(currentSession.closeGracefully()) + : Mono.empty(); + + // First close the session, then handle HttpClient cleanup + if (onCloseClient != null) { + return sessionClose.then(Mono.fromRunnable(() -> onCloseClient.accept(httpClient))); } - return Mono.empty(); + return sessionClose; }); } + /** + * Closes HttpClient resources by shutting down its associated ExecutorService. This + * allows the GC to reclaim HttpClient-related threads (including SelectorManager) on + * the next garbage collection cycle. + *

+ * This approach avoids using reflection, Unsafe, or Java 21+ specific APIs, making it + * compatible with Java 17+. + * @param executor the ExecutorService to shutdown + */ + private static void shutdownHttpClientExecutor(ExecutorService executor) { + if (executor == null) { + return; + } + + try { + logger.debug("Shutting down HttpClient ExecutorService"); + executor.shutdown(); + + // Wait for graceful shutdown + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + logger.debug("ExecutorService did not terminate in time, forcing shutdown"); + executor.shutdownNow(); + + // Wait a bit more after forced shutdown + if (!executor.awaitTermination(2, TimeUnit.SECONDS)) { + logger.warn("ExecutorService did not terminate even after shutdownNow()"); + } + } + + logger.debug("HttpClient ExecutorService shutdown completed"); + } + catch (InterruptedException e) { + logger.warn("Interrupted while shutting down HttpClient ExecutorService"); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + catch (Exception e) { + logger.warn("Failed to shutdown HttpClient ExecutorService cleanly: {}", e.getMessage()); + } + } + private Mono reconnect(McpTransportStream stream) { return Mono.deferContextual(ctx -> { @@ -624,7 +680,7 @@ public static class Builder { private McpJsonMapper jsonMapper; - private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1); + private HttpClient externalHttpClient; private String endpoint = DEFAULT_ENDPOINT; @@ -638,6 +694,9 @@ public static class Builder { private Duration connectTimeout = Duration.ofSeconds(10); + private Consumer onCloseClient = (HttpClient client) -> { + }; + private List supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05, ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18); @@ -651,24 +710,19 @@ private Builder(String baseUri) { } /** - * Sets the HTTP client builder. - * @param clientBuilder the HTTP client builder - * @return this builder - */ - public Builder clientBuilder(HttpClient.Builder clientBuilder) { - Assert.notNull(clientBuilder, "clientBuilder must not be null"); - this.clientBuilder = clientBuilder; - return this; - } - - /** - * Customizes the HTTP client builder. - * @param clientCustomizer the consumer to customize the HTTP client builder + * Provides an external HttpClient instance to use instead of creating a new one. + * When an external HttpClient is provided, the transport will not attempt to + * close it during graceful shutdown, leaving resource management to the caller. + *

+ * Use this method when you want to share a single HttpClient instance across + * multiple transports or when you need fine-grained control over HttpClient + * lifecycle. + * @param httpClient the HttpClient instance to use * @return this builder */ - public Builder customizeClient(final Consumer clientCustomizer) { - Assert.notNull(clientCustomizer, "clientCustomizer must not be null"); - clientCustomizer.accept(clientBuilder); + public Builder withExternalHttpClient(HttpClient httpClient) { + Assert.notNull(httpClient, "httpClient must not be null"); + this.externalHttpClient = httpClient; return this; } @@ -779,13 +833,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as } /** - * Sets the connection timeout for the HTTP client. - * @param connectTimeout the connection timeout duration + * Sets a custom consumer to handle HttpClient closure when the transport is + * closed. This allows for custom cleanup logic beyond the default behavior. + *

+ * Note: This is typically used for advanced use cases. The default behavior + * (shutting down the internal ExecutorService) is sufficient for most scenarios. + * @param onCloseClient the consumer to handle HttpClient closure * @return this builder */ - public Builder connectTimeout(Duration connectTimeout) { - Assert.notNull(connectTimeout, "connectTimeout must not be null"); - this.connectTimeout = connectTimeout; + public Builder onHttpClientClose(Consumer onCloseClient) { + Assert.notNull(onCloseClient, "onCloseClient must not be null"); + this.onCloseClient = onCloseClient; return this; } @@ -819,10 +877,39 @@ public Builder supportedProtocolVersions(List supportedProtocolVersions) * @return a new instance of {@link HttpClientStreamableHttpTransport} */ public HttpClientStreamableHttpTransport build() { - HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build(); + HttpClient httpClient; + Consumer closeHandler; + + if (externalHttpClient != null) { + // Use external HttpClient, use custom close handler or no-op + httpClient = externalHttpClient; + closeHandler = onCloseClient; + } + else { + // Create internal HttpClient with custom ExecutorService + // Create a custom ExecutorService with meaningful thread names + ExecutorService internalExecutor = Executors.newCachedThreadPool(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("MCP-HttpClient-" + thread.getId()); + thread.setDaemon(true); + return thread; + }); + + httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(this.connectTimeout) + .executor(internalExecutor) + .build(); + + // Combine default cleanup (shutdown executor) with custom handler if + // provided + closeHandler = (client) -> shutdownHttpClientExecutor(internalExecutor); + closeHandler = closeHandler.andThen(onCloseClient); + } + return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup, - httpRequestCustomizer, supportedProtocolVersions); + httpRequestCustomizer, supportedProtocolVersions, closeHandler); } } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java index d59ae35b4..f9703ede8 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpSyncClientTests.java @@ -5,7 +5,13 @@ package io.modelcontextprotocol.client; import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -19,6 +25,7 @@ import io.modelcontextprotocol.common.McpTransportContext; import io.modelcontextprotocol.spec.McpClientTransport; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; @@ -70,4 +77,187 @@ void customizesRequests() { }); } + @Test + void supportsExternalHttpClient() throws Exception { + // Create an external HttpClient that we manage ourselves + HttpClient externalHttpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + + // Create transport with external HttpClient - should NOT close it when transport + // closes + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host) + .withExternalHttpClient(externalHttpClient) + .build(); + + // Test MCP operations complete successfully with external HttpClient + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + + // Perform actual MCP operations to verify functionality + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + // Test should complete without errors - external HttpClient works normally + }); + + // Critical test: Verify external HttpClient is still functional after transport + // closes + // This proves the transport didn't close our external HttpClient + HttpRequest testRequest = HttpRequest.newBuilder() + .uri(URI.create(host + "/")) + .timeout(Duration.ofSeconds(5)) + .build(); + + HttpResponse response = externalHttpClient.send(testRequest, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode()).isEqualTo(404); // MCP server returns 404 for + // root path + // The key point is that we can still make requests - the HttpClient is functional + + // Clean up: We are responsible for closing external HttpClient + // (In real applications, this would be done in application shutdown) + } + + @Test + void closesInternalHttpClientGracefully() throws Exception { + // Verify internal HttpClient's ExecutorService threads are properly cleaned up + // after transport closes + + // Count MCP-HttpClient threads before creating transport + int threadCountBefore = countMcpHttpClientThreads(); + + // Create transport with default internal HttpClient (no custom close handler) + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + + // Verify MCP-HttpClient threads exist during operation + int threadCountDuringOperation = countMcpHttpClientThreads(); + assertThat(threadCountDuringOperation).isGreaterThan(threadCountBefore); + + // Perform MCP operations + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // After transport closes, wait a bit for ExecutorService shutdown to complete + Thread.sleep(2000); + + // Verify MCP-HttpClient threads are cleaned up + int threadCountAfter = countMcpHttpClientThreads(); + assertThat(threadCountAfter).isEqualTo(threadCountBefore); + } + + @Test + void invokesCustomCloseHandler() throws Exception { + // Verify custom onHttpClientClose callback is invoked correctly + AtomicBoolean closeHandlerCalled = new AtomicBoolean(false); + AtomicReference capturedHttpClient = new AtomicReference<>(); + + // Create transport with custom close handler + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).onHttpClientClose(httpClient -> { + closeHandlerCalled.set(true); + capturedHttpClient.set(httpClient); + + // Custom cleanup logic would go here + // For example: logging, metrics, custom resource cleanup, etc. + }).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // Verify custom close handler was called + assertThat(closeHandlerCalled.get()).isTrue(); + assertThat(capturedHttpClient.get()).isNotNull(); + } + + @Test + void releasesHttpClientResourcesAfterExecutorShutdownAndGC() throws Exception { + // Verify that after ExecutorService shutdown, GC can reclaim HttpClient resources + // This test validates our core fix: shutdown ExecutorService -> GC reclaims + // SelectorManager threads + + // Count threads before creating transport + int threadCountBefore = countMcpHttpClientThreads(); + int httpClientSelectorThreadsBefore = countHttpClientSelectorThreads(); + + // Create transport with default internal HttpClient + McpClientTransport transport = HttpClientStreamableHttpTransport.builder(host).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + + // Verify threads exist during operation + int threadCountDuringOperation = countMcpHttpClientThreads(); + + assertThat(threadCountDuringOperation).isGreaterThan(threadCountBefore); + // Note: SelectorManager threads may or may not be created yet, depending on + // timing + + // Perform MCP operations + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // After transport closes, ExecutorService is shut down + // Wait a bit for shutdown to complete + Thread.sleep(5000); + + // Verify MCP-HttpClient threads are cleaned up immediately after ExecutorService + // shutdown + int threadCountAfterShutdown = countMcpHttpClientThreads(); + assertThat(threadCountAfterShutdown).isEqualTo(threadCountBefore); + + // Now explicitly trigger GC to reclaim HttpClient and its SelectorManager threads + System.gc(); + System.runFinalization(); + + // Wait for GC to complete + Thread.sleep(2000); + + // Verify SelectorManager threads are also cleaned up after GC + int selectorThreadsAfterGC = countHttpClientSelectorThreads(); + // SelectorManager threads should be cleaned up by GC + // Note: This may not always be 100% reliable as GC timing is non-deterministic, + // but it validates the mechanism works + assertThat(selectorThreadsAfterGC).isLessThanOrEqualTo(httpClientSelectorThreadsBefore); + } + + /** + * Counts the number of HttpClient SelectorManager threads. These threads are created + * by HttpClient internally and should be cleaned up by GC after ExecutorService + * shutdown. + */ + private int countHttpClientSelectorThreads() { + Thread[] threads = new Thread[Thread.activeCount() * 2]; + int count = Thread.enumerate(threads); + int selectorThreadCount = 0; + for (int i = 0; i < count; i++) { + if (threads[i] != null && threads[i].getName().contains("HttpClient") + && threads[i].getName().contains("SelectorManager")) { + selectorThreadCount++; + } + } + return selectorThreadCount; + } + + /** + * Counts the number of threads with names starting with "MCP-HttpClient-" + */ + private int countMcpHttpClientThreads() { + Thread[] threads = new Thread[Thread.activeCount() * 2]; + int count = Thread.enumerate(threads); + int mcpThreadCount = 0; + for (int i = 0; i < count; i++) { + if (threads[i] != null && threads[i].getName().startsWith("MCP-HttpClient-")) { + mcpThreadCount++; + } + } + return mcpThreadCount; + } + } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java index 483d38669..499254771 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java @@ -5,6 +5,8 @@ package io.modelcontextprotocol.client; import java.net.URI; +import java.net.http.HttpClient; +import java.time.Duration; import java.util.Map; import org.junit.jupiter.api.AfterAll; @@ -19,6 +21,7 @@ import io.modelcontextprotocol.common.McpTransportContext; import io.modelcontextprotocol.spec.McpClientTransport; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -75,4 +78,169 @@ void customizesRequests() { }); } + @Test + void supportsExternalHttpClient() { + // Create an external HttpClient + HttpClient externalHttpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + + // Create transport with external HttpClient + McpClientTransport transport = HttpClientSseClientTransport.builder(host) + .withExternalHttpClient(externalHttpClient) + .build(); + + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + // Test should complete without errors + }); + + // External HttpClient should still be usable after transport closes + assertThat(externalHttpClient).isNotNull(); + } + + @Test + void closesInternalHttpClientGracefully() throws Exception { + // Verify internal HttpClient's ExecutorService threads are properly cleaned up + // after transport closes + + // Count MCP-HttpClient threads before creating transport + int threadCountBefore = countMcpHttpClientThreads(); + + // Create transport with default internal HttpClient (no custom close handler) + McpClientTransport transport = HttpClientSseClientTransport.builder(host).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + + // Verify MCP-HttpClient threads exist during operation + int threadCountDuringOperation = countMcpHttpClientThreads(); + assertThat(threadCountDuringOperation).isGreaterThan(threadCountBefore); + + // Perform MCP operations + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // After transport closes, wait a bit for ExecutorService shutdown to complete + Thread.sleep(1000); + + // Verify MCP-HttpClient threads are cleaned up + int threadCountAfter = countMcpHttpClientThreads(); + assertThat(threadCountAfter).isEqualTo(threadCountBefore); + } + + @Test + void invokesCustomCloseHandler() throws Exception { + // Verify custom onHttpClientClose callback is invoked correctly + java.util.concurrent.atomic.AtomicBoolean closeHandlerCalled = new java.util.concurrent.atomic.AtomicBoolean( + false); + java.util.concurrent.atomic.AtomicReference capturedHttpClient = new java.util.concurrent.atomic.AtomicReference<>(); + + // Create transport with custom close handler + McpClientTransport transport = HttpClientSseClientTransport.builder(host).onHttpClientClose(httpClient -> { + closeHandlerCalled.set(true); + capturedHttpClient.set(httpClient); + + // Custom cleanup logic would go here + // For example: logging, metrics, custom resource cleanup, etc. + }).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // Verify custom close handler was called + assertThat(closeHandlerCalled.get()).isTrue(); + assertThat(capturedHttpClient.get()).isNotNull(); + } + + @Test + void releasesHttpClientResourcesAfterExecutorShutdownAndGC() throws Exception { + // Verify that after ExecutorService shutdown, GC can reclaim HttpClient resources + // This test validates our core fix: shutdown ExecutorService -> GC reclaims + // SelectorManager threads + + // Count threads before creating transport + int threadCountBefore = countMcpHttpClientThreads(); + int httpClientSelectorThreadsBefore = countHttpClientSelectorThreads(); + + // Create transport with default internal HttpClient + McpClientTransport transport = HttpClientSseClientTransport.builder(host).build(); + + // Use the transport + withClient(transport, syncSpec -> syncSpec, mcpSyncClient -> { + mcpSyncClient.initialize(); + + // Verify threads exist during operation + int threadCountDuringOperation = countMcpHttpClientThreads(); + + assertThat(threadCountDuringOperation).isGreaterThan(threadCountBefore); + // Note: SelectorManager threads may or may not be created yet, depending on + // timing + + // Perform MCP operations + var capabilities = mcpSyncClient.listTools(); + assertThat(capabilities).isNotNull(); + }); + + // After transport closes, ExecutorService is shut down + // Wait a bit for shutdown to complete + Thread.sleep(5000); + + // Verify MCP-HttpClient threads are cleaned up immediately after ExecutorService + // shutdown + int threadCountAfterShutdown = countMcpHttpClientThreads(); + assertThat(threadCountAfterShutdown).isEqualTo(threadCountBefore); + + // Now explicitly trigger GC to reclaim HttpClient and its SelectorManager threads + System.gc(); + System.runFinalization(); + + // Wait for GC to complete + Thread.sleep(2000); + + // Verify SelectorManager threads are also cleaned up after GC + int selectorThreadsAfterGC = countHttpClientSelectorThreads(); + // SelectorManager threads should be cleaned up by GC + // Note: This may not always be 100% reliable as GC timing is non-deterministic, + // but it validates the mechanism works + assertThat(selectorThreadsAfterGC).isLessThanOrEqualTo(httpClientSelectorThreadsBefore); + } + + /** + * Counts the number of HttpClient SelectorManager threads. These threads are created + * by HttpClient internally and should be cleaned up by GC after ExecutorService + * shutdown. + */ + private int countHttpClientSelectorThreads() { + Thread[] threads = new Thread[Thread.activeCount() * 2]; + int count = Thread.enumerate(threads); + int selectorThreadCount = 0; + for (int i = 0; i < count; i++) { + if (threads[i] != null && threads[i].getName().contains("HttpClient") + && threads[i].getName().contains("SelectorManager")) { + selectorThreadCount++; + } + } + return selectorThreadCount; + } + + /** + * Counts the number of threads with names starting with "MCP-HttpClient-" + */ + private int countMcpHttpClientThreads() { + Thread[] threads = new Thread[Thread.activeCount() * 2]; + int count = Thread.enumerate(threads); + int mcpThreadCount = 0; + for (int i = 0; i < count; i++) { + if (threads[i] != null && threads[i].getName().startsWith("MCP-HttpClient-")) { + mcpThreadCount++; + } + } + return mcpThreadCount; + } + } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java index c5c365798..1b5d5a1dc 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java @@ -78,7 +78,8 @@ static class TestHttpClientSseClientTransport extends HttpClientSseClientTranspo public TestHttpClientSseClientTransport(final String baseUri) { super(HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(), HttpRequest.newBuilder().header("Content-Type", "application/json"), baseUri, "/sse", JSON_MAPPER, - McpAsyncHttpClientRequestCustomizer.NOOP); + McpAsyncHttpClientRequestCustomizer.NOOP, client -> { + }); } public int getInboundMessageCount() { @@ -313,26 +314,6 @@ void testMessageOrderPreservation() { assertThat(transport.getInboundMessageCount()).isEqualTo(3); } - @Test - void testCustomizeClient() { - // Create an atomic boolean to verify the customizer was called - AtomicBoolean customizerCalled = new AtomicBoolean(false); - - // Create a transport with the customizer - HttpClientSseClientTransport customizedTransport = HttpClientSseClientTransport.builder(host) - .customizeClient(builder -> { - builder.version(HttpClient.Version.HTTP_2); - customizerCalled.set(true); - }) - .build(); - - // Verify the customizer was called - assertThat(customizerCalled.get()).isTrue(); - - // Clean up - customizedTransport.closeGracefully().block(); - } - @Test void testCustomizeRequest() { // Create an atomic boolean to verify the customizer was called @@ -367,32 +348,6 @@ void testCustomizeRequest() { customizedTransport.closeGracefully().block(); } - @Test - void testChainedCustomizations() { - // Create atomic booleans to verify both customizers were called - AtomicBoolean clientCustomizerCalled = new AtomicBoolean(false); - AtomicBoolean requestCustomizerCalled = new AtomicBoolean(false); - - // Create a transport with both customizers chained - HttpClientSseClientTransport customizedTransport = HttpClientSseClientTransport.builder(host) - .customizeClient(builder -> { - builder.connectTimeout(Duration.ofSeconds(30)); - clientCustomizerCalled.set(true); - }) - .customizeRequest(builder -> { - builder.header("X-Api-Key", "test-api-key"); - requestCustomizerCalled.set(true); - }) - .build(); - - // Verify both customizers were called - assertThat(clientCustomizerCalled.get()).isTrue(); - assertThat(requestCustomizerCalled.get()).isTrue(); - - // Clean up - customizedTransport.closeGracefully().block(); - } - @Test void testRequestCustomizer() { var mockCustomizer = mock(McpSyncHttpClientRequestCustomizer.class);