Skip to content

Commit 346613a

Browse files
Merge pull request #117 from JaidenAshmore/issue/112_improve_spring_batching_annotation_implementations
refs #112: Made the batch size for the batching queue listeners instead of using concurrency
2 parents 153d8c9 + b892fc2 commit 346613a

File tree

8 files changed

+173
-50
lines changed

8 files changed

+173
-50
lines changed

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/resolver/batching/StaticBatchingMessageResolverProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.jashmore.sqs.resolver.batching;
22

33
import lombok.Builder;
4+
import lombok.Value;
45

56
import javax.validation.constraints.Max;
67
import javax.validation.constraints.Positive;
78

89
/**
910
* Static implementation that will contain constant size and time limit for the buffer.
1011
*/
12+
@Value
1113
@Builder(toBuilder = true)
1214
public class StaticBatchingMessageResolverProperties implements BatchingMessageResolverProperties {
1315
private final long bufferingTimeInMs;

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/batching/BatchingMessageRetrieverProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,13 @@ public interface BatchingMessageRetrieverProperties {
2929
* <p>Note that this trigger size may not be reached due to the the waiting time going higher than {@link #getMessageRetrievalPollingPeriodInMs()}, in
3030
* this case it will just request as many messages as threads requesting messages.
3131
*
32+
* <p>This number should be smaller than the maximum number of messages that can be downloaded from AWS as it doesn't make much sense to have a batch size
33+
* greater than this value.
34+
*
3235
* @return the number of threads to be requesting messages for the retrieval of messages background thread to be triggered
3336
*/
3437
@Positive
38+
@Max(AwsConstants.MAX_NUMBER_OF_MESSAGES_FROM_SQS)
3539
int getNumberOfThreadsWaitingTrigger();
3640

3741
/**

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static java.lang.annotation.RetentionPolicy.RUNTIME;
66

77
import com.jashmore.sqs.QueueProperties;
8+
import com.jashmore.sqs.aws.AwsConstants;
89
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
910
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
1011
import com.jashmore.sqs.container.MessageListenerContainer;
@@ -83,6 +84,31 @@
8384
*/
8485
String concurrencyLevelString() default "";
8586

87+
/**
88+
* The total number of threads requesting messages that will result in the the background thread to actually request the messages.
89+
*
90+
* <p>This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size
91+
* greater than what AWS can provide.
92+
*
93+
* @return the total number of threads requesting messages for trigger a batch of messages to be retrieved
94+
* @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter
95+
*/
96+
int batchSize() default 5;
97+
98+
/**
99+
* The total number of threads requesting messages that will result in the the background thread to actually request the messages.
100+
*
101+
* <p>This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size
102+
* greater than what AWS can provide.
103+
*
104+
* <p>This can be used when you need to load the value from Spring properties for example
105+
* <pre>batchSizeString = "${my.profile.property}"</pre> instead of having it hardcoded in {@link #batchSize()}.
106+
*
107+
* @return the total number of threads requesting messages for trigger a batch of messages to be retrieved
108+
* @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter
109+
*/
110+
String batchSizeString() default "";
111+
86112
/**
87113
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages.
88114
*

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapper.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
5454

5555
final int concurrencyLevel = getConcurrencyLevel(annotation);
5656

57-
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties, concurrencyLevel);
57+
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties);
5858

5959
final MessageResolver messageResolver = new IndividualMessageResolver(queueProperties, sqsAsyncClient);
6060

@@ -71,8 +71,7 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
7171
final ConcurrentMessageBroker messageBroker = new ConcurrentMessageBroker(
7272
messageRetriever,
7373
messageProcessor,
74-
StaticConcurrentMessageBrokerProperties
75-
.builder()
74+
StaticConcurrentMessageBrokerProperties.builder()
7675
.concurrencyLevel(concurrencyLevel)
7776
.threadNameFormat(identifier + "-%d")
7877
.build()
@@ -84,17 +83,17 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
8483
.build();
8584
}
8685

87-
private MessageRetriever buildMessageRetriever(final QueueListener annotation, final QueueProperties queueProperties, final int concurrencyLevel) {
86+
private MessageRetriever buildMessageRetriever(final QueueListener annotation, final QueueProperties queueProperties) {
8887
return new BatchingMessageRetriever(
89-
queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation, concurrencyLevel));
88+
queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation));
9089
}
9190

9291
@VisibleForTesting
93-
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final QueueListener annotation, final int concurrencyLevel) {
92+
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final QueueListener annotation) {
9493
return StaticBatchingMessageRetrieverProperties.builder()
9594
.visibilityTimeoutInSeconds(getMessageVisibilityTimeoutInSeconds(annotation))
9695
.messageRetrievalPollingPeriodInMs(getMaxPeriodBetweenBatchesInMs(annotation))
97-
.numberOfThreadsWaitingTrigger(concurrencyLevel)
96+
.numberOfThreadsWaitingTrigger(getBatchSize(annotation))
9897
.build();
9998
}
10099

@@ -106,6 +105,15 @@ private int getConcurrencyLevel(final QueueListener annotation) {
106105
return Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString()));
107106
}
108107

108+
private int getBatchSize(final QueueListener annotation) {
109+
if (StringUtils.isEmpty(annotation.batchSizeString())) {
110+
return annotation.batchSize();
111+
}
112+
113+
return Integer.parseInt(environment.resolvePlaceholders(annotation.batchSizeString()));
114+
}
115+
116+
109117
private long getMaxPeriodBetweenBatchesInMs(final QueueListener annotation) {
110118
if (StringUtils.isEmpty(annotation.maxPeriodBetweenBatchesInMsString())) {
111119
return annotation.maxPeriodBetweenBatchesInMs();

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static java.lang.annotation.RetentionPolicy.RUNTIME;
55

66
import com.jashmore.sqs.QueueProperties;
7+
import com.jashmore.sqs.aws.AwsConstants;
78
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
89
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
910
import com.jashmore.sqs.container.MessageListenerContainer;
@@ -86,14 +87,35 @@
8687
String concurrencyLevelString() default "";
8788

8889
/**
89-
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages.
90+
* The total number of threads requesting messages that will result in the the background thread to actually request the messages.
91+
*
92+
* <p>This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size
93+
* greater than what AWS can provide.
94+
*
95+
* @return the total number of threads requesting messages for trigger a batch of messages to be retrieved
96+
* @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter
97+
*/
98+
int batchSize() default 5;
99+
100+
/**
101+
* The total number of threads requesting messages that will result in the the background thread to actually request the messages.
102+
*
103+
* <p>This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size
104+
* greater than what AWS can provide.
90105
*
91-
* <p>This tries to reduce the number of times that requests for messages are made to SQS by waiting for all of the threads to be requiring messages
92-
* before requesting for messages from SQS. If one or more of the threads processing messages does not start requesting messages by this period's
93-
* timeout the current threads waiting for messages will have messages requested for them
106+
* <p>This can be used when you need to load the value from Spring properties for example
107+
* <pre>batchSizeString = "${my.profile.property}"</pre> instead of having it hardcoded in {@link #batchSize()}.
108+
*
109+
* @return the total number of threads requesting messages for trigger a batch of messages to be retrieved
110+
* @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter
111+
*/
112+
String batchSizeString() default "";
113+
114+
/**
115+
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages.
94116
*
95117
* @return the period in ms that threads will wait for messages to be requested from SQS
96-
* @see BatchingMessageRetrieverProperties#getMessageRetrievalPollingPeriodInMs() for more details
118+
* @see BatchingMessageRetrieverProperties#getMessageRetrievalPollingPeriodInMs() for more details about this parameter
97119
*/
98120
long maxPeriodBetweenBatchesInMs() default 2000L;
99121

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapper.java

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,13 @@ protected Class<BatchingQueueListener> getAnnotationClass() {
5252
@Override
5353
protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(final Object bean, final Method method,
5454
final BatchingQueueListener annotation) {
55-
final QueueProperties queueProperties = QueueProperties
56-
.builder()
55+
final QueueProperties queueProperties = QueueProperties.builder()
5756
.queueUrl(queueResolverService.resolveQueueUrl(annotation.value()))
5857
.build();
5958

60-
final int concurrencyLevel = getConcurrencyLevel(annotation);
61-
final long maxPeriodBetweenBatchesInMs = getMaxPeriodBetweenBatchesInMs(annotation);
59+
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties);
6260

63-
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties, concurrencyLevel);
64-
65-
final MessageResolver messageResolver = buildMessageResolver(queueProperties, concurrencyLevel, maxPeriodBetweenBatchesInMs);
61+
final MessageResolver messageResolver = buildMessageResolver(annotation, queueProperties);
6662

6763
final MessageProcessor messageProcessor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, bean);
6864

@@ -76,9 +72,8 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
7672
final ConcurrentMessageBroker messageBroker = new ConcurrentMessageBroker(
7773
messageRetriever,
7874
messageProcessor,
79-
StaticConcurrentMessageBrokerProperties
80-
.builder()
81-
.concurrencyLevel(concurrencyLevel)
75+
StaticConcurrentMessageBrokerProperties.builder()
76+
.concurrencyLevel(getConcurrencyLevel(annotation))
8277
.threadNameFormat(identifier + "-%d")
8378
.build()
8479
);
@@ -89,31 +84,35 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
8984
.build();
9085
}
9186

92-
private MessageRetriever buildMessageRetriever(final BatchingQueueListener annotation, final QueueProperties queueProperties, final int concurrencyLevel) {
93-
return new BatchingMessageRetriever(queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation, concurrencyLevel));
87+
private MessageRetriever buildMessageRetriever(final BatchingQueueListener annotation,
88+
final QueueProperties queueProperties) {
89+
return new BatchingMessageRetriever(queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation));
9490
}
9591

9692
@VisibleForTesting
97-
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final BatchingQueueListener annotation, final int concurrencyLevel) {
98-
return StaticBatchingMessageRetrieverProperties
99-
.builder()
93+
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final BatchingQueueListener annotation) {
94+
return StaticBatchingMessageRetrieverProperties.builder()
10095
.visibilityTimeoutInSeconds(getMessageVisibilityTimeoutInSeconds(annotation))
10196
.messageRetrievalPollingPeriodInMs(getMaxPeriodBetweenBatchesInMs(annotation))
102-
.numberOfThreadsWaitingTrigger(concurrencyLevel)
97+
.numberOfThreadsWaitingTrigger(getBatchSize(annotation))
10398
.build();
10499
}
105100

106-
private MessageResolver buildMessageResolver(final QueueProperties queueProperties,
107-
final int concurrencyLevel,
108-
final long maxPeriodBetweenBatchesInMs) {
109-
final BatchingMessageResolverProperties batchingMessageResolverProperties = StaticBatchingMessageResolverProperties.builder()
110-
.bufferingSizeLimit(concurrencyLevel)
111-
.bufferingTimeInMs(maxPeriodBetweenBatchesInMs)
112-
.build();
101+
private MessageResolver buildMessageResolver(final BatchingQueueListener annotation,
102+
final QueueProperties queueProperties) {
103+
final BatchingMessageResolverProperties batchingMessageResolverProperties = batchingMessageResolverProperties(annotation);
113104

114105
return new BatchingMessageResolver(queueProperties, sqsAsyncClient, batchingMessageResolverProperties);
115106
}
116107

108+
@VisibleForTesting
109+
BatchingMessageResolverProperties batchingMessageResolverProperties(final BatchingQueueListener annotation) {
110+
return StaticBatchingMessageResolverProperties.builder()
111+
.bufferingSizeLimit(getBatchSize(annotation))
112+
.bufferingTimeInMs(getMaxPeriodBetweenBatchesInMs(annotation))
113+
.build();
114+
}
115+
117116
private int getConcurrencyLevel(final BatchingQueueListener annotation) {
118117
if (StringUtils.isEmpty(annotation.concurrencyLevelString())) {
119118
return annotation.concurrencyLevel();
@@ -122,6 +121,14 @@ private int getConcurrencyLevel(final BatchingQueueListener annotation) {
122121
return Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString()));
123122
}
124123

124+
private int getBatchSize(final BatchingQueueListener annotation) {
125+
if (StringUtils.isEmpty(annotation.batchSizeString())) {
126+
return annotation.batchSize();
127+
}
128+
129+
return Integer.parseInt(environment.resolvePlaceholders(annotation.batchSizeString()));
130+
}
131+
125132
private long getMaxPeriodBetweenBatchesInMs(final BatchingQueueListener annotation) {
126133
if (StringUtils.isEmpty(annotation.maxPeriodBetweenBatchesInMsString())) {
127134
return annotation.maxPeriodBetweenBatchesInMs();

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/test/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapperTest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,18 +165,18 @@ public void validStringFieldsWillCorrectlyBuildMessageListener() throws Exceptio
165165
@Test
166166
public void batchingMessageRetrieverPropertiesBuiltFromAnnotationValues() throws Exception {
167167
// arrange
168-
final Method method = QueueListenerWrapperTest.class.getMethod("myMethodWithParameters");
168+
final Method method = QueueListenerWrapperTest.class.getMethod("methodWithFields");
169169
final QueueListener annotation = method.getAnnotation(QueueListener.class);
170170

171171
// act
172172
final BatchingMessageRetrieverProperties properties
173-
= queueListenerWrapper.batchingMessageRetrieverProperties(annotation, 2);
173+
= queueListenerWrapper.batchingMessageRetrieverProperties(annotation);
174174

175175
// assert
176176
assertThat(properties).isEqualTo(StaticBatchingMessageRetrieverProperties.builder()
177177
.visibilityTimeoutInSeconds(300)
178-
.messageRetrievalPollingPeriodInMs(60L)
179-
.numberOfThreadsWaitingTrigger(2)
178+
.messageRetrievalPollingPeriodInMs(40L)
179+
.numberOfThreadsWaitingTrigger(10)
180180
.build()
181181
);
182182
}
@@ -186,18 +186,19 @@ public void batchingMessageRetrieverPropertiesBuiltFromSpringValues() throws Exc
186186
// arrange
187187
final Method method = QueueListenerWrapperTest.class.getMethod("methodWithFieldsUsingEnvironmentProperties");
188188
final QueueListener annotation = method.getAnnotation(QueueListener.class);
189+
when(environment.resolvePlaceholders("${prop.batchSize}")).thenReturn("8");
189190
when(environment.resolvePlaceholders("${prop.period}")).thenReturn("30");
190191
when(environment.resolvePlaceholders("${prop.visibility}")).thenReturn("40");
191192

192193
// act
193194
final BatchingMessageRetrieverProperties properties
194-
= queueListenerWrapper.batchingMessageRetrieverProperties(annotation, 2);
195+
= queueListenerWrapper.batchingMessageRetrieverProperties(annotation);
195196

196197
// assert
197198
assertThat(properties).isEqualTo(StaticBatchingMessageRetrieverProperties.builder()
198199
.visibilityTimeoutInSeconds(40)
199200
.messageRetrievalPollingPeriodInMs(30L)
200-
.numberOfThreadsWaitingTrigger(2)
201+
.numberOfThreadsWaitingTrigger(8)
201202
.build()
202203
);
203204
}
@@ -212,14 +213,14 @@ public void myMethodWithIdentifier() {
212213

213214
}
214215

215-
@QueueListener(value = "test2", concurrencyLevelString = "${prop.concurrency}",
216+
@QueueListener(value = "test2", concurrencyLevelString = "${prop.concurrency}", batchSizeString = "${prop.batchSize}",
216217
messageVisibilityTimeoutInSecondsString = "${prop.visibility}", maxPeriodBetweenBatchesInMsString = "${prop.period}")
217218
public void methodWithFieldsUsingEnvironmentProperties() {
218219

219220
}
220221

221-
@QueueListener(value = "test", concurrencyLevel = 6, messageVisibilityTimeoutInSeconds = 300, maxPeriodBetweenBatchesInMs = 60)
222-
public void myMethodWithParameters() {
222+
@QueueListener(value = "test2", concurrencyLevel = 20, batchSize = 10, messageVisibilityTimeoutInSeconds = 300, maxPeriodBetweenBatchesInMs = 40)
223+
public void methodWithFields() {
223224

224225
}
225226
}

0 commit comments

Comments
 (0)