Skip to content

Commit a499dcf

Browse files
Merge pull request #118 from JaidenAshmore/2.x
Merge 2.x into 3.x
2 parents f4cb0ed + 346613a commit a499dcf

File tree

44 files changed

+627
-87
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+627
-87
lines changed

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/resolver/batching/StaticBatchingMessageResolverProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.jashmore.sqs.resolver.batching;
22

33
import lombok.Builder;
4+
import lombok.Value;
45

56
import javax.validation.constraints.Max;
67
import javax.validation.constraints.Positive;
78

89
/**
910
* Static implementation that will contain constant size and time limit for the buffer.
1011
*/
12+
@Value
1113
@Builder(toBuilder = true)
1214
public class StaticBatchingMessageResolverProperties implements BatchingMessageResolverProperties {
1315
private final long bufferingTimeInMs;

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/batching/BatchingMessageRetrieverProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,13 @@ public interface BatchingMessageRetrieverProperties {
2929
* <p>Note that this trigger size may not be reached due to the the waiting time going higher than {@link #getMessageRetrievalPollingPeriodInMs()}, in
3030
* this case it will just request as many messages as threads requesting messages.
3131
*
32+
* <p>This number should be smaller than the maximum number of messages that can be downloaded from AWS as it doesn't make much sense to have a batch size
33+
* greater than this value.
34+
*
3235
* @return the number of threads to be requesting messages for the retrieval of messages background thread to be triggered
3336
*/
3437
@Positive
38+
@Max(AwsConstants.MAX_NUMBER_OF_MESSAGES_FROM_SQS)
3539
int getNumberOfThreadsWaitingTrigger();
3640

3741
/**
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<parent>
4+
<artifactId>java-dynamic-sqs-listener-spring</artifactId>
5+
<groupId>com.jashmore</groupId>
6+
<version>2.3.0-SNAPSHOT</version>
7+
</parent>
8+
<modelVersion>4.0.0</modelVersion>
9+
10+
<artifactId>java-dynamic-sqs-listener-spring-core</artifactId>
11+
12+
<name>Java Dynamic SQS Listener - Spring Core</name>
13+
<description>Core Spring implementation for the Java Dynamic SQS Listener library.</description>
14+
15+
<properties>
16+
<findbugs.config.location>../../configuration/findbugs/bugsExcludeFilter.xml</findbugs.config.location>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>software.amazon.awssdk</groupId>
22+
<artifactId>sqs</artifactId>
23+
</dependency>
24+
25+
<dependency>
26+
<groupId>org.projectlombok</groupId>
27+
<artifactId>lombok</artifactId>
28+
</dependency>
29+
30+
<dependency>
31+
<groupId>com.google.guava</groupId>
32+
<artifactId>guava</artifactId>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>org.springframework.boot</groupId>
37+
<artifactId>spring-boot-autoconfigure</artifactId>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>com.jashmore</groupId>
42+
<artifactId>java-dynamic-sqs-listener-core</artifactId>
43+
<version>${project.version}</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>com.jashmore</groupId>
48+
<artifactId>java-dynamic-sqs-listener-spring-api</artifactId>
49+
<version>${project.version}</version>
50+
</dependency>
51+
52+
<dependency>
53+
<groupId>org.springframework.boot</groupId>
54+
<artifactId>spring-boot-starter-web</artifactId>
55+
<version>${spring.boot.version}</version>
56+
<scope>test</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.springframework</groupId>
61+
<artifactId>spring-tx</artifactId>
62+
<version>${spring.version}</version>
63+
<scope>test</scope>
64+
</dependency>
65+
66+
<dependency>
67+
<groupId>org.springframework</groupId>
68+
<artifactId>spring-test</artifactId>
69+
<version>${spring.version}</version>
70+
<scope>test</scope>
71+
</dependency>
72+
73+
<dependency>
74+
<groupId>org.springframework.boot</groupId>
75+
<artifactId>spring-boot-test</artifactId>
76+
<version>${spring.boot.version}</version>
77+
<scope>test</scope>
78+
</dependency>
79+
80+
<dependency>
81+
<groupId>org.springframework.boot</groupId>
82+
<artifactId>spring-boot-starter-aop</artifactId>
83+
<version>${spring.boot.version}</version>
84+
<scope>test</scope>
85+
</dependency>
86+
87+
<dependency>
88+
<groupId>com.jashmore</groupId>
89+
<artifactId>local-sqs-test-utils</artifactId>
90+
<version>${project.version}</version>
91+
<scope>test</scope>
92+
</dependency>
93+
</dependencies>
94+
95+
<dependencyManagement>
96+
<dependencies>
97+
<dependency>
98+
<groupId>org.springframework.boot</groupId>
99+
<artifactId>spring-boot-autoconfigure</artifactId>
100+
<version>${spring.boot.version}</version>
101+
</dependency>
102+
</dependencies>
103+
</dependencyManagement>
104+
</project>

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/AbstractQueueAnnotationWrapper.java renamed to java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/AbstractQueueAnnotationWrapper.java

File renamed without changes.

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/DefaultQueueContainerService.java renamed to java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/DefaultQueueContainerService.java

File renamed without changes.

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/config/QueueListenerConfiguration.java renamed to java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/config/QueueListenerConfiguration.java

File renamed without changes.

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java renamed to java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static java.lang.annotation.RetentionPolicy.RUNTIME;
66

77
import com.jashmore.sqs.QueueProperties;
8+
import com.jashmore.sqs.aws.AwsConstants;
89
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
910
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
1011
import com.jashmore.sqs.container.MessageListenerContainer;
@@ -83,6 +84,31 @@
8384
*/
8485
String concurrencyLevelString() default "";
8586

87+
/**
88+
* The total number of threads requesting messages that will result in the the background thread to actually request the messages.
89+
*
90+
* <p>This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size
91+
* greater than what AWS can provide.
92+
*
93+
* @return the total number of threads requesting messages for trigger a batch of messages to be retrieved
94+
* @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter
95+
*/
96+
int batchSize() default 5;
97+
98+
/**
99+
* The total number of threads requesting messages that will result in the the background thread to actually request the messages.
100+
*
101+
* <p>This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size
102+
* greater than what AWS can provide.
103+
*
104+
* <p>This can be used when you need to load the value from Spring properties for example
105+
* <pre>batchSizeString = "${my.profile.property}"</pre> instead of having it hardcoded in {@link #batchSize()}.
106+
*
107+
* @return the total number of threads requesting messages for trigger a batch of messages to be retrieved
108+
* @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter
109+
*/
110+
String batchSizeString() default "";
111+
86112
/**
87113
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages.
88114
*

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapper.java renamed to java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListenerWrapper.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
5454

5555
final int concurrencyLevel = getConcurrencyLevel(annotation);
5656

57-
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties, concurrencyLevel);
57+
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties);
5858

5959
final MessageResolver messageResolver = new IndividualMessageResolver(queueProperties, sqsAsyncClient);
6060

@@ -71,8 +71,7 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
7171
final ConcurrentMessageBroker messageBroker = new ConcurrentMessageBroker(
7272
messageRetriever,
7373
messageProcessor,
74-
StaticConcurrentMessageBrokerProperties
75-
.builder()
74+
StaticConcurrentMessageBrokerProperties.builder()
7675
.concurrencyLevel(concurrencyLevel)
7776
.threadNameFormat(identifier + "-%d")
7877
.build()
@@ -84,17 +83,17 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
8483
.build();
8584
}
8685

87-
private MessageRetriever buildMessageRetriever(final QueueListener annotation, final QueueProperties queueProperties, final int concurrencyLevel) {
86+
private MessageRetriever buildMessageRetriever(final QueueListener annotation, final QueueProperties queueProperties) {
8887
return new BatchingMessageRetriever(
89-
queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation, concurrencyLevel));
88+
queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation));
9089
}
9190

9291
@VisibleForTesting
93-
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final QueueListener annotation, final int concurrencyLevel) {
92+
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final QueueListener annotation) {
9493
return StaticBatchingMessageRetrieverProperties.builder()
9594
.visibilityTimeoutInSeconds(getMessageVisibilityTimeoutInSeconds(annotation))
9695
.messageRetrievalPollingPeriodInMs(getMaxPeriodBetweenBatchesInMs(annotation))
97-
.numberOfThreadsWaitingTrigger(concurrencyLevel)
96+
.numberOfThreadsWaitingTrigger(getBatchSize(annotation))
9897
.build();
9998
}
10099

@@ -106,6 +105,15 @@ private int getConcurrencyLevel(final QueueListener annotation) {
106105
return Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString()));
107106
}
108107

108+
private int getBatchSize(final QueueListener annotation) {
109+
if (StringUtils.isEmpty(annotation.batchSizeString())) {
110+
return annotation.batchSize();
111+
}
112+
113+
return Integer.parseInt(environment.resolvePlaceholders(annotation.batchSizeString()));
114+
}
115+
116+
109117
private long getMaxPeriodBetweenBatchesInMs(final QueueListener annotation) {
110118
if (StringUtils.isEmpty(annotation.maxPeriodBetweenBatchesInMsString())) {
111119
return annotation.maxPeriodBetweenBatchesInMs();

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java renamed to java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListener.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static java.lang.annotation.RetentionPolicy.RUNTIME;
55

66
import com.jashmore.sqs.QueueProperties;
7+
import com.jashmore.sqs.aws.AwsConstants;
78
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
89
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
910
import com.jashmore.sqs.container.MessageListenerContainer;
@@ -86,14 +87,35 @@
8687
String concurrencyLevelString() default "";
8788

8889
/**
89-
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages.
90+
* The total number of threads requesting messages that will result in the the background thread to actually request the messages.
91+
*
92+
* <p>This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size
93+
* greater than what AWS can provide.
94+
*
95+
* @return the total number of threads requesting messages for trigger a batch of messages to be retrieved
96+
* @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter
97+
*/
98+
int batchSize() default 5;
99+
100+
/**
101+
* The total number of threads requesting messages that will result in the the background thread to actually request the messages.
102+
*
103+
* <p>This number should be positive but smaller than {@link AwsConstants#MAX_NUMBER_OF_MESSAGES_FROM_SQS} as it does not make sense to have a batch size
104+
* greater than what AWS can provide.
90105
*
91-
* <p>This tries to reduce the number of times that requests for messages are made to SQS by waiting for all of the threads to be requiring messages
92-
* before requesting for messages from SQS. If one or more of the threads processing messages does not start requesting messages by this period's
93-
* timeout the current threads waiting for messages will have messages requested for them
106+
* <p>This can be used when you need to load the value from Spring properties for example
107+
* <pre>batchSizeString = "${my.profile.property}"</pre> instead of having it hardcoded in {@link #batchSize()}.
108+
*
109+
* @return the total number of threads requesting messages for trigger a batch of messages to be retrieved
110+
* @see BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger() for more details about this parameter
111+
*/
112+
String batchSizeString() default "";
113+
114+
/**
115+
* The maximum period of time that the {@link BatchingMessageRetriever} will wait for all threads to be ready before retrieving messages.
94116
*
95117
* @return the period in ms that threads will wait for messages to be requested from SQS
96-
* @see BatchingMessageRetrieverProperties#getMessageRetrievalPollingPeriodInMs() for more details
118+
* @see BatchingMessageRetrieverProperties#getMessageRetrievalPollingPeriodInMs() for more details about this parameter
97119
*/
98120
long maxPeriodBetweenBatchesInMs() default 2000L;
99121

java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-starter/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapper.java renamed to java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/batching/BatchingQueueListenerWrapper.java

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,13 @@ protected Class<BatchingQueueListener> getAnnotationClass() {
5252
@Override
5353
protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(final Object bean, final Method method,
5454
final BatchingQueueListener annotation) {
55-
final QueueProperties queueProperties = QueueProperties
56-
.builder()
55+
final QueueProperties queueProperties = QueueProperties.builder()
5756
.queueUrl(queueResolverService.resolveQueueUrl(annotation.value()))
5857
.build();
5958

60-
final int concurrencyLevel = getConcurrencyLevel(annotation);
61-
final long maxPeriodBetweenBatchesInMs = getMaxPeriodBetweenBatchesInMs(annotation);
59+
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties);
6260

63-
final MessageRetriever messageRetriever = buildMessageRetriever(annotation, queueProperties, concurrencyLevel);
64-
65-
final MessageResolver messageResolver = buildMessageResolver(queueProperties, concurrencyLevel, maxPeriodBetweenBatchesInMs);
61+
final MessageResolver messageResolver = buildMessageResolver(annotation, queueProperties);
6662

6763
final MessageProcessor messageProcessor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, bean);
6864

@@ -76,9 +72,8 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
7672
final ConcurrentMessageBroker messageBroker = new ConcurrentMessageBroker(
7773
messageRetriever,
7874
messageProcessor,
79-
StaticConcurrentMessageBrokerProperties
80-
.builder()
81-
.concurrencyLevel(concurrencyLevel)
75+
StaticConcurrentMessageBrokerProperties.builder()
76+
.concurrencyLevel(getConcurrencyLevel(annotation))
8277
.threadNameFormat(identifier + "-%d")
8378
.build()
8479
);
@@ -89,31 +84,35 @@ protected IdentifiableMessageListenerContainer wrapMethodContainingAnnotation(fi
8984
.build();
9085
}
9186

92-
private MessageRetriever buildMessageRetriever(final BatchingQueueListener annotation, final QueueProperties queueProperties, final int concurrencyLevel) {
93-
return new BatchingMessageRetriever(queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation, concurrencyLevel));
87+
private MessageRetriever buildMessageRetriever(final BatchingQueueListener annotation,
88+
final QueueProperties queueProperties) {
89+
return new BatchingMessageRetriever(queueProperties, sqsAsyncClient, batchingMessageRetrieverProperties(annotation));
9490
}
9591

9692
@VisibleForTesting
97-
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final BatchingQueueListener annotation, final int concurrencyLevel) {
98-
return StaticBatchingMessageRetrieverProperties
99-
.builder()
93+
BatchingMessageRetrieverProperties batchingMessageRetrieverProperties(final BatchingQueueListener annotation) {
94+
return StaticBatchingMessageRetrieverProperties.builder()
10095
.visibilityTimeoutInSeconds(getMessageVisibilityTimeoutInSeconds(annotation))
10196
.messageRetrievalPollingPeriodInMs(getMaxPeriodBetweenBatchesInMs(annotation))
102-
.numberOfThreadsWaitingTrigger(concurrencyLevel)
97+
.numberOfThreadsWaitingTrigger(getBatchSize(annotation))
10398
.build();
10499
}
105100

106-
private MessageResolver buildMessageResolver(final QueueProperties queueProperties,
107-
final int concurrencyLevel,
108-
final long maxPeriodBetweenBatchesInMs) {
109-
final BatchingMessageResolverProperties batchingMessageResolverProperties = StaticBatchingMessageResolverProperties.builder()
110-
.bufferingSizeLimit(concurrencyLevel)
111-
.bufferingTimeInMs(maxPeriodBetweenBatchesInMs)
112-
.build();
101+
private MessageResolver buildMessageResolver(final BatchingQueueListener annotation,
102+
final QueueProperties queueProperties) {
103+
final BatchingMessageResolverProperties batchingMessageResolverProperties = batchingMessageResolverProperties(annotation);
113104

114105
return new BatchingMessageResolver(queueProperties, sqsAsyncClient, batchingMessageResolverProperties);
115106
}
116107

108+
@VisibleForTesting
109+
BatchingMessageResolverProperties batchingMessageResolverProperties(final BatchingQueueListener annotation) {
110+
return StaticBatchingMessageResolverProperties.builder()
111+
.bufferingSizeLimit(getBatchSize(annotation))
112+
.bufferingTimeInMs(getMaxPeriodBetweenBatchesInMs(annotation))
113+
.build();
114+
}
115+
117116
private int getConcurrencyLevel(final BatchingQueueListener annotation) {
118117
if (StringUtils.isEmpty(annotation.concurrencyLevelString())) {
119118
return annotation.concurrencyLevel();
@@ -122,6 +121,14 @@ private int getConcurrencyLevel(final BatchingQueueListener annotation) {
122121
return Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString()));
123122
}
124123

124+
private int getBatchSize(final BatchingQueueListener annotation) {
125+
if (StringUtils.isEmpty(annotation.batchSizeString())) {
126+
return annotation.batchSize();
127+
}
128+
129+
return Integer.parseInt(environment.resolvePlaceholders(annotation.batchSizeString()));
130+
}
131+
125132
private long getMaxPeriodBetweenBatchesInMs(final BatchingQueueListener annotation) {
126133
if (StringUtils.isEmpty(annotation.maxPeriodBetweenBatchesInMsString())) {
127134
return annotation.maxPeriodBetweenBatchesInMs();

0 commit comments

Comments
 (0)