Skip to content

Commit ed325ec

Browse files
committed
fix unit tests, update domainEvenListeners references, create shared constant for app events
1 parent 7a10e32 commit ed325ec

File tree

14 files changed

+62
-47
lines changed

14 files changed

+62
-47
lines changed

async/async-commons-api/async-commons-api.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@ dependencies {
88
compileOnly 'io.projectreactor:reactor-core'
99
testImplementation 'io.projectreactor:reactor-test'
1010
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
11-
1211
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
@Getter
2424
@NoArgsConstructor(access = AccessLevel.PACKAGE)
2525
public class HandlerRegistry {
26+
public static final String DEFAULT_LISTENER = "app";
2627
private final Map<String, List<RegisteredEventListener<?>>> domainEventListeners = new ConcurrentHashMap<>();
2728
private final List<RegisteredEventListener<?>> dynamicEventHandlers = new CopyOnWriteArrayList<>();
2829
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
@@ -31,7 +32,9 @@ public class HandlerRegistry {
3132

3233

3334
public static HandlerRegistry register() {
34-
return new HandlerRegistry();
35+
HandlerRegistry instance = new HandlerRegistry();
36+
instance.domainEventListeners.put(DEFAULT_LISTENER, new CopyOnWriteArrayList<>());
37+
return instance;
3538
}
3639

3740
public <T> HandlerRegistry listenDomainEvent(String domain, String eventName, EventHandler<T> handler, Class<T> eventClass) {
@@ -41,7 +44,7 @@ public <T> HandlerRegistry listenDomainEvent(String domain, String eventName, Ev
4144
}
4245

4346
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler, Class<T> eventClass) {
44-
domainEventListeners.computeIfAbsent("app", ignored -> new CopyOnWriteArrayList<>())
47+
domainEventListeners.computeIfAbsent(DEFAULT_LISTENER, ignored -> new CopyOnWriteArrayList<>())
4548
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
4649
return this;
4750
}

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import static org.assertj.core.api.Assertions.assertThat;
1818
import static org.mockito.Mockito.mock;
19+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
1920

2021
class HandlerRegistryTest {
2122
private final HandlerRegistry registry = HandlerRegistry.register();
@@ -27,7 +28,7 @@ void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() {
2728

2829
registry.listenEvent(name, eventHandler);
2930

30-
assertThat(registry.getEventListeners())
31+
assertThat(registry.getDomainEventListeners().get(DEFAULT_LISTENER))
3132
.anySatisfy(registered -> assertThat(registered)
3233
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
3334
.containsExactly(name, SomeDataClass.class, eventHandler)).hasSize(1);
@@ -43,7 +44,7 @@ void shouldRegisterPatternEventHandlerWithTypeInference() {
4344
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
4445
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
4546

46-
assertThat(registry.getEventListeners())
47+
assertThat(registry.getDomainEventListeners().get(DEFAULT_LISTENER))
4748
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
4849
.usingRecursiveComparison()
4950
.isEqualTo(expectedRegisteredEventListener));
@@ -62,7 +63,7 @@ void shouldRegisterPatternEventHandler() {
6263
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
6364
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
6465

65-
assertThat(registry.getEventListeners())
66+
assertThat(registry.getDomainEventListeners().get(DEFAULT_LISTENER))
6667
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
6768
.usingRecursiveComparison()
6869
.isEqualTo(expectedRegisteredEventListener));
@@ -84,7 +85,7 @@ public void listenEvent() {
8485
EventHandler<SomeDataClass> handler = mock(EventHandler.class);
8586
registry.listenEvent(name, handler, SomeDataClass.class);
8687

87-
assertThat(registry.getEventListeners())
88+
assertThat(registry.getDomainEventListeners().get(DEFAULT_LISTENER))
8889
.anySatisfy(registered -> assertThat(registered)
8990
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
9091
.containsExactly(name, SomeDataClass.class, handler)).hasSize(1);

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.springframework.context.annotation.Configuration;
1717
import org.springframework.context.annotation.Import;
1818

19+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
20+
1921
@Configuration
2022
@Import(RabbitMqConfig.class)
2123
@RequiredArgsConstructor
@@ -25,12 +27,12 @@ public class DirectAsyncGatewayConfig {
2527

2628
@Bean
2729
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, MessageConverter converter, MeterRegistry meterRegistry) throws Exception {
28-
return new RabbitDirectAsyncGateway(config, router, manager.getSender("app"), props.getDirectMessagesExchangeName(), converter, meterRegistry);
30+
return new RabbitDirectAsyncGateway(config, router, manager.getSender(DEFAULT_LISTENER), props.getDirectMessagesExchangeName(), converter, meterRegistry);
2931
}
3032

3133
@Bean
3234
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ConnectionManager manager) {
33-
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener("app"), props.getReplyQueue());
35+
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(DEFAULT_LISTENER), props.getReplyQueue());
3436
replyListener.startListening(config.getRoutingKey());
3537
return replyListener;
3638
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventBusConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.springframework.context.annotation.Configuration;
1313
import org.springframework.context.annotation.Import;
1414

15+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
1516
import static reactor.rabbitmq.ExchangeSpecification.exchange;
1617

1718
@Configuration
@@ -21,12 +22,11 @@ public class EventBusConfig {
2122
@Bean // app connection
2223
public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfigProps props, BrokerConfig config,
2324
ObjectMapperSupplier objectMapperSupplier) {
24-
String domain = "app";
25-
ReactiveMessageSender sender = manager.getSender(domain);
25+
ReactiveMessageSender sender = manager.getSender(DEFAULT_LISTENER);
2626
final String exchangeName = props.getDomainEventsExchangeName();
2727
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
2828
DomainEventBus domainEventBus = new RabbitDomainEventBus(sender, exchangeName, config);
29-
manager.setDiscardNotifier(domain, createDiscardNotifier(domainEventBus, objectMapperSupplier));
29+
manager.setDiscardNotifier(DEFAULT_LISTENER, createDiscardNotifier(domainEventBus, objectMapperSupplier));
3030
return domainEventBus;
3131
}
3232

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventListenersConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
import java.util.concurrent.atomic.AtomicReference;
1414

15+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
16+
1517
@Configuration
1618
@RequiredArgsConstructor
1719
@Import(RabbitMqConfig.class)
@@ -38,7 +40,7 @@ public ApplicationEventListener eventListener(MessageConverter messageConverter,
3840
manager.getDiscardNotifier(domain),
3941
errorReporter,
4042
appName);
41-
if ("app".equals(domain)) {
43+
if (DEFAULT_LISTENER.equals(domain)) {
4244
external.set(listener);
4345
}
4446
listener.startListener();

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/HandlerResolverBuilder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
import java.util.concurrent.ConcurrentMap;
1515
import java.util.stream.Stream;
1616

17+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
18+
1719
@NoArgsConstructor(access = AccessLevel.PRIVATE)
1820
public class HandlerResolverBuilder {
1921

2022
public static HandlerResolver buildResolver(String domain,
2123
Map<String, HandlerRegistry> registries,
2224
final DefaultCommandHandler defaultCommandHandler) {
2325

24-
if ("app".equals(domain)) {
26+
if (DEFAULT_LISTENER.equals(domain)) {
2527
final ConcurrentMap<String, RegisteredQueryHandler<?, ?>> queryHandlers = registries
2628
.values().stream()
2729
.flatMap(r -> r.getHandlers().stream())
@@ -79,7 +81,7 @@ private static ConcurrentMap<String, RegisteredEventListener<?>> getEventHandler
7981
}
8082

8183
private static Stream<RegisteredEventListener<?>> getDynamics(String domain, HandlerRegistry r) {
82-
if ("app".equals(domain)) {
84+
if (DEFAULT_LISTENER.equals(domain)) {
8385
return r.getDynamicEventHandlers().stream();
8486
}
8587
return Stream.of();

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import java.util.Map;
5353
import java.util.logging.Level;
5454

55+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
56+
5557
@Log
5658
@Configuration
5759
@RequiredArgsConstructor
@@ -193,8 +195,8 @@ Mono<Connection> createConnectionMono(ConnectionFactory factory, String connecti
193195

194196
@Bean
195197
public DynamicRegistry dynamicRegistry(ConnectionManager connectionManager, IBrokerConfigProps props) {
196-
return new DynamicRegistryImp(connectionManager.getHandlerResolver("app"),
197-
connectionManager.getListener("app").getTopologyCreator(), props);
198+
return new DynamicRegistryImp(connectionManager.getHandlerResolver(DEFAULT_LISTENER),
199+
connectionManager.getListener(DEFAULT_LISTENER).getTopologyCreator(), props);
198200
}
199201

200202
@Bean

async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44
import org.assertj.core.api.Assertions;
55
import org.junit.jupiter.api.BeforeEach;
66
import org.junit.jupiter.api.Test;
7-
import org.reactivecommons.async.commons.DiscardNotifier;
7+
import org.reactivecommons.async.api.HandlerRegistry;
88
import org.reactivecommons.async.commons.converters.MessageConverter;
99
import org.reactivecommons.async.commons.ext.CustomReporter;
1010
import org.reactivecommons.async.rabbit.HandlerResolver;
1111
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
12+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1213
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1314
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
1415
import org.reactivecommons.async.rabbit.listeners.ApplicationEventListener;
1516
import reactor.core.publisher.Flux;
1617
import reactor.core.publisher.Mono;
17-
import reactor.rabbitmq.*;
18+
import reactor.rabbitmq.BindingSpecification;
19+
import reactor.rabbitmq.ConsumeOptions;
20+
import reactor.rabbitmq.ExchangeSpecification;
21+
import reactor.rabbitmq.QueueSpecification;
22+
import reactor.rabbitmq.Receiver;
1823

1924
import java.util.Collections;
2025

@@ -27,12 +32,13 @@ class EventListenersConfigTest {
2732
private final AsyncProps props = new AsyncProps();
2833
private final EventListenersConfig config = new EventListenersConfig(props);
2934
private final ReactiveMessageListener listener = mock(ReactiveMessageListener.class);
35+
private final ReactiveMessageSender sender = mock(ReactiveMessageSender.class);
3036
private final TopologyCreator creator = mock(TopologyCreator.class);
3137
private final HandlerResolver handlerResolver = mock(HandlerResolver.class);
3238
private final MessageConverter messageConverter = mock(MessageConverter.class);
33-
private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class);
3439
private final CustomReporter customReporter = mock(CustomReporter.class);
3540
private final Receiver receiver = mock(Receiver.class);
41+
private ConnectionManager connectionManager;
3642

3743
@BeforeEach
3844
public void init() {
@@ -47,15 +53,15 @@ public void init() {
4753
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
4854
when(listener.getReceiver()).thenReturn(receiver);
4955
when(listener.getMaxConcurrency()).thenReturn(20);
56+
connectionManager = new ConnectionManager();
57+
connectionManager.addDomain(HandlerRegistry.DEFAULT_LISTENER, listener, sender, handlerResolver);
5058
}
5159

5260
@Test
5361
void eventListener() {
5462
final ApplicationEventListener eventListener = config.eventListener(
55-
handlerResolver,
5663
messageConverter,
57-
listener,
58-
discardNotifier,
64+
connectionManager,
5965
customReporter
6066
);
6167

async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/RabbitMqConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
class RabbitMqConfigTest {
2424

25-
RabbitMqConfig config = new RabbitMqConfig(null);
25+
RabbitMqConfig config = new RabbitMqConfig();
2626

2727
@Test
2828
void retryInitialConnection() throws IOException, TimeoutException {

0 commit comments

Comments
 (0)