Skip to content

Commit f6bd8ff

Browse files
Merge pull request #95 from JaidenAshmore/issue/90_spring_properties_in_annotations
refs 90: allow for Spring properties to be included in Queue Annotations
2 parents 14c4bc9 + 339d17a commit f6bd8ff

File tree

19 files changed

+1114
-184
lines changed

19 files changed

+1114
-184
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Spring - How to set Queue Listener props from the Environment
2+
The Queue Listener annotations have been written to allow for them to be set via the Spring Environment, e.g. properties from an `application.yml` file.
3+
4+
# Steps
5+
6+
1. Create some properties in the `application.yml` file, in this case there are two profiles with different concurrency rates.
7+
```yaml
8+
9+
---
10+
spring.profiles: staging
11+
12+
queues:
13+
my-queue:
14+
concurrency: 5
15+
16+
---
17+
spring.profiles: production
18+
queues:
19+
my-queue:
20+
concurrency: 5
21+
```
22+
1. Use the `{propertyName}String` fields in the annotations to pull data from the environment:
23+
```java
24+
@QueueListener(value = "http://localhost:9432/q/myqueue", concurrencyLevelString="${queues.my-queue.concurrency}")
25+
public void myMethod(@Payload String payload) {
26+
27+
}
28+
```
29+
30+
***NOTE**: the `{propertyName}String` fields will override any of the other properties if they are set. E.g. if both `concurrencyLevel` and
31+
`concurrencyLevelString` are set the `concurrencyLevelString` one will be prioritised*

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/batching/StaticBatchingMessageRetrieverProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.jashmore.sqs.retriever.batching;
22

33
import lombok.Builder;
4+
import lombok.Value;
45

56
import javax.annotation.Nullable;
67
import javax.validation.constraints.Positive;
@@ -9,6 +10,7 @@
910
/**
1011
* Static implementation of the properties that will never change during the processing of the messages.
1112
*/
13+
@Value
1214
@Builder(toBuilder = true)
1315
public class StaticBatchingMessageRetrieverProperties implements BatchingMessageRetrieverProperties {
1416
private final int numberOfThreadsWaitingTrigger;

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/config/QueueListenerConfiguration.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,22 +173,25 @@ public static class QueueWrapperConfiguration {
173173
@Bean
174174
public QueueWrapper coreProvidedQueueListenerWrapper(final ArgumentResolverService argumentResolverService,
175175
final SqsAsyncClient sqsAsyncClient,
176-
final QueueResolverService queueResolverService) {
177-
return new QueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService);
176+
final QueueResolverService queueResolverService,
177+
final Environment environment) {
178+
return new QueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService, environment);
178179
}
179180

180181
@Bean
181182
public QueueWrapper coreProvidedPrefetchingQueueListenerWrapper(final ArgumentResolverService argumentResolverService,
182183
final SqsAsyncClient sqsAsyncClient,
183-
final QueueResolverService queueResolverService) {
184-
return new PrefetchingQueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService);
184+
final QueueResolverService queueResolverService,
185+
final Environment environment) {
186+
return new PrefetchingQueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService, environment);
185187
}
186188

187189
@Bean
188190
public QueueWrapper coreProvidedBatchingQueueListenerWrapper(final ArgumentResolverService argumentResolverService,
189191
final SqsAsyncClient sqsAsyncClient,
190-
final QueueResolverService queueResolverService) {
191-
return new BatchingQueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService);
192+
final QueueResolverService queueResolverService,
193+
final Environment environment) {
194+
return new BatchingQueueListenerWrapper(argumentResolverService, sqsAsyncClient, queueResolverService, environment);
192195
}
193196
}
194197
}

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,26 @@
6363
/**
6464
* The number of threads that will be processing messages.
6565
*
66+
* <p>This value is ignored when {@link #concurrencyLevelString()} has been set and is not an empty string.
67+
*
6668
* @return the total number of threads processing messages
6769
* @see ConcurrentMessageBrokerProperties#getConcurrencyLevel() for more details and constraints
6870
*/
6971
int concurrencyLevel() default 5;
7072

73+
/**
74+
* The number of threads that will be processing messages converted from a string representation.
75+
*
76+
* <p>This can be used when you need to load the value from Spring properties for example <pre>concurrencyLevelString = "${my.profile.property}"</pre>
77+
* instead of having it hardcoded in {@link #concurrencyLevel()}.
78+
*
79+
* <p>If this value is not empty, the value set by {@link #concurrencyLevel()} will be ignored.
80+
*
81+
* @return the total number of threads processing messages as a string
82+
* @see ConcurrentMessageBrokerProperties#getConcurrencyLevel() for more details and constraints
83+
*/
84+
String concurrencyLevelString() default "";
85+
7186
/**
7287
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages.
7388
*
@@ -80,11 +95,35 @@
8095
*/
8196
long maxPeriodBetweenBatchesInMs() default 2000L;
8297

98+
/**
99+
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages converted
100+
* from a string representation.
101+
*
102+
* <p>This can be used when you need to load the value from Spring properties for example
103+
* <pre>maxPeriodBetweenBatchesInMsString = "${my.profile.property}"</pre> instead of having it hardcoded in {@link #maxPeriodBetweenBatchesInMs()}.
104+
*
105+
* @return the period in ms that threads will wait for messages to be requested from SQS
106+
* @see BatchingMessageRetrieverProperties#getMessageRetrievalPollingPeriodInMs() for more details
107+
* @see #maxPeriodBetweenBatchesInMs() for more information about this field
108+
*/
109+
String maxPeriodBetweenBatchesInMsString() default "";
110+
83111
/**
84112
* The message visibility that will be used for messages obtained from the queue.
85113
*
86114
* @return the message visibility for messages fetched from the queue
87115
* @see BatchingMessageRetrieverProperties#getVisibilityTimeoutInSeconds() for more details and constraints
88116
*/
89117
int messageVisibilityTimeoutInSeconds() default 30;
118+
119+
/**
120+
* The message visibility that will be used for messages obtained from the queue converted from a string representation.
121+
*
122+
* <p>This can be used when you need to load the value from Spring properties for example
123+
* <pre>messageVisibilityTimeoutInSeconds = "${my.profile.property}"</pre> instead of having it hardcoded in {@link #messageVisibilityTimeoutInSeconds()}.
124+
*
125+
* @return the message visibility for messages fetched from the queue
126+
* @see BatchingMessageRetrieverProperties#getVisibilityTimeoutInSeconds() for more details and constraints
127+
*/
128+
String messageVisibilityTimeoutInSecondsString() default "";
90129
}

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapper.java

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.jashmore.sqs.spring.container.basic;
22

3+
import com.google.common.annotations.VisibleForTesting;
4+
35
import com.jashmore.sqs.QueueProperties;
46
import com.jashmore.sqs.argument.ArgumentResolverService;
57
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
@@ -9,6 +11,7 @@
911
import com.jashmore.sqs.processor.MessageProcessor;
1012
import com.jashmore.sqs.resolver.MessageResolver;
1113
import com.jashmore.sqs.resolver.individual.IndividualMessageResolver;
14+
import com.jashmore.sqs.retriever.MessageRetriever;
1215
import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever;
1316
import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties;
1417
import com.jashmore.sqs.retriever.batching.StaticBatchingMessageRetrieverProperties;
@@ -17,8 +20,9 @@
1720
import com.jashmore.sqs.spring.QueueWrapper;
1821
import com.jashmore.sqs.spring.queue.QueueResolverService;
1922
import com.jashmore.sqs.spring.util.IdentifierUtils;
20-
import lombok.RequiredArgsConstructor;
23+
import lombok.AllArgsConstructor;
2124
import lombok.extern.slf4j.Slf4j;
25+
import org.springframework.core.env.Environment;
2226
import org.springframework.util.StringUtils;
2327
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2428

@@ -29,11 +33,12 @@
2933
* implementations of the framework.
3034
*/
3135
@Slf4j
32-
@RequiredArgsConstructor
36+
@AllArgsConstructor
3337
public class QueueListenerWrapper extends AbstractQueueAnnotationWrapper<QueueListener> {
3438
private final ArgumentResolverService argumentResolverService;
3539
private final SqsAsyncClient sqsAsyncClient;
3640
private final QueueResolverService queueResolverService;
41+
private final Environment environment;
3742

3843
@Override
3944
protected Class<QueueListener> getAnnotationClass() {
@@ -47,14 +52,9 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
4752
.queueUrl(queueResolverService.resolveQueueUrl(annotation.value()))
4853
.build();
4954

50-
final BatchingMessageRetrieverProperties batchingMessageRetrieverProperties = StaticBatchingMessageRetrieverProperties
51-
.builder()
52-
.visibilityTimeoutInSeconds(annotation.messageVisibilityTimeoutInSeconds())
53-
.messageRetrievalPollingPeriodInMs(annotation.maxPeriodBetweenBatchesInMs())
54-
.numberOfThreadsWaitingTrigger(annotation.concurrencyLevel())
55-
.build();
56-
final BatchingMessageRetriever messageRetriever = new BatchingMessageRetriever(
57-
queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties);
55+
final int concurrencyLevel = getConcurrencyLevel(annotation);
56+
57+
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties, concurrencyLevel);
5858

5959
final MessageResolver messageResolver = new IndividualMessageResolver(queueProperties, sqsAsyncClient);
6060

@@ -73,7 +73,7 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
7373
messageProcessor,
7474
StaticConcurrentMessageBrokerProperties
7575
.builder()
76-
.concurrencyLevel(annotation.concurrencyLevel())
76+
.concurrencyLevel(concurrencyLevel)
7777
.threadNameFormat(identifier + "-%d")
7878
.build()
7979
);
@@ -83,4 +83,42 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
8383
.container(new SimpleMessageListenerContainer(messageRetriever, messageBroker, messageResolver))
8484
.build();
8585
}
86+
87+
private MessageRetriever buildMessageRetriever(final QueueListener annotation, final QueueProperties queueProperties, final int concurrencyLevel) {
88+
return new BatchingMessageRetriever(
89+
queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation, concurrencyLevel));
90+
}
91+
92+
@VisibleForTesting
93+
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final QueueListener annotation, final int concurrencyLevel) {
94+
return StaticBatchingMessageRetrieverProperties.builder()
95+
.visibilityTimeoutInSeconds(getMessageVisibilityTimeoutInSeconds(annotation))
96+
.messageRetrievalPollingPeriodInMs(getMaxPeriodBetweenBatchesInMs(annotation))
97+
.numberOfThreadsWaitingTrigger(concurrencyLevel)
98+
.build();
99+
}
100+
101+
private int getConcurrencyLevel(final QueueListener annotation) {
102+
if (StringUtils.isEmpty(annotation.concurrencyLevelString())) {
103+
return annotation.concurrencyLevel();
104+
}
105+
106+
return Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString()));
107+
}
108+
109+
private long getMaxPeriodBetweenBatchesInMs(final QueueListener annotation) {
110+
if (StringUtils.isEmpty(annotation.maxPeriodBetweenBatchesInMsString())) {
111+
return annotation.maxPeriodBetweenBatchesInMs();
112+
}
113+
114+
return Long.parseLong(environment.resolvePlaceholders(annotation.maxPeriodBetweenBatchesInMsString()));
115+
}
116+
117+
private int getMessageVisibilityTimeoutInSeconds(final QueueListener annotation) {
118+
if (StringUtils.isEmpty(annotation.messageVisibilityTimeoutInSecondsString())) {
119+
return annotation.messageVisibilityTimeoutInSeconds();
120+
}
121+
122+
return Integer.parseInt(environment.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString()));
123+
}
86124
}

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,19 @@
7272
*/
7373
int concurrencyLevel() default 5;
7474

75+
/**
76+
* The number of threads that will be processing messages converted from a string representation.
77+
*
78+
* <p>This can be used when you need to load the value from Spring properties for example <pre>concurrencyLevelString = "${my.profile.property}"</pre>
79+
* instead of having it hardcoded in {@link #concurrencyLevel()}.
80+
*
81+
* <p>If this value is not empty, the value set by {@link #concurrencyLevel()} will be ignored.
82+
*
83+
* @return the total number of threads processing messages as a string
84+
* @see ConcurrentMessageBrokerProperties#getConcurrencyLevel() for more details and constraints
85+
*/
86+
String concurrencyLevelString() default "";
87+
7588
/**
7689
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages.
7790
*
@@ -84,11 +97,35 @@
8497
*/
8598
long maxPeriodBetweenBatchesInMs() default 2000L;
8699

100+
/**
101+
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages converted
102+
* from a string representation.
103+
*
104+
* <p>This can be used when you need to load the value from Spring properties for example
105+
* <pre>maxPeriodBetweenBatchesInMsString = "${my.profile.property}"</pre> instead of having it hardcoded in {@link #maxPeriodBetweenBatchesInMs()}.
106+
*
107+
* @return the period in ms that threads will wait for messages to be requested from SQS
108+
* @see BatchingMessageRetrieverProperties#getMessageRetrievalPollingPeriodInMs() for more details
109+
* @see #maxPeriodBetweenBatchesInMs() for more information about this field
110+
*/
111+
String maxPeriodBetweenBatchesInMsString() default "";
112+
87113
/**
88114
* The message visibility that will be used for messages obtained from the queue.
89115
*
90116
* @return the message visibility for messages fetched from the queue
91117
* @see BatchingMessageRetrieverProperties#getVisibilityTimeoutInSeconds() for more details and constraints
92118
*/
93119
int messageVisibilityTimeoutInSeconds() default 30;
120+
121+
/**
122+
* The message visibility that will be used for messages obtained from the queue converted from a string representation.
123+
*
124+
* <p>This can be used when you need to load the value from Spring properties for example
125+
* <pre>messageVisibilityTimeoutInSeconds = "${my.profile.property}"</pre> instead of having it hardcoded in {@link #messageVisibilityTimeoutInSeconds()}.
126+
*
127+
* @return the message visibility for messages fetched from the queue
128+
* @see BatchingMessageRetrieverProperties#getVisibilityTimeoutInSeconds() for more details and constraints
129+
*/
130+
String messageVisibilityTimeoutInSecondsString() default "";
94131
}

0 commit comments

Comments
 (0)