Skip to content

Commit ed50a37

Browse files
committed
Modificacion libreria con CloudEvents
1 parent edbf27b commit ed50a37

File tree

9 files changed

+153
-14
lines changed

9 files changed

+153
-14
lines changed

async/async-commons-api/async-commons-api.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ dependencies {
77
api project(':domain-events-api')
88
compileOnly 'io.projectreactor:reactor-core'
99
testImplementation 'io.projectreactor:reactor-test'
10+
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
11+
1012
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.reactivecommons.async.api;
22

3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.provider.EventFormatProvider;
5+
import io.cloudevents.jackson.JsonFormat;
36
import lombok.AccessLevel;
47
import lombok.Getter;
58
import lombok.NoArgsConstructor;
@@ -25,6 +28,7 @@ public class HandlerRegistry {
2528
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2629
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();
2730

31+
2832
public static HandlerRegistry register() {
2933
return new HandlerRegistry();
3034
}
@@ -67,7 +71,21 @@ public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> han
6771
}
6872

6973
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
70-
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
74+
if(queryClass == CloudEvent.class){
75+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) ->
76+
{
77+
CloudEvent query = EventFormatProvider
78+
.getInstance()
79+
.resolveFormat(JsonFormat.CONTENT_TYPE)
80+
.deserialize(message);
81+
82+
return handler.handle((R) query);
83+
84+
} , byte[].class));
85+
}
86+
else{
87+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
88+
}
7189
return this;
7290
}
7391

async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivecommons.async.commons.converters;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.api.domain.Command;
45
import org.reactivecommons.api.domain.DomainEvent;
56
import org.reactivecommons.async.api.AsyncQuery;

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,7 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
9494

9595
@Override
9696
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
97-
byte[] serialized = EventFormatProvider
98-
.getInstance()
99-
.resolveFormat(JsonFormat.CONTENT_TYPE)
100-
.serialize(query);
101-
return requestReply(new AsyncQuery<>(query.getType(), serialized), targetName, type);
97+
return requestReply(new AsyncQuery<>(query.getType(), query), targetName, type);
10298
}
10399

104100
@Override

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package org.reactivecommons.async.rabbit.converters.json;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
34
import com.fasterxml.jackson.databind.JsonNode;
45
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import io.cloudevents.CloudEvent;
7+
import io.cloudevents.core.provider.EventFormatProvider;
8+
import io.cloudevents.jackson.JsonFormat;
59
import lombok.Data;
610
import org.reactivecommons.api.domain.Command;
711
import org.reactivecommons.api.domain.DomainEvent;
@@ -13,6 +17,7 @@
1317

1418
import java.io.IOException;
1519
import java.nio.charset.StandardCharsets;
20+
import java.util.Base64;
1621

1722
public class JacksonMessageConverter implements MessageConverter {
1823
private static final String CONTENT_TYPE = "application/json";
@@ -28,7 +33,7 @@ public JacksonMessageConverter(ObjectMapper objectMapper) {
2833
public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
2934
try {
3035
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
31-
final T value = objectMapper.treeToValue(asyncQueryJson.getQueryData(), bodyClass);
36+
T value = extractData(bodyClass, asyncQueryJson.getQueryData());
3237
return new AsyncQuery<>(asyncQueryJson.getResource(), value);
3338
} catch (IOException e) {
3439
throw new MessageConversionException("Failed to convert Message content", e);
@@ -39,7 +44,9 @@ public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
3944
public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
4045
try {
4146
final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class);
42-
final T value = objectMapper.treeToValue(domainEventJson.getData(), bodyClass);
47+
48+
T value = extractData(bodyClass, domainEventJson.getData());
49+
4350
return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value);
4451
} catch (IOException e) {
4552
throw new MessageConversionException("Failed to convert Message content", e);
@@ -50,7 +57,7 @@ public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
5057
public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
5158
try {
5259
final CommandJson commandJson = readValue(message, CommandJson.class);
53-
final T value = objectMapper.treeToValue(commandJson.getData(), bodyClass);
60+
T value = extractData(bodyClass, commandJson.getData());
5461
return new Command<>(commandJson.getName(), commandJson.getCommandId(), value);
5562
} catch (IOException e) {
5663
throw new MessageConversionException("Failed to convert Message content", e);
@@ -60,6 +67,14 @@ public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
6067
@Override
6168
public <T> T readValue(Message message, Class<T> valueClass) {
6269
try {
70+
if(valueClass == CloudEvent.class){
71+
72+
return (T) EventFormatProvider
73+
.getInstance()
74+
.resolveFormat(JsonFormat.CONTENT_TYPE)
75+
.deserialize(objectMapper.readValue(message.getBody(), byte[].class));
76+
77+
}
6378
return objectMapper.readValue(message.getBody(), valueClass);
6479
} catch (IOException e) {
6580
throw new MessageConversionException("Failed to convert Message content", e);
@@ -87,10 +102,62 @@ public <T> AsyncQuery<T> readAsyncQueryStructure(Message message) {
87102
return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData());
88103
}
89104

105+
106+
public Message commandToMessage(Command<CloudEvent> object) {
107+
byte[] data = EventFormatProvider
108+
.getInstance()
109+
.resolveFormat(JsonFormat.CONTENT_TYPE)
110+
.serialize(object.getData());
111+
112+
return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data));
113+
}
114+
115+
116+
public Message eventToMessage(DomainEvent<CloudEvent> object) {
117+
byte[] data = EventFormatProvider
118+
.getInstance()
119+
.resolveFormat(JsonFormat.CONTENT_TYPE)
120+
.serialize(object.getData());
121+
122+
return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data));
123+
}
124+
125+
126+
public Message queryToMessage(AsyncQuery<CloudEvent> object) {
127+
byte[] data = EventFormatProvider
128+
.getInstance()
129+
.resolveFormat(JsonFormat.CONTENT_TYPE)
130+
.serialize(object.getQueryData());
131+
132+
return getRabbitMessage(new AsyncQuery<>(object.getResource(), data));
133+
}
90134
@Override
91135
public Message toMessage(Object object) {
136+
byte[] bytes;
137+
if(object instanceof DomainEvent
138+
&& ((DomainEvent) object).getData() instanceof CloudEvent){
139+
140+
return eventToMessage((DomainEvent) object);
141+
142+
}
143+
if(object instanceof Command
144+
&& ((Command) object).getData() instanceof CloudEvent){
145+
146+
return commandToMessage((Command) object);
147+
}
148+
if(object instanceof AsyncQuery
149+
&& ((AsyncQuery) object).getQueryData() instanceof CloudEvent){
150+
151+
return queryToMessage((AsyncQuery) object);
152+
}
153+
154+
return getRabbitMessage(object);
155+
}
156+
157+
private RabbitMessage getRabbitMessage(Object object) {
92158
byte[] bytes;
93159
try {
160+
94161
String jsonString = this.objectMapper.writeValueAsString(object);
95162
bytes = jsonString.getBytes(StandardCharsets.UTF_8);
96163
} catch (IOException e) {
@@ -103,6 +170,23 @@ public Message toMessage(Object object) {
103170
return new RabbitMessage(bytes, props);
104171
}
105172

173+
private <T> T extractData(Class<T> bodyClass, JsonNode node) throws JsonProcessingException {
174+
T value;
175+
if(bodyClass == CloudEvent.class){
176+
177+
value = (T) EventFormatProvider
178+
.getInstance()
179+
.resolveFormat(JsonFormat.CONTENT_TYPE)
180+
.deserialize(Base64.getDecoder()
181+
.decode(node.asText()));
182+
183+
}
184+
else{
185+
value = objectMapper.treeToValue(node, bodyClass);
186+
}
187+
return value;
188+
}
189+
106190
@Data
107191
private static class AsyncQueryJson {
108192
private String resource;

samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.net.URI;
2424
import java.time.OffsetDateTime;
25+
import java.util.Map;
2526
import java.util.UUID;
2627

2728
import static org.reactivecommons.async.api.HandlerRegistry.register;
@@ -57,18 +58,26 @@ public Mono<MemberRegisteredEvent> handle(AddMemberCommand command) {
5758
public HandlerRegistry handlerRegistrySubs(DirectAsyncGateway gateway) {
5859
return HandlerRegistry.register()
5960
.handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class)
60-
.listenEvent("fixed.event", message -> Mono.empty(), Object.class)
61+
.listenEvent("event", message -> {
62+
log.info(message.getData().toString());
63+
return Mono.empty();
64+
}, CloudEvent.class)
65+
.handleCommand("command", message -> {
66+
log.info(message.getData().toString());
67+
return Mono.empty();
68+
}, CloudEvent.class)
6169
.serveQuery("query1", message -> {
6270
log.info("resolving from direct query" + message);
71+
Map<String, String> mapData = Map.of("1", "data");
6372
CloudEvent response = CloudEventBuilder.v1() //
6473
.withId(UUID.randomUUID().toString()) //
6574
.withSource(URI.create("https://spring.io/foos"))//
6675
.withType("query1.response") //
6776
.withTime(OffsetDateTime.now())
68-
.withData("application/json", "result".getBytes())
77+
.withData("application/json", CloudEventBuilderExt.asBytes(mapData))
6978
.build();
7079
return just(response);
71-
}, byte[].class)
80+
}, CloudEvent.class)
7281
.serveQuery("sample.query.*", message -> {
7382
log.info("resolving from direct query");
7483
return just(new RespQuery1("Ok", message));
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
spring.application.name=receiver
2-
spring.rabbitmq.virtual-host=test
2+

samples/async/sender-client/src/main/java/sample/SampleRestController.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import lombok.AllArgsConstructor;
77
import lombok.Data;
88
import lombok.NoArgsConstructor;
9+
import org.reactivecommons.api.domain.DomainEventBus;
910
import org.reactivecommons.async.api.AsyncQuery;
1011
import org.reactivecommons.async.api.DirectAsyncGateway;
1112
import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt;
@@ -26,6 +27,9 @@ public class SampleRestController {
2627

2728
@Autowired
2829
private DirectAsyncGateway directAsyncGateway;
30+
31+
@Autowired
32+
private DomainEventBus domainEventBus;
2933
private final String queryName = "query1";
3034
private final String queryName2 = "query2";
3135
private final String target = "receiver";
@@ -45,7 +49,33 @@ public Mono<CloudEvent> sampleService(@RequestBody Call call) throws JsonProcess
4549

4650
return directAsyncGateway.requestReply(query, target, CloudEvent.class);
4751
}
52+
@PostMapping(path = "/sample/event", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
53+
public Mono<String> sampleServiceEvent(@RequestBody Call call) throws JsonProcessingException {
54+
// AsyncQuery<?> query = new AsyncQuery<>(queryName, call);
55+
CloudEvent event = CloudEventBuilder.v1() //
56+
.withId(UUID.randomUUID().toString()) //
57+
.withSource(URI.create("https://spring.io/foos"))//
58+
.withType("event") //
59+
.withTime(OffsetDateTime.now())
60+
.withData("application/json", CloudEventBuilderExt.asBytes(call))
61+
.build();
62+
63+
return Mono.from(domainEventBus.emit(event)).thenReturn("event");
64+
}
4865

66+
@PostMapping(path = "/sample/command", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
67+
public Mono<String> sampleServiceCommand(@RequestBody Call call) throws JsonProcessingException {
68+
// AsyncQuery<?> query = new AsyncQuery<>(queryName, call);
69+
CloudEvent command = CloudEventBuilder.v1() //
70+
.withId(UUID.randomUUID().toString()) //
71+
.withSource(URI.create("https://spring.io/foos"))//
72+
.withType("command") //
73+
.withTime(OffsetDateTime.now())
74+
.withData("application/json", CloudEventBuilderExt.asBytes(call))
75+
.build();
76+
77+
return directAsyncGateway.sendCommand(command, target).thenReturn("command");
78+
}
4979
@PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
5080
public Mono<RespQuery1> sampleServices(@RequestBody Call call) {
5181
AsyncQuery<?> query = new AsyncQuery<>("sample.query.any.that.matches", call);
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
spring.application.name=sender
22
server.port=4001
3-
spring.rabbitmq.virtual-host=test
43
management.endpoint.health.show-details=always
54
management.endpoints.web.exposure.include=health,prometheus

0 commit comments

Comments
 (0)