Skip to content

Commit d0a78d2

Browse files
Merge pull request #102 from JaidenAshmore/issue/92_message_system_attribute_argument_resolver
refs #92: Added support for individual Message System Attributes
2 parents 9d66dba + bd38f94 commit d0a78d2

File tree

16 files changed

+812
-7
lines changed

16 files changed

+812
-7
lines changed

java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/retriever/MessageRetriever.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
* remote queue which will be taken by the {@link MessageBroker} and transferred to the corresponding {@link MessageProcessor} that knows how to process
1414
* this message.
1515
*
16+
* <p>Messages that are downloaded from the remote server must contain all of the {@link Message#attributes()} and {@link Message#messageAttributes()} as
17+
* they can be consumed by corresponding {@link com.jashmore.sqs.argument.ArgumentResolver}s.
18+
*
1619
* <p>As there could be multiple threads wanting to process messages the implementations of this class must be thread safe.
1720
*/
1821
@ThreadSafe

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/argument/CoreArgumentResolverService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import com.fasterxml.jackson.databind.ObjectMapper;
66
import com.jashmore.sqs.argument.attribute.MessageAttributeArgumentResolver;
7+
import com.jashmore.sqs.argument.attribute.MessageSystemAttributeArgumentResolver;
78
import com.jashmore.sqs.argument.messageid.MessageIdArgumentResolver;
89
import com.jashmore.sqs.argument.payload.PayloadArgumentResolver;
910
import com.jashmore.sqs.argument.payload.mapper.PayloadMapper;
@@ -30,6 +31,7 @@ public CoreArgumentResolverService(final PayloadMapper payloadMapper,
3031
new PayloadArgumentResolver(payloadMapper),
3132
new MessageIdArgumentResolver(),
3233
new MessageAttributeArgumentResolver(objectMapper),
34+
new MessageSystemAttributeArgumentResolver(),
3335
new VisibilityExtenderArgumentResolver(sqsAsyncClient)
3436
);
3537
this.delegatingArgumentResolverService = new DelegatingArgumentResolverService(argumentResolvers);
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.jashmore.sqs.argument.attribute;
2+
3+
import static java.lang.annotation.ElementType.PARAMETER;
4+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
5+
6+
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
7+
8+
import java.lang.annotation.Retention;
9+
import java.lang.annotation.Target;
10+
11+
/**
12+
* Populate on of the arguments in a message processing method by taking a value from the message system attributes.
13+
*
14+
* <p>There are currently a subset of types that can be used for this parameter. These include:
15+
*
16+
* <ul>
17+
* <li>{@link String}: all attributes can be cast to a String</li>
18+
* <li>{@link Integer}, int, {@link Long}, long: any attributes that are in number format can be cast to these. These include:
19+
* <ul>
20+
* <li>{@link MessageSystemAttributeName#SENT_TIMESTAMP}</li>
21+
* <li>{@link MessageSystemAttributeName#APPROXIMATE_RECEIVE_COUNT}</li>
22+
* <li>{@link MessageSystemAttributeName#APPROXIMATE_FIRST_RECEIVE_TIMESTAMP}</li>
23+
* </ul>
24+
* </li>
25+
* <li>{@link java.time.OffsetDateTime}, {@link java.time.Instant}: attributes that are for timestamps can be cast to this. These include:
26+
* <ul>
27+
* <li>{@link MessageSystemAttributeName#SENT_TIMESTAMP}</li>
28+
* <li>{@link MessageSystemAttributeName#APPROXIMATE_FIRST_RECEIVE_TIMESTAMP}</li>
29+
* </ul>
30+
* </li>
31+
* </ul>
32+
*/
33+
@Retention(RUNTIME)
34+
@Target(PARAMETER)
35+
public @interface MessageSystemAttribute {
36+
/**
37+
* The system attribute that should be used to populate this parameter.
38+
*
39+
* @return the attribute to consume
40+
*/
41+
MessageSystemAttributeName value();
42+
43+
/**
44+
* Fail the processing of the message if the message attribute with the provided name does not exist.
45+
*
46+
* @return whether the message should fail to be processed if the message attribute is missing
47+
*/
48+
boolean required() default false;
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.jashmore.sqs.argument.attribute;
2+
3+
import static java.time.ZoneOffset.UTC;
4+
import static software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_FIRST_RECEIVE_TIMESTAMP;
5+
import static software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP;
6+
7+
import com.jashmore.sqs.QueueProperties;
8+
import com.jashmore.sqs.argument.ArgumentResolutionException;
9+
import com.jashmore.sqs.argument.ArgumentResolver;
10+
import com.jashmore.sqs.argument.MethodParameter;
11+
import com.jashmore.sqs.util.annotation.AnnotationUtils;
12+
import software.amazon.awssdk.services.sqs.model.Message;
13+
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
14+
15+
import java.time.Instant;
16+
import java.time.OffsetDateTime;
17+
import java.util.Optional;
18+
19+
public class MessageSystemAttributeArgumentResolver implements ArgumentResolver<Object> {
20+
@Override
21+
public boolean canResolveParameter(final MethodParameter methodParameter) {
22+
return AnnotationUtils.findParameterAnnotation(methodParameter, MessageSystemAttribute.class).isPresent();
23+
}
24+
25+
@Override
26+
public Object resolveArgumentForParameter(final QueueProperties queueProperties,
27+
final MethodParameter methodParameter,
28+
final Message message) throws ArgumentResolutionException {
29+
final MessageSystemAttribute annotation = AnnotationUtils.findParameterAnnotation(methodParameter, MessageSystemAttribute.class)
30+
.orElseThrow(() -> new ArgumentResolutionException(
31+
"Parameter passed in does not contain the MessageSystemAttribute annotation when it should"
32+
));
33+
34+
final MessageSystemAttributeName messageSystemAttributeName = annotation.value();
35+
final Optional<String> optionalAttributeValue = Optional.ofNullable(message.attributes().get(messageSystemAttributeName));
36+
37+
if (!optionalAttributeValue.isPresent() ) {
38+
if (annotation.required()) {
39+
throw new ArgumentResolutionException("Missing system attribute with name: " + messageSystemAttributeName.toString());
40+
}
41+
42+
return null;
43+
}
44+
final String attributeValue = optionalAttributeValue.get();
45+
46+
final Class<?> parameterType = methodParameter.getParameter().getType();
47+
try {
48+
if (parameterType == String.class) {
49+
return attributeValue;
50+
}
51+
52+
if (parameterType == Integer.class || parameterType == int.class) {
53+
return Integer.parseInt(attributeValue);
54+
}
55+
56+
if (parameterType == Long.class || parameterType == long.class) {
57+
return Long.parseLong(attributeValue);
58+
}
59+
} catch (final RuntimeException exception) {
60+
throw new ArgumentResolutionException("Error parsing message attribute: " + messageSystemAttributeName.toString(), exception);
61+
}
62+
63+
if (messageSystemAttributeName == SENT_TIMESTAMP || messageSystemAttributeName == APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) {
64+
return handleTimeStampAttributes(methodParameter.getParameter().getType(), messageSystemAttributeName, attributeValue);
65+
}
66+
67+
throw new ArgumentResolutionException("Unsupported parameter type " + parameterType.getName()
68+
+ " for system attribute " + messageSystemAttributeName.toString());
69+
}
70+
71+
private Object handleTimeStampAttributes(final Class<?> parameterType,
72+
final MessageSystemAttributeName messageSystemAttributeName,
73+
final String attributeValue) {
74+
if (parameterType == Instant.class) {
75+
return Instant.ofEpochMilli(Long.parseLong(attributeValue));
76+
}
77+
78+
if (parameterType == OffsetDateTime.class) {
79+
return OffsetDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(attributeValue)), UTC);
80+
}
81+
82+
throw new ArgumentResolutionException("Unsupported parameter type " + parameterType.getName()
83+
+ " for system attribute " + messageSystemAttributeName.toString());
84+
}
85+
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/batching/BatchingMessageRetriever.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ void backoff(final long backoffTimeInMs) throws InterruptedException {
203203
private ReceiveMessageRequest buildReceiveMessageRequest(final int numberOfMessagesToObtain) {
204204
final ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder()
205205
.queueUrl(queueProperties.getQueueUrl())
206+
.attributeNames(QueueAttributeName.ALL)
206207
.messageAttributeNames(QueueAttributeName.ALL.toString())
207208
.maxNumberOfMessages(numberOfMessagesToObtain)
208209
.waitTimeSeconds(RetrieverUtils.safelyGetWaitTimeInSeconds(properties::getMessageWaitTimeInSeconds));

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/individual/IndividualMessageRetriever.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private ReceiveMessageRequest generateReceiveMessageRequest() {
5555
.builder()
5656
.queueUrl(queueProperties.getQueueUrl())
5757
.maxNumberOfMessages(1)
58+
.attributeNames(QueueAttributeName.ALL)
5859
.messageAttributeNames(QueueAttributeName.ALL.toString())
5960
.waitTimeSeconds(MAX_SQS_RECEIVE_WAIT_TIME_IN_SECONDS)
6061
.visibilityTimeout(properties.getVisibilityTimeoutForMessagesInSeconds())

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/prefetch/PrefetchingMessageRetriever.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ private ReceiveMessageRequest buildReceiveMessageRequest() {
158158
log.debug("Retrieving {} messages asynchronously", numberOfMessagesToObtain);
159159
final ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder()
160160
.queueUrl(queueProperties.getQueueUrl())
161+
.attributeNames(QueueAttributeName.ALL)
161162
.messageAttributeNames(QueueAttributeName.ALL.toString())
162163
.waitTimeSeconds(RetrieverUtils.safelyGetWaitTimeInSeconds(properties::getMessageWaitTimeInSeconds))
163164
.maxNumberOfMessages(numberOfMessagesToObtain);

java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/CoreArgumentResolverServiceTest.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.jashmore.sqs.QueueProperties;
99
import com.jashmore.sqs.argument.attribute.MessageAttribute;
1010
import com.jashmore.sqs.argument.attribute.MessageAttributeDataTypes;
11+
import com.jashmore.sqs.argument.attribute.MessageSystemAttribute;
1112
import com.jashmore.sqs.argument.messageid.MessageId;
1213
import com.jashmore.sqs.argument.payload.Payload;
1314
import com.jashmore.sqs.argument.payload.mapper.JacksonPayloadMapper;
@@ -21,6 +22,7 @@
2122
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2223
import software.amazon.awssdk.services.sqs.model.Message;
2324
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
25+
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
2426

2527
import java.lang.reflect.Method;
2628
import java.util.Map;
@@ -46,7 +48,7 @@ public void setUp() {
4648
@Test
4749
public void shouldBeAbleToResolvePayloadParameters() throws Exception {
4850
// arrange
49-
final Method method = CoreArgumentResolverServiceTest.class.getMethod("method", Map.class, String.class, String.class);
51+
final Method method = CoreArgumentResolverServiceTest.class.getMethod("method", Map.class, String.class, String.class, String.class);
5052
final Map<String, String> payload = ImmutableMap.of("key", "value");
5153
final Message message = Message.builder()
5254
.body(objectMapper.writeValueAsString(payload))
@@ -70,7 +72,7 @@ public void shouldBeAbleToResolvePayloadParameters() throws Exception {
7072
@Test
7173
public void shouldBeAbleToResolveMessageIdParameters() throws Exception {
7274
// arrange
73-
final Method method = CoreArgumentResolverServiceTest.class.getMethod("method", Map.class, String.class, String.class);
75+
final Method method = CoreArgumentResolverServiceTest.class.getMethod("method", Map.class, String.class, String.class, String.class);
7476
final Map<String, String> payload = ImmutableMap.of("key", "value");
7577
final Message message = Message.builder()
7678
.body(objectMapper.writeValueAsString(payload))
@@ -93,7 +95,7 @@ public void shouldBeAbleToResolveMessageIdParameters() throws Exception {
9395
@Test
9496
public void shouldBeAbleToResolveMessageAttributeParameters() throws Exception {
9597
// arrange
96-
final Method method = CoreArgumentResolverServiceTest.class.getMethod("method", Map.class, String.class, String.class);
98+
final Method method = CoreArgumentResolverServiceTest.class.getMethod("method", Map.class, String.class, String.class, String.class);
9799
final Map<String, String> payload = ImmutableMap.of("key", "value");
98100
final Message message = Message.builder()
99101
.body(objectMapper.writeValueAsString(payload))
@@ -119,8 +121,35 @@ public void shouldBeAbleToResolveMessageAttributeParameters() throws Exception {
119121
assertThat(payloadArgument).isEqualTo("test");
120122
}
121123

124+
@Test
125+
public void shouldBeAbleToResolveMessageSystemAttributeParameters() throws Exception {
126+
// arrange
127+
final Method method = CoreArgumentResolverServiceTest.class.getMethod("method", Map.class, String.class, String.class, String.class);
128+
final Map<String, String> payload = ImmutableMap.of("key", "value");
129+
final Message message = Message.builder()
130+
.body(objectMapper.writeValueAsString(payload))
131+
.messageId("messageId")
132+
.attributes(ImmutableMap.of(MessageSystemAttributeName.SEQUENCE_NUMBER, "test"))
133+
.build();
134+
final QueueProperties queueProperties = QueueProperties.builder().queueUrl("queueUrl").build();
135+
final MethodParameter messagePayloadParameter = DefaultMethodParameter.builder()
136+
.method(method)
137+
.parameter(method.getParameters()[3])
138+
.parameterIndex(3)
139+
.build();
140+
141+
// act
142+
final Object payloadArgument = service.resolveArgument(queueProperties, messagePayloadParameter, message);
143+
144+
// assert
145+
assertThat(payloadArgument).isEqualTo("test");
146+
}
147+
122148
@SuppressWarnings( {"unused" })
123-
public void method(@Payload final Map<String, String> payload, @MessageId final String messageId, @MessageAttribute("key") final String attribute) {
149+
public void method(@Payload final Map<String, String> payload,
150+
@MessageId final String messageId,
151+
@MessageAttribute("key") final String attribute,
152+
@MessageSystemAttribute(MessageSystemAttributeName.SEQUENCE_NUMBER) final String sequenceNumber) {
124153

125154
}
126155
}

0 commit comments

Comments
 (0)