Skip to content

Commit 9d66dba

Browse files
Merge pull request #101 from JaidenAshmore/issue/92_message_attribute_argument_resolver
refs #92: Message Attribute support
2 parents 117036d + 903e875 commit 9d66dba

File tree

24 files changed

+1224
-65
lines changed

24 files changed

+1224
-65
lines changed

doc/core-implementations-overview.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ with an implementation that allows for a message to be manually acknowledged whe
7272
signature, the [MessageProcessor](../java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java) is not required to
7373
acknowledge the message after a successful execution. These implementations should be provided by the
7474
[MessageProcessor](../java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java) being used.
75+
- [MessageAttribute](../java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/attribute/MessageAttribute.java): arguments annotated with this
76+
will attempt to parse the contents of the message attribute into this field. For example, if the argument is a String then the attribute will be cast to a
77+
string where as if the argument is an integer it will try and parse the string into the number. This also works with POJOs in that the resolver will
78+
attempt to deserialised the message attribute into this POJO shape, e.g. via the Jackson Object Mapper. This is provided by the
79+
[MessageAttributeArgumentResolver](../java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/attribute/MessageAttributeArgumentResolver.java).
7580
- [VisibilityExtender](../java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/VisibilityExtender.java): arguments of this type
7681
will be injected with an implementation that extends the message visibility of the current message. This is provided by the
7782
[VisibilityExtenderArgumentResolver](../java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/visibility/VisibilityExtenderArgumentResolver.java).

doc/documentation.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ more in depth understanding take a look at the JavaDoc for the API.
1919
1. [How to add a custom ArgumentResolver to a Spring application](how-to-guides/spring/spring-how-to-add-custom-argument-resolver.md): useful for
2020
integrating custom argument resolution code to be included in a Spring Application. See [How to implement a custom ArgumentResolver](how-to-guides/core/core-how-to-implement-a-custom-argument-resolver.md)
2121
for how build a new ArgumentResolver from scratch
22+
1. [How to provide a custom Object Mapper](how-to-guides/spring/spring-how-to-add-custom-argument-resolver.md): guide for overriding the default
23+
`ObjectMapper` that is used to serialise the message body and attributes
2224
1. [How to add your own queue listener](how-to-guides/spring/spring-how-to-add-own-queue-listener.md): useful for defining your own annotation for
2325
queue listening without the verbosity of the custom queue listener
2426
1. [How to write Spring Integration Tests](how-to-guides/spring/spring-how-to-write-integration-tests.md): you actually want to test what you are
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Spring - How to provide own ObjectMapper for serialisation/deserialisation
2+
Currently a Jackson `ObjectMapper` is being used for the serialisation and deserialisation of messages and their attributes. If none is provided
3+
by the application, the Spring Starter will provide their own to use. This shows steps on providing your own so the default is not used.
4+
5+
### Steps
6+
7+
1. In a `@Configuration` class define a bean of type `ObjectMapper`
8+
```java
9+
@Configuration
10+
public class MyConfiguration {
11+
@Bean
12+
public ObjectMapper objectMapper() {
13+
return new ObjectMapper();
14+
}
15+
}
16+
```
17+
18+
19+
## Providing an ObjectMapper without including it in the Spring Context
20+
One problem with doing this method is that this ObjectMapper will be shared with this library as well as any others that need this, e.g. the HTTP layer
21+
may be using this ObjectMapper for their own serialisation. Therefore, if you want to provide an `ObjectMapper` that is only used by this library you would
22+
need to override the individual core [ArgumentResolvers](../../../java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/argument/ArgumentResolver.java)
23+
that are using this `ObjectMapper`, which is the
24+
[PayloadArgumentResolver](../../../java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/payload/PayloadArgumentResolver.java) and
25+
[MessageAttributeArgumentResolver](../../../java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/attribute/MessageAttributeArgumentResolver.java).
26+
27+
### Steps
28+
29+
1. In a `@Configuration` class define your own [ArgumentResolverService](../../../java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/argument/ArgumentResolverService.java)
30+
```java
31+
@Configuration
32+
public class MyConfiguration {
33+
private static final ObjectMapper MY_OBJECT_MAPPER_FOR_SQS = new ObjectMapper();
34+
35+
@Bean
36+
public PayloadArgumentResolver payloadArgumentResolver() {
37+
return new PayloadArgumentResolver(new JacksonPayloadMapper(MY_OBJECT_MAPPER_FOR_SQS));
38+
}
39+
40+
@Bean
41+
public MessageAttributeArgumentResolver messageAttributeArgumentResolver() {
42+
return new MessageAttributeArgumentResolver(objectMapper);
43+
}
44+
}
45+
```

examples/java-dynamic-sqs-listener-core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ private static SqsAsyncClient startElasticMqServer() throws URISyntaxException {
189189
*/
190190
private static ArgumentResolverService argumentResolverService(final SqsAsyncClient sqsAsyncClient) {
191191
final PayloadMapper payloadMapper = new JacksonPayloadMapper(OBJECT_MAPPER);
192-
return new CoreArgumentResolverService(payloadMapper, sqsAsyncClient);
192+
return new CoreArgumentResolverService(payloadMapper, sqsAsyncClient, OBJECT_MAPPER);
193193
}
194194

195195
/**

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/CoreArgumentResolverService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.google.common.collect.ImmutableSet;
44

5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.jashmore.sqs.argument.attribute.MessageAttributeArgumentResolver;
57
import com.jashmore.sqs.argument.messageid.MessageIdArgumentResolver;
68
import com.jashmore.sqs.argument.payload.PayloadArgumentResolver;
79
import com.jashmore.sqs.argument.payload.mapper.PayloadMapper;
@@ -22,10 +24,12 @@ public class CoreArgumentResolverService implements ArgumentResolverService {
2224
private final DelegatingArgumentResolverService delegatingArgumentResolverService;
2325

2426
public CoreArgumentResolverService(final PayloadMapper payloadMapper,
25-
final SqsAsyncClient sqsAsyncClient) {
27+
final SqsAsyncClient sqsAsyncClient,
28+
final ObjectMapper objectMapper) {
2629
final Set<ArgumentResolver<?>> argumentResolvers = ImmutableSet.of(
2730
new PayloadArgumentResolver(payloadMapper),
2831
new MessageIdArgumentResolver(),
32+
new MessageAttributeArgumentResolver(objectMapper),
2933
new VisibilityExtenderArgumentResolver(sqsAsyncClient)
3034
);
3135
this.delegatingArgumentResolverService = new DelegatingArgumentResolverService(argumentResolvers);

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/DelegatingArgumentResolverService.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,21 @@ public class DelegatingArgumentResolverService implements ArgumentResolverServic
1818

1919
@Override
2020
public Object resolveArgument(final QueueProperties queueProperties, final MethodParameter methodParameter, final Message message) {
21-
return argumentResolvers.stream()
22-
.filter(resolver -> resolver.canResolveParameter(methodParameter))
23-
.map(resolver -> {
24-
try {
25-
return resolver.resolveArgumentForParameter(queueProperties, methodParameter, message);
26-
} catch (final RuntimeException runtimeException) {
27-
// Make sure to wrap any unintended exceptions with the expected exception for errors
28-
if (!ArgumentResolutionException.class.isAssignableFrom(runtimeException.getClass())) {
29-
throw new ArgumentResolutionException("Error obtaining an argument value for parameter", runtimeException);
30-
}
31-
32-
throw runtimeException;
21+
for (final ArgumentResolver<?> resolver: argumentResolvers) {
22+
if (resolver.canResolveParameter(methodParameter)) {
23+
try {
24+
return resolver.resolveArgumentForParameter(queueProperties, methodParameter, message);
25+
} catch (final RuntimeException runtimeException) {
26+
// Make sure to wrap any unintended exceptions with the expected exception for errors
27+
if (!ArgumentResolutionException.class.isAssignableFrom(runtimeException.getClass())) {
28+
throw new ArgumentResolutionException("Error obtaining an argument value for parameter", runtimeException);
3329
}
34-
})
35-
.findFirst()
36-
.orElseThrow(() -> new ArgumentResolutionException("No ArgumentResolver found that can process this parameter"));
30+
31+
throw runtimeException;
32+
}
33+
}
34+
}
35+
36+
throw new ArgumentResolutionException("No ArgumentResolver found that can process this parameter");
3737
}
3838
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.jashmore.sqs.argument.attribute;
2+
3+
import static java.lang.annotation.ElementType.PARAMETER;
4+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
5+
6+
import java.lang.annotation.Retention;
7+
import java.lang.annotation.Target;
8+
9+
/**
10+
* Populate on of the arguments in a message processing method by taking a value from the message attributes.
11+
*/
12+
@Retention(RUNTIME)
13+
@Target(PARAMETER)
14+
public @interface MessageAttribute {
15+
/**
16+
* The name of the attribute to use.
17+
*
18+
* @return the attribute name
19+
*/
20+
String value();
21+
22+
/**
23+
* Fail the processing of the message if the message attribute with the provided name does not exist.
24+
*
25+
* @return whether the message should fail to be processed if the message attribute is missing
26+
*/
27+
boolean required() default false;
28+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package com.jashmore.sqs.argument.attribute;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.jashmore.sqs.QueueProperties;
5+
import com.jashmore.sqs.argument.ArgumentResolutionException;
6+
import com.jashmore.sqs.argument.ArgumentResolver;
7+
import com.jashmore.sqs.argument.MethodParameter;
8+
import com.jashmore.sqs.util.annotation.AnnotationUtils;
9+
import software.amazon.awssdk.services.sqs.model.Message;
10+
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
11+
12+
import java.io.IOException;
13+
import java.nio.charset.Charset;
14+
import java.util.Optional;
15+
16+
/**
17+
* An {@link ArgumentResolver} that is able to handle the extraction of information from the attributes of the SQS message.
18+
*
19+
* <p>This will attempt to do its best in casting the contents of the message attribute to the type of tha parameter. For example, if the contents of
20+
* the message attribute is binary (byte[]) and the parameter is a POJO, it will attempt to serialise using the {@link ObjectMapper#readValue(byte[], Class)}.
21+
*
22+
* <p>It is the responsibility of the consumer to make sure that the type of the parameter is correct in regards to the content of the message attribute. For
23+
* example, this resolver ignores all helper data types after the main, e.g. Number.float data types will have the float ignored.
24+
*
25+
* <p>This current implementation uses the Jackson {@link ObjectMapper} to perform all of the parsing and there is the potential for a future version of this
26+
* library to split this out so that Jackson isn't a required dependency.
27+
*
28+
* @see <a href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-attributes.html">SQS Message Attributes</a>
29+
* @see MessageAttributeValue
30+
*/
31+
public class MessageAttributeArgumentResolver implements ArgumentResolver<Object> {
32+
private final ObjectMapper objectMapper;
33+
34+
public MessageAttributeArgumentResolver(final ObjectMapper objectMapper) {
35+
this.objectMapper = objectMapper;
36+
}
37+
38+
@Override
39+
public boolean canResolveParameter(final MethodParameter methodParameter) {
40+
return AnnotationUtils.findParameterAnnotation(methodParameter, MessageAttribute.class).isPresent();
41+
}
42+
43+
@Override
44+
public Object resolveArgumentForParameter(final QueueProperties queueProperties,
45+
final MethodParameter methodParameter,
46+
final Message message) throws ArgumentResolutionException {
47+
final MessageAttribute annotation = AnnotationUtils.findParameterAnnotation(methodParameter, MessageAttribute.class)
48+
.orElseThrow(() -> new ArgumentResolutionException("Parameter passed in does not contain the MessageAttribute annotation when it should"));
49+
50+
final String attributeName = annotation.value();
51+
52+
final Optional<MessageAttributeValue> optionalMessageAttributeValue = Optional.ofNullable(message.messageAttributes().get(attributeName));
53+
54+
if (!optionalMessageAttributeValue.isPresent()) {
55+
if (annotation.required()) {
56+
throw new ArgumentResolutionException("Required Message Attribute '" + attributeName + "' is missing from message");
57+
}
58+
59+
return null;
60+
}
61+
62+
final MessageAttributeValue messageAttributeValue = optionalMessageAttributeValue.get();
63+
64+
if (messageAttributeValue.dataType().startsWith(MessageAttributeDataTypes.STRING.getValue())
65+
|| messageAttributeValue.dataType().startsWith(MessageAttributeDataTypes.NUMBER.getValue())) {
66+
return handleStringParameterValue(methodParameter, messageAttributeValue, attributeName);
67+
} else if (messageAttributeValue.dataType().startsWith(MessageAttributeDataTypes.BINARY.getValue())) {
68+
return handleByteParameterValue(methodParameter, messageAttributeValue);
69+
}
70+
71+
throw new ArgumentResolutionException("Cannot parse message attribute due to unknown data type '" + messageAttributeValue.dataType() + "'");
72+
}
73+
74+
/**
75+
* Handle resolving the argument from the string contents of the attribute.
76+
*
77+
* @param methodParameter the parameter of the method to resolve
78+
* @param messageAttributeValue the value of the message attribute
79+
* @param attributeName the name of the attribute that is being consumed
80+
* @return the resolved argument from the attribute
81+
*/
82+
private Object handleStringParameterValue(final MethodParameter methodParameter,
83+
final MessageAttributeValue messageAttributeValue,
84+
final String attributeName) {
85+
if (methodParameter.getParameter().getType().isAssignableFrom(String.class)) {
86+
return messageAttributeValue.stringValue();
87+
}
88+
89+
try {
90+
return objectMapper.readValue(messageAttributeValue.stringValue(), methodParameter.getParameter().getType());
91+
} catch (final IOException ioException) {
92+
throw new ArgumentResolutionException("Error parsing Message Attribute '" + attributeName + "'", ioException);
93+
}
94+
}
95+
96+
/**
97+
* Handle an attribute that contains the data as bytes.
98+
*
99+
* @param methodParameter details about the parameter to parse the message attribute into
100+
* @param messageAttributeValue value of the message attribute
101+
* @return the argument resolved for this attribute
102+
*/
103+
private Object handleByteParameterValue(final MethodParameter methodParameter,
104+
final MessageAttributeValue messageAttributeValue) {
105+
final byte[] byteArray = messageAttributeValue.binaryValue().asByteArray();
106+
final Class<?> parameterClass = methodParameter.getParameter().getType();
107+
if (parameterClass == byte[].class) {
108+
return byteArray;
109+
}
110+
111+
if (parameterClass.isAssignableFrom(String.class)) {
112+
return new String(byteArray, Charset.forName("UTF-8"));
113+
}
114+
115+
try {
116+
return objectMapper.readValue(byteArray, parameterClass);
117+
} catch (final IOException ioException) {
118+
throw new ArgumentResolutionException("Failure to parse binary bytes to '" + parameterClass.getName() + "'", ioException);
119+
}
120+
}
121+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.jashmore.sqs.argument.attribute;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
6+
@Getter
7+
@AllArgsConstructor
8+
public enum MessageAttributeDataTypes {
9+
STRING("String"),
10+
NUMBER("Number"),
11+
BINARY("Binary");
12+
13+
private final String value;
14+
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,17 @@ public void run() {
6868
try {
6969
return messageRetriever.retrieveMessage();
7070
} catch (final InterruptedException exception) {
71-
log.debug("Thread interrupted waiting for a message");
72-
throw new RuntimeException("Failure to get message");
71+
log.trace("Thread interrupted waiting for a message");
72+
throw new BrokerStoppedWhileRetrievingMessageException();
7373
}
7474
}, concurrentThreadsExecutor)
7575
.thenAcceptAsync(messageProcessor::processMessage, messageProcessingThreadsExecutor)
76-
.whenComplete((ignoredResult, throwable) -> concurrentMessagesBeingProcessedSemaphore.release());
76+
.whenComplete((ignoredResult, throwable) -> {
77+
if (throwable != null && !(throwable.getCause() instanceof BrokerStoppedWhileRetrievingMessageException)) {
78+
log.error("Error processing message", throwable.getCause());
79+
}
80+
concurrentMessagesBeingProcessedSemaphore.release();
81+
});
7782
} catch (final Throwable throwable) {
7883
try {
7984
final long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds();
@@ -176,4 +181,12 @@ private long getNumberOfMillisecondsToObtainPermit() {
176181
DEFAULT_BACKOFF_TIME_IN_MS
177182
);
178183
}
184+
185+
/**
186+
* Internal exception used to be thrown when the thread is interrupted while retrieving messages. This is because we don't want a
187+
* error to be logged for this scenario but only when their was an actual exception processing the message.
188+
*/
189+
private static class BrokerStoppedWhileRetrievingMessageException extends RuntimeException {
190+
191+
}
179192
}

0 commit comments

Comments
 (0)