Skip to content

Commit 1c27c39

Browse files
refs #91: Add some more safety for BatchingMessageResolver and PrefetchingMessageRetriever
1 parent 08784e5 commit 1c27c39

File tree

2 files changed

+19
-21
lines changed

2 files changed

+19
-21
lines changed

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/resolver/batching/BatchingMessageResolver.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,20 @@ public CompletableFuture<?> resolveMessage(final Message message) {
8383
public void run() {
8484
boolean continueProcessing = true;
8585
while (continueProcessing) {
86-
final List<MessageResolutionBean> batchOfMessagesToResolve = new LinkedList<>();
8786
try {
88-
Queues.drain(messagesToBeResolved, batchOfMessagesToResolve, getBatchSize(), getBufferingTimeInMs(), TimeUnit.MILLISECONDS);
89-
} catch (final InterruptedException interruptedException) {
90-
// Do nothing, we still want to send the current batch of messages
91-
continueProcessing = false;
92-
}
93-
94-
if (!batchOfMessagesToResolve.isEmpty()) {
95-
submitMessageDeletionBatch(batchOfMessagesToResolve);
87+
final List<MessageResolutionBean> batchOfMessagesToResolve = new LinkedList<>();
88+
try {
89+
Queues.drain(messagesToBeResolved, batchOfMessagesToResolve, getBatchSize(), getBufferingTimeInMs(), TimeUnit.MILLISECONDS);
90+
} catch (final InterruptedException interruptedException) {
91+
// Do nothing, we still want to send the current batch of messages
92+
continueProcessing = false;
93+
}
94+
95+
if (!batchOfMessagesToResolve.isEmpty()) {
96+
submitMessageDeletionBatch(batchOfMessagesToResolve);
97+
}
98+
} catch (final Throwable throwable) {
99+
log.error("Exception thrown when retrieving messages", throwable);
96100
}
97101
}
98102
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/prefetch/PrefetchingMessageRetriever.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
import com.jashmore.sqs.QueueProperties;
99
import com.jashmore.sqs.aws.AwsConstants;
1010
import com.jashmore.sqs.retriever.AsyncMessageRetriever;
11+
import com.jashmore.sqs.util.properties.PropertyUtils;
1112
import com.jashmore.sqs.util.retriever.RetrieverUtils;
1213
import lombok.extern.slf4j.Slf4j;
1314
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
1415
import software.amazon.awssdk.services.sqs.model.Message;
1516
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
1617
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
1718

18-
import java.util.Optional;
1919
import java.util.concurrent.BlockingDeque;
2020
import java.util.concurrent.BlockingQueue;
2121
import java.util.concurrent.LinkedBlockingQueue;
@@ -181,16 +181,10 @@ private ReceiveMessageRequest buildReceiveMessageRequest() {
181181
@SuppressWarnings("Duplicates")
182182
@VisibleForTesting()
183183
int getBackoffTimeInMs() {
184-
return Optional.ofNullable(properties.getErrorBackoffTimeInMilliseconds())
185-
.filter(backoffPeriod -> {
186-
if (backoffPeriod < 0) {
187-
log.warn("Non-positive errorBackoffTimeInMilliseconds provided({}), using default({}) instead", backoffPeriod,
188-
DEFAULT_ERROR_BACKOFF_TIMEOUT_IN_MILLISECONDS);
189-
return false;
190-
}
191-
192-
return true;
193-
})
194-
.orElse(DEFAULT_ERROR_BACKOFF_TIMEOUT_IN_MILLISECONDS);
184+
return PropertyUtils.safelyGetPositiveIntegerValue(
185+
"errorBackoffTimeInMilliseconds",
186+
properties::getErrorBackoffTimeInMilliseconds,
187+
DEFAULT_ERROR_BACKOFF_TIMEOUT_IN_MILLISECONDS
188+
);
195189
}
196190
}

0 commit comments

Comments
 (0)