Skip to content

Commit d0a0034

Browse files
authored
Add micronaut integration (#407)
1 parent 0f4482a commit d0a0034

File tree

50 files changed

+3051
-6
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3051
-6
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,6 @@ hs_err_pid*
4848
!.yarn/sdks
4949
!.yarn/versions
5050

51-
!gradle-wrapper.jar
51+
!gradle-wrapper.jar
52+
53+
.micronaut/

.prettierignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@ lombok.config
2424
.husky
2525

2626
org.springframework.boot.autoconfigure.AutoConfiguration.imports
27+
io.micronaut.inject.annotation.AnnotationTransformer
2728

2829
doc/resources/**/*.xml

README.md

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,81 @@ See the [Core Kotlin Example](examples/core-kotlin-example) for a full example r
223223

224224
See the [Ktor Core Example](examples/ktor-example) for a full example running a Ktor framework that listens to a local ElasticMQ SQS Server.
225225

226+
### Micronaut Quick Guide
227+
228+
1. Include the Micronaut core dependency with Maven `<dependencies>`:
229+
230+
```xml
231+
<dependency>
232+
<groupId>com.jashmore</groupId>
233+
<artifactId>java-dynamic-sqs-listener-micronaut-core</artifactId>
234+
<version>${sqs.listener.version}</version>
235+
</dependency>
236+
```
237+
238+
Or with Gradle:
239+
240+
```kotlin
241+
dependencies {
242+
implementation("com.jashmore:java-dynamic-sqs-listener-micronaut-core:${sqs.listener.version}")
243+
}
244+
```
245+
246+
1. Also, include the Micronaut annotation processor with Maven:
247+
248+
```xml
249+
<pluginManagement>
250+
<plugins>
251+
<plugin>
252+
<groupId>org.apache.maven.plugins</groupId>
253+
<artifactId>maven-compiler-plugin</artifactId>
254+
<version>${maven.compiler.version}</version>
255+
<configuration>
256+
<annotationProcessorPaths>
257+
<annotationProcessorPath>
258+
<groupId>com.jashmore</groupId>
259+
<artifactId>java-dynamic-sqs-listener-micronaut-inject-java</artifactId>
260+
<version>${sqs.listener.version}</version>
261+
</annotationProcessorPath>
262+
</annotationProcessorPaths>
263+
</configuration>
264+
</plugin>
265+
</plugins>
266+
</pluginManagement>
267+
```
268+
269+
Or with Gradle:
270+
271+
```kotlin
272+
dependencies {
273+
annotationProcessor("com.jashmore:java-dynamic-sqs-listener-micronaut-inject-java:${sqs.listener.version}")
274+
}
275+
```
276+
277+
Micronaut will use this at compile time to transform usages of core listener annotations to enable
278+
method processors to register annotated methods as message listeners.
279+
280+
1. In one of your beans, attach a
281+
[@QueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java) or other supported annotation
282+
to a method indicating that it should process messages from a queue.
283+
284+
```java
285+
@Singleton
286+
public class MyMessageListener {
287+
288+
// The queue here can point to your SQS server, e.g. a
289+
// local SQS server or one on AWS
290+
@QueueListener("${insert.queue.url.here}")
291+
public void processMessage(@Payload final String payload) {
292+
// process the message payload here
293+
}
294+
}
295+
296+
```
297+
298+
This will use any configured `SqsAsyncClient` in the application context for connecting to the queue, otherwise a default
299+
will be provided that will look for AWS credentials/region from multiple areas, like the environment variables.
300+
226301
## Core Infrastructure
227302

228303
This library has been divided into isolated components each with distinct responsibilities. The following is a diagram describing a simple flow of a
@@ -261,14 +336,16 @@ for compatibility.
261336
- [AWS SQS SDK](https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/welcome.html)
262337
- [Jackson Databind](https://github.com/FasterXML/jackson-databind)
263338
- [SLF4J API](https://github.com/qos-ch/slf4j)
339+
340+
The following require all the core dependencies above.
341+
264342
- [Spring Framework](./spring)
265-
- All the core dependencies above
266343
- [Spring Boot](https://github.com/spring-projects/spring-boot)
267344
- [Ktor Framework](./ktor)
268-
269-
- All the core dependencies above
270345
- [Kotlin](https://github.com/JetBrains/kotlin)
271346
- [Ktor](https://github.com/ktorio/ktor)
347+
- [Micronaut Framework](./micronaut)
348+
- [Micronaut](https://github.com/micronaut-projects/micronaut-core)
272349

273350
See the [gradle.properties](gradle.properties) for the specific versions of these dependencies.
274351

@@ -363,6 +440,11 @@ public class MyMessageListener {
363440

364441
```
365442

443+
### Micronaut
444+
445+
The `micronaut-core` library is applied pretty much the same way as `spring-starter`,
446+
so for Micronaut it will be useful to look through the Spring guides and examples.
447+
366448
### Spring - Adding a custom argument resolver
367449

368450
There are some core [ArgumentResolvers](./api/src/main/java/com/jashmore/sqs/argument/ArgumentResolver.java) provided in the
@@ -576,7 +658,7 @@ finish in 10 milliseconds but one takes 10 seconds no other messages will be pic
576658
provides the same basic functionality, but it also provides a timeout where it will eventually request for more messages when there are threads that are
577659
ready for another message.
578660
579-
#### Core/Spring Boot
661+
#### Core/Spring Boot/Micronaut
580662
581663
```java
582664
public class MyMessageListener {

build.gradle.kts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,10 @@ subprojects {
118118
// these classes are better handled by integration tests
119119
"com.jashmore.sqs.container.fifo*" ,
120120
"com.jashmore.sqs.container.batching*",
121-
"com.jashmore.sqs.container.prefetching*"
121+
"com.jashmore.sqs.container.prefetching*",
122+
"com.jashmore.sqs.micronaut.configuration*",
123+
"com.jashmore.sqs.micronaut.container*",
124+
"com.jashmore.sqs.micronaut.placeholder*"
122125
)
123126
element = "PACKAGE"
124127
limit {
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Micronaut Integration Test Example
2+
3+
There are many examples in this codebase to run integration tests for this application (look in the src/test/java/integrationTest folder of
4+
the module) but this shows a minimal use case to copy from.
5+
6+
## Usage
7+
8+
```bash
9+
gradle integrationTest
10+
```
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
2+
description = "Contains an example for writing an integration test for the SQS Listener"
3+
4+
plugins {
5+
id("io.micronaut.minimal.application") version "4.4.2"
6+
}
7+
8+
val micronautVersion: String by project
9+
10+
dependencies {
11+
implementation("io.micronaut:micronaut-http-server-netty")
12+
runtimeOnly("ch.qos.logback:logback-classic")
13+
runtimeOnly("org.yaml:snakeyaml")
14+
implementation(project(":java-dynamic-sqs-listener-micronaut-core"))
15+
annotationProcessor(project(":java-dynamic-sqs-listener-micronaut-inject-java"))
16+
17+
testImplementation(project(":elasticmq-sqs-client"))
18+
testImplementation(project(":expected-test-exception"))
19+
testImplementation("io.micronaut.test:micronaut-test-junit5")
20+
testImplementation("org.mockito:mockito-core")
21+
testImplementation("org.mockito:mockito-junit-jupiter")
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.jashmore.sqs.examples.integrationtests;
2+
3+
import com.jashmore.sqs.annotations.core.basic.QueueListener;
4+
import com.jashmore.sqs.argument.payload.Payload;
5+
import io.micronaut.runtime.Micronaut;
6+
import jakarta.inject.Singleton;
7+
import lombok.AllArgsConstructor;
8+
9+
public class TestApplication {
10+
11+
public static void main(String[] args) {
12+
Micronaut.build(args).mainClass(TestApplication.class).start();
13+
}
14+
15+
@Singleton
16+
public static class SomeService {
17+
18+
/**
19+
* Process a message payload, no-op.
20+
*
21+
* @param payload the payload of the message
22+
*/
23+
public void run(final String payload) {
24+
// do nothing
25+
}
26+
}
27+
28+
@Singleton
29+
@AllArgsConstructor
30+
public static class MessageListener {
31+
32+
private final SomeService someService;
33+
34+
/**
35+
* We specifically override the visibility timeout here from the default of 30 to decrease the time
36+
* for the tests to run.
37+
*
38+
* @param message the payload of the message
39+
*/
40+
41+
@QueueListener(value = "${sqs.queues.integrationTestingQueue}", messageVisibilityTimeoutInSeconds = 2)
42+
public void messageListener(@Payload final String message) {
43+
someService.run(message);
44+
}
45+
}
46+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
java-dynamic-sqs-listener-micronaut:
2+
auto-start-containers-enabled: true # default
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<configuration>
2+
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<encoder>
5+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
6+
</encoder>
7+
</appender>
8+
9+
<!-- We reduce the log messages from ElasticMQ, Akka and Netty to reduce the amount of unnecessary log messages being published -->
10+
<logger name="org.elasticmq" level="OFF" />
11+
<logger name="akka" level="OFF" />
12+
<logger name="io.netty" level="ERROR" />
13+
14+
<logger name="com.jashmore" level="DEBUG" />
15+
16+
<root level="info">
17+
<appender-ref ref="STDOUT" />
18+
</root>
19+
</configuration>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package com.jashmore.sqs.examples.integrationtests;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.ArgumentMatchers.anyString;
5+
import static org.mockito.Mockito.*;
6+
7+
import com.jashmore.sqs.elasticmq.ElasticMqSqsAsyncClient;
8+
import com.jashmore.sqs.util.ExpectedTestException;
9+
import com.jashmore.sqs.util.LocalSqsAsyncClient;
10+
import com.jashmore.sqs.util.SqsQueuesConfig;
11+
import io.micronaut.context.annotation.Factory;
12+
import io.micronaut.context.annotation.Property;
13+
import io.micronaut.test.annotation.MockBean;
14+
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
15+
import jakarta.inject.Inject;
16+
import jakarta.inject.Singleton;
17+
import java.time.Duration;
18+
import java.util.Collections;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.TestInstance;
27+
28+
@MicronautTest
29+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
30+
@Property(name = "sqs.queues.integrationTestingQueue", value = SqsListenerExampleIntegrationTest.QUEUE_NAME)
31+
class SqsListenerExampleIntegrationTest {
32+
33+
static final String QUEUE_NAME = "SqsListenerExampleIntegrationTest";
34+
private static final int QUEUE_MAX_RECEIVE_COUNT = 3;
35+
private static final int VISIBILITY_TIMEOUT_IN_SECONDS = 2;
36+
37+
@Inject
38+
private LocalSqsAsyncClient localSqsAsyncClient;
39+
40+
@MockBean(TestApplication.SomeService.class)
41+
TestApplication.SomeService someService() {
42+
return mock(TestApplication.SomeService.class);
43+
}
44+
45+
@Inject
46+
private TestApplication.SomeService mockSomeService;
47+
48+
@Factory
49+
public static class TestConfig {
50+
51+
@Singleton
52+
public LocalSqsAsyncClient localSqsAsyncClient() {
53+
return new ElasticMqSqsAsyncClient(
54+
Collections.singletonList(
55+
SqsQueuesConfig.QueueConfig
56+
.builder()
57+
.queueName(QUEUE_NAME)
58+
.maxReceiveCount(QUEUE_MAX_RECEIVE_COUNT)
59+
.visibilityTimeout(VISIBILITY_TIMEOUT_IN_SECONDS)
60+
.build()
61+
)
62+
);
63+
}
64+
}
65+
66+
@AfterEach
67+
void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
68+
localSqsAsyncClient.purgeAllQueues().get(5, TimeUnit.SECONDS);
69+
}
70+
71+
@Test
72+
void messagesPlacedOntoQueueArePickedUpMessageListener() throws Exception {
73+
// arrange
74+
final CountDownLatch messageReceivedCountDownLatch = new CountDownLatch(1);
75+
doAnswer(invocationOnMock -> {
76+
messageReceivedCountDownLatch.countDown();
77+
return null;
78+
})
79+
.when(mockSomeService)
80+
.run(anyString());
81+
82+
// act
83+
localSqsAsyncClient.sendMessage(QUEUE_NAME, "my message");
84+
messageReceivedCountDownLatch.await(VISIBILITY_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
85+
86+
// assert
87+
verify(mockSomeService).run("my message");
88+
}
89+
90+
@Test
91+
void messageFailingToProcessWillBeProcessedAgain() throws Exception {
92+
// arrange
93+
final CountDownLatch messageReceivedCountDownLatch = new CountDownLatch(1);
94+
final AtomicBoolean processedMessageOnce = new AtomicBoolean();
95+
doAnswer(invocationOnMock -> {
96+
if (!processedMessageOnce.getAndSet(true)) {
97+
throw new ExpectedTestException();
98+
}
99+
messageReceivedCountDownLatch.countDown();
100+
return null;
101+
})
102+
.when(mockSomeService)
103+
.run(anyString());
104+
105+
// act
106+
localSqsAsyncClient.sendMessage(QUEUE_NAME, "my message");
107+
messageReceivedCountDownLatch.await(3 * VISIBILITY_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
108+
109+
// assert
110+
verify(mockSomeService, times(2)).run("my message");
111+
}
112+
113+
@Test
114+
void messageThatContinuesToFailWillBePlacedIntoDlq() throws Exception {
115+
// arrange
116+
final CountDownLatch messageReceivedCountDownLatch = new CountDownLatch(QUEUE_MAX_RECEIVE_COUNT);
117+
doAnswer(invocationOnMock -> {
118+
messageReceivedCountDownLatch.countDown();
119+
throw new ExpectedTestException();
120+
})
121+
.when(mockSomeService)
122+
.run(anyString());
123+
124+
// act
125+
localSqsAsyncClient.sendMessage(QUEUE_NAME, "my message");
126+
messageReceivedCountDownLatch.await(VISIBILITY_TIMEOUT_IN_SECONDS * (QUEUE_MAX_RECEIVE_COUNT + 1), TimeUnit.SECONDS);
127+
waitForMessageVisibilityToExpire();
128+
129+
// assert
130+
final int numberOfMessages = localSqsAsyncClient.getApproximateMessages(QUEUE_NAME + "-dlq").get();
131+
assertThat(numberOfMessages).isEqualTo(1);
132+
}
133+
134+
private void waitForMessageVisibilityToExpire() throws InterruptedException {
135+
Thread.sleep(Duration.ofSeconds(VISIBILITY_TIMEOUT_IN_SECONDS + 1).toMillis());
136+
}
137+
}

0 commit comments

Comments
 (0)