diff --git a/dubbo-distribution/dubbo-bom/pom.xml b/dubbo-distribution/dubbo-bom/pom.xml index b2d1a2492072..b822946faf24 100644 --- a/dubbo-distribution/dubbo-bom/pom.xml +++ b/dubbo-distribution/dubbo-bom/pom.xml @@ -128,6 +128,11 @@ dubbo-rpc-triple ${project.version} + + org.apache.dubbo + dubbo-rpc-rocketmq + ${project.version} + org.apache.dubbo dubbo-registry-api diff --git a/dubbo-rpc/dubbo-rpc-rocketmq/pom.xml b/dubbo-rpc/dubbo-rpc-rocketmq/pom.xml index cf018ed9e920..89b9ca6246a0 100644 --- a/dubbo-rpc/dubbo-rpc-rocketmq/pom.xml +++ b/dubbo-rpc/dubbo-rpc-rocketmq/pom.xml @@ -32,5 +32,41 @@ rocketmq-client 4.9.2 - + + org.apache.dubbo + dubbo-serialization-hessian2 + ${revision} + true + + + org.apache.dubbo + dubbo-serialization-jdk + ${project.parent.version} + test + + + org.powermock + powermock-module-junit4 + 2.0.7 + test + + + org.powermock + powermock-api-mockito2 + 2.0.7 + test + + + org.powermock + powermock-reflect + 2.0.7 + test + + + junit + junit + 4.13.1 + test + + diff --git a/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolServerTest.java b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolServerTest.java new file mode 100644 index 000000000000..e3ffe34c83d2 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolServerTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.rpc.rocketmq; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({RocketMQProtocolServer.class}) +public class RocketMQProtocolServerTest { + + private static RocketMQProtocolServer rocketMQProtocolServer; + private static String ROCKETMQ_URL_TEMPLATE = "rocketmq://127.0.0.1:9876"; + + private static URL serverUrl; + + @Before + public void setUp() { + // init + rocketMQProtocolServer = PowerMockito.spy(new RocketMQProtocolServer()); + + // set serverUrl + Map paramMap = new HashMap<>(); + paramMap.put("brokerName", "broker-a"); + paramMap.put("topic", RocketMQProtocolConstant.DUBBO_DEFAULT_PROTOCOL_TOPIC); + paramMap.put("groupModel", "select"); + paramMap.put("group", "DEFAULT_GROUP"); + paramMap.put("version", "4.9.2"); + paramMap.put("corethreads", "1"); + paramMap.put("threads", "1"); + serverUrl = URL.valueOf(ROCKETMQ_URL_TEMPLATE + "?" + TestRocketMQUtils.mapToString(paramMap)); + } + + @After + public void tearDown() { + // release + rocketMQProtocolServer = null; + } + + @Test + public void testCreateConsumer() throws Exception { + Whitebox.setInternalState(rocketMQProtocolServer, "messageListenerConcurrently", Mockito.mock(MessageListenerConcurrently.class)); + rocketMQProtocolServer.reset(serverUrl); + DefaultMQPushConsumer defaultMQPushConsumer = PowerMockito.mock(DefaultMQPushConsumer.class); + PowerMockito.whenNew(DefaultMQPushConsumer.class).withAnyArguments().thenReturn(defaultMQPushConsumer); + PowerMockito.doNothing().when(defaultMQPushConsumer).start(); + + rocketMQProtocolServer.createConsumer(); + Assertions.assertNotNull(rocketMQProtocolServer.getDefaultMQPushConsumer()); + } + + @Test + public void testReset() throws Exception { + DefaultMQPushConsumer defaultMQPushConsumer = PowerMockito.mock(DefaultMQPushConsumer.class); + PowerMockito.whenNew(DefaultMQPushConsumer.class).withArguments(String.class, String.class).thenReturn(defaultMQPushConsumer); + PowerMockito.doNothing().when(rocketMQProtocolServer).createConsumer(); + Whitebox.setInternalState(rocketMQProtocolServer, "model", CommonConstants.PROVIDER); + + rocketMQProtocolServer.reset(serverUrl); + Assertions.assertNotNull(rocketMQProtocolServer.getDefaultMQProducer()); + } + +} diff --git a/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolTest.java b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolTest.java new file mode 100644 index 000000000000..70ec5b6d99fa --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.rpc.rocketmq; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.Invoker; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({RocketMQProtocol.class}) +public class RocketMQProtocolTest { + + private static RocketMQProtocolServer rocketMQProtocolServer; + private static RocketMQProtocol rocketMQProtocol; + private static String ROCKETMQ_URL_TEMPLATE = "rocketmq://127.0.0.1:9876"; + private static URL registryUrl; + + + @Before + public void setUp() { + // init + rocketMQProtocol = PowerMockito.spy(new RocketMQProtocol()); + rocketMQProtocolServer = PowerMockito.spy(new RocketMQProtocolServer()); + } + + @After + public void tearDown() { + // release + rocketMQProtocol = null; + rocketMQProtocolServer = null; + } + + @Test + public void testExport() throws Exception { + // set serverUrl + Map paramMap = new HashMap<>(); + paramMap.put("brokerName", "broker-a"); + paramMap.put("topic", RocketMQProtocolConstant.DUBBO_DEFAULT_PROTOCOL_TOPIC); + paramMap.put("groupModel", "select"); + paramMap.put("group", "DEFAULT_GROUP"); + paramMap.put("version", "4.9.2"); + paramMap.put("corethreads", "1"); + paramMap.put("threads", "1"); + registryUrl = URL.valueOf(ROCKETMQ_URL_TEMPLATE + "?" + TestRocketMQUtils.mapToString(paramMap)); + + Invoker invoker = PowerMockito.mock(Invoker.class); + PowerMockito.when(invoker.getUrl()).thenReturn(registryUrl); + PowerMockito.when(invoker.getInterface()).thenReturn(RocketMQInvoker.class); + + PowerMockito.doReturn(rocketMQProtocolServer).when(rocketMQProtocol, "openServer", registryUrl, "provider"); + PowerMockito.doReturn(Mockito.mock(DefaultMQPushConsumer.class)).when(rocketMQProtocolServer, "getDefaultMQPushConsumer"); + + PowerMockito.when(rocketMQProtocol.export(invoker)).thenCallRealMethod(); + } + +} diff --git a/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/TestRocketMQUtils.java b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/TestRocketMQUtils.java new file mode 100644 index 000000000000..64e188e496b9 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/TestRocketMQUtils.java @@ -0,0 +1,17 @@ +package org.apache.dubbo.rpc.rocketmq; + +import java.util.Map; + +public class TestRocketMQUtils { + public static String mapToString(Map paramMap) { + StringBuilder parameters = new StringBuilder(); + for (Map.Entry entry : paramMap.entrySet()) { + parameters.append(entry.getKey()); + parameters.append("="); + parameters.append(entry.getValue()); + parameters.append("&;"); + } + return parameters.toString(); + } + +}