Skip to content

Commit 4df5c3f

Browse files
committed
update commands and queries to listen from default app doman
1 parent e9e0fb8 commit 4df5c3f

File tree

8 files changed

+54
-52
lines changed

8 files changed

+54
-52
lines changed

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package org.reactivecommons.async.rabbit.config;
22

33
import lombok.RequiredArgsConstructor;
4-
import org.reactivecommons.async.commons.DiscardNotifier;
5-
import org.reactivecommons.async.rabbit.HandlerResolver;
6-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
7-
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
84
import org.reactivecommons.async.commons.converters.MessageConverter;
95
import org.reactivecommons.async.commons.ext.CustomReporter;
6+
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
107
import org.reactivecommons.async.rabbit.listeners.ApplicationCommandListener;
118
import org.springframework.beans.factory.annotation.Value;
129
import org.springframework.context.annotation.Bean;
1310
import org.springframework.context.annotation.Configuration;
1411
import org.springframework.context.annotation.Import;
1512

13+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
14+
1615
@Configuration
1716
@RequiredArgsConstructor
1817
@Import(RabbitMqConfig.class)
@@ -24,13 +23,12 @@ public class CommandListenersConfig {
2423
private final AsyncProps asyncProps;
2524

2625
@Bean
27-
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
28-
HandlerResolver resolver, MessageConverter converter,
29-
DiscardNotifier discardNotifier,
26+
public ApplicationCommandListener applicationCommandListener(ConnectionManager manager,
27+
MessageConverter converter,
3028
CustomReporter errorReporter) {
31-
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
29+
ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, manager.getHandlerResolver(DEFAULT_DOMAIN),
3230
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
33-
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier, errorReporter);
31+
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);
3432

3533
commandListener.startListener();
3634

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, Re
2929
return new RabbitEDADirectAsyncGateway(config, router, manager, props.getDirectMessagesExchangeName(), converter, meterRegistry);
3030
}
3131

32-
@Bean
32+
@Bean // TODO: Listen replies from n domains if enabled in config
3333
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ConnectionManager manager) {
3434
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(DEFAULT_DOMAIN), props.getReplyQueue());
3535
replyListener.startListening(config.getRoutingKey());

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfig.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package org.reactivecommons.async.rabbit.config;
22

33
import lombok.RequiredArgsConstructor;
4-
import org.reactivecommons.async.commons.DiscardNotifier;
5-
import org.reactivecommons.async.rabbit.HandlerResolver;
6-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
7-
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
84
import org.reactivecommons.async.commons.converters.MessageConverter;
95
import org.reactivecommons.async.commons.ext.CustomReporter;
6+
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
107
import org.reactivecommons.async.rabbit.listeners.ApplicationNotificationListener;
118
import org.springframework.beans.factory.annotation.Value;
129
import org.springframework.context.annotation.Bean;
1310
import org.springframework.context.annotation.Configuration;
1411
import org.springframework.context.annotation.Import;
1512

13+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
14+
1615
@Configuration
1716
@RequiredArgsConstructor
1817
@Import(RabbitMqConfig.class)
@@ -24,15 +23,16 @@ public class NotificacionListenersConfig {
2423
private final AsyncProps asyncProps;
2524

2625
@Bean
27-
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
28-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
26+
public ApplicationNotificationListener eventNotificationListener(ConnectionManager manager,
27+
MessageConverter messageConverter,
28+
CustomReporter errorReporter) {
2929
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
30-
receiver,
30+
manager.getListener(DEFAULT_DOMAIN),
3131
asyncProps.getDomain().getEvents().getExchange(),
3232
asyncProps.getNotificationProps().getQueueName(appName),
33-
resolver,
33+
manager.getHandlerResolver(DEFAULT_DOMAIN),
3434
messageConverter,
35-
discardNotifier,
35+
manager.getDiscardNotifier(DEFAULT_DOMAIN),
3636
errorReporter);
3737
listener.startListener();
3838
return listener;

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/QueryListenerConfig.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
package org.reactivecommons.async.rabbit.config;
22

33
import lombok.RequiredArgsConstructor;
4-
import org.reactivecommons.async.commons.DiscardNotifier;
5-
import org.reactivecommons.async.rabbit.HandlerResolver;
6-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
7-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
8-
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
94
import org.reactivecommons.async.commons.converters.MessageConverter;
105
import org.reactivecommons.async.commons.ext.CustomReporter;
6+
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
117
import org.reactivecommons.async.rabbit.listeners.ApplicationQueryListener;
128
import org.springframework.beans.factory.annotation.Value;
139
import org.springframework.context.annotation.Bean;
1410
import org.springframework.context.annotation.Configuration;
1511
import org.springframework.context.annotation.Import;
1612

13+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
14+
1715
@Configuration
1816
@RequiredArgsConstructor
1917
@Import(RabbitMqConfig.class)
@@ -25,15 +23,14 @@ public class QueryListenerConfig {
2523
private final AsyncProps asyncProps;
2624

2725
@Bean
28-
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
29-
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
30-
DiscardNotifier discardNotifier,
26+
public ApplicationQueryListener queryListener(MessageConverter converter,
27+
ConnectionManager manager,
3128
CustomReporter errorReporter) {
32-
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
33-
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
29+
final ApplicationQueryListener listener = new ApplicationQueryListener(manager.getListener(DEFAULT_DOMAIN),
30+
appName + ".query", manager.getHandlerResolver(DEFAULT_DOMAIN), manager.getSender(DEFAULT_DOMAIN), asyncProps.getDirect().getExchange(), converter,
3431
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
35-
asyncProps.getRetryDelay(),asyncProps.getGlobal().getMaxLengthBytes(),
36-
asyncProps.getDirect().isDiscardTimeoutQueries(), discardNotifier, errorReporter);
32+
asyncProps.getRetryDelay(), asyncProps.getGlobal().getMaxLengthBytes(),
33+
asyncProps.getDirect().isDiscardTimeoutQueries(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter);
3734

3835
listener.startListener();
3936

async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import org.junit.jupiter.api.Test;
77
import org.junit.jupiter.api.extension.ExtendWith;
88
import org.mockito.junit.jupiter.MockitoExtension;
9-
import org.reactivecommons.async.commons.DiscardNotifier;
109
import org.reactivecommons.async.commons.converters.MessageConverter;
1110
import org.reactivecommons.async.commons.ext.CustomReporter;
1211
import org.reactivecommons.async.rabbit.HandlerResolver;
@@ -26,6 +25,7 @@
2625
import static org.mockito.ArgumentMatchers.any;
2726
import static org.mockito.Mockito.mock;
2827
import static org.mockito.Mockito.when;
28+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
2929

3030
@ExtendWith(MockitoExtension.class)
3131
class CommandListenersConfigTest {
@@ -37,9 +37,9 @@ class CommandListenersConfigTest {
3737
private final TopologyCreator creator = mock(TopologyCreator.class);
3838
private final HandlerResolver handlerResolver = mock(HandlerResolver.class);
3939
private final MessageConverter messageConverter = mock(MessageConverter.class);
40-
private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class);
4140
private final CustomReporter customReporter = mock(CustomReporter.class);
4241
private final Receiver receiver = mock(Receiver.class);
42+
private final ConnectionManager manager = new ConnectionManager();
4343

4444
@BeforeEach
4545
public void init() throws NoSuchFieldException, IllegalAccessException {
@@ -53,17 +53,12 @@ public void init() throws NoSuchFieldException, IllegalAccessException {
5353
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
5454
when(listener.getReceiver()).thenReturn(receiver);
5555
when(listener.getMaxConcurrency()).thenReturn(20);
56+
manager.addDomain(DEFAULT_DOMAIN, listener, null, handlerResolver);
5657
}
5758

5859
@Test
5960
void applicationCommandListener() {
60-
final ApplicationCommandListener commandListener = config.applicationCommandListener(
61-
listener,
62-
handlerResolver,
63-
messageConverter,
64-
discardNotifier,
65-
customReporter
66-
);
61+
final ApplicationCommandListener commandListener = config.applicationCommandListener(manager, messageConverter, customReporter);
6762
Assertions.assertThat(commandListener).isNotNull();
6863
}
6964
}

async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,27 @@
55
import org.junit.jupiter.api.BeforeEach;
66
import org.junit.jupiter.api.Test;
77
import org.reactivecommons.async.commons.DiscardNotifier;
8+
import org.reactivecommons.async.commons.converters.MessageConverter;
9+
import org.reactivecommons.async.commons.ext.CustomReporter;
810
import org.reactivecommons.async.rabbit.HandlerResolver;
911
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1012
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1113
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
12-
import org.reactivecommons.async.commons.converters.MessageConverter;
13-
import org.reactivecommons.async.commons.ext.CustomReporter;
1414
import org.reactivecommons.async.rabbit.listeners.ApplicationNotificationListener;
1515
import reactor.core.publisher.Flux;
1616
import reactor.core.publisher.Mono;
17-
import reactor.rabbitmq.*;
17+
import reactor.rabbitmq.BindingSpecification;
18+
import reactor.rabbitmq.ConsumeOptions;
19+
import reactor.rabbitmq.ExchangeSpecification;
20+
import reactor.rabbitmq.QueueSpecification;
21+
import reactor.rabbitmq.Receiver;
1822

1923
import java.util.Collections;
2024

2125
import static org.mockito.ArgumentMatchers.any;
2226
import static org.mockito.Mockito.mock;
2327
import static org.mockito.Mockito.when;
28+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
2429

2530
class NotificacionListenersConfigTest {
2631

@@ -33,6 +38,7 @@ class NotificacionListenersConfigTest {
3338
private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class);
3439
private final CustomReporter customReporter = mock(CustomReporter.class);
3540
private final Receiver receiver = mock(Receiver.class);
41+
private final ConnectionManager manager = new ConnectionManager();
3642

3743
@BeforeEach
3844
public void init() {
@@ -47,12 +53,13 @@ public void init() {
4753
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
4854
when(listener.getReceiver()).thenReturn(receiver);
4955
when(listener.getMaxConcurrency()).thenReturn(20);
56+
manager.addDomain(DEFAULT_DOMAIN, listener, null, handlerResolver);
5057
}
5158

5259
@Test
5360
void eventNotificationListener() {
54-
final ApplicationNotificationListener applicationEventListener = config.
55-
eventNotificationListener(handlerResolver, messageConverter, listener, discardNotifier, customReporter);
61+
final ApplicationNotificationListener applicationEventListener =
62+
config.eventNotificationListener(manager, messageConverter, customReporter);
5663
Assertions.assertThat(applicationEventListener).isNotNull();
5764
}
5865
}

async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,28 @@
44
import org.assertj.core.api.Assertions;
55
import org.junit.jupiter.api.BeforeEach;
66
import org.junit.jupiter.api.Test;
7-
import org.reactivecommons.async.commons.DiscardNotifier;
7+
import org.reactivecommons.async.commons.converters.MessageConverter;
8+
import org.reactivecommons.async.commons.ext.CustomReporter;
89
import org.reactivecommons.async.rabbit.HandlerResolver;
910
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1011
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1112
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1213
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
13-
import org.reactivecommons.async.commons.converters.MessageConverter;
14-
import org.reactivecommons.async.commons.ext.CustomReporter;
1514
import org.reactivecommons.async.rabbit.listeners.ApplicationQueryListener;
1615
import reactor.core.publisher.Flux;
1716
import reactor.core.publisher.Mono;
18-
import reactor.rabbitmq.*;
17+
import reactor.rabbitmq.BindingSpecification;
18+
import reactor.rabbitmq.ConsumeOptions;
19+
import reactor.rabbitmq.ExchangeSpecification;
20+
import reactor.rabbitmq.QueueSpecification;
21+
import reactor.rabbitmq.Receiver;
1922

2023
import java.util.Collections;
2124

2225
import static org.mockito.ArgumentMatchers.any;
2326
import static org.mockito.Mockito.mock;
2427
import static org.mockito.Mockito.when;
28+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
2529

2630
class QueryListenerConfigTest {
2731

@@ -31,10 +35,10 @@ class QueryListenerConfigTest {
3135
private final TopologyCreator creator = mock(TopologyCreator.class);
3236
private final HandlerResolver handlerResolver = mock(HandlerResolver.class);
3337
private final MessageConverter messageConverter = mock(MessageConverter.class);
34-
private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class);
3538
private final CustomReporter customReporter = mock(CustomReporter.class);
3639
private final Receiver receiver = mock(Receiver.class);
3740
private final ReactiveMessageSender sender = mock(ReactiveMessageSender.class);
41+
private final ConnectionManager manager = new ConnectionManager();
3842

3943
@BeforeEach
4044
public void init() {
@@ -49,11 +53,12 @@ public void init() {
4953
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
5054
when(listener.getReceiver()).thenReturn(receiver);
5155
when(listener.getMaxConcurrency()).thenReturn(20);
56+
manager.addDomain(DEFAULT_DOMAIN, listener, sender, handlerResolver);
5257
}
5358

5459
@Test
5560
void queryListener() {
56-
final ApplicationQueryListener queryListener = config.queryListener(messageConverter, handlerResolver, sender, listener, discardNotifier, customReporter);
61+
final ApplicationQueryListener queryListener = config.queryListener(messageConverter, manager, customReporter);
5762
Assertions.assertThat(queryListener).isNotNull();
5863
}
5964
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ version=2.0.0
22
springBootVersion=3.0.2
33
reactorRabbitVersion=1.5.5
44
gradleVersionsVersion=0.36.0
5-
toPublish=async-commons,async-commons-api,async-commons-rabbit-standalone,async-commons-rabbit-starter,domain-events-api,async-rabbit
5+
toPublish=async-commons,async-commons-api,async-commons-rabbit-standalone,async-commons-rabbit-starter,async-commons-rabbit-starter-eda,domain-events-api,async-rabbit

0 commit comments

Comments
 (0)