Skip to content

Commit 7a10e32

Browse files
authored
Merge pull request #85 from jespinosas/feature/eda
Feature/eda
2 parents 1578200 + a7f59ac commit 7a10e32

File tree

17 files changed

+379
-11
lines changed

17 files changed

+379
-11
lines changed

async/async-commons-api/async-commons-api.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ dependencies {
77
api project(':domain-events-api')
88
compileOnly 'io.projectreactor:reactor-core'
99
testImplementation 'io.projectreactor:reactor-test'
10+
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
11+
1012
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.reactivecommons.async.api;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.api.domain.Command;
45
import reactor.core.publisher.Mono;
56

67
public interface DirectAsyncGateway {
78
<T> Mono<Void> sendCommand(Command<T> command, String targetName);
9+
Mono<Void> sendCommand(CloudEvent command, String targetName);
810
<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type);
11+
<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type);
912
<T> Mono<Void> reply(T response, From from);
1013
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.reactivecommons.async.api;
22

3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.provider.EventFormatProvider;
5+
import io.cloudevents.jackson.JsonFormat;
36
import lombok.AccessLevel;
47
import lombok.Getter;
58
import lombok.NoArgsConstructor;
@@ -26,6 +29,7 @@ public class HandlerRegistry {
2629
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2730
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();
2831

32+
2933
public static HandlerRegistry register() {
3034
return new HandlerRegistry();
3135
}
@@ -75,7 +79,21 @@ public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> han
7579
}
7680

7781
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
78-
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
82+
if(queryClass == CloudEvent.class){
83+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) ->
84+
{
85+
CloudEvent query = EventFormatProvider
86+
.getInstance()
87+
.resolveFormat(JsonFormat.CONTENT_TYPE)
88+
.deserialize(message);
89+
90+
return handler.handle((R) query);
91+
92+
} , byte[].class));
93+
}
94+
else{
95+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
96+
}
7997
return this;
8098
}
8199

async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivecommons.async.commons.converters;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.api.domain.Command;
45
import org.reactivecommons.api.domain.DomainEvent;
56
import org.reactivecommons.async.api.AsyncQuery;

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
2727
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
2828
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
29+
import org.reactivecommons.async.rabbit.converters.json.JacksonCloudEventMessageConverter;
2930
import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter;
3031
import org.springframework.beans.factory.annotation.Value;
3132
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -156,7 +157,7 @@ public ObjectMapperSupplier objectMapperSupplier() {
156157
@Bean
157158
@ConditionalOnMissingBean
158159
public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) {
159-
return new JacksonMessageConverter(objectMapperSupplier.get());
160+
return new JacksonCloudEventMessageConverter(objectMapperSupplier.get());
160161
}
161162

162163
@Bean

async/async-rabbit/async-rabbit.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ dependencies {
1414
api 'com.rabbitmq:amqp-client'
1515
api 'com.fasterxml.jackson.core:jackson-databind'
1616
testImplementation 'io.projectreactor:reactor-test'
17+
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
1718
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.reactivecommons.async.rabbit;
22

3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.provider.EventFormatProvider;
5+
import io.cloudevents.jackson.JsonFormat;
36
import io.micrometer.core.instrument.MeterRegistry;
47
import org.reactivecommons.api.domain.Command;
58
import org.reactivecommons.async.api.AsyncQuery;
@@ -56,6 +59,11 @@ public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
5659
return sender.sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands);
5760
}
5861

62+
@Override
63+
public Mono<Void> sendCommand(CloudEvent command, String targetName) {
64+
return sendCommand(new Command<>(command.getType(), command.getId(), command), targetName);
65+
}
66+
5967
public <T> Flux<OutboundMessageResult> sendCommands(Flux<Command<T>> commands, String targetName) {
6068
return sender.sendWithConfirmBatch(commands, exchange, targetName, Collections.emptyMap(), persistentCommands);
6169
}
@@ -84,6 +92,11 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
8492
.tap(Micrometer.metrics(meterRegistry));
8593
}
8694

95+
@Override
96+
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
97+
return requestReply(new AsyncQuery<>(query.getType(), query), targetName, type);
98+
}
99+
87100
@Override
88101
public <T> Mono<Void> reply(T response, From from) {
89102
final HashMap<String, Object> headers = new HashMap<>();

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package org.reactivecommons.async.rabbit;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
45
import org.reactivecommons.async.commons.config.BrokerConfig;
6+
import org.reactivestreams.Publisher;
57
import reactor.core.publisher.Mono;
68
import org.reactivecommons.api.domain.DomainEvent;
79
import org.reactivecommons.api.domain.DomainEventBus;
@@ -31,8 +33,8 @@ public <T> Mono<Void> emit(DomainEvent<T> event) {
3133
}
3234

3335
@Override
34-
public <T> Mono<Void> emit(CloudEvent<T> event) {
35-
return emit(new DomainEvent(event.getName(), event.getId(), event))
36+
public Publisher<Void> emit(CloudEvent event) {
37+
return emit(new DomainEvent<>(event.getType(), event.getId(), event));
3638
}
3739

3840
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.reactivecommons.async.rabbit.converters.json;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import lombok.SneakyThrows;
5+
import lombok.experimental.UtilityClass;
6+
7+
@UtilityClass
8+
public class CloudEventBuilderExt {
9+
private static final ObjectMapper mapper = new ObjectMapper();
10+
11+
@SneakyThrows
12+
public static byte[] asBytes(Object object) {
13+
return mapper.writeValueAsBytes(object);
14+
}
15+
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package org.reactivecommons.async.rabbit.converters.json;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JsonNode;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import io.cloudevents.CloudEvent;
7+
import io.cloudevents.core.provider.EventFormatProvider;
8+
import io.cloudevents.jackson.JsonFormat;
9+
import lombok.Data;
10+
import org.reactivecommons.api.domain.Command;
11+
import org.reactivecommons.api.domain.DomainEvent;
12+
import org.reactivecommons.async.api.AsyncQuery;
13+
import org.reactivecommons.async.commons.communications.Message;
14+
import org.reactivecommons.async.commons.converters.MessageConverter;
15+
import org.reactivecommons.async.commons.exceptions.MessageConversionException;
16+
import org.reactivecommons.async.rabbit.RabbitMessage;
17+
18+
import java.io.IOException;
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.Base64;
21+
22+
public class JacksonCloudEventMessageConverter implements MessageConverter {
23+
private static final String CONTENT_TYPE = "application/json";
24+
25+
private final ObjectMapper objectMapper;
26+
27+
28+
public JacksonCloudEventMessageConverter(ObjectMapper objectMapper) {
29+
this.objectMapper = objectMapper;
30+
}
31+
32+
@Override
33+
public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
34+
try {
35+
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
36+
T value = extractData(bodyClass, asyncQueryJson.getQueryData());
37+
return new AsyncQuery<>(asyncQueryJson.getResource(), value);
38+
} catch (IOException e) {
39+
throw new MessageConversionException("Failed to convert Message content", e);
40+
}
41+
}
42+
43+
@Override
44+
public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
45+
try {
46+
final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class);
47+
48+
T value = extractData(bodyClass, domainEventJson.getData());
49+
50+
return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value);
51+
} catch (IOException e) {
52+
throw new MessageConversionException("Failed to convert Message content", e);
53+
}
54+
}
55+
56+
@Override
57+
public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
58+
try {
59+
final CommandJson commandJson = readValue(message, CommandJson.class);
60+
T value = extractData(bodyClass, commandJson.getData());
61+
return new Command<>(commandJson.getName(), commandJson.getCommandId(), value);
62+
} catch (IOException e) {
63+
throw new MessageConversionException("Failed to convert Message content", e);
64+
}
65+
}
66+
67+
@Override
68+
public <T> T readValue(Message message, Class<T> valueClass) {
69+
try {
70+
if(valueClass == CloudEvent.class){
71+
72+
return (T) EventFormatProvider
73+
.getInstance()
74+
.resolveFormat(JsonFormat.CONTENT_TYPE)
75+
.deserialize(objectMapper.readValue(message.getBody(), byte[].class));
76+
77+
}
78+
return objectMapper.readValue(message.getBody(), valueClass);
79+
} catch (IOException e) {
80+
throw new MessageConversionException("Failed to convert Message content", e);
81+
}
82+
}
83+
84+
@Override
85+
@SuppressWarnings("unchecked")
86+
public <T> Command<T> readCommandStructure(Message message) {
87+
final CommandJson commandJson = readValue(message, CommandJson.class);
88+
return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T) commandJson.getData());
89+
}
90+
91+
@Override
92+
@SuppressWarnings("unchecked")
93+
public <T> DomainEvent<T> readDomainEventStructure(Message message) {
94+
final DomainEventJson eventJson = readValue(message, DomainEventJson.class);
95+
return new DomainEvent<>(eventJson.getName(), eventJson.getEventId(), (T) eventJson.getData());
96+
}
97+
98+
@Override
99+
@SuppressWarnings("unchecked")
100+
public <T> AsyncQuery<T> readAsyncQueryStructure(Message message) {
101+
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
102+
return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData());
103+
}
104+
105+
106+
public Message commandToMessage(Command<CloudEvent> object) {
107+
byte[] data = EventFormatProvider
108+
.getInstance()
109+
.resolveFormat(JsonFormat.CONTENT_TYPE)
110+
.serialize(object.getData());
111+
112+
return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data));
113+
}
114+
115+
116+
public Message eventToMessage(DomainEvent<CloudEvent> object) {
117+
byte[] data = EventFormatProvider
118+
.getInstance()
119+
.resolveFormat(JsonFormat.CONTENT_TYPE)
120+
.serialize(object.getData());
121+
122+
return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data));
123+
}
124+
125+
126+
public Message queryToMessage(AsyncQuery<CloudEvent> object) {
127+
byte[] data = EventFormatProvider
128+
.getInstance()
129+
.resolveFormat(JsonFormat.CONTENT_TYPE)
130+
.serialize(object.getQueryData());
131+
132+
return getRabbitMessage(new AsyncQuery<>(object.getResource(), data));
133+
}
134+
@Override
135+
public Message toMessage(Object object) {
136+
byte[] bytes;
137+
if(object instanceof DomainEvent
138+
&& ((DomainEvent) object).getData() instanceof CloudEvent){
139+
140+
return eventToMessage((DomainEvent) object);
141+
142+
}
143+
if(object instanceof Command
144+
&& ((Command) object).getData() instanceof CloudEvent){
145+
146+
return commandToMessage((Command) object);
147+
}
148+
if(object instanceof AsyncQuery
149+
&& ((AsyncQuery) object).getQueryData() instanceof CloudEvent){
150+
151+
return queryToMessage((AsyncQuery) object);
152+
}
153+
154+
return getRabbitMessage(object);
155+
}
156+
157+
private RabbitMessage getRabbitMessage(Object object) {
158+
byte[] bytes;
159+
try {
160+
161+
String jsonString = this.objectMapper.writeValueAsString(object);
162+
bytes = jsonString.getBytes(StandardCharsets.UTF_8);
163+
} catch (IOException e) {
164+
throw new MessageConversionException("Failed to convert Message content", e);
165+
}
166+
RabbitMessage.RabbitMessageProperties props = new RabbitMessage.RabbitMessageProperties();
167+
props.setContentType(CONTENT_TYPE);
168+
props.setContentEncoding(StandardCharsets.UTF_8.name());
169+
props.setContentLength(bytes.length);
170+
return new RabbitMessage(bytes, props);
171+
}
172+
173+
private <T> T extractData(Class<T> bodyClass, JsonNode node) throws JsonProcessingException {
174+
T value;
175+
if(bodyClass == CloudEvent.class){
176+
177+
value = (T) EventFormatProvider
178+
.getInstance()
179+
.resolveFormat(JsonFormat.CONTENT_TYPE)
180+
.deserialize(Base64.getDecoder()
181+
.decode(node.asText()));
182+
183+
}
184+
else{
185+
value = objectMapper.treeToValue(node, bodyClass);
186+
}
187+
return value;
188+
}
189+
190+
@Data
191+
private static class AsyncQueryJson {
192+
private String resource;
193+
private JsonNode queryData;
194+
}
195+
196+
@Data
197+
private static class DomainEventJson {
198+
private String name;
199+
private String eventId;
200+
private JsonNode data;
201+
}
202+
203+
@Data
204+
private static class CommandJson {
205+
private String name;
206+
private String commandId;
207+
private JsonNode data;
208+
}
209+
}

0 commit comments

Comments
 (0)