Skip to content

Commit 1518551

Browse files
committed
update health for eda, append default app props
1 parent ed325ec commit 1518551

File tree

17 files changed

+120
-47
lines changed

17 files changed

+120
-47
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,20 @@
66

77
public interface DirectAsyncGateway {
88
<T> Mono<Void> sendCommand(Command<T> command, String targetName);
9+
10+
<T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain);
11+
912
Mono<Void> sendCommand(CloudEvent command, String targetName);
13+
14+
Mono<Void> sendCommand(CloudEvent command, String targetName, String domain);
15+
1016
<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type);
17+
18+
<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain);
19+
1120
<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type);
21+
22+
<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain);
23+
1224
<T> Mono<Void> reply(T response, From from);
1325
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
@Getter
2424
@NoArgsConstructor(access = AccessLevel.PACKAGE)
2525
public class HandlerRegistry {
26-
public static final String DEFAULT_LISTENER = "app";
26+
public static final String DEFAULT_DOMAIN = "app";
2727
private final Map<String, List<RegisteredEventListener<?>>> domainEventListeners = new ConcurrentHashMap<>();
2828
private final List<RegisteredEventListener<?>> dynamicEventHandlers = new CopyOnWriteArrayList<>();
2929
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
@@ -33,7 +33,7 @@ public class HandlerRegistry {
3333

3434
public static HandlerRegistry register() {
3535
HandlerRegistry instance = new HandlerRegistry();
36-
instance.domainEventListeners.put(DEFAULT_LISTENER, new CopyOnWriteArrayList<>());
36+
instance.domainEventListeners.put(DEFAULT_DOMAIN, new CopyOnWriteArrayList<>());
3737
return instance;
3838
}
3939

@@ -44,7 +44,7 @@ public <T> HandlerRegistry listenDomainEvent(String domain, String eventName, Ev
4444
}
4545

4646
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler, Class<T> eventClass) {
47-
domainEventListeners.computeIfAbsent(DEFAULT_LISTENER, ignored -> new CopyOnWriteArrayList<>())
47+
domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>())
4848
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
4949
return this;
5050
}

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import static org.assertj.core.api.Assertions.assertThat;
1818
import static org.mockito.Mockito.mock;
19-
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
19+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
2020

2121
class HandlerRegistryTest {
2222
private final HandlerRegistry registry = HandlerRegistry.register();
@@ -28,7 +28,7 @@ void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() {
2828

2929
registry.listenEvent(name, eventHandler);
3030

31-
assertThat(registry.getDomainEventListeners().get(DEFAULT_LISTENER))
31+
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
3232
.anySatisfy(registered -> assertThat(registered)
3333
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
3434
.containsExactly(name, SomeDataClass.class, eventHandler)).hasSize(1);
@@ -44,7 +44,7 @@ void shouldRegisterPatternEventHandlerWithTypeInference() {
4444
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
4545
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
4646

47-
assertThat(registry.getDomainEventListeners().get(DEFAULT_LISTENER))
47+
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
4848
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
4949
.usingRecursiveComparison()
5050
.isEqualTo(expectedRegisteredEventListener));
@@ -63,7 +63,7 @@ void shouldRegisterPatternEventHandler() {
6363
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
6464
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
6565

66-
assertThat(registry.getDomainEventListeners().get(DEFAULT_LISTENER))
66+
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
6767
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
6868
.usingRecursiveComparison()
6969
.isEqualTo(expectedRegisteredEventListener));
@@ -85,7 +85,7 @@ public void listenEvent() {
8585
EventHandler<SomeDataClass> handler = mock(EventHandler.class);
8686
registry.listenEvent(name, handler, SomeDataClass.class);
8787

88-
assertThat(registry.getDomainEventListeners().get(DEFAULT_LISTENER))
88+
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
8989
.anySatisfy(registered -> assertThat(registered)
9090
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
9191
.containsExactly(name, SomeDataClass.class, handler)).hasSize(1);

async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/matcher/KeyMatcherPerformanceWildcardTest.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88
import java.io.IOException;
99
import java.nio.file.Files;
1010
import java.nio.file.Paths;
11-
import java.util.*;
11+
import java.util.ArrayList;
12+
import java.util.HashMap;
13+
import java.util.HashSet;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.Set;
1217
import java.util.stream.Collectors;
1318

1419
class KeyMatcherPerformanceWildcardTest {
@@ -27,13 +32,13 @@ public void setUp() {
2732
File file = new File(classLoader.getResource("wildcard_names_for_matching.txt").getFile());
2833
File file2 = new File(classLoader.getResource("concrete_names_for_matching.txt").getFile());
2934
try {
30-
Set<String> names = new HashSet<>(Files
35+
Set<String> names = new HashSet<>(Files
3136
.readAllLines(Paths.get(file.getAbsolutePath())));
3237
candidates = names.stream()
3338
.collect(Collectors.toMap(name -> name, name -> name));
3439
testList = new ArrayList<>(new HashSet<>(Files
35-
.readAllLines(Paths.get(file2.getAbsolutePath()))));
36-
testResultList = new ArrayList<>(testList.size()*10);
40+
.readAllLines(Paths.get(file2.getAbsolutePath()))));
41+
testResultList = new ArrayList<>(testList.size() * 10);
3742
} catch (IOException e) {
3843
e.printStackTrace();
3944
}
@@ -43,14 +48,14 @@ public void setUp() {
4348
void keyMatcherLookupShouldPerformInLessThan30Micros() {
4449
final int size = testList.size();
4550
final long init = System.currentTimeMillis();
46-
for (int i = 0; i< size*10; ++i){
47-
testResultList.add(keyMatcher.match(candidates.keySet(), testList.get(i%size)));
51+
for (int i = 0; i < size * 10; ++i) {
52+
testResultList.add(keyMatcher.match(candidates.keySet(), testList.get(i % size)));
4853
}
4954
final long end = System.currentTimeMillis();
5055

5156

5257
final long total = end - init;
53-
final double microsPerLookup = ((total+0.0)/testResultList.size())*1000;
58+
final double microsPerLookup = ((total + 0.0) / testResultList.size()) * 1000;
5459
System.out.println("Performed Lookups: " + testResultList.size());
5560
System.out.println("Total Execution Time: " + total + "ms");
5661
System.out.println("Microseconds per lookup: " + microsPerLookup + "us");
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.reactivecommons.async;
2+
3+
import io.micrometer.core.instrument.MeterRegistry;
4+
import org.reactivecommons.async.commons.config.BrokerConfig;
5+
import org.reactivecommons.async.commons.converters.MessageConverter;
6+
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
7+
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
8+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
9+
import org.reactivecommons.async.rabbit.config.ConnectionManager;
10+
11+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
12+
13+
public class RabbitEDADirectAsyncGateway extends RabbitDirectAsyncGateway {
14+
private final ConnectionManager manager;
15+
16+
public RabbitEDADirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, String exchange, MessageConverter converter, MeterRegistry meterRegistry) {
17+
super(config, router, manager.getSender(DEFAULT_DOMAIN), exchange, converter, meterRegistry);
18+
this.manager = manager;
19+
}
20+
21+
@Override
22+
protected ReactiveMessageSender resolveSender(String domain) {
23+
return manager.getSender(domain);
24+
}
25+
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public static class DomainConnections {
2121
private final ReactiveMessageListener listener;
2222
private final ReactiveMessageSender sender;
2323
private final HandlerResolver handlerResolver;
24+
private final ConnectionFactoryProvider provider;
2425
@Setter
2526
private DiscardNotifier discardNotifier;
2627
}
@@ -62,4 +63,7 @@ public DiscardNotifier getDiscardNotifier(String domain) {
6263
public HandlerResolver getHandlerResolver(String domain) {
6364
return connections.get(domain).getHandlerResolver();
6465
}
66+
public ConnectionFactoryProvider getProvider(String domain) {
67+
return connections.get(domain).getProvider();
68+
}
6569
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,14 @@
77
import org.reactivecommons.async.commons.converters.MessageConverter;
88
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
99
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
10-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
11-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1210
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
1311
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
1412
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1513
import org.springframework.context.annotation.Bean;
1614
import org.springframework.context.annotation.Configuration;
1715
import org.springframework.context.annotation.Import;
1816

19-
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
17+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
2018

2119
@Configuration
2220
@Import(RabbitMqConfig.class)
@@ -27,12 +25,12 @@ public class DirectAsyncGatewayConfig {
2725

2826
@Bean
2927
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, MessageConverter converter, MeterRegistry meterRegistry) throws Exception {
30-
return new RabbitDirectAsyncGateway(config, router, manager.getSender(DEFAULT_LISTENER), props.getDirectMessagesExchangeName(), converter, meterRegistry);
28+
return new RabbitDirectAsyncGateway(config, router, manager.getSender(DEFAULT_DOMAIN), props.getDirectMessagesExchangeName(), converter, meterRegistry);
3129
}
3230

3331
@Bean
3432
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ConnectionManager manager) {
35-
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(DEFAULT_LISTENER), props.getReplyQueue());
33+
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(DEFAULT_DOMAIN), props.getReplyQueue());
3634
replyListener.startListening(config.getRoutingKey());
3735
return replyListener;
3836
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventBusConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import org.springframework.context.annotation.Configuration;
1313
import org.springframework.context.annotation.Import;
1414

15-
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
15+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
1616
import static reactor.rabbitmq.ExchangeSpecification.exchange;
1717

1818
@Configuration
@@ -22,11 +22,11 @@ public class EventBusConfig {
2222
@Bean // app connection
2323
public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfigProps props, BrokerConfig config,
2424
ObjectMapperSupplier objectMapperSupplier) {
25-
ReactiveMessageSender sender = manager.getSender(DEFAULT_LISTENER);
25+
ReactiveMessageSender sender = manager.getSender(DEFAULT_DOMAIN);
2626
final String exchangeName = props.getDomainEventsExchangeName();
2727
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
2828
DomainEventBus domainEventBus = new RabbitDomainEventBus(sender, exchangeName, config);
29-
manager.setDiscardNotifier(DEFAULT_LISTENER, createDiscardNotifier(domainEventBus, objectMapperSupplier));
29+
manager.setDiscardNotifier(DEFAULT_DOMAIN, createDiscardNotifier(domainEventBus, objectMapperSupplier));
3030
return domainEventBus;
3131
}
3232

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventListenersConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
import java.util.concurrent.atomic.AtomicReference;
1414

15-
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
15+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
1616

1717
@Configuration
1818
@RequiredArgsConstructor
@@ -40,7 +40,7 @@ public ApplicationEventListener eventListener(MessageConverter messageConverter,
4040
manager.getDiscardNotifier(domain),
4141
errorReporter,
4242
appName);
43-
if (DEFAULT_LISTENER.equals(domain)) {
43+
if (DEFAULT_DOMAIN.equals(domain)) {
4444
external.set(listener);
4545
}
4646
listener.startListener();

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/HandlerResolverBuilder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.concurrent.ConcurrentMap;
1515
import java.util.stream.Stream;
1616

17-
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_LISTENER;
17+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
1818

1919
@NoArgsConstructor(access = AccessLevel.PRIVATE)
2020
public class HandlerResolverBuilder {
@@ -23,7 +23,7 @@ public static HandlerResolver buildResolver(String domain,
2323
Map<String, HandlerRegistry> registries,
2424
final DefaultCommandHandler defaultCommandHandler) {
2525

26-
if (DEFAULT_LISTENER.equals(domain)) {
26+
if (DEFAULT_DOMAIN.equals(domain)) {
2727
final ConcurrentMap<String, RegisteredQueryHandler<?, ?>> queryHandlers = registries
2828
.values().stream()
2929
.flatMap(r -> r.getHandlers().stream())
@@ -81,7 +81,7 @@ private static ConcurrentMap<String, RegisteredEventListener<?>> getEventHandler
8181
}
8282

8383
private static Stream<RegisteredEventListener<?>> getDynamics(String domain, HandlerRegistry r) {
84-
if (DEFAULT_LISTENER.equals(domain)) {
84+
if (DEFAULT_DOMAIN.equals(domain)) {
8585
return r.getDynamicEventHandlers().stream();
8686
}
8787
return Stream.of();

0 commit comments

Comments
 (0)