Skip to content

Commit c2f79e9

Browse files
refs #259: Add batching and prefetching queue listeners into Kotlin DSL (#260)
This adds the ability to create the equivalent containers as the @QueueListener and @PrefetchingQueueListener in the Kotlin Core DSL. For example: ```kotlin val container = batchingMessageListener("identifier", sqsAsyncClient, "url") { concurrencyLevel = { 10 } batchSize = { 5 } batchingPeriod = { Duration.ofSeconds(5) } processor = lambdaProcessor { method { message -> log.info("Message: {}", message.body()) } } } ``` or ```kotlin val container = prefetchingMessageListener("identifier", sqsAsyncClient, "url") { concurrencyLevel = { 2 } desiredPrefetchedMessages = 5 maxPrefetchedMessages = 10 processor = lambdaProcessor { method { message -> log.info("Message: {}", message.body()) } } } ```
1 parent cf4a812 commit c2f79e9

File tree

5 files changed

+419
-0
lines changed

5 files changed

+419
-0
lines changed

doc/how-to-guides/core/core-how-to-use-kotlin-dsl.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,46 @@ val container = coreMessageListener("identifier", sqsAsyncClient, queueUrl) {
6767
}
6868
```
6969

70+
## Using the batchingMessageListener
71+
72+
This is equivalent to
73+
the [@QueueListener](../../../spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java) annotation
74+
used in a Spring Boot application which will set up a container that will request for messages in batches.
75+
76+
```kotlin
77+
batchingMessageListener("identifier", sqsAsyncClient, "url") {
78+
concurrencyLevel = { 10 }
79+
batchSize = { 5 }
80+
batchingPeriod = { Duration.ofSeconds(5) }
81+
82+
processor = lambdaProcessor {
83+
method { message ->
84+
log.info("Message: {}", message.body())
85+
}
86+
}
87+
}
88+
```
89+
90+
## Using the prefetchingMessageListener
91+
92+
This is equivalent to
93+
the [@PrefetchingQueueListener](../../../spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java) annotation
94+
used in a Spring Boot application which will set up a container that will prefetch messages for processing.
95+
96+
```kotlin
97+
prefetchingMessageListener("identifier", sqsAsyncClient, "url") {
98+
concurrencyLevel = { 2 }
99+
desiredPrefetchedMessages = 5
100+
maxPrefetchedMessages = 10
101+
102+
processor = lambdaProcessor {
103+
method { message ->
104+
log.info("Message: {}", message.body())
105+
}
106+
}
107+
}
108+
```
109+
70110
## Example
71111

72112
A full example of using the Kotlin DSL can be found in the [core-kotlin-example](../../../examples/core-kotlin-example/README.md).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package com.jashmore.sqs.core.kotlin.dsl.container
2+
3+
import com.jashmore.sqs.QueueProperties
4+
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties
5+
import com.jashmore.sqs.container.CoreMessageListenerContainer
6+
import com.jashmore.sqs.container.CoreMessageListenerContainerProperties
7+
import com.jashmore.sqs.container.MessageListenerContainer
8+
import com.jashmore.sqs.core.kotlin.dsl.MessageListenerComponentDslMarker
9+
import com.jashmore.sqs.core.kotlin.dsl.retriever.BatchingMessageRetrieverDslBuilder
10+
import com.jashmore.sqs.resolver.batching.BatchingMessageResolverProperties
11+
import com.jashmore.sqs.retriever.MessageRetriever
12+
import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties
13+
import com.jashmore.sqs.retriever.prefetch.PrefetchingMessageRetrieverProperties
14+
import software.amazon.awssdk.services.sqs.SqsAsyncClient
15+
import java.time.Duration
16+
17+
@MessageListenerComponentDslMarker
18+
class BatchingMessageListenerContainerBuilder(identifier: String,
19+
sqsAsyncClient: SqsAsyncClient,
20+
queueProperties: QueueProperties) : MessageListenerContainerBuilder(identifier, sqsAsyncClient, queueProperties) {
21+
/**
22+
* Supplier for getting the number of messages that should be processed concurrently.
23+
*
24+
* Each time a new message is begun to be processed this supplier will be checked and therefore it should be optimised via caching
25+
* or another method. This field may return different values each time it is checked and therefore the rate of concurrency can
26+
* be dynamically changed during runtime.
27+
*
28+
* @see [ConcurrentMessageBrokerProperties.getConcurrencyLevel] for in-depth details about this field
29+
*/
30+
var concurrencyLevel: (() -> Int)? = null
31+
/**
32+
* The batch size for the number of messages to receive at once
33+
*
34+
* @see BatchingMessageRetrieverProperties.getBatchSize for more details about this field
35+
* @see BatchingMessageResolverProperties.getBufferingSizeLimit for more details about this field
36+
*/
37+
var batchSize: (() -> Int) = { 5 }
38+
/**
39+
* The maximum amount of time to wait for the number of messages requested to reach the [BatchingMessageRetrieverDslBuilder.batchSize].
40+
*
41+
* @see BatchingMessageRetrieverProperties.getBatchingPeriod for more details about this field
42+
* @see BatchingMessageResolverProperties.getBufferingTime for more details about this field
43+
*/
44+
var batchingPeriod: (() -> Duration) = { Duration.ofSeconds(2) }
45+
/**
46+
* Function for obtaining the visibility timeout for the message being retrieved.
47+
*
48+
* @see PrefetchingMessageRetrieverProperties.getMessageVisibilityTimeout for more details about this field
49+
*/
50+
var messageVisibility: (() -> Duration?) = { Duration.ofSeconds(30) }
51+
/**
52+
* Set whether any extra messages that may have been internally stored in the [MessageRetriever] should be processed before shutting down.
53+
*
54+
* @see [CoreMessageListenerContainerProperties.shouldProcessAnyExtraRetrievedMessagesOnShutdown] for more details about this field
55+
*/
56+
var processExtraMessagesOnShutdown: Boolean = true
57+
/**
58+
* Set whether the message processing threads should be interrupted when a shutdown is requested.
59+
*
60+
* @see [CoreMessageListenerContainerProperties.shouldInterruptThreadsProcessingMessagesOnShutdown] for more details about this field
61+
*/
62+
var interruptThreadsProcessingMessagesOnShutdown: Boolean = false
63+
64+
override fun invoke(): MessageListenerContainer {
65+
return coreMessageListener(identifier, sqsAsyncClient, queueProperties) {
66+
processor = this@BatchingMessageListenerContainerBuilder.processor
67+
broker = concurrentBroker {
68+
concurrencyLevel = this@BatchingMessageListenerContainerBuilder.concurrencyLevel
69+
}
70+
retriever = batchingMessageRetriever {
71+
batchSize = this@BatchingMessageListenerContainerBuilder.batchSize
72+
batchingPeriod = this@BatchingMessageListenerContainerBuilder.batchingPeriod
73+
messageVisibility = this@BatchingMessageListenerContainerBuilder.messageVisibility
74+
}
75+
resolver = batchingResolver {
76+
batchSize = this@BatchingMessageListenerContainerBuilder.batchSize
77+
batchingPeriod = this@BatchingMessageListenerContainerBuilder.batchingPeriod
78+
}
79+
shutdown {
80+
shouldInterruptThreadsProcessingMessages = this@BatchingMessageListenerContainerBuilder.interruptThreadsProcessingMessagesOnShutdown
81+
shouldProcessAnyExtraRetrievedMessages = this@BatchingMessageListenerContainerBuilder.processExtraMessagesOnShutdown
82+
}
83+
}
84+
}
85+
}
86+
87+
/**
88+
* Build a [CoreMessageListenerContainer] using a Kotlin DSL that will batch requests to retrieve messages to process.
89+
*
90+
* This is equivalent to the @QueueListener annotation in the Spring implementation.
91+
*
92+
* Usage:
93+
*
94+
* ```kotlin
95+
* val container = batchingMessageListener("identifier", sqsAsyncClient, "url") {
96+
* concurrencyLevel = { 2 }
97+
* batchSize = { 5 }
98+
* batchingPeriod = { Duration.ofSeconds(2) }
99+
*
100+
* // other configurations here...
101+
* }
102+
* ```
103+
*
104+
* @param identifier the identifier that uniquely identifies this container
105+
* @param sqsAsyncClient the client for communicating with the SQS server
106+
* @param queueUrl the URL of the queue to listen to this
107+
* @param init the function to configure this container
108+
* @return the message listener container
109+
*/
110+
fun batchingMessageListener(identifier: String,
111+
sqsAsyncClient: SqsAsyncClient,
112+
queueUrl: String,
113+
init: BatchingMessageListenerContainerBuilder.() -> Unit): MessageListenerContainer {
114+
return batchingMessageListener(identifier, sqsAsyncClient, QueueProperties.builder().queueUrl(queueUrl).build(), init)
115+
}
116+
117+
/**
118+
* Build a [CoreMessageListenerContainer] using a Kotlin DSL that will batch requests to retrieve messages to process.
119+
*
120+
* This is equivalent to the @QueueListener annotation in the Spring implementation.
121+
*
122+
* Usage:
123+
*
124+
* ```kotlin
125+
* val container = batchingMessageListener("identifier", sqsAsyncClient, QueueProperties.builder().queueUrl("url").build()) {
126+
* concurrencyLevel = { 2 }
127+
* batchSize = { 5 }
128+
* batchingPeriod = { Duration.ofSeconds(2) }
129+
*
130+
* // other configurations here...
131+
* }
132+
* ```
133+
*
134+
* @param identifier the identifier that uniquely identifies this container
135+
* @param sqsAsyncClient the client for communicating with the SQS server
136+
* @param queueProperties details about the queue that is being listened to
137+
* @param init the function to configure this container
138+
* @return the message listener container
139+
*/
140+
fun batchingMessageListener(identifier: String,
141+
sqsAsyncClient: SqsAsyncClient,
142+
queueProperties: QueueProperties,
143+
init: BatchingMessageListenerContainerBuilder.() -> Unit): MessageListenerContainer {
144+
145+
val listener = BatchingMessageListenerContainerBuilder(identifier, sqsAsyncClient, queueProperties)
146+
listener.init()
147+
return listener()
148+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package com.jashmore.sqs.core.kotlin.dsl.container
2+
3+
import com.jashmore.sqs.QueueProperties
4+
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties
5+
import com.jashmore.sqs.container.CoreMessageListenerContainer
6+
import com.jashmore.sqs.container.CoreMessageListenerContainerProperties
7+
import com.jashmore.sqs.container.MessageListenerContainer
8+
import com.jashmore.sqs.core.kotlin.dsl.MessageListenerComponentDslMarker
9+
import com.jashmore.sqs.core.kotlin.dsl.retriever.PrefetchingMessageRetrieverDslBuilder
10+
import com.jashmore.sqs.retriever.MessageRetriever
11+
import com.jashmore.sqs.retriever.prefetch.PrefetchingMessageRetrieverProperties
12+
import software.amazon.awssdk.services.sqs.SqsAsyncClient
13+
import java.time.Duration
14+
15+
@MessageListenerComponentDslMarker
16+
class PrefetchingMessageListenerContainerBuilder(identifier: String,
17+
sqsAsyncClient: SqsAsyncClient,
18+
queueProperties: QueueProperties) : MessageListenerContainerBuilder(identifier, sqsAsyncClient, queueProperties) {
19+
/**
20+
* Supplier for getting the number of messages that should be processed concurrently.
21+
*
22+
* Each time a new message is begun to be processed this supplier will be checked and therefore it should be optimised via caching
23+
* or another method. This field may return different values each time it is checked and therefore the rate of concurrency can
24+
* be dynamically changed during runtime.
25+
*
26+
* @see [ConcurrentMessageBrokerProperties.getConcurrencyLevel] for in-depth details about this field
27+
*/
28+
var concurrencyLevel: (() -> Int)? = null
29+
/**
30+
* The desired messages to be prefetched.
31+
*
32+
* @see PrefetchingMessageRetrieverProperties.getDesiredMinPrefetchedMessages for more details about this field
33+
*/
34+
var desiredPrefetchedMessages: Int? = null
35+
/**
36+
* The maximum numbers that can be prefetched, must be less than [PrefetchingMessageRetrieverDslBuilder.desiredPrefetchedMessages].
37+
*
38+
* @see PrefetchingMessageRetrieverProperties.getMaxPrefetchedMessages for more details about this field
39+
*/
40+
var maxPrefetchedMessages: Int? = null
41+
/**
42+
* Function for obtaining the visibility timeout for the message being retrieved.
43+
*
44+
* @see PrefetchingMessageRetrieverProperties.getMessageVisibilityTimeout for more details about this field
45+
*/
46+
var messageVisibility: (() -> Duration?) = { Duration.ofSeconds(30) }
47+
/**
48+
* Set whether any extra messages that may have been internally stored in the [MessageRetriever] should be processed before shutting down.
49+
*
50+
* @see [CoreMessageListenerContainerProperties.shouldProcessAnyExtraRetrievedMessagesOnShutdown] for more details about this field
51+
*/
52+
var processExtraMessagesOnShutdown: Boolean = true
53+
/**
54+
* Set whether the message processing threads should be interrupted when a shutdown is requested.
55+
*
56+
* @see [CoreMessageListenerContainerProperties.shouldInterruptThreadsProcessingMessagesOnShutdown] for more details about this field
57+
*/
58+
var interruptThreadsProcessingMessagesOnShutdown: Boolean = false
59+
60+
override fun invoke(): MessageListenerContainer {
61+
return coreMessageListener(identifier, sqsAsyncClient, queueProperties) {
62+
processor = this@PrefetchingMessageListenerContainerBuilder.processor
63+
broker = concurrentBroker {
64+
concurrencyLevel = this@PrefetchingMessageListenerContainerBuilder.concurrencyLevel
65+
}
66+
retriever = prefetchingMessageRetriever {
67+
desiredPrefetchedMessages = this@PrefetchingMessageListenerContainerBuilder.desiredPrefetchedMessages
68+
maxPrefetchedMessages = this@PrefetchingMessageListenerContainerBuilder.maxPrefetchedMessages
69+
messageVisibility = this@PrefetchingMessageListenerContainerBuilder.messageVisibility
70+
}
71+
resolver = batchingResolver()
72+
shutdown {
73+
shouldInterruptThreadsProcessingMessages = this@PrefetchingMessageListenerContainerBuilder.interruptThreadsProcessingMessagesOnShutdown
74+
shouldProcessAnyExtraRetrievedMessages = this@PrefetchingMessageListenerContainerBuilder.processExtraMessagesOnShutdown
75+
}
76+
}
77+
}
78+
}
79+
80+
/**
81+
* Build a [CoreMessageListenerContainer] using a Kotlin DSL that will prefetch messages to process.
82+
*
83+
* This is equivalent to the @PrefetchingQueueListener annotation in the Spring implementation.
84+
*
85+
* Usage:
86+
*
87+
* ```kotlin
88+
* val container = prefetchingMessageListener("identifier", sqsAsyncClient, "url") {
89+
* concurrencyLevel = { 2 }
90+
* desiredPrefetchedMessages = 5
91+
* maxPrefetchedMessages = 10
92+
*
93+
* // other configurations here...
94+
* }
95+
* ```
96+
*
97+
* @param identifier the identifier that uniquely identifies this container
98+
* @param sqsAsyncClient the client for communicating with the SQS server
99+
* @param queueUrl the URL of the queue to listen to this
100+
* @param init the function to configure this container
101+
* @return the message listener container
102+
*/
103+
fun prefetchingMessageListener(identifier: String,
104+
sqsAsyncClient: SqsAsyncClient,
105+
queueUrl: String,
106+
init: PrefetchingMessageListenerContainerBuilder.() -> Unit): MessageListenerContainer {
107+
return prefetchingMessageListener(identifier, sqsAsyncClient, QueueProperties.builder().queueUrl(queueUrl).build(), init)
108+
}
109+
110+
/**
111+
* Build a [CoreMessageListenerContainer] using a Kotlin DSL that will prefetch messages to process.
112+
*
113+
* This is equivalent to the @PrefetchingMessageListener annotation in the Spring implementation.
114+
*
115+
* Usage:
116+
*
117+
* ```kotlin
118+
* val container = prefetchingMessageListener("identifier", sqsAsyncClient, QueueProperties.builder().queueUrl("url").build()) {
119+
* concurrencyLevel = { 2 }
120+
* desiredPrefetchedMessages = 5
121+
* maxPrefetchedMessages = 10
122+
*
123+
* // other configurations here...
124+
* }
125+
* ```
126+
*
127+
* @param identifier the identifier that uniquely identifies this container
128+
* @param sqsAsyncClient the client for communicating with the SQS server
129+
* @param queueProperties details about the queue that is being listened to
130+
* @param init the function to configure this container
131+
* @return the message listener container
132+
*/
133+
fun prefetchingMessageListener(identifier: String,
134+
sqsAsyncClient: SqsAsyncClient,
135+
queueProperties: QueueProperties,
136+
init: PrefetchingMessageListenerContainerBuilder.() -> Unit): MessageListenerContainer {
137+
138+
val listener = PrefetchingMessageListenerContainerBuilder(identifier, sqsAsyncClient, queueProperties)
139+
listener.init()
140+
return listener()
141+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.jashmore.sqs.core.kotlin.dsl.container
2+
3+
import com.jashmore.sqs.container.MessageListenerContainer
4+
import com.jashmore.sqs.elasticmq.ElasticMqSqsAsyncClient
5+
import org.assertj.core.api.Assertions
6+
import org.junit.jupiter.api.AfterEach
7+
import org.junit.jupiter.api.Test
8+
import java.time.Duration
9+
import java.util.concurrent.CountDownLatch
10+
import java.util.concurrent.TimeUnit
11+
12+
class BatchingMessageListenerContainerBuilderTest {
13+
14+
var container: MessageListenerContainer? = null
15+
16+
@AfterEach
17+
internal fun tearDown() {
18+
container?.stop()
19+
}
20+
21+
@Test
22+
fun `minimal configuration`() {
23+
// arrange
24+
val sqsAsyncClient = ElasticMqSqsAsyncClient()
25+
val queueUrl = sqsAsyncClient.createRandomQueue().get().queueUrl()
26+
val countDownLatch = CountDownLatch(1)
27+
28+
// act
29+
container = batchingMessageListener("identifier", sqsAsyncClient, queueUrl) {
30+
concurrencyLevel = { 1 }
31+
batchSize = { 5 }
32+
batchingPeriod = { Duration.ofSeconds(1) }
33+
processor = lambdaProcessor {
34+
method { _ ->
35+
countDownLatch.countDown()
36+
}
37+
}
38+
}
39+
container?.start()
40+
sqsAsyncClient.sendMessage { it.queueUrl(queueUrl).messageBody("body") }
41+
42+
// assert
43+
Assertions.assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue()
44+
}
45+
}

0 commit comments

Comments
 (0)