Skip to content

Commit 53f9716

Browse files
committed
Update props
1 parent 4df5c3f commit 53f9716

File tree

7 files changed

+47
-26
lines changed

7 files changed

+47
-26
lines changed

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@
88
import org.reactivecommons.async.commons.converters.MessageConverter;
99
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
1010
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
11+
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
1112
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
1213
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
1314
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1415
import org.springframework.context.annotation.Bean;
1516
import org.springframework.context.annotation.Configuration;
1617
import org.springframework.context.annotation.Import;
1718

19+
import java.util.concurrent.atomic.AtomicReference;
20+
1821
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
1922

2023
@Configuration
@@ -29,11 +32,22 @@ public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, Re
2932
return new RabbitEDADirectAsyncGateway(config, router, manager, props.getDirectMessagesExchangeName(), converter, meterRegistry);
3033
}
3134

32-
@Bean // TODO: Listen replies from n domains if enabled in config
33-
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ConnectionManager manager) {
34-
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(DEFAULT_DOMAIN), props.getReplyQueue());
35-
replyListener.startListening(config.getRoutingKey());
36-
return replyListener;
35+
@Bean
36+
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, AsyncProps asyncProps, BrokerConfig config, ConnectionManager manager) {
37+
asyncProps.getListenRepliesFrom().add(DEFAULT_DOMAIN);
38+
AtomicReference<ApplicationReplyListener> localListener = new AtomicReference<>();
39+
40+
asyncProps.getListenRepliesFrom().forEach(domain -> {
41+
42+
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(domain), props.getReplyQueue());
43+
replyListener.startListening(config.getRoutingKey());
44+
45+
if (DEFAULT_DOMAIN.equals(domain)) {
46+
localListener.set(replyListener);
47+
}
48+
});
49+
50+
return localListener.get();
3751
}
3852

3953
@Bean

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import org.springframework.boot.context.properties.ConfigurationProperties;
77
import org.springframework.boot.context.properties.NestedConfigurationProperty;
88

9+
import java.util.ArrayList;
10+
import java.util.List;
911
import java.util.Map;
1012
import java.util.TreeMap;
1113

@@ -33,6 +35,8 @@ public class AsyncProps {
3335
@NestedConfigurationProperty
3436
private Map<String, RabbitProperties> connections = new TreeMap<>();
3537

38+
private List<String> listenRepliesFrom = new ArrayList<>();
39+
3640
private Integer maxRetries = 10;
3741

3842
private Integer prefetchCount = 250;

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/BrokerConfigProps.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public String getCommandsQueue() {
3939
public String getReplyQueue() {
4040
final String name = replyQueueName.get();
4141
if (name == null) {
42-
final String replyName = NameGenerator.generateNameFrom(appName);
42+
final String replyName = NameGenerator.generateNameFrom(appName, "replies");
4343
if (replyQueueName.compareAndSet(null, replyName)) {
4444
return replyName;
4545
} else {

samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.reactivecommons.async.api.DirectAsyncGateway;
1212
import org.reactivecommons.async.api.HandlerRegistry;
1313
import org.reactivecommons.async.api.handlers.QueryHandler;
14+
import org.reactivecommons.async.impl.config.annotations.EnableCommandListeners;
1415
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
1516
import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus;
1617
import org.reactivecommons.async.impl.config.annotations.EnableEventListeners;
@@ -31,6 +32,7 @@
3132

3233
@SpringBootApplication
3334
@EnableEventListeners
35+
@EnableCommandListeners
3436
@EnableDomainEventBus
3537
@EnableDirectAsyncGateway
3638
@Log
@@ -60,9 +62,9 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/) {
6062
return HandlerRegistry.register()
6163

6264
// .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class)
63-
.listenEvent("fixed.event", message -> Mono.empty(), Object.class)
64-
.listenDomainEvent("accounts", "account.created", message -> Mono.empty(), Object.class)
65-
.listenDomainEvent("deposits", "transfer.xxx", message -> Mono.empty(), Object.class)
65+
// .listenEvent("fixed.event", message -> Mono.empty(), Object.class)
66+
// .listenDomainEvent("accounts", "account.created", message -> Mono.empty(), Object.class)
67+
// .listenDomainEvent("deposits", "transfer.xxx", message -> Mono.empty(), Object.class)
6668
// .serveQuery("query1", message -> {
6769
// log.info("resolving from direct query");
6870
// return just(new RespQuery1("Ok", message));
@@ -76,11 +78,11 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/) {
7678
// return gateway.reply(new RespQuery1("Ok", message), from).then();
7779
// }, Call.class);
7880

79-
.handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class)
80-
.listenEvent("event", message -> {
81-
log.info(message.getData().toString());
82-
return Mono.empty();
83-
}, CloudEvent.class)
81+
// .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class)
82+
// .listenEvent("event", message -> {
83+
// log.info(message.getData().toString());
84+
// return Mono.empty();
85+
// }, CloudEvent.class)
8486
.handleCommand("command", message -> {
8587
log.info(message.getData().toString());
8688
return Mono.empty();
@@ -96,11 +98,11 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/) {
9698
.withData("application/json", CloudEventBuilderExt.asBytes(mapData))
9799
.build();
98100
return just(response);
99-
}, CloudEvent.class)
100-
.serveQuery("sample.query.*", message -> {
101-
log.info("resolving from direct query");
102-
return just(new RespQuery1("Ok", message));
103-
}, Call.class);
101+
}, CloudEvent.class);
102+
// .serveQuery("sample.query.*", message -> {
103+
// log.info("resolving from direct query");
104+
// return just(new RespQuery1("Ok", message));
105+
// }, Call.class);
104106
/*.serveQuery("query2", (from, message) -> {
105107
log.info("resolving from delegate query");
106108
return gateway.reply(new RespQuery1("Ok", message), from).then();

samples/async/receiver-responder/src/main/resources/application.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ app:
1010
retry-delay: 1000 # son milisegundos
1111
connections:
1212
app:
13-
virtualHost: app1
14-
accounts:
15-
virtualHost: domain-a
16-
deposits:
1713
virtualHost: domain-a
14+
# accounts:
15+
# virtualHost: domain-a
16+
# deposits:
17+
# virtualHost: domain-a

samples/async/sender-client/src/main/java/sample/SampleRestController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public Mono<CloudEvent> sampleService(@RequestBody Call call) throws JsonProcess
4747
.withData("application/json", CloudEventBuilderExt.asBytes(call))
4848
.build();
4949

50-
return directAsyncGateway.requestReply(query, target, CloudEvent.class);
50+
return directAsyncGateway.requestReply(query, target, CloudEvent.class, "accounts");
5151
}
5252
@PostMapping(path = "/sample/event", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
5353
public Mono<String> sampleServiceEvent(@RequestBody Call call) throws JsonProcessingException {
@@ -74,7 +74,7 @@ public Mono<String> sampleServiceCommand(@RequestBody Call call) throws JsonProc
7474
.withData("application/json", CloudEventBuilderExt.asBytes(call))
7575
.build();
7676

77-
return directAsyncGateway.sendCommand(command, target).thenReturn("command");
77+
return directAsyncGateway.sendCommand(command, target, "accounts").thenReturn("command");
7878
}
7979
@PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
8080
public Mono<RespQuery1> sampleServices(@RequestBody Call call) {

samples/async/sender-client/src/main/resources/application.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ management:
1515
include: health,prometheus
1616
app:
1717
async:
18+
listenRepliesFrom: [accounts]
1819
connections:
1920
app:
20-
virtualHost: app1
21+
virtualHost: /
2122
accounts:
2223
virtualHost: domain-a
2324

0 commit comments

Comments
 (0)