diff --git a/dubbo-distribution/dubbo-bom/pom.xml b/dubbo-distribution/dubbo-bom/pom.xml
index b2d1a2492072..b822946faf24 100644
--- a/dubbo-distribution/dubbo-bom/pom.xml
+++ b/dubbo-distribution/dubbo-bom/pom.xml
@@ -128,6 +128,11 @@
dubbo-rpc-triple
${project.version}
+
+ org.apache.dubbo
+ dubbo-rpc-rocketmq
+ ${project.version}
+
org.apache.dubbo
dubbo-registry-api
diff --git a/dubbo-rpc/dubbo-rpc-rocketmq/pom.xml b/dubbo-rpc/dubbo-rpc-rocketmq/pom.xml
index cf018ed9e920..89b9ca6246a0 100644
--- a/dubbo-rpc/dubbo-rpc-rocketmq/pom.xml
+++ b/dubbo-rpc/dubbo-rpc-rocketmq/pom.xml
@@ -32,5 +32,41 @@
rocketmq-client
4.9.2
-
+
+ org.apache.dubbo
+ dubbo-serialization-hessian2
+ ${revision}
+ true
+
+
+ org.apache.dubbo
+ dubbo-serialization-jdk
+ ${project.parent.version}
+ test
+
+
+ org.powermock
+ powermock-module-junit4
+ 2.0.7
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ 2.0.7
+ test
+
+
+ org.powermock
+ powermock-reflect
+ 2.0.7
+ test
+
+
+ junit
+ junit
+ 4.13.1
+ test
+
+
diff --git a/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolServerTest.java b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolServerTest.java
new file mode 100644
index 000000000000..e3ffe34c83d2
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolServerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.rocketmq;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({RocketMQProtocolServer.class})
+public class RocketMQProtocolServerTest {
+
+ private static RocketMQProtocolServer rocketMQProtocolServer;
+ private static String ROCKETMQ_URL_TEMPLATE = "rocketmq://127.0.0.1:9876";
+
+ private static URL serverUrl;
+
+ @Before
+ public void setUp() {
+ // init
+ rocketMQProtocolServer = PowerMockito.spy(new RocketMQProtocolServer());
+
+ // set serverUrl
+ Map paramMap = new HashMap<>();
+ paramMap.put("brokerName", "broker-a");
+ paramMap.put("topic", RocketMQProtocolConstant.DUBBO_DEFAULT_PROTOCOL_TOPIC);
+ paramMap.put("groupModel", "select");
+ paramMap.put("group", "DEFAULT_GROUP");
+ paramMap.put("version", "4.9.2");
+ paramMap.put("corethreads", "1");
+ paramMap.put("threads", "1");
+ serverUrl = URL.valueOf(ROCKETMQ_URL_TEMPLATE + "?" + TestRocketMQUtils.mapToString(paramMap));
+ }
+
+ @After
+ public void tearDown() {
+ // release
+ rocketMQProtocolServer = null;
+ }
+
+ @Test
+ public void testCreateConsumer() throws Exception {
+ Whitebox.setInternalState(rocketMQProtocolServer, "messageListenerConcurrently", Mockito.mock(MessageListenerConcurrently.class));
+ rocketMQProtocolServer.reset(serverUrl);
+ DefaultMQPushConsumer defaultMQPushConsumer = PowerMockito.mock(DefaultMQPushConsumer.class);
+ PowerMockito.whenNew(DefaultMQPushConsumer.class).withAnyArguments().thenReturn(defaultMQPushConsumer);
+ PowerMockito.doNothing().when(defaultMQPushConsumer).start();
+
+ rocketMQProtocolServer.createConsumer();
+ Assertions.assertNotNull(rocketMQProtocolServer.getDefaultMQPushConsumer());
+ }
+
+ @Test
+ public void testReset() throws Exception {
+ DefaultMQPushConsumer defaultMQPushConsumer = PowerMockito.mock(DefaultMQPushConsumer.class);
+ PowerMockito.whenNew(DefaultMQPushConsumer.class).withArguments(String.class, String.class).thenReturn(defaultMQPushConsumer);
+ PowerMockito.doNothing().when(rocketMQProtocolServer).createConsumer();
+ Whitebox.setInternalState(rocketMQProtocolServer, "model", CommonConstants.PROVIDER);
+
+ rocketMQProtocolServer.reset(serverUrl);
+ Assertions.assertNotNull(rocketMQProtocolServer.getDefaultMQProducer());
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolTest.java b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolTest.java
new file mode 100644
index 000000000000..70ec5b6d99fa
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.rocketmq;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({RocketMQProtocol.class})
+public class RocketMQProtocolTest {
+
+ private static RocketMQProtocolServer rocketMQProtocolServer;
+ private static RocketMQProtocol rocketMQProtocol;
+ private static String ROCKETMQ_URL_TEMPLATE = "rocketmq://127.0.0.1:9876";
+ private static URL registryUrl;
+
+
+ @Before
+ public void setUp() {
+ // init
+ rocketMQProtocol = PowerMockito.spy(new RocketMQProtocol());
+ rocketMQProtocolServer = PowerMockito.spy(new RocketMQProtocolServer());
+ }
+
+ @After
+ public void tearDown() {
+ // release
+ rocketMQProtocol = null;
+ rocketMQProtocolServer = null;
+ }
+
+ @Test
+ public void testExport() throws Exception {
+ // set serverUrl
+ Map paramMap = new HashMap<>();
+ paramMap.put("brokerName", "broker-a");
+ paramMap.put("topic", RocketMQProtocolConstant.DUBBO_DEFAULT_PROTOCOL_TOPIC);
+ paramMap.put("groupModel", "select");
+ paramMap.put("group", "DEFAULT_GROUP");
+ paramMap.put("version", "4.9.2");
+ paramMap.put("corethreads", "1");
+ paramMap.put("threads", "1");
+ registryUrl = URL.valueOf(ROCKETMQ_URL_TEMPLATE + "?" + TestRocketMQUtils.mapToString(paramMap));
+
+ Invoker invoker = PowerMockito.mock(Invoker.class);
+ PowerMockito.when(invoker.getUrl()).thenReturn(registryUrl);
+ PowerMockito.when(invoker.getInterface()).thenReturn(RocketMQInvoker.class);
+
+ PowerMockito.doReturn(rocketMQProtocolServer).when(rocketMQProtocol, "openServer", registryUrl, "provider");
+ PowerMockito.doReturn(Mockito.mock(DefaultMQPushConsumer.class)).when(rocketMQProtocolServer, "getDefaultMQPushConsumer");
+
+ PowerMockito.when(rocketMQProtocol.export(invoker)).thenCallRealMethod();
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/TestRocketMQUtils.java b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/TestRocketMQUtils.java
new file mode 100644
index 000000000000..64e188e496b9
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-rocketmq/src/test/java/org/apache/dubbo/rpc/rocketmq/TestRocketMQUtils.java
@@ -0,0 +1,17 @@
+package org.apache.dubbo.rpc.rocketmq;
+
+import java.util.Map;
+
+public class TestRocketMQUtils {
+ public static String mapToString(Map paramMap) {
+ StringBuilder parameters = new StringBuilder();
+ for (Map.Entry entry : paramMap.entrySet()) {
+ parameters.append(entry.getKey());
+ parameters.append("=");
+ parameters.append(entry.getValue());
+ parameters.append("&;");
+ }
+ return parameters.toString();
+ }
+
+}