diff --git a/trpc-container/trpc-container-default/src/test/java/com/tencent/trpc/container/container/ConsumerProxyTest.java b/trpc-container/trpc-container-default/src/test/java/com/tencent/trpc/container/container/ConsumerProxyTest.java index 384f12479..344d105a6 100644 --- a/trpc-container/trpc-container-default/src/test/java/com/tencent/trpc/container/container/ConsumerProxyTest.java +++ b/trpc-container/trpc-container-default/src/test/java/com/tencent/trpc/container/container/ConsumerProxyTest.java @@ -22,6 +22,7 @@ import com.tencent.trpc.core.exception.TRpcException; import com.tencent.trpc.core.rpc.RpcClientContext; import com.tencent.trpc.core.rpc.TRpcProxy; +import com.tencent.trpc.proto.support.DefResponseFutureManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -44,6 +45,7 @@ public void after() { container.stop(); } ConfigManager.stopTest(); + DefResponseFutureManager.reset(); } @Test diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/common/ConfigManager.java b/trpc-core/src/main/java/com/tencent/trpc/core/common/ConfigManager.java index bf241850b..7e53da3d4 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/common/ConfigManager.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/common/ConfigManager.java @@ -44,6 +44,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.WeakHashMap; import java.util.concurrent.TimeUnit; public class ConfigManager { @@ -55,6 +56,12 @@ public class ConfigManager { * Container startup listener */ private final Set tRPCRunListeners = Sets.newConcurrentHashSet(); + + /** + * Shutdown listeners for decoupled cleanup, using WeakHashMap to prevent classloader leaks + */ + private final Map shutdownListeners = new WeakHashMap<>(); + /** * Business initialization */ @@ -251,6 +258,41 @@ public void addTRPCRunListener(TRPCRunListener trpcRunListener) { tRPCRunListeners.add(trpcRunListener); } + /** + * Get all shutdown listeners loaded via SPI. + * + * @return set of shutdown listeners + */ + public void registerShutdownListener(ShutdownListener listener) { + if (listener != null) { + shutdownListeners.put(listener, Boolean.TRUE); + } + } + + /** + * Unregister a shutdown listener. + * + * @param listener the shutdown listener to unregister + */ + public void unregisterShutdownListener(ShutdownListener listener) { + if (listener != null) { + shutdownListeners.remove(listener); + } + } + + /** + * Notify all registered shutdown listeners. + */ + private void notifyShutdownListeners() { + shutdownListeners.keySet().forEach(listener -> { + try { + listener.onShutdown(); + } catch (Exception e) { + logger.error("Error executing shutdown listener: {}", listener.getClass().getName(), e); + } + }); + } + @Override public String toString() { return "ApplicationConfig {globalConfig=" + globalConfig + ", serverConfig=" + serverConfig @@ -385,6 +427,10 @@ protected void stopInternal() throws Exception { ExtensionLoader.destroyAllPlugin(); // 8) close client cluster RpcClusterClientManager.close(); + + // Notify all shutdown listeners before actual shutdown + notifyShutdownListeners(); + logger.info(">>>tRPC Server stopped"); } @@ -394,4 +440,4 @@ public String toString() { } } -} +} \ No newline at end of file diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/common/ShutdownListener.java b/trpc-core/src/main/java/com/tencent/trpc/core/common/ShutdownListener.java new file mode 100644 index 000000000..38422f5e0 --- /dev/null +++ b/trpc-core/src/main/java/com/tencent/trpc/core/common/ShutdownListener.java @@ -0,0 +1,26 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.common; + +/** + * Shutdown listener for components that need to perform cleanup when the container stops. + * This provides a decoupled way for components to register shutdown hooks without + * creating circular dependencies. + */ +public interface ShutdownListener { + + /** + * Called when the container is shutting down. + */ + void onShutdown(); + +} \ No newline at end of file diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/management/support/MBeanRegistryHelper.java b/trpc-core/src/main/java/com/tencent/trpc/core/management/support/MBeanRegistryHelper.java index 146021d6a..d42cf9d5e 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/management/support/MBeanRegistryHelper.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/management/support/MBeanRegistryHelper.java @@ -36,4 +36,20 @@ public static void registerMBean(Object object, ObjectName objectName) { } } + /** + * Unregister mbean + * + * @param objectName mbean object name {@link ObjectName} + */ + public static void unregisterMBean(ObjectName objectName) { + try { + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + if (mBeanServer.isRegistered(objectName)) { + mBeanServer.unregisterMBean(objectName); + } + } catch (Exception e) { + logger.warn("unregister mbean exception: ", e); + } + } + } \ No newline at end of file diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPool.java b/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPool.java index 8be64f429..6686b6f02 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPool.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPool.java @@ -126,6 +126,11 @@ public void close(long timeoutMills) { shutdownGraceful(timeoutMills); } } + + // unregister MBean + if (this.forkJoinPoolMXBean != null) { + MBeanRegistryHelper.unregisterMBean(this.forkJoinPoolMXBean.getObjectName()); + } } private void shutdownGraceful(long timeoutMills) { diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java b/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java index c7b5b919e..a5450e20c 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java @@ -149,6 +149,10 @@ public void close(long timeoutMills) { shutdownGraceful(timeoutMills); } } + // unregister Mbean + if (this.threadPoolMXBean != null) { + MBeanRegistryHelper.unregisterMBean(this.threadPoolMXBean.getObjectName()); + } } @Override diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/common/ConfigManagerTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/common/ConfigManagerTest.java index 7d69d1f1f..ff0c3d8e7 100644 --- a/trpc-core/src/test/java/com/tencent/trpc/core/common/ConfigManagerTest.java +++ b/trpc-core/src/test/java/com/tencent/trpc/core/common/ConfigManagerTest.java @@ -21,12 +21,16 @@ import com.tencent.trpc.core.worker.support.thread.ThreadPoolConfig; import com.tencent.trpc.core.worker.support.thread.ThreadWorkerPool; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.assertj.core.util.Lists; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class ConfigManagerTest { private static final int TCP_PORT = 8090; @@ -73,8 +77,19 @@ public void teardown() { @Test public void testStart() { - ConfigManager.getInstance().start(); - ConfigManager.getInstance().stop(); + // Reset ConfigManager state to avoid influence from listener1 configuration in setUp() + ConfigManager.stopTest(); + ConfigManager.startTest(); + + ConfigManager configManager = ConfigManager.getInstance(); + + // Set minimal configuration, do not use listener1 + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setRunListeners(Lists.newArrayList()); // Empty runListeners list + configManager.setServerConfig(serverConfig); + + configManager.start(); + configManager.stop(); } @Test @@ -167,14 +182,248 @@ public void stop() { @Test public void testGracefulRestart() throws InterruptedException { + // Reset ConfigManager state to avoid influence from listener1 configuration in setUp() + ConfigManager.stopTest(); ConfigManager.startTest(); + + ServerConfig serverConfig = new ServerConfig(); serverConfig.setWaitTimeout(WAIT_TIME); serverConfig.setCloseTimeout(WAIT_TIME); + serverConfig.setRunListeners(Lists.newArrayList()); // Empty runListeners list serverConfig.setDefault(); - ConfigManager.getInstance().setServerConfig(serverConfig); - ConfigManager.getInstance().start(); + + final ConfigManager configManager = ConfigManager.getInstance(); + configManager.setServerConfig(serverConfig); + + configManager.start(); Thread.sleep(WAIT_TIME); - ConfigManager.getInstance().stop(); + configManager.stop(); + } + + @Test + public void testRegisterShutdownListener() { + // Reset ConfigManager state to avoid influence from listener1 configuration in setUp() + ConfigManager.stopTest(); + ConfigManager.startTest(); + + ConfigManager configManager = ConfigManager.getInstance(); + + // Set minimal configuration, do not use listener1 + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setRunListeners(Lists.newArrayList()); // Empty runListeners list + configManager.setServerConfig(serverConfig); + + TestShutdownListener listener = new TestShutdownListener(); + + configManager.registerShutdownListener(listener); + + // Verify listener is registered by starting and stopping the container + configManager.start(false); // Do not register shutdown hook + configManager.stop(); + + assertTrue("Shutdown listener should be called", listener.isShutdownCalled()); + } + + @Test + public void testUnregisterShutdownListener() { + // Reset ConfigManager state to avoid influence from listener1 configuration in setUp() + ConfigManager.stopTest(); + ConfigManager.startTest(); + + ConfigManager configManager = ConfigManager.getInstance(); + + // Set minimal configuration, do not use listener1 + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setRunListeners(Lists.newArrayList()); // Empty runListeners list + configManager.setServerConfig(serverConfig); + + TestShutdownListener listener = new TestShutdownListener(); + + configManager.registerShutdownListener(listener); + configManager.unregisterShutdownListener(listener); + + // Verify listener is not called after being unregistered + configManager.start(false); + configManager.stop(); + + assertFalse("Shutdown listener should not be called after unregister", listener.isShutdownCalled()); + } + + @Test + public void testMultipleShutdownListeners() { + // Reset ConfigManager state to avoid influence from listener1 configuration in setUp() + ConfigManager.stopTest(); + ConfigManager.startTest(); + + ConfigManager configManager = ConfigManager.getInstance(); + + // Set minimal configuration, do not use listener1 + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setRunListeners(Lists.newArrayList()); // Empty runListeners list + configManager.setServerConfig(serverConfig); + + TestShutdownListener listener1 = new TestShutdownListener("listener1"); + TestShutdownListener listener2 = new TestShutdownListener("listener2"); + TestShutdownListener listener3 = new TestShutdownListener("listener3"); + + configManager.registerShutdownListener(listener1); + configManager.registerShutdownListener(listener2); + configManager.registerShutdownListener(listener3); + + configManager.start(false); + configManager.stop(); + + assertTrue("Listener1 should be called", listener1.isShutdownCalled()); + assertTrue("Listener2 should be called", listener2.isShutdownCalled()); + assertTrue("Listener3 should be called", listener3.isShutdownCalled()); + } + + @Test + public void testNullShutdownListenerHandling() { + // Reset ConfigManager state to avoid influence from listener1 configuration in setUp() + ConfigManager.stopTest(); + ConfigManager.startTest(); + + ConfigManager configManager = ConfigManager.getInstance(); + + // Set minimal configuration, do not use listener1 + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setRunListeners(Lists.newArrayList()); // Empty runListeners list + configManager.setServerConfig(serverConfig); + + // Verify null listener does not cause exceptions + configManager.registerShutdownListener(null); + configManager.unregisterShutdownListener(null); + + configManager.start(false); + configManager.stop(); + } + + @Test + public void testShutdownListenerExceptionHandling() { + // Reset ConfigManager state to avoid influence from listener1 configuration in setUp() + ConfigManager.stopTest(); + ConfigManager.startTest(); + + ConfigManager configManager = ConfigManager.getInstance(); + + // Set minimal configuration, do not use listener1 + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setRunListeners(Lists.newArrayList()); // Empty runListeners list + configManager.setServerConfig(serverConfig); + + TestShutdownListener goodListener = new TestShutdownListener("good"); + TestShutdownListener badListener = new TestShutdownListener("bad", true); + + configManager.registerShutdownListener(goodListener); + configManager.registerShutdownListener(badListener); + + configManager.start(false); + configManager.stop(); + + // Verify other listeners are called even if one listener throws an exception + assertTrue("Good listener should be called despite bad listener exception", goodListener.isShutdownCalled()); + assertTrue("Bad listener should be called even with exception", badListener.isShutdownCalled()); + } + + @Test + public void testConfigManagerShutdownListenerOnly() { + // Create an isolated test specifically for ShutdownListener functionality, not dependent on listener1 + ConfigManager.stopTest(); + + // Reinitialize to ensure extensions are properly loaded + ConfigManager.startTest(); + ConfigManager configManager = ConfigManager.getInstance(); + + // Set minimal configuration, do not use listener1 + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setRunListeners(Lists.newArrayList()); // Empty runListeners list + configManager.setServerConfig(serverConfig); + + TestShutdownListener testListener = new TestShutdownListener("isolated-test"); + configManager.registerShutdownListener(testListener); + + try { + configManager.start(false); + configManager.stop(); + + assertTrue("Shutdown listener should be called", testListener.isShutdownCalled()); + } catch (Exception e) { + // Shutdown listener should be called even if startup fails + assertTrue("Shutdown listener should be called even on startup failure", + testListener.isShutdownCalled()); + } + } + + @Test + public void testShutdownListenerWithStartupFailure() { + // Specifically test if ShutdownListener is still called when startup fails + ConfigManager.stopTest(); + ConfigManager.startTest(); + ConfigManager configManager = ConfigManager.getInstance(); + + TestShutdownListener testListener = new TestShutdownListener("startup-failure-test"); + configManager.registerShutdownListener(testListener); + + // Deliberately set a nonexistent listener to trigger startup failure + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setRunListeners(Lists.newArrayList("nonexistent-listener")); + configManager.setServerConfig(serverConfig); + + try { + configManager.start(false); + // If no exception, stop normally + configManager.stop(); + } catch (Exception e) { + // Exception is expected, but ShutdownListener should be called during exception handling + } + + assertTrue("Shutdown listener should be called even when startup fails", + testListener.isShutdownCalled()); + } + + /** + * Test ShutdownListener implementation + */ + private static class TestShutdownListener implements ShutdownListener { + private final String name; + private final boolean throwException; + private final AtomicBoolean shutdownCalled = new AtomicBoolean(false); + + public TestShutdownListener() { + this("test-listener", false); + } + + public TestShutdownListener(String name) { + this(name, false); + } + + public TestShutdownListener(String name, boolean throwException) { + this.name = name; + this.throwException = throwException; + } + + @Override + public void onShutdown() { + shutdownCalled.set(true); + if (throwException) { + throw new RuntimeException("Simulated exception in " + name); + } + } + + public boolean isShutdownCalled() { + return shutdownCalled.get(); + } + } + + @Test + public void testShutdownHook() { + // Reset ConfigManager state to avoid influence from listener1 configuration in setUp() + ConfigManager.stopTest(); + ConfigManager.startTest(); + + ConfigManager configManager = ConfigManager.getInstance(); + configManager.start(); } } \ No newline at end of file diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/management/support/MBeanRegistryHelperTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/management/support/MBeanRegistryHelperTest.java new file mode 100644 index 000000000..99aa2a20e --- /dev/null +++ b/trpc-core/src/test/java/com/tencent/trpc/core/management/support/MBeanRegistryHelperTest.java @@ -0,0 +1,150 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.management.support; + +import java.lang.management.ManagementFactory; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test class for MBeanRegistryHelper to cover exception handling branches + */ +public class MBeanRegistryHelperTest { + + /** + * Test registerMBean method with normal operation - should not throw exception + */ + @Test + public void testRegisterMBeanNormal() throws Exception { + // Create test objects + Object testObject = new TestMBeanImpl(); + ObjectName objectName = new ObjectName("test:type=TestMBean"); + + // This should not throw exception + MBeanRegistryHelper.registerMBean(testObject, objectName); + + // Clean up - unregister the MBean + MBeanRegistryHelper.unregisterMBean(objectName); + } + + /** + * Test registerMBean method with invalid ObjectName - should trigger exception handling + */ + @Test + public void testRegisterMBeanWithInvalidObjectName() throws Exception { + // Create test objects + Object testObject = new Object(); // Not a valid MBean + ObjectName objectName = new ObjectName("test:type=TestMBean"); + + // This should trigger exception handling (NotCompliantMBeanException) but not throw + MBeanRegistryHelper.registerMBean(testObject, objectName); + } + + /** + * Test registerMBean method with duplicate registration - should trigger exception handling + */ + @Test + public void testRegisterMBeanDuplicate() throws Exception { + // Create test objects + Object testObject = new TestMBeanImpl(); + ObjectName objectName = new ObjectName("test:type=DuplicateTestMBean"); + + try { + // Register first time - should succeed + MBeanRegistryHelper.registerMBean(testObject, objectName); + + // Register second time - should trigger exception handling (InstanceAlreadyExistsException) but not throw + MBeanRegistryHelper.registerMBean(testObject, objectName); + } finally { + // Clean up + MBeanRegistryHelper.unregisterMBean(objectName); + } + } + + /** + * Test unregisterMBean method with valid ObjectName - should work normally + */ + @Test + public void testUnregisterMBeanNormal() throws Exception { + // Create ObjectName for a simple test + ObjectName objectName = new ObjectName("test:type=UnregisterTestMBean"); + + // This should not throw exception even if MBean doesn't exist + MBeanRegistryHelper.unregisterMBean(objectName); + } + + /** + * Test unregisterMBean method with non-existent ObjectName - should not throw exception + */ + @Test + public void testUnregisterMBeanNonExistent() throws Exception { + // Create ObjectName for non-existent MBean + ObjectName objectName = new ObjectName("test:type=NonExistentMBean"); + + // This should not throw exception even though MBean doesn't exist + MBeanRegistryHelper.unregisterMBean(objectName); + } + + /** + * Test unregisterMBean method with invalid ObjectName pattern - should trigger exception handling + */ + @Test + public void testUnregisterMBeanWithInvalidPattern() throws Exception { + // Create ObjectName with pattern (which cannot be used for unregistering) + ObjectName objectName = new ObjectName("test:type=*"); + + // This should trigger exception handling but not throw + MBeanRegistryHelper.unregisterMBean(objectName); + } + + /** + * Test unregisterMBean method exception handling by creating a scenario that triggers exception + * This test covers the exception handling branch in unregisterMBean method + */ + @Test + public void testUnregisterMBeanException() throws Exception { + // Test with null ObjectName which should cause RuntimeOperationsException + // in the isRegistered(null) call, which will trigger the exception handling branch + + // This should not throw any exception - the exception should be caught and logged + try { + MBeanRegistryHelper.unregisterMBean(null); + // If we reach here, it means the exception was properly caught and handled + } catch (Exception e) { + // If any exception escapes, the test should fail + Assert.fail("Exception should have been caught and logged, but was thrown: " + e.getMessage()); + } + + // The test passes if no exception is thrown (exception is caught and logged) + // The logger.warn("unregister mbean exception: ", e) line should be executed + // This covers the exception handling branch that was previously untested + } + + /** + * Test MBean interface for testing purposes + */ + public interface TestMBean { + String getName(); + } + + /** + * Test MBean implementation for testing purposes + */ + public static class TestMBeanImpl implements TestMBean { + @Override + public String getName() { + return "TestMBean"; + } + } +} \ No newline at end of file diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPoolTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPoolTest.java index c74f4f95c..3d632723a 100644 --- a/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPoolTest.java +++ b/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPoolTest.java @@ -15,10 +15,14 @@ import com.tencent.trpc.core.management.ForkJoinPoolMXBean; import com.tencent.trpc.core.management.PoolMXBean; import com.tencent.trpc.core.management.PoolMXBean.WorkerPoolType; +import com.tencent.trpc.core.management.support.MBeanRegistryHelper; +import java.lang.management.ManagementFactory; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import javax.management.MBeanServer; +import javax.management.ObjectName; import org.junit.Assert; import org.junit.Test; @@ -73,4 +77,57 @@ public void testForkJoinWorkerPool() { Assert.assertTrue(forkJoinPool.isShutdown()); forkJoinWorkerPool.close(0); } -} + + /** + * Test MBean unregistration when closing ForkJoinWorkerPool + */ + @Test + public void testMBeanUnregistrationOnClose() throws Exception { + Map properties = new HashMap<>(); + properties.put(ForkJoinPoolConfig.PARALLELISM, PARALLELISM); + properties.put(ForkJoinPoolConfig.TIMEOUT_MS, TIMEOUT_MILLS); + PluginConfig poolPluginConfig = new PluginConfig(ForkJoinWorkerPool.NAME, ThreadWorkerPool.class, + properties); + ForkJoinWorkerPool forkJoinWorkerPool = new ForkJoinWorkerPool(); + forkJoinWorkerPool.setPluginConfig(poolPluginConfig); + forkJoinWorkerPool.init(); + + // Get the MXBean and its object name + PoolMXBean report = forkJoinWorkerPool.report(); + ForkJoinPoolMXBean forkJoinPoolMXBean = (ForkJoinPoolMXBean) report; + ObjectName objectName = forkJoinPoolMXBean.getObjectName(); + + // Verify MBean is registered + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + Assert.assertTrue("MBean should be registered after init", + mBeanServer.isRegistered(objectName)); + + // Close the worker pool + forkJoinWorkerPool.close(1000); + + // Verify MBean is unregistered after close + Assert.assertFalse("MBean should be unregistered after close", + mBeanServer.isRegistered(objectName)); + } + + /** + * Test close method when forkJoinPoolMXBean is null (covers the null check branch) + */ + @Test + public void testCloseWithNullMXBean() throws Exception { + Map properties = new HashMap<>(); + properties.put(ForkJoinPoolConfig.PARALLELISM, PARALLELISM); + properties.put(ForkJoinPoolConfig.TIMEOUT_MS, TIMEOUT_MILLS); + PluginConfig poolPluginConfig = new PluginConfig(ForkJoinWorkerPool.NAME, ThreadWorkerPool.class, + properties); + ForkJoinWorkerPool forkJoinWorkerPool = new ForkJoinWorkerPool(); + forkJoinWorkerPool.setPluginConfig(poolPluginConfig); + + // Don't call init() so forkJoinPoolMXBean remains null + // This should not throw any exception when closing + forkJoinWorkerPool.close(1000); + + // Verify no exception is thrown and method completes successfully + Assert.assertTrue("Close method should complete successfully even with null MXBean", true); + } +} \ No newline at end of file diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPoolTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPoolTest.java index db4048cab..094fec0fb 100644 --- a/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPoolTest.java +++ b/trpc-core/src/test/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPoolTest.java @@ -18,8 +18,12 @@ import com.tencent.trpc.core.management.PoolMXBean.WorkerPoolType; import com.tencent.trpc.core.management.ThreadPerTaskExecutorMXBeanImpl; import com.tencent.trpc.core.management.ThreadPoolMXBean; +import com.tencent.trpc.core.management.support.MBeanRegistryHelper; +import java.lang.management.ManagementFactory; import java.util.HashMap; import java.util.Map; +import javax.management.MBeanServer; +import javax.management.ObjectName; import org.junit.Assert; import org.junit.Test; @@ -113,4 +117,53 @@ public void testVirtualThreads() { Assert.assertNotNull(report.toString()); } + /** + * Test MBean unregistration when closing ThreadWorkerPool + */ + @Test + public void testMBeanUnregistrationOnClose() throws Exception { + Map properties = getProperties(); + PluginConfig poolPluginConfig = new PluginConfig("work_pool", ThreadWorkerPool.class, + properties); + ThreadWorkerPool threadWorkerPool = new ThreadWorkerPool(); + threadWorkerPool.setPluginConfig(poolPluginConfig); + threadWorkerPool.init(); + + // Get the MXBean and its object name + PoolMXBean report = threadWorkerPool.report(); + ThreadPoolMXBean threadPoolMXBean = (ThreadPoolMXBean) report; + ObjectName objectName = threadPoolMXBean.getObjectName(); + + // Verify MBean is registered + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + Assert.assertTrue("MBean should be registered after init", + mBeanServer.isRegistered(objectName)); + + // Close the worker pool + threadWorkerPool.close(1000); + + // Verify MBean is unregistered after close + Assert.assertFalse("MBean should be unregistered after close", + mBeanServer.isRegistered(objectName)); + } + + /** + * Test close method when threadPoolMXBean is null (covers the null check branch) + */ + @Test + public void testCloseWithNullMXBean() throws Exception { + Map properties = getProperties(); + PluginConfig poolPluginConfig = new PluginConfig("work_pool", ThreadWorkerPool.class, + properties); + ThreadWorkerPool threadWorkerPool = new ThreadWorkerPool(); + threadWorkerPool.setPluginConfig(poolPluginConfig); + + // Don't call init() so threadPoolMXBean remains null + // This should not throw any exception when closing + threadWorkerPool.close(1000); + + // Verify no exception is thrown and method completes successfully + Assert.assertTrue("Close method should complete successfully even with null MXBean", true); + } + } \ No newline at end of file diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/AbstractConsumerInvoker.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/AbstractConsumerInvoker.java index c3db4ba53..37173870f 100644 --- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/AbstractConsumerInvoker.java +++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/AbstractConsumerInvoker.java @@ -35,6 +35,8 @@ import com.tencent.trpc.core.rpc.TimeoutManager; import com.tencent.trpc.core.rpc.def.DefTimeoutManager; import com.tencent.trpc.core.utils.JsonUtils; +import com.tencent.trpc.core.common.ConfigManager; +import com.tencent.trpc.core.common.ShutdownListener; import com.tencent.trpc.core.utils.ProtoJsonConverter; import com.tencent.trpc.core.utils.RpcUtils; import com.tencent.trpc.core.utils.StringUtils; @@ -64,7 +66,12 @@ public abstract class AbstractConsumerInvoker implements ConsumerInvoker { /** * The request timeout manager, used to handle the result of client requests in timeout scenarios. */ - private static final TimeoutManager TIMEOUT_MANAGER = new DefTimeoutManager(10); + private static TimeoutManager TIMEOUT_MANAGER = new DefTimeoutManager(10); + + /** + * Internal shutdown listener that handles the shutdown of the timeout manager + */ + private final ShutdownListener shutdownListener = new InternalShutdownListener(); /** * Http client for the request @@ -96,6 +103,9 @@ public AbstractConsumerInvoker(AbstractRpcClient client, if (extMap.containsKey(KEYSTORE_PATH) && extMap.containsKey(KEYSTORE_PASS)) { scheme = HTTPS_SCHEME; } + + // register shutdown listener + ConfigManager.getInstance().registerShutdownListener(shutdownListener); } /** @@ -285,4 +295,38 @@ public ProtocolConfig getProtocolConfig() { public Class getInterface() { return config.getServiceInterface(); } + + /** + * Stop the timeout manager + */ + public static void stop() { + TIMEOUT_MANAGER.close(); + } + + /** + * Called when the container is reset. + */ + public static void reset() { + TIMEOUT_MANAGER = new DefTimeoutManager(10); + } + + /** + * Get the internal shutdown listener for testing purposes + * + * @return the internal shutdown listener + */ + public ShutdownListener getShutdownListener() { + return shutdownListener; + } + + /** + * Internal shutdown listener implementation + */ + private class InternalShutdownListener implements ShutdownListener { + @Override + public void onShutdown() { + logger.info("AbstractConsumerInvoker received shutdown notification"); + AbstractConsumerInvoker.stop(); + } + } } diff --git a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/Http2RpcClientTest.java b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/Http2RpcClientTest.java index 6f53fa6f4..0d262d7a9 100644 --- a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/Http2RpcClientTest.java +++ b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/Http2RpcClientTest.java @@ -26,12 +26,14 @@ import com.tencent.trpc.core.logger.LoggerFactory; import com.tencent.trpc.core.rpc.RpcClientContext; import com.tencent.trpc.core.utils.NetUtils; +import com.tencent.trpc.proto.http.client.AbstractConsumerInvoker; import java.io.File; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import tests.service.GreeterJsonService; @@ -53,6 +55,11 @@ public class Http2RpcClientTest { private static Map extMap = new HashMap<>(); + @Before + public void beforeTest() { + AbstractConsumerInvoker.reset(); + } + @BeforeClass public static void startHttpServer() { diff --git a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/Http2cRpcClientTest.java b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/Http2cRpcClientTest.java index 2eea59e18..e84340ff2 100644 --- a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/Http2cRpcClientTest.java +++ b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/Http2cRpcClientTest.java @@ -24,10 +24,12 @@ import com.tencent.trpc.core.logger.LoggerFactory; import com.tencent.trpc.core.rpc.RpcClientContext; import com.tencent.trpc.core.utils.NetUtils; +import com.tencent.trpc.proto.http.client.AbstractConsumerInvoker; import java.util.HashMap; import java.util.Map; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import tests.service.GreeterJsonService; @@ -47,6 +49,11 @@ public class Http2cRpcClientTest { private static ServerConfig serverConfig; + @Before + public void beforeTest() { + AbstractConsumerInvoker.reset(); + } + @BeforeClass public static void startHttpServer() { ConfigManager.stopTest(); diff --git a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientTest.java b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientTest.java index d7e699d39..2a7ddb0f0 100644 --- a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientTest.java +++ b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientTest.java @@ -15,6 +15,7 @@ import static com.tencent.trpc.transport.http.common.Constants.HTTP_SCHEME; import com.tencent.trpc.core.common.ConfigManager; +import com.tencent.trpc.core.common.ShutdownListener; import com.tencent.trpc.core.common.config.BackendConfig; import com.tencent.trpc.core.common.config.ConsumerConfig; import com.tencent.trpc.core.common.config.GlobalConfig; @@ -49,6 +50,7 @@ import tests.service.impl1.GreeterJavaBeanServiceImpl; import tests.service.impl1.GreeterJsonServiceImpl1; import tests.service.impl1.GreeterServiceImpl1; +import com.tencent.trpc.proto.http.client.AbstractConsumerInvoker; public class HttpRpcClientTest { @@ -488,4 +490,37 @@ public void testHttpRpcAttachmentWithJavaBean() { backendConfig.stop(); } } + + @Test + public void testAbstractConsumerInvokerShutdownListener() { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setName("serviceId"); + backendConfig.setRequestTimeout(REQUEST_TIMEOUT); + backendConfig.setMaxConns(MAX_CONNECTIONS); + backendConfig.setNamingUrl("ip://127.0.0.1:18088"); + backendConfig.setKeepAlive(false); + backendConfig.setConnsPerAddr(4); + backendConfig.setProtocol("http"); + + ConsumerConfig consumerConfig = new ConsumerConfig<>(); + consumerConfig.setServiceInterface(GreeterService.class); + consumerConfig.setBackendConfig(backendConfig); + + try { + GreeterService proxy = consumerConfig.getProxy(); + + // Test that the shutdown listener is properly registered + // We can't directly access the AbstractConsumerInvoker from the proxy, + // but we can test the static methods + AbstractConsumerInvoker.reset(); + + // Test shutdown functionality + AbstractConsumerInvoker.stop(); + + logger.info("AbstractConsumerInvoker shutdown listener test completed successfully"); + + } finally { + backendConfig.stop(); + } + } } diff --git a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/client/AbstractConsumerInvokerTest.java b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/client/AbstractConsumerInvokerTest.java new file mode 100644 index 000000000..8b7fee455 --- /dev/null +++ b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/client/AbstractConsumerInvokerTest.java @@ -0,0 +1,299 @@ +/* + + + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.proto.http.client; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static com.tencent.trpc.transport.http.common.Constants.KEYSTORE_PATH; +import static com.tencent.trpc.transport.http.common.Constants.KEYSTORE_PASS; +import static com.tencent.trpc.transport.http.common.Constants.HTTPS_SCHEME; +import static com.tencent.trpc.transport.http.common.Constants.HTTP_SCHEME; + +import com.tencent.trpc.core.common.ShutdownListener; +import com.tencent.trpc.core.common.config.BackendConfig; +import com.tencent.trpc.core.common.config.ConsumerConfig; +import com.tencent.trpc.core.common.config.ProtocolConfig; +import com.tencent.trpc.core.logger.Logger; +import com.tencent.trpc.core.logger.LoggerFactory; +import com.tencent.trpc.core.rpc.AbstractRpcClient; +import com.tencent.trpc.core.rpc.Request; +import com.tencent.trpc.core.rpc.Response; +import com.tencent.trpc.core.worker.spi.WorkerPool; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the ShutdownListener functionality of AbstractConsumerInvoker + */ +public class AbstractConsumerInvokerTest { + + private static final Logger logger = LoggerFactory.getLogger(AbstractConsumerInvokerTest.class); + + private AbstractRpcClient mockClient; + private ConsumerConfig mockConfig; + private ProtocolConfig mockProtocolConfig; + private BackendConfig mockBackendConfig; + private WorkerPool mockWorkerPool; + + private TestConsumerInvoker testInvoker; + + @Before + public void setUp() { + // 创建 mock 对象 + mockClient = mock(AbstractRpcClient.class); + mockConfig = mock(ConsumerConfig.class); + mockProtocolConfig = mock(ProtocolConfig.class); + mockBackendConfig = mock(BackendConfig.class); + mockWorkerPool = mock(WorkerPool.class); + + // 设置 mock 对象的行为 + when(mockConfig.getBackendConfig()).thenReturn(mockBackendConfig); + when(mockBackendConfig.getWorkerPoolObj()).thenReturn(mockWorkerPool); + when(mockProtocolConfig.getIp()).thenReturn("127.0.0.1"); + when(mockProtocolConfig.getPort()).thenReturn(8080); + when(mockProtocolConfig.getExtMap()).thenReturn(new HashMap()); + + // 创建测试实例 + testInvoker = new TestConsumerInvoker(mockClient, mockConfig, mockProtocolConfig); + } + + @After + public void tearDown() { + // 重置静态状态 + AbstractConsumerInvoker.reset(); + } + + /** + * 测试 ShutdownListener 不为空 + */ + @Test + public void testShutdownListenerNotNull() { + ShutdownListener shutdownListener = testInvoker.getShutdownListener(); + assertNotNull("ShutdownListener should not be null", shutdownListener); + } + + /** + * 测试 onShutdown 方法的日志输出和执行 + */ + @Test + public void testOnShutdownExecution() { + // 获取 ShutdownListener + ShutdownListener shutdownListener = testInvoker.getShutdownListener(); + assertNotNull("ShutdownListener should not be null", shutdownListener); + + // 测试 onShutdown 方法不会抛出异常 + try { + shutdownListener.onShutdown(); + // 如果没有异常,测试通过 + assertTrue("onShutdown method should execute without exceptions", true); + } catch (Exception e) { + throw new AssertionError("onShutdown method should not throw exceptions", e); + } + } + + /** + * 测试多次调用 onShutdown 方法的安全性 + */ + @Test + public void testMultipleOnShutdownCalls() { + ShutdownListener shutdownListener = testInvoker.getShutdownListener(); + assertNotNull("ShutdownListener should not be null", shutdownListener); + + // 多次调用 onShutdown 方法,确保不会出现异常 + try { + shutdownListener.onShutdown(); + shutdownListener.onShutdown(); + shutdownListener.onShutdown(); + assertTrue("Multiple onShutdown calls should be safe", true); + } catch (Exception e) { + throw new AssertionError("Multiple onShutdown calls should not throw exceptions", e); + } + } + + /** + * 测试 ShutdownListener 的类型 + */ + @Test + public void testShutdownListenerType() { + ShutdownListener shutdownListener = testInvoker.getShutdownListener(); + assertNotNull("ShutdownListener should not be null", shutdownListener); + + // 验证 ShutdownListener 是内部类的实例 + String className = shutdownListener.getClass().getSimpleName(); + assertTrue("ShutdownListener should be InternalShutdownListener", + className.contains("InternalShutdownListener")); + } + + /** + * 测试静态方法 stop() 和 reset() 的调用 + */ + @Test + public void testStaticMethods() { + try { + // 测试静态方法调用不会抛出异常 + AbstractConsumerInvoker.stop(); + AbstractConsumerInvoker.reset(); + assertTrue("Static methods should execute without exceptions", true); + } catch (Exception e) { + throw new AssertionError("Static methods should not throw exceptions", e); + } + } + + /** + * 测试 HTTP 协议的默认配置(不包含 keystore 配置) + */ + @Test + public void testHttpSchemeWithoutKeystore() throws Exception { + // 创建不包含 keystore 配置的 extMap + Map extMap = new HashMap<>(); + when(mockProtocolConfig.getExtMap()).thenReturn(extMap); + + // 创建新的测试实例 + TestConsumerInvoker httpInvoker = new TestConsumerInvoker(mockClient, mockConfig, mockProtocolConfig); + + // 通过反射获取 scheme 字段来验证 + Field schemeField = AbstractConsumerInvoker.class.getDeclaredField("scheme"); + schemeField.setAccessible(true); + String scheme = (String) schemeField.get(httpInvoker); + + assertEquals("Should use HTTP scheme when keystore is not configured", HTTP_SCHEME, scheme); + } + + /** + * 测试 HTTPS 协议的配置(包含 keystore 配置) + */ + @Test + public void testHttpsSchemeWithKeystore() throws Exception { + // 创建包含 keystore 配置的 extMap + Map extMap = new HashMap<>(); + extMap.put(KEYSTORE_PATH, "/path/to/keystore.jks"); + extMap.put(KEYSTORE_PASS, "password123"); + when(mockProtocolConfig.getExtMap()).thenReturn(extMap); + + // 创建新的测试实例 + TestConsumerInvoker httpsInvoker = new TestConsumerInvoker(mockClient, mockConfig, mockProtocolConfig); + + // 通过反射获取 scheme 字段来验证 + Field schemeField = AbstractConsumerInvoker.class.getDeclaredField("scheme"); + schemeField.setAccessible(true); + String scheme = (String) schemeField.get(httpsInvoker); + + assertEquals("Should use HTTPS scheme when keystore is configured", HTTPS_SCHEME, scheme); + } + + /** + * 测试只有 KEYSTORE_PATH 但没有 KEYSTORE_PASS 的情况(应该使用 HTTP) + */ + @Test + public void testHttpSchemeWithOnlyKeystorePath() throws Exception { + // 创建只包含 KEYSTORE_PATH 的 extMap + Map extMap = new HashMap<>(); + extMap.put(KEYSTORE_PATH, "/path/to/keystore.jks"); + when(mockProtocolConfig.getExtMap()).thenReturn(extMap); + + // 创建新的测试实例 + TestConsumerInvoker httpInvoker = new TestConsumerInvoker(mockClient, mockConfig, mockProtocolConfig); + + // 通过反射获取 scheme 字段来验证 + Field schemeField = AbstractConsumerInvoker.class.getDeclaredField("scheme"); + schemeField.setAccessible(true); + String scheme = (String) schemeField.get(httpInvoker); + + assertEquals("Should use HTTP scheme when only KEYSTORE_PATH is configured", HTTP_SCHEME, scheme); + } + + /** + * 测试只有 KEYSTORE_PASS 但没有 KEYSTORE_PATH 的情况(应该使用 HTTP) + */ + @Test + public void testHttpSchemeWithOnlyKeystorePass() throws Exception { + // 创建只包含 KEYSTORE_PASS 的 extMap + Map extMap = new HashMap<>(); + extMap.put(KEYSTORE_PASS, "password123"); + when(mockProtocolConfig.getExtMap()).thenReturn(extMap); + + // 创建新的测试实例 + TestConsumerInvoker httpInvoker = new TestConsumerInvoker(mockClient, mockConfig, mockProtocolConfig); + + // 通过反射获取 scheme 字段来验证 + Field schemeField = AbstractConsumerInvoker.class.getDeclaredField("scheme"); + schemeField.setAccessible(true); + String scheme = (String) schemeField.get(httpInvoker); + + assertEquals("Should use HTTP scheme when only KEYSTORE_PASS is configured", HTTP_SCHEME, scheme); + } + + /** + * 测试 URI 构建在不同协议下的正确性 + */ + @Test + public void testUriConstructionWithDifferentSchemes() throws Exception { + // 设置 mock 对象的基本配置 + when(mockConfig.getBackendConfig().getBasePath()).thenReturn("/api"); + + // 创建 mock request 和 invocation + Request mockRequest = mock(Request.class); + com.tencent.trpc.core.rpc.RpcInvocation mockInvocation = mock(com.tencent.trpc.core.rpc.RpcInvocation.class); + when(mockRequest.getInvocation()).thenReturn(mockInvocation); + when(mockInvocation.getFunc()).thenReturn("/test"); + + // 测试 HTTP 协议的 URI + Map httpExtMap = new HashMap<>(); + when(mockProtocolConfig.getExtMap()).thenReturn(httpExtMap); + TestConsumerInvoker httpInvoker = new TestConsumerInvoker(mockClient, mockConfig, mockProtocolConfig); + URI httpUri = httpInvoker.getUri(mockRequest); + assertEquals("HTTP URI scheme should be http", HTTP_SCHEME, httpUri.getScheme()); + + // 测试 HTTPS 协议的 URI + Map httpsExtMap = new HashMap<>(); + httpsExtMap.put(KEYSTORE_PATH, "/path/to/keystore.jks"); + httpsExtMap.put(KEYSTORE_PASS, "password123"); + when(mockProtocolConfig.getExtMap()).thenReturn(httpsExtMap); + TestConsumerInvoker httpsInvoker = new TestConsumerInvoker(mockClient, mockConfig, mockProtocolConfig); + URI httpsUri = httpsInvoker.getUri(mockRequest); + assertEquals("HTTPS URI scheme should be https", HTTPS_SCHEME, httpsUri.getScheme()); + } + + /** + * 测试用的 ConsumerInvoker 实现类 + */ + private static class TestConsumerInvoker extends AbstractConsumerInvoker { + + public TestConsumerInvoker(AbstractRpcClient client, ConsumerConfig config, + ProtocolConfig protocolConfig) { + super(client, config, protocolConfig); + } + + @Override + public Response send(Request request) throws Exception { + // 简单的测试实现 + return null; + } + } + + /** + * 测试用的服务接口 + */ + private interface TestService { + String testMethod(String input); + } +} \ No newline at end of file diff --git a/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/common/TRpcServerTest.java b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/common/TRpcServerTest.java index a9aa1ed59..64983c84a 100644 --- a/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/common/TRpcServerTest.java +++ b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/common/TRpcServerTest.java @@ -40,6 +40,7 @@ import com.tencent.trpc.proto.standard.common.HelloRequestProtocol.HelloRequest; import com.tencent.trpc.proto.standard.common.HelloRequestProtocol.HelloResponse; import com.tencent.trpc.proto.standard.server.StandardRpcServerFactory; +import com.tencent.trpc.proto.support.DefResponseFutureManager; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; @@ -114,6 +115,7 @@ public void after() { } catch (InterruptedException e) { e.printStackTrace(); } + DefResponseFutureManager.reset(); } @Test @@ -463,6 +465,7 @@ public void testCommonMethod() { public void testDefaultMethod() { HelloRequest.Builder builder = HelloRequest.newBuilder(); builder.setMessage(ByteString.copyFromUtf8("hello")); + DefResponseFutureManager.reset(); HelloServiceApi serviceApi = helloClientConfig.getProxy(); HelloResponse response = serviceApi.doDefaultMethod(new RpcClientContext(), builder.build()); Assert.assertEquals(response.getMessage().toStringUtf8(), "this is default method"); diff --git a/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/TcpConcurrentTest.java b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/TcpConcurrentTest.java index 15c5f0f7e..6888593ee 100644 --- a/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/TcpConcurrentTest.java +++ b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/TcpConcurrentTest.java @@ -22,6 +22,7 @@ import com.tencent.trpc.core.rpc.RpcClientContext; import com.tencent.trpc.core.utils.RpcContextUtils; import com.tencent.trpc.proto.standard.common.HelloRequestProtocol.HelloRequest; +import com.tencent.trpc.proto.support.DefResponseFutureManager; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -68,6 +69,7 @@ public void testUdpConcurrentTest() throws InterruptedException { List results = new ArrayList<>(); for (int i = 0; i < concurrent; i++) { BackendConfig config = new BackendConfig(); + DefResponseFutureManager.reset(); config.setNamingUrl("ip://127.0.0.1:" + TCP_PORT); config.setConnsPerAddr(1); config.setNetwork("tcp"); diff --git a/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/UdpConcurrentTest.java b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/UdpConcurrentTest.java index 80e4dcaaa..71e13298d 100644 --- a/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/UdpConcurrentTest.java +++ b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/UdpConcurrentTest.java @@ -22,6 +22,7 @@ import com.tencent.trpc.core.rpc.RpcClientContext; import com.tencent.trpc.core.utils.RpcContextUtils; import com.tencent.trpc.proto.standard.common.HelloRequestProtocol; +import com.tencent.trpc.proto.support.DefResponseFutureManager; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -67,6 +68,7 @@ public void testUdpConcurrentTest() throws InterruptedException { List results = new ArrayList(); for (int i = 0; i < concurrent; i++) { BackendConfig config = new BackendConfig(); + DefResponseFutureManager.reset(); config.setNamingUrl("ip://127.0.0.1:" + UDP_PORT); config.setConnsPerAddr(1); config.setNetwork("udp"); @@ -114,7 +116,7 @@ public void run() { } private void startServer() { - // 1)服务接口配置 + // 1) Service interface configuration ProviderConfig providerConfig = new ProviderConfig<>(); providerConfig.setRef(new ConcurrentTestServiceImpl()); serviceConfig = getServiceConfig(); diff --git a/trpc-proto/trpc-rpc-support/src/main/java/com/tencent/trpc/proto/support/DefResponseFutureManager.java b/trpc-proto/trpc-rpc-support/src/main/java/com/tencent/trpc/proto/support/DefResponseFutureManager.java index 8d2884e1e..df1af9260 100644 --- a/trpc-proto/trpc-rpc-support/src/main/java/com/tencent/trpc/proto/support/DefResponseFutureManager.java +++ b/trpc-proto/trpc-rpc-support/src/main/java/com/tencent/trpc/proto/support/DefResponseFutureManager.java @@ -23,6 +23,8 @@ import com.tencent.trpc.core.rpc.TimeoutManager; import com.tencent.trpc.core.rpc.def.DefTimeoutManager; import com.tencent.trpc.core.transport.ClientTransport; +import com.tencent.trpc.core.common.ConfigManager; +import com.tencent.trpc.core.common.ShutdownListener; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -36,14 +38,24 @@ public class DefResponseFutureManager { private static final Logger LOG = LoggerFactory.getLogger(DefResponseFutureManager.class); + /** * Watcher for timeouts */ - private static final TimeoutManager TIMEOUT_MANAGER = new DefTimeoutManager(10); + private static TimeoutManager TIMEOUT_MANAGER = new DefTimeoutManager(10); /** * Store */ private final ConcurrentMap futureMap = new ConcurrentHashMap<>(); + + /** + * Internal shutdown listener that handles the shutdown of this manager + */ + private final ShutdownListener shutdownListener = new InternalShutdownListener(); + + public DefResponseFutureManager() { + ConfigManager.getInstance().registerShutdownListener(shutdownListener); + } /** * Create a {@link DefResponseFuture} for a tRPC request @@ -152,6 +164,33 @@ public void stop() { TIMEOUT_MANAGER.close(); } + /** + * Called when the container is reset. + */ + public static void reset() { + TIMEOUT_MANAGER = new DefTimeoutManager(10); + } + + /** + * Get the internal shutdown listener for testing purposes + * + * @return the internal shutdown listener + */ + public ShutdownListener getShutdownListener() { + return shutdownListener; + } + + /** + * Internal shutdown listener implementation + */ + private class InternalShutdownListener implements ShutdownListener { + @Override + public void onShutdown() { + LOG.info("DefResponseFutureManager received shutdown notification"); + DefResponseFutureManager.this.stop(); + } + } + /** * Add a watcher for timeout check of the {@link DefResponseFuture} */ @@ -181,4 +220,4 @@ private void watchTimeout(final DefResponseFuture future) { }, timeoutMills); future.setTimeoutFuture(watch); } -} +} \ No newline at end of file diff --git a/trpc-proto/trpc-rpc-support/src/test/java/com/tencent/trpc/proto/support/DefResponseFutureTest.java b/trpc-proto/trpc-rpc-support/src/test/java/com/tencent/trpc/proto/support/DefResponseFutureTest.java index b3fe2b180..14ff7affb 100644 --- a/trpc-proto/trpc-rpc-support/src/test/java/com/tencent/trpc/proto/support/DefResponseFutureTest.java +++ b/trpc-proto/trpc-rpc-support/src/test/java/com/tencent/trpc/proto/support/DefResponseFutureTest.java @@ -16,6 +16,7 @@ import com.tencent.trpc.core.common.config.ConsumerConfig; import com.tencent.trpc.core.common.config.ProtocolConfig; +import com.tencent.trpc.core.common.ShutdownListener; import com.tencent.trpc.core.exception.ErrorCode; import com.tencent.trpc.core.exception.TRpcException; import com.tencent.trpc.core.rpc.ConsumerInvoker; @@ -100,6 +101,13 @@ public void test() throws Exception { } manager.stop(); + + // Test internal shutdown listener + ShutdownListener shutdownListener = manager.getShutdownListener(); + assertTrue(shutdownListener != null); + + // Test shutdown listener functionality + shutdownListener.onShutdown(); } private class TestClientCodec extends ClientCodec {