Skip to content

Commit 34c259f

Browse files
committed
start multibroker
1 parent 2dc8443 commit 34c259f

36 files changed

+2285
-6
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
ext {
2+
artifactId = 'async-commons-rabbit-starter'
3+
artifactDescription = 'Async Commons Starter'
4+
}
5+
6+
dependencies {
7+
api project(':async-rabbit')
8+
compileOnly 'org.springframework.boot:spring-boot-starter'
9+
compileOnly 'org.springframework.boot:spring-boot-starter-actuator'
10+
11+
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
12+
13+
testImplementation 'io.projectreactor:reactor-test'
14+
testImplementation 'org.springframework.boot:spring-boot-starter-actuator'
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.CommandListenersConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(CommandListenersConfig.class)
14+
@Configuration
15+
public @interface EnableCommandListeners {
16+
}
17+
18+
19+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.DirectAsyncGatewayConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(DirectAsyncGatewayConfig.class)
14+
@Configuration
15+
public @interface EnableDirectAsyncGateway {
16+
}
17+
18+
19+
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.EventBusConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(EventBusConfig.class)
14+
@Configuration
15+
public @interface EnableDomainEventBus {
16+
}
17+
18+
19+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.EventListenersConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
@Retention(RetentionPolicy.RUNTIME)
10+
@Target({ElementType.TYPE})
11+
@Documented
12+
@Import(EventListenersConfig.class)
13+
@Configuration
14+
public @interface EnableEventListeners {
15+
}
16+
17+
18+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.CommandListenersConfig;
4+
import org.reactivecommons.async.rabbit.config.EventListenersConfig;
5+
import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig;
6+
import org.reactivecommons.async.rabbit.config.QueryListenerConfig;
7+
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.context.annotation.Import;
9+
10+
import java.lang.annotation.*;
11+
12+
/**
13+
* This annotation enables all messages listeners (Query, Commands, Events). If you want to enable separately, please use
14+
* EnableCommandListeners, EnableQueryListeners or EnableEventListeners.
15+
*
16+
*/
17+
@Retention(RetentionPolicy.RUNTIME)
18+
@Target({ElementType.TYPE})
19+
@Documented
20+
@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificacionListenersConfig.class})
21+
@Configuration
22+
public @interface EnableMessageListeners {
23+
}
24+
25+
26+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(NotificacionListenersConfig.class)
14+
@Configuration
15+
public @interface EnableNotificationListener {
16+
}
17+
18+
19+
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.QueryListenerConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(QueryListenerConfig.class)
14+
@Configuration
15+
public @interface EnableQueryListeners {
16+
}
17+
18+
19+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.reactivecommons.async.rabbit.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.commons.DiscardNotifier;
5+
import org.reactivecommons.async.rabbit.HandlerResolver;
6+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
7+
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
8+
import org.reactivecommons.async.commons.converters.MessageConverter;
9+
import org.reactivecommons.async.commons.ext.CustomReporter;
10+
import org.reactivecommons.async.rabbit.listeners.ApplicationCommandListener;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
import org.springframework.context.annotation.Import;
15+
16+
@Configuration
17+
@RequiredArgsConstructor
18+
@Import(RabbitMqConfig.class)
19+
public class CommandListenersConfig {
20+
21+
@Value("${spring.application.name}")
22+
private String appName;
23+
24+
private final AsyncProps asyncProps;
25+
26+
@Bean
27+
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
28+
HandlerResolver resolver, MessageConverter converter,
29+
DiscardNotifier discardNotifier,
30+
CustomReporter errorReporter) {
31+
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
32+
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
33+
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier, errorReporter);
34+
35+
commandListener.startListener();
36+
37+
return commandListener;
38+
}
39+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.reactivecommons.async.rabbit.config;
2+
3+
import io.micrometer.core.instrument.MeterRegistry;
4+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
5+
import lombok.RequiredArgsConstructor;
6+
import org.reactivecommons.async.commons.config.BrokerConfig;
7+
import org.reactivecommons.async.commons.converters.MessageConverter;
8+
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
9+
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
10+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
11+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
12+
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
13+
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
14+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
15+
import org.springframework.context.annotation.Bean;
16+
import org.springframework.context.annotation.Configuration;
17+
import org.springframework.context.annotation.Import;
18+
19+
@Configuration
20+
@Import(RabbitMqConfig.class)
21+
@RequiredArgsConstructor
22+
public class DirectAsyncGatewayConfig {
23+
24+
private final BrokerConfigProps props;
25+
26+
@Bean
27+
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter, MeterRegistry meterRegistry) throws Exception {
28+
return new RabbitDirectAsyncGateway(config, router, rSender, props.getDirectMessagesExchangeName(), converter, meterRegistry);
29+
}
30+
31+
@Bean
32+
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {
33+
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, props.getReplyQueue());
34+
replyListener.startListening(config.getRoutingKey());
35+
return replyListener;
36+
}
37+
38+
@Bean
39+
public ReactiveReplyRouter router() {
40+
return new ReactiveReplyRouter();
41+
}
42+
43+
@Bean
44+
@ConditionalOnMissingBean(MeterRegistry.class)
45+
public MeterRegistry defaultRabbitMeterRegistry() {
46+
return new SimpleMeterRegistry();
47+
}
48+
}

0 commit comments

Comments
 (0)