Skip to content

Commit a7f59ac

Browse files
committed
Refinamiento codigo
1 parent b2b7017 commit a7f59ac

File tree

3 files changed

+214
-88
lines changed

3 files changed

+214
-88
lines changed

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
2727
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
2828
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
29+
import org.reactivecommons.async.rabbit.converters.json.JacksonCloudEventMessageConverter;
2930
import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter;
3031
import org.springframework.beans.factory.annotation.Value;
3132
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -156,7 +157,7 @@ public ObjectMapperSupplier objectMapperSupplier() {
156157
@Bean
157158
@ConditionalOnMissingBean
158159
public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) {
159-
return new JacksonMessageConverter(objectMapperSupplier.get());
160+
return new JacksonCloudEventMessageConverter(objectMapperSupplier.get());
160161
}
161162

162163
@Bean
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package org.reactivecommons.async.rabbit.converters.json;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JsonNode;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import io.cloudevents.CloudEvent;
7+
import io.cloudevents.core.provider.EventFormatProvider;
8+
import io.cloudevents.jackson.JsonFormat;
9+
import lombok.Data;
10+
import org.reactivecommons.api.domain.Command;
11+
import org.reactivecommons.api.domain.DomainEvent;
12+
import org.reactivecommons.async.api.AsyncQuery;
13+
import org.reactivecommons.async.commons.communications.Message;
14+
import org.reactivecommons.async.commons.converters.MessageConverter;
15+
import org.reactivecommons.async.commons.exceptions.MessageConversionException;
16+
import org.reactivecommons.async.rabbit.RabbitMessage;
17+
18+
import java.io.IOException;
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.Base64;
21+
22+
public class JacksonCloudEventMessageConverter implements MessageConverter {
23+
private static final String CONTENT_TYPE = "application/json";
24+
25+
private final ObjectMapper objectMapper;
26+
27+
28+
public JacksonCloudEventMessageConverter(ObjectMapper objectMapper) {
29+
this.objectMapper = objectMapper;
30+
}
31+
32+
@Override
33+
public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
34+
try {
35+
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
36+
T value = extractData(bodyClass, asyncQueryJson.getQueryData());
37+
return new AsyncQuery<>(asyncQueryJson.getResource(), value);
38+
} catch (IOException e) {
39+
throw new MessageConversionException("Failed to convert Message content", e);
40+
}
41+
}
42+
43+
@Override
44+
public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
45+
try {
46+
final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class);
47+
48+
T value = extractData(bodyClass, domainEventJson.getData());
49+
50+
return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value);
51+
} catch (IOException e) {
52+
throw new MessageConversionException("Failed to convert Message content", e);
53+
}
54+
}
55+
56+
@Override
57+
public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
58+
try {
59+
final CommandJson commandJson = readValue(message, CommandJson.class);
60+
T value = extractData(bodyClass, commandJson.getData());
61+
return new Command<>(commandJson.getName(), commandJson.getCommandId(), value);
62+
} catch (IOException e) {
63+
throw new MessageConversionException("Failed to convert Message content", e);
64+
}
65+
}
66+
67+
@Override
68+
public <T> T readValue(Message message, Class<T> valueClass) {
69+
try {
70+
if(valueClass == CloudEvent.class){
71+
72+
return (T) EventFormatProvider
73+
.getInstance()
74+
.resolveFormat(JsonFormat.CONTENT_TYPE)
75+
.deserialize(objectMapper.readValue(message.getBody(), byte[].class));
76+
77+
}
78+
return objectMapper.readValue(message.getBody(), valueClass);
79+
} catch (IOException e) {
80+
throw new MessageConversionException("Failed to convert Message content", e);
81+
}
82+
}
83+
84+
@Override
85+
@SuppressWarnings("unchecked")
86+
public <T> Command<T> readCommandStructure(Message message) {
87+
final CommandJson commandJson = readValue(message, CommandJson.class);
88+
return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T) commandJson.getData());
89+
}
90+
91+
@Override
92+
@SuppressWarnings("unchecked")
93+
public <T> DomainEvent<T> readDomainEventStructure(Message message) {
94+
final DomainEventJson eventJson = readValue(message, DomainEventJson.class);
95+
return new DomainEvent<>(eventJson.getName(), eventJson.getEventId(), (T) eventJson.getData());
96+
}
97+
98+
@Override
99+
@SuppressWarnings("unchecked")
100+
public <T> AsyncQuery<T> readAsyncQueryStructure(Message message) {
101+
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
102+
return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData());
103+
}
104+
105+
106+
public Message commandToMessage(Command<CloudEvent> object) {
107+
byte[] data = EventFormatProvider
108+
.getInstance()
109+
.resolveFormat(JsonFormat.CONTENT_TYPE)
110+
.serialize(object.getData());
111+
112+
return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data));
113+
}
114+
115+
116+
public Message eventToMessage(DomainEvent<CloudEvent> object) {
117+
byte[] data = EventFormatProvider
118+
.getInstance()
119+
.resolveFormat(JsonFormat.CONTENT_TYPE)
120+
.serialize(object.getData());
121+
122+
return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data));
123+
}
124+
125+
126+
public Message queryToMessage(AsyncQuery<CloudEvent> object) {
127+
byte[] data = EventFormatProvider
128+
.getInstance()
129+
.resolveFormat(JsonFormat.CONTENT_TYPE)
130+
.serialize(object.getQueryData());
131+
132+
return getRabbitMessage(new AsyncQuery<>(object.getResource(), data));
133+
}
134+
@Override
135+
public Message toMessage(Object object) {
136+
byte[] bytes;
137+
if(object instanceof DomainEvent
138+
&& ((DomainEvent) object).getData() instanceof CloudEvent){
139+
140+
return eventToMessage((DomainEvent) object);
141+
142+
}
143+
if(object instanceof Command
144+
&& ((Command) object).getData() instanceof CloudEvent){
145+
146+
return commandToMessage((Command) object);
147+
}
148+
if(object instanceof AsyncQuery
149+
&& ((AsyncQuery) object).getQueryData() instanceof CloudEvent){
150+
151+
return queryToMessage((AsyncQuery) object);
152+
}
153+
154+
return getRabbitMessage(object);
155+
}
156+
157+
private RabbitMessage getRabbitMessage(Object object) {
158+
byte[] bytes;
159+
try {
160+
161+
String jsonString = this.objectMapper.writeValueAsString(object);
162+
bytes = jsonString.getBytes(StandardCharsets.UTF_8);
163+
} catch (IOException e) {
164+
throw new MessageConversionException("Failed to convert Message content", e);
165+
}
166+
RabbitMessage.RabbitMessageProperties props = new RabbitMessage.RabbitMessageProperties();
167+
props.setContentType(CONTENT_TYPE);
168+
props.setContentEncoding(StandardCharsets.UTF_8.name());
169+
props.setContentLength(bytes.length);
170+
return new RabbitMessage(bytes, props);
171+
}
172+
173+
private <T> T extractData(Class<T> bodyClass, JsonNode node) throws JsonProcessingException {
174+
T value;
175+
if(bodyClass == CloudEvent.class){
176+
177+
value = (T) EventFormatProvider
178+
.getInstance()
179+
.resolveFormat(JsonFormat.CONTENT_TYPE)
180+
.deserialize(Base64.getDecoder()
181+
.decode(node.asText()));
182+
183+
}
184+
else{
185+
value = objectMapper.treeToValue(node, bodyClass);
186+
}
187+
return value;
188+
}
189+
190+
@Data
191+
private static class AsyncQueryJson {
192+
private String resource;
193+
private JsonNode queryData;
194+
}
195+
196+
@Data
197+
private static class DomainEventJson {
198+
private String name;
199+
private String eventId;
200+
private JsonNode data;
201+
}
202+
203+
@Data
204+
private static class CommandJson {
205+
private String name;
206+
private String commandId;
207+
private JsonNode data;
208+
}
209+
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java

Lines changed: 3 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
package org.reactivecommons.async.rabbit.converters.json;
22

3-
import com.fasterxml.jackson.core.JsonProcessingException;
43
import com.fasterxml.jackson.databind.JsonNode;
54
import com.fasterxml.jackson.databind.ObjectMapper;
6-
import io.cloudevents.CloudEvent;
7-
import io.cloudevents.core.provider.EventFormatProvider;
8-
import io.cloudevents.jackson.JsonFormat;
95
import lombok.Data;
106
import org.reactivecommons.api.domain.Command;
117
import org.reactivecommons.api.domain.DomainEvent;
@@ -17,7 +13,6 @@
1713

1814
import java.io.IOException;
1915
import java.nio.charset.StandardCharsets;
20-
import java.util.Base64;
2116

2217
public class JacksonMessageConverter implements MessageConverter {
2318
private static final String CONTENT_TYPE = "application/json";
@@ -33,7 +28,7 @@ public JacksonMessageConverter(ObjectMapper objectMapper) {
3328
public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
3429
try {
3530
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
36-
T value = extractData(bodyClass, asyncQueryJson.getQueryData());
31+
final T value = objectMapper.treeToValue(asyncQueryJson.getQueryData(), bodyClass);
3732
return new AsyncQuery<>(asyncQueryJson.getResource(), value);
3833
} catch (IOException e) {
3934
throw new MessageConversionException("Failed to convert Message content", e);
@@ -44,9 +39,7 @@ public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
4439
public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
4540
try {
4641
final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class);
47-
48-
T value = extractData(bodyClass, domainEventJson.getData());
49-
42+
final T value = objectMapper.treeToValue(domainEventJson.getData(), bodyClass);
5043
return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value);
5144
} catch (IOException e) {
5245
throw new MessageConversionException("Failed to convert Message content", e);
@@ -57,7 +50,7 @@ public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
5750
public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
5851
try {
5952
final CommandJson commandJson = readValue(message, CommandJson.class);
60-
T value = extractData(bodyClass, commandJson.getData());
53+
final T value = objectMapper.treeToValue(commandJson.getData(), bodyClass);
6154
return new Command<>(commandJson.getName(), commandJson.getCommandId(), value);
6255
} catch (IOException e) {
6356
throw new MessageConversionException("Failed to convert Message content", e);
@@ -67,14 +60,6 @@ public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
6760
@Override
6861
public <T> T readValue(Message message, Class<T> valueClass) {
6962
try {
70-
if(valueClass == CloudEvent.class){
71-
72-
return (T) EventFormatProvider
73-
.getInstance()
74-
.resolveFormat(JsonFormat.CONTENT_TYPE)
75-
.deserialize(objectMapper.readValue(message.getBody(), byte[].class));
76-
77-
}
7863
return objectMapper.readValue(message.getBody(), valueClass);
7964
} catch (IOException e) {
8065
throw new MessageConversionException("Failed to convert Message content", e);
@@ -102,62 +87,10 @@ public <T> AsyncQuery<T> readAsyncQueryStructure(Message message) {
10287
return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData());
10388
}
10489

105-
106-
public Message commandToMessage(Command<CloudEvent> object) {
107-
byte[] data = EventFormatProvider
108-
.getInstance()
109-
.resolveFormat(JsonFormat.CONTENT_TYPE)
110-
.serialize(object.getData());
111-
112-
return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data));
113-
}
114-
115-
116-
public Message eventToMessage(DomainEvent<CloudEvent> object) {
117-
byte[] data = EventFormatProvider
118-
.getInstance()
119-
.resolveFormat(JsonFormat.CONTENT_TYPE)
120-
.serialize(object.getData());
121-
122-
return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data));
123-
}
124-
125-
126-
public Message queryToMessage(AsyncQuery<CloudEvent> object) {
127-
byte[] data = EventFormatProvider
128-
.getInstance()
129-
.resolveFormat(JsonFormat.CONTENT_TYPE)
130-
.serialize(object.getQueryData());
131-
132-
return getRabbitMessage(new AsyncQuery<>(object.getResource(), data));
133-
}
13490
@Override
13591
public Message toMessage(Object object) {
136-
byte[] bytes;
137-
if(object instanceof DomainEvent
138-
&& ((DomainEvent) object).getData() instanceof CloudEvent){
139-
140-
return eventToMessage((DomainEvent) object);
141-
142-
}
143-
if(object instanceof Command
144-
&& ((Command) object).getData() instanceof CloudEvent){
145-
146-
return commandToMessage((Command) object);
147-
}
148-
if(object instanceof AsyncQuery
149-
&& ((AsyncQuery) object).getQueryData() instanceof CloudEvent){
150-
151-
return queryToMessage((AsyncQuery) object);
152-
}
153-
154-
return getRabbitMessage(object);
155-
}
156-
157-
private RabbitMessage getRabbitMessage(Object object) {
15892
byte[] bytes;
15993
try {
160-
16194
String jsonString = this.objectMapper.writeValueAsString(object);
16295
bytes = jsonString.getBytes(StandardCharsets.UTF_8);
16396
} catch (IOException e) {
@@ -170,23 +103,6 @@ private RabbitMessage getRabbitMessage(Object object) {
170103
return new RabbitMessage(bytes, props);
171104
}
172105

173-
private <T> T extractData(Class<T> bodyClass, JsonNode node) throws JsonProcessingException {
174-
T value;
175-
if(bodyClass == CloudEvent.class){
176-
177-
value = (T) EventFormatProvider
178-
.getInstance()
179-
.resolveFormat(JsonFormat.CONTENT_TYPE)
180-
.deserialize(Base64.getDecoder()
181-
.decode(node.asText()));
182-
183-
}
184-
else{
185-
value = objectMapper.treeToValue(node, bodyClass);
186-
}
187-
return value;
188-
}
189-
190106
@Data
191107
private static class AsyncQueryJson {
192108
private String resource;

0 commit comments

Comments
 (0)