Skip to content

Commit b2b7017

Browse files
committed
merge
2 parents 1578200 + ed50a37 commit b2b7017

File tree

16 files changed

+255
-13
lines changed

16 files changed

+255
-13
lines changed

async/async-commons-api/async-commons-api.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ dependencies {
77
api project(':domain-events-api')
88
compileOnly 'io.projectreactor:reactor-core'
99
testImplementation 'io.projectreactor:reactor-test'
10+
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
11+
1012
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.reactivecommons.async.api;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.api.domain.Command;
45
import reactor.core.publisher.Mono;
56

67
public interface DirectAsyncGateway {
78
<T> Mono<Void> sendCommand(Command<T> command, String targetName);
9+
Mono<Void> sendCommand(CloudEvent command, String targetName);
810
<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type);
11+
<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type);
912
<T> Mono<Void> reply(T response, From from);
1013
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.reactivecommons.async.api;
22

3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.provider.EventFormatProvider;
5+
import io.cloudevents.jackson.JsonFormat;
36
import lombok.AccessLevel;
47
import lombok.Getter;
58
import lombok.NoArgsConstructor;
@@ -26,6 +29,7 @@ public class HandlerRegistry {
2629
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2730
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();
2831

32+
2933
public static HandlerRegistry register() {
3034
return new HandlerRegistry();
3135
}
@@ -75,7 +79,21 @@ public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> han
7579
}
7680

7781
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
78-
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
82+
if(queryClass == CloudEvent.class){
83+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) ->
84+
{
85+
CloudEvent query = EventFormatProvider
86+
.getInstance()
87+
.resolveFormat(JsonFormat.CONTENT_TYPE)
88+
.deserialize(message);
89+
90+
return handler.handle((R) query);
91+
92+
} , byte[].class));
93+
}
94+
else{
95+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
96+
}
7997
return this;
8098
}
8199

async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivecommons.async.commons.converters;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.api.domain.Command;
45
import org.reactivecommons.api.domain.DomainEvent;
56
import org.reactivecommons.async.api.AsyncQuery;

async/async-rabbit/async-rabbit.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ dependencies {
1414
api 'com.rabbitmq:amqp-client'
1515
api 'com.fasterxml.jackson.core:jackson-databind'
1616
testImplementation 'io.projectreactor:reactor-test'
17+
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
1718
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.reactivecommons.async.rabbit;
22

3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.provider.EventFormatProvider;
5+
import io.cloudevents.jackson.JsonFormat;
36
import io.micrometer.core.instrument.MeterRegistry;
47
import org.reactivecommons.api.domain.Command;
58
import org.reactivecommons.async.api.AsyncQuery;
@@ -56,6 +59,11 @@ public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
5659
return sender.sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands);
5760
}
5861

62+
@Override
63+
public Mono<Void> sendCommand(CloudEvent command, String targetName) {
64+
return sendCommand(new Command<>(command.getType(), command.getId(), command), targetName);
65+
}
66+
5967
public <T> Flux<OutboundMessageResult> sendCommands(Flux<Command<T>> commands, String targetName) {
6068
return sender.sendWithConfirmBatch(commands, exchange, targetName, Collections.emptyMap(), persistentCommands);
6169
}
@@ -84,6 +92,11 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
8492
.tap(Micrometer.metrics(meterRegistry));
8593
}
8694

95+
@Override
96+
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
97+
return requestReply(new AsyncQuery<>(query.getType(), query), targetName, type);
98+
}
99+
87100
@Override
88101
public <T> Mono<Void> reply(T response, From from) {
89102
final HashMap<String, Object> headers = new HashMap<>();

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package org.reactivecommons.async.rabbit;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
45
import org.reactivecommons.async.commons.config.BrokerConfig;
6+
import org.reactivestreams.Publisher;
57
import reactor.core.publisher.Mono;
68
import org.reactivecommons.api.domain.DomainEvent;
79
import org.reactivecommons.api.domain.DomainEventBus;
@@ -31,8 +33,8 @@ public <T> Mono<Void> emit(DomainEvent<T> event) {
3133
}
3234

3335
@Override
34-
public <T> Mono<Void> emit(CloudEvent<T> event) {
35-
return emit(new DomainEvent(event.getName(), event.getId(), event))
36+
public Publisher<Void> emit(CloudEvent event) {
37+
return emit(new DomainEvent<>(event.getType(), event.getId(), event));
3638
}
3739

3840
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.reactivecommons.async.rabbit.converters.json;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import lombok.SneakyThrows;
5+
import lombok.experimental.UtilityClass;
6+
7+
@UtilityClass
8+
public class CloudEventBuilderExt {
9+
private static final ObjectMapper mapper = new ObjectMapper();
10+
11+
@SneakyThrows
12+
public static byte[] asBytes(Object object) {
13+
return mapper.writeValueAsBytes(object);
14+
}
15+
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package org.reactivecommons.async.rabbit.converters.json;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
34
import com.fasterxml.jackson.databind.JsonNode;
45
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import io.cloudevents.CloudEvent;
7+
import io.cloudevents.core.provider.EventFormatProvider;
8+
import io.cloudevents.jackson.JsonFormat;
59
import lombok.Data;
610
import org.reactivecommons.api.domain.Command;
711
import org.reactivecommons.api.domain.DomainEvent;
@@ -13,6 +17,7 @@
1317

1418
import java.io.IOException;
1519
import java.nio.charset.StandardCharsets;
20+
import java.util.Base64;
1621

1722
public class JacksonMessageConverter implements MessageConverter {
1823
private static final String CONTENT_TYPE = "application/json";
@@ -28,7 +33,7 @@ public JacksonMessageConverter(ObjectMapper objectMapper) {
2833
public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
2934
try {
3035
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
31-
final T value = objectMapper.treeToValue(asyncQueryJson.getQueryData(), bodyClass);
36+
T value = extractData(bodyClass, asyncQueryJson.getQueryData());
3237
return new AsyncQuery<>(asyncQueryJson.getResource(), value);
3338
} catch (IOException e) {
3439
throw new MessageConversionException("Failed to convert Message content", e);
@@ -39,7 +44,9 @@ public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
3944
public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
4045
try {
4146
final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class);
42-
final T value = objectMapper.treeToValue(domainEventJson.getData(), bodyClass);
47+
48+
T value = extractData(bodyClass, domainEventJson.getData());
49+
4350
return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value);
4451
} catch (IOException e) {
4552
throw new MessageConversionException("Failed to convert Message content", e);
@@ -50,7 +57,7 @@ public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
5057
public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
5158
try {
5259
final CommandJson commandJson = readValue(message, CommandJson.class);
53-
final T value = objectMapper.treeToValue(commandJson.getData(), bodyClass);
60+
T value = extractData(bodyClass, commandJson.getData());
5461
return new Command<>(commandJson.getName(), commandJson.getCommandId(), value);
5562
} catch (IOException e) {
5663
throw new MessageConversionException("Failed to convert Message content", e);
@@ -60,6 +67,14 @@ public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
6067
@Override
6168
public <T> T readValue(Message message, Class<T> valueClass) {
6269
try {
70+
if(valueClass == CloudEvent.class){
71+
72+
return (T) EventFormatProvider
73+
.getInstance()
74+
.resolveFormat(JsonFormat.CONTENT_TYPE)
75+
.deserialize(objectMapper.readValue(message.getBody(), byte[].class));
76+
77+
}
6378
return objectMapper.readValue(message.getBody(), valueClass);
6479
} catch (IOException e) {
6580
throw new MessageConversionException("Failed to convert Message content", e);
@@ -87,10 +102,62 @@ public <T> AsyncQuery<T> readAsyncQueryStructure(Message message) {
87102
return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData());
88103
}
89104

105+
106+
public Message commandToMessage(Command<CloudEvent> object) {
107+
byte[] data = EventFormatProvider
108+
.getInstance()
109+
.resolveFormat(JsonFormat.CONTENT_TYPE)
110+
.serialize(object.getData());
111+
112+
return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data));
113+
}
114+
115+
116+
public Message eventToMessage(DomainEvent<CloudEvent> object) {
117+
byte[] data = EventFormatProvider
118+
.getInstance()
119+
.resolveFormat(JsonFormat.CONTENT_TYPE)
120+
.serialize(object.getData());
121+
122+
return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data));
123+
}
124+
125+
126+
public Message queryToMessage(AsyncQuery<CloudEvent> object) {
127+
byte[] data = EventFormatProvider
128+
.getInstance()
129+
.resolveFormat(JsonFormat.CONTENT_TYPE)
130+
.serialize(object.getQueryData());
131+
132+
return getRabbitMessage(new AsyncQuery<>(object.getResource(), data));
133+
}
90134
@Override
91135
public Message toMessage(Object object) {
136+
byte[] bytes;
137+
if(object instanceof DomainEvent
138+
&& ((DomainEvent) object).getData() instanceof CloudEvent){
139+
140+
return eventToMessage((DomainEvent) object);
141+
142+
}
143+
if(object instanceof Command
144+
&& ((Command) object).getData() instanceof CloudEvent){
145+
146+
return commandToMessage((Command) object);
147+
}
148+
if(object instanceof AsyncQuery
149+
&& ((AsyncQuery) object).getQueryData() instanceof CloudEvent){
150+
151+
return queryToMessage((AsyncQuery) object);
152+
}
153+
154+
return getRabbitMessage(object);
155+
}
156+
157+
private RabbitMessage getRabbitMessage(Object object) {
92158
byte[] bytes;
93159
try {
160+
94161
String jsonString = this.objectMapper.writeValueAsString(object);
95162
bytes = jsonString.getBytes(StandardCharsets.UTF_8);
96163
} catch (IOException e) {
@@ -103,6 +170,23 @@ public Message toMessage(Object object) {
103170
return new RabbitMessage(bytes, props);
104171
}
105172

173+
private <T> T extractData(Class<T> bodyClass, JsonNode node) throws JsonProcessingException {
174+
T value;
175+
if(bodyClass == CloudEvent.class){
176+
177+
value = (T) EventFormatProvider
178+
.getInstance()
179+
.resolveFormat(JsonFormat.CONTENT_TYPE)
180+
.deserialize(Base64.getDecoder()
181+
.decode(node.asText()));
182+
183+
}
184+
else{
185+
value = objectMapper.treeToValue(node, bodyClass);
186+
}
187+
return value;
188+
}
189+
106190
@Data
107191
private static class AsyncQueryJson {
108192
private String resource;

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package org.reactivecommons.async.rabbit.listeners;
22

33
import com.rabbitmq.client.AMQP;
4+
import io.cloudevents.CloudEvent;
5+
import io.cloudevents.core.provider.EventFormatProvider;
6+
import io.cloudevents.jackson.JsonFormat;
47
import lombok.extern.java.Log;
58
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
69
import org.reactivecommons.async.commons.DiscardNotifier;
@@ -149,8 +152,16 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
149152
final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString();
150153
final HashMap<String, Object> headers = new HashMap<>();
151154
headers.put(CORRELATION_ID, correlationID);
155+
Object response = signal.get();
156+
if(response instanceof CloudEvent) {
157+
byte[] serialized = EventFormatProvider
158+
.getInstance()
159+
.resolveFormat(JsonFormat.CONTENT_TYPE)
160+
.serialize((CloudEvent) response);
161+
return sender.sendNoConfirm(serialized, replyExchange, replyID, headers, false);
162+
}
152163

153-
return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false);
164+
return sender.sendNoConfirm(response, replyExchange, replyID, headers, false);
154165
});
155166
}
156167

0 commit comments

Comments
 (0)