Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dubbo-distribution/dubbo-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
<artifactId>dubbo-rpc-triple</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-rpc-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-api</artifactId>
Expand Down
38 changes: 37 additions & 1 deletion dubbo-rpc/dubbo-rpc-rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,41 @@
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
</dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-serialization-hessian2</artifactId>
<version>${revision}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-serialization-jdk</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>2.0.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>2.0.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-reflect</artifactId>
<version>2.0.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.rocketmq;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

@RunWith(PowerMockRunner.class)
@PrepareForTest({RocketMQProtocolServer.class})
public class RocketMQProtocolServerTest {

private static RocketMQProtocolServer rocketMQProtocolServer;
private static String ROCKETMQ_URL_TEMPLATE = "rocketmq://127.0.0.1:9876";

private static URL serverUrl;

@Before
public void setUp() {
// init
rocketMQProtocolServer = PowerMockito.spy(new RocketMQProtocolServer());

// set serverUrl
Map<String, String> paramMap = new HashMap<>();
paramMap.put("brokerName", "broker-a");
paramMap.put("topic", RocketMQProtocolConstant.DUBBO_DEFAULT_PROTOCOL_TOPIC);
paramMap.put("groupModel", "select");
paramMap.put("group", "DEFAULT_GROUP");
paramMap.put("version", "4.9.2");
paramMap.put("corethreads", "1");
paramMap.put("threads", "1");
serverUrl = URL.valueOf(ROCKETMQ_URL_TEMPLATE + "?" + TestRocketMQUtils.mapToString(paramMap));
}

@After
public void tearDown() {
// release
rocketMQProtocolServer = null;
}

@Test
public void testCreateConsumer() throws Exception {
Whitebox.setInternalState(rocketMQProtocolServer, "messageListenerConcurrently", Mockito.mock(MessageListenerConcurrently.class));
rocketMQProtocolServer.reset(serverUrl);
DefaultMQPushConsumer defaultMQPushConsumer = PowerMockito.mock(DefaultMQPushConsumer.class);
PowerMockito.whenNew(DefaultMQPushConsumer.class).withAnyArguments().thenReturn(defaultMQPushConsumer);
PowerMockito.doNothing().when(defaultMQPushConsumer).start();

rocketMQProtocolServer.createConsumer();
Assertions.assertNotNull(rocketMQProtocolServer.getDefaultMQPushConsumer());
}

@Test
public void testReset() throws Exception {
DefaultMQPushConsumer defaultMQPushConsumer = PowerMockito.mock(DefaultMQPushConsumer.class);
PowerMockito.whenNew(DefaultMQPushConsumer.class).withArguments(String.class, String.class).thenReturn(defaultMQPushConsumer);
PowerMockito.doNothing().when(rocketMQProtocolServer).createConsumer();
Whitebox.setInternalState(rocketMQProtocolServer, "model", CommonConstants.PROVIDER);

rocketMQProtocolServer.reset(serverUrl);
Assertions.assertNotNull(rocketMQProtocolServer.getDefaultMQProducer());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.rocketmq;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invoker;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import java.util.HashMap;
import java.util.Map;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PrepareForTest({RocketMQProtocol.class})
public class RocketMQProtocolTest {

private static RocketMQProtocolServer rocketMQProtocolServer;
private static RocketMQProtocol rocketMQProtocol;
private static String ROCKETMQ_URL_TEMPLATE = "rocketmq://127.0.0.1:9876";
private static URL registryUrl;


@Before
public void setUp() {
// init
rocketMQProtocol = PowerMockito.spy(new RocketMQProtocol());
rocketMQProtocolServer = PowerMockito.spy(new RocketMQProtocolServer());
}

@After
public void tearDown() {
// release
rocketMQProtocol = null;
rocketMQProtocolServer = null;
}

@Test
public void testExport() throws Exception {
// set serverUrl
Map<String, String> paramMap = new HashMap<>();
paramMap.put("brokerName", "broker-a");
paramMap.put("topic", RocketMQProtocolConstant.DUBBO_DEFAULT_PROTOCOL_TOPIC);
paramMap.put("groupModel", "select");
paramMap.put("group", "DEFAULT_GROUP");
paramMap.put("version", "4.9.2");
paramMap.put("corethreads", "1");
paramMap.put("threads", "1");
registryUrl = URL.valueOf(ROCKETMQ_URL_TEMPLATE + "?" + TestRocketMQUtils.mapToString(paramMap));

Invoker<RocketMQInvoker> invoker = PowerMockito.mock(Invoker.class);
PowerMockito.when(invoker.getUrl()).thenReturn(registryUrl);
PowerMockito.when(invoker.getInterface()).thenReturn(RocketMQInvoker.class);

PowerMockito.doReturn(rocketMQProtocolServer).when(rocketMQProtocol, "openServer", registryUrl, "provider");
PowerMockito.doReturn(Mockito.mock(DefaultMQPushConsumer.class)).when(rocketMQProtocolServer, "getDefaultMQPushConsumer");

PowerMockito.when(rocketMQProtocol.export(invoker)).thenCallRealMethod();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.dubbo.rpc.rocketmq;

import java.util.Map;

public class TestRocketMQUtils {
public static String mapToString(Map<String, String> paramMap) {
StringBuilder parameters = new StringBuilder();
for (Map.Entry<String, String> entry : paramMap.entrySet()) {
parameters.append(entry.getKey());
parameters.append("=");
parameters.append(entry.getValue());
parameters.append("&;");
}
return parameters.toString();
}

}