11package sample ;
22
3- import com .rabbitmq .client .ConnectionFactory ;
4- import lombok .Builder ;
3+ import lombok .AllArgsConstructor ;
54import lombok .Data ;
5+ import lombok .RequiredArgsConstructor ;
66import lombok .extern .java .Log ;
7- import org .reactivecommons .async .api .AsyncQuery ;
7+ import org .reactivecommons .api .domain .DomainEvent ;
8+ import org .reactivecommons .api .domain .DomainEventBus ;
89import org .reactivecommons .async .api .DirectAsyncGateway ;
910import org .reactivecommons .async .api .HandlerRegistry ;
11+ import org .reactivecommons .async .api .handlers .QueryHandler ;
1012import org .reactivecommons .async .impl .config .annotations .EnableDirectAsyncGateway ;
1113import org .reactivecommons .async .impl .config .annotations .EnableDomainEventBus ;
1214import org .reactivecommons .async .impl .config .annotations .EnableMessageListeners ;
13- import org .reactivecommons .async .rabbit .config .ConnectionFactoryProvider ;
14- import org .springframework .boot .CommandLineRunner ;
1515import org .springframework .boot .SpringApplication ;
1616import org .springframework .boot .autoconfigure .SpringBootApplication ;
17- import org .springframework .boot .context .properties .PropertyMapper ;
1817import org .springframework .context .annotation .Bean ;
19- import org .springframework .context .annotation .Primary ;
20- import reactor .core .publisher .Flux ;
2118import reactor .core .publisher .Mono ;
2219
23- import java .security .KeyManagementException ;
24- import java .security .NoSuchAlgorithmException ;
25- import java .time .Duration ;
26- import java .util .Map ;
27- import java .util .logging .Level ;
28-
29- import static reactor .core .publisher .Mono .delay ;
20+ import static org .reactivecommons .async .api .HandlerRegistry .register ;
3021import static reactor .core .publisher .Mono .just ;
3122
3223@ SpringBootApplication
@@ -39,70 +30,81 @@ public static void main(String[] args) {
3930 SpringApplication .run (SampleReceiverApp .class , args );
4031 }
4132
42- private ConnectionRabbitProperties rabbitProperties (){
43- return ConnectionRabbitProperties .builder ()
44- .hostname ("b-8b1e2880-30bd-4124-b765-220854289b87.mq.us-east-1.amazonaws.com" )
45- .username ("user" )
46- .password ("userrszthwco" )
47- .port (5671 )
48- .virtualhost ("/" )
49- .ssl (true )
50- .build ();
33+ //@Bean
34+ public HandlerRegistry handlerRegistry (MemberReceiver receiver ) {
35+ return register ()
36+ .serveQuery ("serveQuery.register.member" , receiver )
37+ .serveQuery ("serveQuery.register.member.new" , new QueryHandler <MemberRegisteredEvent ,
38+ AddMemberCommand >() {
39+ @ Override
40+ public Mono <MemberRegisteredEvent > handle (AddMemberCommand command ) {
41+ return just (new MemberRegisteredEvent ("42" , 69 ));
42+ }
43+ })
44+ .serveQuery ("test.query" , message -> {
45+ return Mono .error (new RuntimeException ("Falla Generada Query" ));
46+ }, AddMemberCommand .class );
5147 }
5248
53- private void configureSsl (ConnectionFactory connectionFactory ) {
54- try {
55- connectionFactory .useSslProtocol ();
56- } catch (NoSuchAlgorithmException | KeyManagementException exception ) {
57- log .log (Level .SEVERE , exception .getMessage (), exception );
58- }
49+ @ Bean
50+ public HandlerRegistry handlerRegistrySubs (DirectAsyncGateway gateway ) {
51+ return HandlerRegistry .register ()
52+ .serveQuery ("query1" , message -> {
53+ log .info ("resolving from direct query" );
54+ return just (new RespQuery1 ("Ok" , message ));
55+ }, Call .class )
56+ .serveQuery ("query2" , (from , message ) -> {
57+ log .info ("resolving from delegate query" );
58+ return gateway .reply (new RespQuery1 ("Ok" , message ), from ).then ();
59+ }, Call .class );
5960 }
6061
61- @ Bean
62- public CommandLineRunner runner (DirectAsyncGateway gateway ) {
63- return args -> {
64- Flux .interval (Duration .ofSeconds (3 )).flatMap (l -> {
65- AsyncQuery <Map > query = new AsyncQuery <>("query" , Map .of ("type" , "query" ));
66- return gateway .requestReply (query , "" , Map .class );
67- });
62+ //@Bean
63+ public HandlerRegistry handlerRegistryForEmpty (EmptyReceiver emptyReceiver ) {
64+ return register ()
65+ .serveQuery ("serveQuery.empty" , emptyReceiver );
66+ }
6867
69- };
68+ //@Bean
69+ public HandlerRegistry eventListeners (SampleUseCase useCase ) {
70+ return register ()
71+ .listenEvent ("persona.registrada" , useCase ::reactToPersonaEvent , MemberRegisteredEvent .class );
7072 }
7173
7274 @ Bean
73- public HandlerRegistry registry () {
74- return HandlerRegistry .register ()
75- .serveQuery ("query" , message -> delay (Duration .ofMillis (500 )).thenReturn (message ), Map .class )
76- .handleCommand ("command" , message -> just (message ).log ().then (), Map .class )
77- .listenEvent ("event" , message -> just (message ).log ().then (), Map .class )
78- .listenNotificationEvent ("event1" , message -> just (message ).log ().then (), Map .class )
79- ;
75+ public SampleUseCase sampleUseCase (DomainEventBus eventBus ) {
76+ return new SampleUseCase (eventBus );
8077 }
8178
82- @ Bean
83- @ Primary
84- public ConnectionFactoryProvider connection (){
85- ConnectionRabbitProperties properties = rabbitProperties ();
86- final ConnectionFactory factory = new ConnectionFactory ();
87- PropertyMapper map = PropertyMapper .get ();
88- map .from (properties ::getHostname ).whenNonNull ().to (factory ::setHost );
89- map .from (properties ::getPort ).to (factory ::setPort );
90- map .from (properties ::getUsername ).whenNonNull ().to (factory ::setUsername );
91- map .from (properties ::getPassword ).whenNonNull ().to (factory ::setPassword );
92- map .from (properties ::getVirtualhost ).whenNonNull ().to (factory ::setVirtualHost );
93- map .from (properties ::isSsl ).whenTrue ().as (ssl -> factory ).to (this ::configureSsl );
94- System .out .println ("RabbitMQ configured!!" );
95- return () -> factory ;
79+ @ Data
80+ @ AllArgsConstructor
81+ static class RespQuery1 {
82+ private String response ;
83+ private Call request ;
9684 }
9785
9886 @ Data
99- @ Builder
100- static class ConnectionRabbitProperties {
101- private String virtualhost ;
102- private String hostname ;
103- private String username ;
104- private String password ;
105- private Integer port ;
106- private boolean ssl ;
87+ @ AllArgsConstructor
88+ static class Call {
89+ private String name ;
90+ private String phone ;
10791 }
108- }
92+
93+ @ Data
94+ @ AllArgsConstructor
95+ static class CallResponse {
96+ private String message ;
97+ private Integer code ;
98+ }
99+
100+ @ RequiredArgsConstructor
101+ public static class SampleUseCase {
102+ private final DomainEventBus eventBus ;
103+
104+ Mono <Void > reactToPersonaEvent (DomainEvent <MemberRegisteredEvent > event ) {
105+ return Mono .from (eventBus .emit (new DomainEvent <>("persona.procesada" , "213" , event .getData ())))
106+ .doOnSuccess (_v -> System .out .println ("Persona procesada" ));
107+ }
108+ }
109+
110+ }
0 commit comments