Skip to content

Commit 87f06d8

Browse files
refs #254: Added Ktor support (#261)
Can now register message listeners in a Ktor application by using the Ktor Core module. Usage would look like: ```kotlin val server = embeddedServer(Netty, 8080) { prefetchingMessageListener("prefetching-listener", sqsAsyncClient, "queueUrl") { concurrencyLevel = { 2 } desiredPrefetchedMessages = 5 maxPrefetchedMessages = 10 processor = lambdaProcessor { method { message, _ -> log.info("Message: {}", message.body()) } } } } ```
1 parent c2f79e9 commit 87f06d8

File tree

13 files changed

+493
-4
lines changed

13 files changed

+493
-4
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ See the [build.gradle.kts](build.gradle.kts) for the specific versions of these
137137
across multiple AWS Accounts
138138
1. [How to version message payload schemas](doc/how-to-guides/spring/spring-how-to-version-payload-schemas-using-spring-cloud-schema-registry.md): guide
139139
for versioning payloads using Avro and the Spring Cloud Schema Registry.
140+
1. Ktor How to Guides
141+
1. [How to Register Message Listeners](doc/how-to-guides/ktor/ktor-how-to-register-message-listeners.md): guide for include message listeners into a
142+
Ktor application.
140143

141144
## Common Use Cases/Explanations
142145

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ allprojects {
2020
}
2121

2222
subprojects {
23-
val isKotlinProject = project.name.contains("kotlin")
23+
val isKotlinProject = project.name.contains("kotlin") || project.name.contains("ktor")
2424
apply(plugin = "java-library")
2525
apply(plugin = "jacoco")
2626
if (!isKotlinProject) {

doc/how-to-guides/core/core-how-to-use-kotlin-dsl.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ Check out the [Core Kotlin DSL](../../../extensions/core-kotlin-dsl) for more de
5959
```kotlin
6060
val container = coreMessageListener("identifier", sqsAsyncClient, queueUrl) {
6161
processor = lambdaProcessor {
62-
method { message ->
62+
method { message ->
6363
log.info("Message received: {}", message.body())
6464
}
6565
}
@@ -74,7 +74,7 @@ the [@QueueListener](../../../spring/spring-core/src/main/java/com/jashmore/sqs/
7474
used in a Spring Boot application which will set up a container that will request for messages in batches.
7575

7676
```kotlin
77-
batchingMessageListener("identifier", sqsAsyncClient, "url") {
77+
val container = batchingMessageListener("identifier", sqsAsyncClient, "url") {
7878
concurrencyLevel = { 10 }
7979
batchSize = { 5 }
8080
batchingPeriod = { Duration.ofSeconds(5) }
@@ -94,7 +94,7 @@ the [@PrefetchingQueueListener](../../../spring/spring-core/src/main/java/com/ja
9494
used in a Spring Boot application which will set up a container that will prefetch messages for processing.
9595

9696
```kotlin
97-
prefetchingMessageListener("identifier", sqsAsyncClient, "url") {
97+
val container = prefetchingMessageListener("identifier", sqsAsyncClient, "url") {
9898
concurrencyLevel = { 2 }
9999
desiredPrefetchedMessages = 5
100100
maxPrefetchedMessages = 10
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Ktor - How to Register Message Listeners
2+
3+
The [Ktor Core](../../../ktor/core) module adds the ability to register Message Listeners into the web application.
4+
5+
## Steps
6+
7+
1. Include the [Ktor Core](../../../ktor/core) dependency.
8+
9+
```xml
10+
<dependency>
11+
<groupId>com.jashmore</groupId>
12+
<artifactId>java-dynamic-sqs-listener-ktor-core</artifactId>
13+
<version>${dynamic-sqs-listener.version}</version>
14+
</dependency>
15+
```
16+
17+
1. Include one of the message listener [extension functions](../../../ktor/core/src/main/kotlin/com/jashmore/sqs/ktor/container/KtorCoreExtension.kt) imports.
18+
19+
```kotlin
20+
import com.jashmore.sqs.ktor.container.* // Change * to the specific function
21+
```
22+
23+
Make sure to use this `ktor` import instead of the `com.jashmore.sqs.core.kotlin.dsl.container.*` import as the `ktor` version will attach
24+
to the lifecycle of the server.
25+
26+
1. Integrate the message listener into the web application
27+
28+
```kotlin
29+
val server = embeddedServer(Netty, 8080) {
30+
batchingMessageListener("batching-listener", sqsAsyncClient, queeuUrl) {
31+
concurrencyLevel = { 2 }
32+
33+
processor = lambdaProcessor {
34+
method { message ->
35+
log.info("Processing message: ${message.body()}")
36+
}
37+
}
38+
}
39+
}
40+
```
41+
42+
## Shutting down the container on server shutdown
43+
44+
To make sure the message listener is gracefully shutdown when the web server is shutdown, you will need to make sure to
45+
attach to a `ShutdownHook`:
46+
47+
```kotlin
48+
val server = embeddedServer(Netty, 8080) {
49+
// setup message listeners here
50+
}
51+
server.start()
52+
Runtime.getRuntime().addShutdownHook(Thread {
53+
server.stop(1, 30_000)
54+
})
55+
Thread.currentThread().join()
56+
```

examples/ktor-example/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Ktor Example
2+
3+
This example shows the usage of the [Ktor Core](../../ktor/core) module which uses the [Core Kotlin DSL](../../extensions/core-kotlin-dsl) to build
4+
the message listeners. It creates multiple message listeners that use different methods to process messages, such as prefetching messages, etc.
5+
6+
## Usage
7+
8+
```bash
9+
gradle runApp
10+
```
11+
12+
Perform a GET request to [http://localhost:8080/message/{queueIdentifier}](http://localhost:8080/message/one) where the queueIdentifier is one
13+
of `one`, `two`, or `three`. Any unknown queue will return 404. You can also start and stop the message listeners by navigating to
14+
[http://localhost:8080/start/{queueIdentifier}](http://localhost:8080/start/one) and
15+
[http://localhost:8080/stop/{queueIdentifier}](http://localhost:8080/stop/one) respectively.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
2+
3+
description = "Contains examples for connection to SQS in an Ktor Application"
4+
5+
plugins {
6+
kotlin("jvm") version "1.3.72"
7+
}
8+
9+
repositories {
10+
jcenter()
11+
}
12+
13+
apply(plugin = "kotlin")
14+
15+
dependencies {
16+
implementation(project(":java-dynamic-sqs-listener-core"))
17+
implementation(project(":core-kotlin-dsl"))
18+
implementation(project(":elasticmq-sqs-client"))
19+
api(project(":java-dynamic-sqs-listener-ktor-core"))
20+
implementation("io.ktor:ktor-server-netty:1.3.2")
21+
implementation("ch.qos.logback:logback-core")
22+
implementation("ch.qos.logback:logback-classic")
23+
}
24+
25+
tasks.withType<KotlinCompile>().configureEach {
26+
kotlinOptions.jvmTarget = "1.8"
27+
}
28+
29+
tasks.create<JavaExec>("runApp") {
30+
classpath = sourceSets.main.get().runtimeClasspath
31+
32+
main = "com.jashmore.sqs.examples.KtorApplicationExample"
33+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
@file:JvmName("KtorApplicationExample")
2+
package com.jashmore.sqs.examples
3+
4+
import com.jashmore.sqs.container.MessageListenerContainer
5+
import com.jashmore.sqs.elasticmq.ElasticMqSqsAsyncClient
6+
import com.jashmore.sqs.ktor.container.batchingMessageListener
7+
import com.jashmore.sqs.ktor.container.messageListener
8+
import com.jashmore.sqs.ktor.container.prefetchingMessageListener
9+
import io.ktor.application.call
10+
import io.ktor.application.log
11+
import io.ktor.http.ContentType
12+
import io.ktor.http.HttpStatusCode
13+
import io.ktor.response.respond
14+
import io.ktor.response.respondText
15+
import io.ktor.routing.get
16+
import io.ktor.routing.routing
17+
import io.ktor.server.engine.embeddedServer
18+
import io.ktor.server.netty.Netty
19+
import kotlinx.coroutines.GlobalScope
20+
import kotlinx.coroutines.delay
21+
import kotlinx.coroutines.future.await
22+
import kotlinx.coroutines.future.future
23+
import kotlinx.coroutines.launch
24+
import software.amazon.awssdk.services.sqs.model.Message
25+
import java.time.Duration
26+
27+
fun main() {
28+
val sqsAsyncClient = ElasticMqSqsAsyncClient()
29+
30+
val server = embeddedServer(Netty, 8080) {
31+
val firstQueueUrl = sqsAsyncClient.createRandomQueue().get().queueUrl()
32+
val firstContainer = messageListener("core-listener", sqsAsyncClient, firstQueueUrl) {
33+
processor = lambdaProcessor {
34+
method { message ->
35+
log.info("Message: {}", message.body())
36+
}
37+
}
38+
retriever = batchingMessageRetriever {
39+
batchSize = { 1 }
40+
batchingPeriod = { Duration.ofSeconds(30) }
41+
}
42+
resolver = batchingResolver {
43+
batchSize = { 1 }
44+
batchingPeriod = { Duration.ofSeconds(5) }
45+
}
46+
broker = concurrentBroker {
47+
concurrencyLevel = { 5 }
48+
concurrencyPollingRate = { Duration.ofMinutes(1) }
49+
}
50+
}
51+
52+
val secondQueueUrl = sqsAsyncClient.createRandomQueue().get().queueUrl()
53+
val secondContainer = prefetchingMessageListener("prefetching-listener", sqsAsyncClient, secondQueueUrl) {
54+
concurrencyLevel = { 2 }
55+
desiredPrefetchedMessages = 5
56+
maxPrefetchedMessages = 10
57+
58+
processor = lambdaProcessor {
59+
methodWithVisibilityExtender { message, _ ->
60+
log.info("Message: {}", message.body())
61+
}
62+
}
63+
}
64+
65+
val thirdQueueUrl = sqsAsyncClient.createRandomQueue().get().queueUrl()
66+
val thirdContainer = batchingMessageListener("batching-listener", sqsAsyncClient, thirdQueueUrl) {
67+
concurrencyLevel = { 2 }
68+
batchSize = { 5 }
69+
batchingPeriod = { Duration.ofSeconds(5) }
70+
71+
processor = asyncLambdaProcessor {
72+
method { message ->
73+
log.info("Processing message: ${message.body()}")
74+
future {
75+
something(message)
76+
}
77+
}
78+
}
79+
}
80+
81+
routing {
82+
get("/message/{queue}") {
83+
val queueIdentifier = call.parameters["queue"]
84+
val queueUrl: String? = when (queueIdentifier) {
85+
"one" -> firstQueueUrl
86+
"two" -> secondQueueUrl
87+
"three" -> thirdQueueUrl
88+
else -> null
89+
}
90+
91+
if (queueUrl == null) {
92+
call.respond(HttpStatusCode.NotFound, "Unknown queue: $queueIdentifier")
93+
return@get
94+
}
95+
96+
val response = sqsAsyncClient.sendMessage {
97+
it.queueUrl(queueUrl).messageBody("body")
98+
}.await()
99+
100+
call.respondText("Hello, world! ${response.messageId()}", ContentType.Text.Html)
101+
}
102+
103+
get("/start/{queue}") {
104+
val queueIdentifier = call.parameters["queue"]
105+
val container: MessageListenerContainer? = when (queueIdentifier) {
106+
"one" -> firstContainer
107+
"two" -> secondContainer
108+
"three" -> thirdContainer
109+
else -> null
110+
}
111+
112+
if (container == null) {
113+
call.respond(HttpStatusCode.NotFound, "Unknown queue: $queueIdentifier")
114+
return@get
115+
}
116+
117+
container.start()
118+
119+
call.respondText("Container ${queueIdentifier} started", ContentType.Text.Html)
120+
}
121+
122+
123+
get("/stop/{queue}") {
124+
val queueIdentifier = call.parameters["queue"]
125+
val container: MessageListenerContainer? = when (queueIdentifier) {
126+
"one" -> firstContainer
127+
"two" -> secondContainer
128+
"three" -> thirdContainer
129+
else -> null
130+
}
131+
132+
if (container == null) {
133+
call.respond(HttpStatusCode.NotFound, "Unknown queue: $queueIdentifier")
134+
return@get
135+
}
136+
137+
container.stop()
138+
139+
call.respondText("Container ${queueIdentifier} stopped", ContentType.Text.Html)
140+
}
141+
}
142+
}
143+
server.start()
144+
Runtime.getRuntime().addShutdownHook(Thread {
145+
server.stop(1, 30_000)
146+
})
147+
Thread.currentThread().join()
148+
}
149+
150+
suspend fun something(@Suppress("UNUSED_PARAMETER") message: Message) = GlobalScope.launch {
151+
delay(5000)
152+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<configuration>
2+
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<encoder>
5+
<pattern>%d{dd-MM-yyyy HH:mm:ss.SSS} %magenta([%thread]) %highlight(%-5level) %logger{36}.%M - %msg %yellow(%mdc) %n</pattern>
6+
</encoder>
7+
</appender>
8+
9+
<!-- We reduce the log messages from ElasticMQ, Akka and Netty to reduce the amount of unnecessary log messages being published -->
10+
<logger name="org.elasticmq" level="OFF" />
11+
<logger name="akka" level="OFF" />
12+
<logger name="io.netty" level="ERROR" />
13+
14+
<logger name="com.jashmore" level="INFO" />
15+
16+
<root level="INFO">
17+
<appender-ref ref="STDOUT" />
18+
</root>
19+
</configuration>

ktor/core/build.gradle.kts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
2+
3+
description = "Library for integrating the Java Dynamic Sqs Listener in a Ktor application"
4+
5+
plugins {
6+
kotlin("jvm") version "1.3.72"
7+
}
8+
9+
repositories {
10+
jcenter()
11+
}
12+
13+
apply(plugin = "kotlin")
14+
15+
dependencies {
16+
implementation(kotlin("stdlib-jdk8"))
17+
api(project(":java-dynamic-sqs-listener-core"))
18+
api(project(":core-kotlin-dsl"))
19+
implementation("io.ktor:ktor-server-core:1.3.2")
20+
21+
testImplementation(project(":elasticmq-sqs-client"))
22+
testImplementation(project(":expected-test-exception"))
23+
testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0")
24+
testImplementation("io.ktor:ktor-server-test-host:1.3.2")
25+
}
26+
27+
tasks.withType<KotlinCompile>().configureEach {
28+
kotlinOptions.jvmTarget = "1.8"
29+
}

0 commit comments

Comments
 (0)