Skip to content

Commit 57bf533

Browse files
refs #111: allow the naming of the DLQ queue names
1 parent 37d6bb2 commit 57bf533

File tree

4 files changed

+181
-10
lines changed

4 files changed

+181
-10
lines changed

util/local-amazon-sqs/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@
3232
<groupId>org.slf4j</groupId>
3333
<artifactId>slf4j-api</artifactId>
3434
</dependency>
35+
36+
<dependency>
37+
<groupId>org.elasticmq</groupId>
38+
<artifactId>elasticmq-rest-sqs_2.11</artifactId>
39+
<version>0.13.9</version>
40+
<scope>test</scope>
41+
</dependency>
3542
</dependencies>
3643

3744
</project>

util/local-amazon-sqs/src/main/java/com/jashmore/sqs/util/LocalSqsAsyncClient.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.jashmore.sqs.util;
22

33
import static com.jashmore.sqs.util.SqsQueuesConfig.DEFAULT_SQS_SERVER_URL;
4+
import static com.jashmore.sqs.util.SqsQueuesConfig.QueueConfig.DEFAULT_MAX_RECEIVE_COUNT;
45

56
import com.google.common.collect.ImmutableMap;
67
import com.google.common.collect.Maps;
@@ -126,11 +127,14 @@ public void buildQueues() {
126127
attributesBuilder.put(QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(queueConfig.getVisibilityTimeout()));
127128
}
128129

129-
if (queueConfig.getMaxReceiveCount() != null) {
130-
final String deadLetterQueueArn = createDeadLetterQueue(queueConfig.getQueueName());
130+
if (queueConfig.getMaxReceiveCount() != null || queueConfig.getDeadLetterQueueName() != null) {
131+
final String deadLetterQueueName = Optional.ofNullable(queueConfig.getDeadLetterQueueName()).orElse(queueConfig.getQueueName() + "-dlq");
132+
final int maxReceiveCount = Optional.ofNullable(queueConfig.getMaxReceiveCount())
133+
.orElse(DEFAULT_MAX_RECEIVE_COUNT);
134+
final String deadLetterQueueArn = createDeadLetterQueue(deadLetterQueueName);
131135
attributesBuilder.put(
132136
QueueAttributeName.REDRIVE_POLICY,
133-
String.format("{\"deadLetterTargetArn\":\"%s\",\"maxReceiveCount\":\"%d\"}", deadLetterQueueArn, queueConfig.getMaxReceiveCount())
137+
String.format("{\"deadLetterTargetArn\":\"%s\",\"maxReceiveCount\":\"%d\"}", deadLetterQueueArn, maxReceiveCount)
134138
);
135139
}
136140

@@ -148,15 +152,13 @@ public void buildQueues() {
148152
/**
149153
* Create a Dead Letter Queue that should be used for the queue with the provided name.
150154
*
151-
* <p>This will create a queue with "-dlq" appended to the original queue's name.
152-
*
153155
* @param queueName the name of the queue that this dead letter queue is for
154156
* @return the queue ARN of the dead letter queue created
155157
*/
156158
private String createDeadLetterQueue(final String queueName) {
157159
try {
158-
log.debug("Creating local queue: {}-dlq", queueName);
159-
return delegate.createQueue((builder -> builder.queueName(queueName + "-dlq")))
160+
log.debug("Creating dead letter queue: {}", queueName);
161+
return delegate.createQueue((builder -> builder.queueName(queueName)))
160162
.thenCompose(createQueueResponse -> delegate.getQueueAttributes(builder -> builder
161163
.queueUrl(createQueueResponse.queueUrl())
162164
.attributeNames(QueueAttributeName.QUEUE_ARN))

util/local-amazon-sqs/src/main/java/com/jashmore/sqs/util/SqsQueuesConfig.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,31 @@ public class SqsQueuesConfig {
3434
@Value
3535
@Builder
3636
public static class QueueConfig {
37+
public static final int DEFAULT_MAX_RECEIVE_COUNT = 3;
3738

3839
/**
3940
* The name of the queue.
4041
*/
4142
private final String queueName;
4243

44+
/**
45+
* The name of the dead letter queue that should be created and linked to the queue.
46+
*
47+
* <p>If this value is null a dead letter queue will not be created. However if {@link #maxReceiveCount} is not null than a dead letter queue will
48+
* be created with a default name of "{queueName}-dlq".
49+
*/
50+
private final String deadLetterQueueName;
51+
4352
/**
4453
* The amount of time messages should be visible before putting back onto the queue.
4554
*/
4655
private final Integer visibilityTimeout;
4756

4857
/**
49-
* The amount of times that a message can be retrieved before it should be plcaed into the Dead Letter Queue.
58+
* The amount of times that a message can be retrieved before it should be placed into the Dead Letter Queue.
5059
*
51-
* <p>If this value is non null a dead letter queue will be created and a redrive policy linked to it with this max receive count. If this value is null
52-
* no dead letter queue will be created and attached.
60+
* <p>If this value is non null a dead letter queue with a name of {@link #deadLetterQueueName} or "{queueName}-dlq" will be created and a
61+
* re-drive policy linked to it with this max receive count.
5362
*/
5463
private final Integer maxReceiveCount;
5564
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package com.jashmore.sqs.util;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import akka.http.scaladsl.Http;
6+
import org.elasticmq.rest.sqs.SQSRestServer;
7+
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
8+
import org.junit.Before;
9+
import org.junit.Rule;
10+
import org.junit.Test;
11+
import org.mockito.junit.MockitoJUnit;
12+
import org.mockito.junit.MockitoRule;
13+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
14+
import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
15+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
16+
17+
public class LocalSqsAsyncClientTest {
18+
@Rule
19+
public MockitoRule mockitoRule = MockitoJUnit.rule();
20+
21+
private String queueServerUrl;
22+
23+
@Before
24+
public void setUp() {
25+
final SQSRestServer sqsRestServer = SQSRestServerBuilder
26+
.withInterface("localhost")
27+
.withDynamicPort()
28+
.start();
29+
30+
final Http.ServerBinding serverBinding = sqsRestServer.waitUntilStarted();
31+
queueServerUrl = "http://localhost:" + serverBinding.localAddress().getPort();
32+
}
33+
34+
@Test
35+
public void whenBuildingLocalSqsAsyncClientQueuesAndDlqsWillBeCreatedAutomatically() throws Exception {
36+
// arrange
37+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
38+
.sqsServerUrl(queueServerUrl)
39+
.queue(SqsQueuesConfig.QueueConfig.builder()
40+
.queueName("queueName")
41+
.deadLetterQueueName("queueNameDlq")
42+
.build())
43+
.queue(SqsQueuesConfig.QueueConfig.builder()
44+
.queueName("queueName2")
45+
.deadLetterQueueName("queueName2Dlq")
46+
.build())
47+
.build();
48+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
49+
50+
// act
51+
sqsAsyncClient.buildQueues();
52+
53+
// assert
54+
final ListQueuesResponse listQueuesResponse = sqsAsyncClient.listQueues().get();
55+
assertThat(listQueuesResponse.queueUrls()).hasSize(4);
56+
}
57+
58+
@Test
59+
public void whenDeadLetterQueueIsBuiltItIsLinkedToCorrespondingQueue() throws Exception {
60+
// arrange
61+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
62+
.sqsServerUrl(queueServerUrl)
63+
.queue(SqsQueuesConfig.QueueConfig.builder()
64+
.queueName("queueName")
65+
.deadLetterQueueName("queueNameDlq")
66+
.build())
67+
.build();
68+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
69+
70+
// act
71+
sqsAsyncClient.buildQueues();
72+
73+
// assert
74+
final String reDrivePolicy = sqsAsyncClient.getQueueAttributes(GetQueueAttributesRequest.builder()
75+
.queueUrl(queueServerUrl + "/queue/queueName")
76+
.attributeNames(QueueAttributeName.REDRIVE_POLICY)
77+
.build())
78+
.thenApply(getQueueAttributesResponse -> getQueueAttributesResponse.attributes().get(QueueAttributeName.REDRIVE_POLICY))
79+
.get();
80+
assertThat(reDrivePolicy).contains("queueNameDlq");
81+
}
82+
83+
@Test
84+
public void whenMaxReceiveCountUsedAndNoDeadLetterQueueNameIsIncludedTheDefaultNameIsused() throws Exception {
85+
// arrange
86+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
87+
.sqsServerUrl(queueServerUrl)
88+
.queue(SqsQueuesConfig.QueueConfig.builder()
89+
.queueName("queueName")
90+
.maxReceiveCount(3)
91+
.build())
92+
.build();
93+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
94+
95+
// act
96+
sqsAsyncClient.buildQueues();
97+
98+
// assert
99+
final ListQueuesResponse listQueuesResponse = sqsAsyncClient.listQueues().get();
100+
assertThat(listQueuesResponse.queueUrls()).hasSize(2);
101+
assertThat(listQueuesResponse.queueUrls()).contains(queueServerUrl + "/queue/queueName-dlq");
102+
}
103+
104+
@Test
105+
public void maxReceiveCountIsIncludedInQueue() throws Exception {
106+
// arrange
107+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
108+
.sqsServerUrl(queueServerUrl)
109+
.queue(SqsQueuesConfig.QueueConfig.builder()
110+
.queueName("queueName")
111+
.maxReceiveCount(3)
112+
.build())
113+
.build();
114+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
115+
116+
// act
117+
sqsAsyncClient.buildQueues();
118+
119+
// assert
120+
final String reDrivePolicy = sqsAsyncClient.getQueueAttributes(GetQueueAttributesRequest.builder()
121+
.queueUrl(queueServerUrl + "/queue/queueName")
122+
.attributeNames(QueueAttributeName.REDRIVE_POLICY)
123+
.build())
124+
.thenApply(getQueueAttributesResponse -> getQueueAttributesResponse.attributes().get(QueueAttributeName.REDRIVE_POLICY))
125+
.get();
126+
assertThat(reDrivePolicy).contains("\"maxReceiveCount\":3");
127+
}
128+
129+
@Test
130+
public void visibilityTimeoutIsIncludedInQueueWhenBuilt() throws Exception {
131+
// arrange
132+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
133+
.sqsServerUrl(queueServerUrl)
134+
.queue(SqsQueuesConfig.QueueConfig.builder()
135+
.queueName("queueName")
136+
.visibilityTimeout(60)
137+
.build())
138+
.build();
139+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
140+
141+
// act
142+
sqsAsyncClient.buildQueues();
143+
144+
// assert
145+
final String visibilityTimeout = sqsAsyncClient.getQueueAttributes(GetQueueAttributesRequest.builder()
146+
.queueUrl(queueServerUrl + "/queue/queueName")
147+
.attributeNames(QueueAttributeName.VISIBILITY_TIMEOUT)
148+
.build())
149+
.thenApply(getQueueAttributesResponse -> getQueueAttributesResponse.attributes().get(QueueAttributeName.VISIBILITY_TIMEOUT))
150+
.get();
151+
assertThat(visibilityTimeout).contains("60");
152+
}
153+
}

0 commit comments

Comments
 (0)