Skip to content

Commit 1578200

Browse files
committed
start eda
1 parent 34c259f commit 1578200

File tree

15 files changed

+309
-140
lines changed

15 files changed

+309
-140
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313

1414
import java.lang.reflect.ParameterizedType;
1515
import java.util.List;
16+
import java.util.Map;
17+
import java.util.concurrent.ConcurrentHashMap;
1618
import java.util.concurrent.CopyOnWriteArrayList;
1719

1820
@Getter
1921
@NoArgsConstructor(access = AccessLevel.PACKAGE)
2022
public class HandlerRegistry {
21-
22-
private final List<RegisteredEventListener<?>> eventListeners = new CopyOnWriteArrayList<>();
23+
private final Map<String, List<RegisteredEventListener<?>>> domainEventListeners = new ConcurrentHashMap<>();
2324
private final List<RegisteredEventListener<?>> dynamicEventHandlers = new CopyOnWriteArrayList<>();
2425
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
2526
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
@@ -29,8 +30,15 @@ public static HandlerRegistry register() {
2930
return new HandlerRegistry();
3031
}
3132

33+
public <T> HandlerRegistry listenDomainEvent(String domain, String eventName, EventHandler<T> handler, Class<T> eventClass) {
34+
domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>())
35+
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
36+
return this;
37+
}
38+
3239
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler, Class<T> eventClass) {
33-
eventListeners.add(new RegisteredEventListener<>(eventName, handler, eventClass));
40+
domainEventListeners.computeIfAbsent("app", ignored -> new CopyOnWriteArrayList<>())
41+
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
3442
return this;
3543
}
3644

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package org.reactivecommons.async.rabbit.config;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
import lombok.Setter;
6+
import org.reactivecommons.async.commons.DiscardNotifier;
7+
import org.reactivecommons.async.rabbit.HandlerResolver;
8+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
9+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
10+
11+
import java.util.Map;
12+
import java.util.TreeMap;
13+
import java.util.function.BiConsumer;
14+
15+
public class ConnectionManager {
16+
private final Map<String, DomainConnections> connections = new TreeMap<>();
17+
18+
@Builder
19+
@Getter
20+
public static class DomainConnections {
21+
private final ReactiveMessageListener listener;
22+
private final ReactiveMessageSender sender;
23+
private final HandlerResolver handlerResolver;
24+
@Setter
25+
private DiscardNotifier discardNotifier;
26+
}
27+
28+
public void forSender(BiConsumer<String, ReactiveMessageSender> consumer) {
29+
connections.forEach((key, conn) -> consumer.accept(key, conn.getSender()));
30+
}
31+
32+
public void forListener(BiConsumer<String, ReactiveMessageListener> consumer) {
33+
connections.forEach((key, conn) -> consumer.accept(key, conn.getListener()));
34+
}
35+
36+
public void setDiscardNotifier(String domain, DiscardNotifier discardNotifier) {
37+
connections.get(domain).setDiscardNotifier(discardNotifier);
38+
}
39+
40+
public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender,
41+
HandlerResolver resolver) {
42+
connections.put(domain, DomainConnections.builder()
43+
.listener(listener)
44+
.sender(sender)
45+
.handlerResolver(resolver)
46+
.build());
47+
return this;
48+
}
49+
50+
public ReactiveMessageSender getSender(String domain) {
51+
return connections.get(domain).getSender();
52+
}
53+
54+
public ReactiveMessageListener getListener(String domain) {
55+
return connections.get(domain).getListener();
56+
}
57+
58+
public DiscardNotifier getDiscardNotifier(String domain) {
59+
return connections.get(domain).getDiscardNotifier();
60+
}
61+
62+
public HandlerResolver getHandlerResolver(String domain) {
63+
return connections.get(domain).getHandlerResolver();
64+
}
65+
}

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ public class DirectAsyncGatewayConfig {
2424
private final BrokerConfigProps props;
2525

2626
@Bean
27-
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter, MeterRegistry meterRegistry) throws Exception {
28-
return new RabbitDirectAsyncGateway(config, router, rSender, props.getDirectMessagesExchangeName(), converter, meterRegistry);
27+
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, MessageConverter converter, MeterRegistry meterRegistry) throws Exception {
28+
return new RabbitDirectAsyncGateway(config, router, manager.getSender("app"), props.getDirectMessagesExchangeName(), converter, meterRegistry);
2929
}
3030

3131
@Bean
32-
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {
33-
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, props.getReplyQueue());
32+
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ConnectionManager manager) {
33+
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener("app"), props.getReplyQueue());
3434
replyListener.startListening(config.getRoutingKey());
3535
return replyListener;
3636
}
Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.reactivecommons.async.rabbit.config;
22

33
import org.reactivecommons.api.domain.DomainEventBus;
4+
import org.reactivecommons.async.commons.DiscardNotifier;
5+
import org.reactivecommons.async.commons.config.BrokerConfig;
6+
import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier;
7+
import org.reactivecommons.async.rabbit.RabbitDiscardNotifier;
48
import org.reactivecommons.async.rabbit.RabbitDomainEventBus;
59
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
610
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
7-
import org.reactivecommons.async.commons.config.BrokerConfig;
811
import org.springframework.context.annotation.Bean;
912
import org.springframework.context.annotation.Configuration;
1013
import org.springframework.context.annotation.Import;
@@ -15,10 +18,19 @@
1518
@Import(RabbitMqConfig.class)
1619
public class EventBusConfig {
1720

18-
@Bean
19-
public DomainEventBus domainEventBus(ReactiveMessageSender sender, BrokerConfigProps props, BrokerConfig config) {
21+
@Bean // app connection
22+
public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfigProps props, BrokerConfig config,
23+
ObjectMapperSupplier objectMapperSupplier) {
24+
String domain = "app";
25+
ReactiveMessageSender sender = manager.getSender(domain);
2026
final String exchangeName = props.getDomainEventsExchangeName();
2127
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
22-
return new RabbitDomainEventBus(sender, exchangeName, config);
28+
DomainEventBus domainEventBus = new RabbitDomainEventBus(sender, exchangeName, config);
29+
manager.setDiscardNotifier(domain, createDiscardNotifier(domainEventBus, objectMapperSupplier));
30+
return domainEventBus;
31+
}
32+
33+
private DiscardNotifier createDiscardNotifier(DomainEventBus domainEventBus, ObjectMapperSupplier objectMapperSupplier) {
34+
return new RabbitDiscardNotifier(domainEventBus, objectMapperSupplier.get());
2335
}
2436
}
Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package org.reactivecommons.async.rabbit.config;
22

33
import lombok.RequiredArgsConstructor;
4-
import org.reactivecommons.async.commons.DiscardNotifier;
5-
import org.reactivecommons.async.rabbit.HandlerResolver;
6-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
7-
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
84
import org.reactivecommons.async.commons.converters.MessageConverter;
95
import org.reactivecommons.async.commons.ext.CustomReporter;
6+
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
107
import org.reactivecommons.async.rabbit.listeners.ApplicationEventListener;
118
import org.springframework.beans.factory.annotation.Value;
129
import org.springframework.context.annotation.Bean;
1310
import org.springframework.context.annotation.Configuration;
1411
import org.springframework.context.annotation.Import;
1512

13+
import java.util.concurrent.atomic.AtomicReference;
14+
1615
@Configuration
1716
@RequiredArgsConstructor
1817
@Import(RabbitMqConfig.class)
@@ -24,16 +23,27 @@ public class EventListenersConfig {
2423
private final AsyncProps asyncProps;
2524

2625
@Bean
27-
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter,
28-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
29-
30-
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
31-
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
32-
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),asyncProps.getDomain().getEvents().getMaxLengthBytes(),
33-
discardNotifier, errorReporter, appName);
34-
35-
listener.startListener();
36-
37-
return listener;
26+
public ApplicationEventListener eventListener(MessageConverter messageConverter,
27+
ConnectionManager manager, CustomReporter errorReporter) {
28+
AtomicReference<ApplicationEventListener> external = new AtomicReference<>();
29+
manager.forListener((domain, receiver) -> {
30+
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
31+
appName + ".subsEvents",
32+
manager.getHandlerResolver(domain),
33+
asyncProps.getDomain().getEvents().getExchange(),
34+
messageConverter, asyncProps.getWithDLQRetry(),
35+
asyncProps.getMaxRetries(),
36+
asyncProps.getRetryDelay(),
37+
asyncProps.getDomain().getEvents().getMaxLengthBytes(),
38+
manager.getDiscardNotifier(domain),
39+
errorReporter,
40+
appName);
41+
if ("app".equals(domain)) {
42+
external.set(listener);
43+
}
44+
listener.startListener();
45+
});
46+
47+
return external.get();
3848
}
3949
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.reactivecommons.async.rabbit.config;
2+
3+
import lombok.AccessLevel;
4+
import lombok.NoArgsConstructor;
5+
import org.reactivecommons.async.api.DefaultCommandHandler;
6+
import org.reactivecommons.async.api.HandlerRegistry;
7+
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
8+
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
9+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
10+
import org.reactivecommons.async.rabbit.HandlerResolver;
11+
12+
import java.util.Map;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
import java.util.concurrent.ConcurrentMap;
15+
import java.util.stream.Stream;
16+
17+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
18+
public class HandlerResolverBuilder {
19+
20+
public static HandlerResolver buildResolver(String domain,
21+
Map<String, HandlerRegistry> registries,
22+
final DefaultCommandHandler defaultCommandHandler) {
23+
24+
if ("app".equals(domain)) {
25+
final ConcurrentMap<String, RegisteredQueryHandler<?, ?>> queryHandlers = registries
26+
.values().stream()
27+
.flatMap(r -> r.getHandlers().stream())
28+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
29+
ConcurrentHashMap::putAll);
30+
31+
final ConcurrentMap<String, RegisteredCommandHandler<?>> commandHandlers = registries
32+
.values().stream()
33+
.flatMap(r -> r.getCommandHandlers().stream())
34+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
35+
ConcurrentHashMap::putAll);
36+
37+
final ConcurrentMap<String, RegisteredEventListener<?>> eventNotificationListener = registries
38+
.values()
39+
.stream()
40+
.flatMap(r -> r.getEventNotificationListener().stream())
41+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
42+
ConcurrentHashMap::putAll);
43+
44+
final ConcurrentMap<String, RegisteredEventListener<?>> eventsToBind = getEventsToBind(domain, registries);
45+
46+
final ConcurrentMap<String, RegisteredEventListener<?>> eventHandlers = getEventHandlersWithDynamics(domain, registries);
47+
48+
return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, commandHandlers) {
49+
@Override
50+
@SuppressWarnings("unchecked")
51+
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
52+
final RegisteredCommandHandler<T> handler = super.getCommandHandler(path);
53+
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
54+
}
55+
};
56+
}
57+
58+
59+
final ConcurrentMap<String, RegisteredEventListener<?>> eventsToBind = getEventsToBind(domain, registries);
60+
final ConcurrentMap<String, RegisteredEventListener<?>> eventHandlers = getEventHandlersWithDynamics(domain, registries);
61+
62+
return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(), new ConcurrentHashMap<>()) {
63+
@Override
64+
@SuppressWarnings("unchecked")
65+
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
66+
final RegisteredCommandHandler<T> handler = super.getCommandHandler(path);
67+
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
68+
}
69+
};
70+
}
71+
72+
private static ConcurrentMap<String, RegisteredEventListener<?>> getEventHandlersWithDynamics(String domain, Map<String, HandlerRegistry> registries) {
73+
// event handlers and dynamic handlers
74+
return registries
75+
.values().stream()
76+
.flatMap(r -> Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r)))
77+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
78+
ConcurrentHashMap::putAll);
79+
}
80+
81+
private static Stream<RegisteredEventListener<?>> getDynamics(String domain, HandlerRegistry r) {
82+
if ("app".equals(domain)) {
83+
return r.getDynamicEventHandlers().stream();
84+
}
85+
return Stream.of();
86+
}
87+
88+
private static ConcurrentMap<String, RegisteredEventListener<?>> getEventsToBind(String domain, Map<String, HandlerRegistry> registries) {
89+
return registries
90+
.values().stream()
91+
.flatMap(r -> r.getDomainEventListeners().get(domain).stream())
92+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
93+
ConcurrentHashMap::putAll);
94+
}
95+
}

0 commit comments

Comments
 (0)