Skip to content

Commit c5d2f5e

Browse files
Merge pull request #120 from JaidenAshmore/issue/119_handles_sdk_interrupted_exceptions_in_message_retrievers
refs 119: handles Interruptions when retrieving messages as they don't send InterruptedExceptions
2 parents 346613a + 6b9ca2e commit c5d2f5e

File tree

4 files changed

+76
-1
lines changed

4 files changed

+76
-1
lines changed

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/batching/BatchingMessageRetriever.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import com.jashmore.sqs.util.retriever.RetrieverUtils;
1515
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1616
import lombok.extern.slf4j.Slf4j;
17+
import software.amazon.awssdk.core.exception.SdkClientException;
18+
import software.amazon.awssdk.core.exception.SdkInterruptedException;
1719
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
1820
import software.amazon.awssdk.services.sqs.model.Message;
1921
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
@@ -139,11 +141,25 @@ public void run() {
139141
for (final Message message : response.messages()) {
140142
messagesDownloaded.put(message);
141143
}
142-
} catch (InterruptedException interruptedException) {
144+
} catch (final InterruptedException interruptedException) {
143145
log.debug("Thread interrupted while placing messages on internal queue");
144146
break;
145147
}
146148
} catch (final ExecutionException | RuntimeException exception) {
149+
// Supposedly the SqsAsyncClient can get interrupted and this will remove the interrupted status from the thread and then wrap it
150+
// in it's own version of the interrupted exception...If this happens when the retriever is being shut down it will keep on processing
151+
// because it does not realise it is being shut down, therefore we have to check for this and quit if necessary
152+
if (exception instanceof ExecutionException) {
153+
final Throwable executionExceptionCause = exception.getCause();
154+
if (executionExceptionCause instanceof SdkClientException) {
155+
if (executionExceptionCause.getCause() instanceof SdkInterruptedException) {
156+
log.debug("Thread interrupted while receiving messages");
157+
break;
158+
}
159+
}
160+
}
161+
162+
log.error("Exception thrown when retrieving messages", exception);
147163
try {
148164
final long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds();
149165
log.error("Error thrown while organising threads to process messages. Backing off for {}ms", errorBackoffTimeInMilliseconds, exception);

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/prefetch/PrefetchingMessageRetriever.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import com.jashmore.sqs.util.properties.PropertyUtils;
1212
import com.jashmore.sqs.util.retriever.RetrieverUtils;
1313
import lombok.extern.slf4j.Slf4j;
14+
import software.amazon.awssdk.core.exception.SdkClientException;
15+
import software.amazon.awssdk.core.exception.SdkInterruptedException;
1416
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
1517
import software.amazon.awssdk.services.sqs.model.Message;
1618
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
@@ -134,6 +136,19 @@ public void run() {
134136
break;
135137
}
136138
} catch (final ExecutionException | RuntimeException exception) {
139+
// Supposedly the SqsAsyncClient can get interrupted and this will remove the interrupted status from the thread and then wrap it
140+
// in it's own version of the interrupted exception...If this happens when the retriever is being shut down it will keep on processing
141+
// because it does not realise it is being shut down, therefore we have to check for this and quit if necessary
142+
if (exception instanceof ExecutionException) {
143+
final Throwable executionExceptionCause = exception.getCause();
144+
if (executionExceptionCause instanceof SdkClientException) {
145+
if (executionExceptionCause.getCause() instanceof SdkInterruptedException) {
146+
log.debug("Thread interrupted receiving messages");
147+
break;
148+
}
149+
}
150+
}
151+
137152
log.error("Exception thrown when retrieving messages", exception);
138153

139154
try {

java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/retriever/batching/BatchingMessageRetrieverTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.Mockito.doThrow;
56
import static org.mockito.Mockito.never;
67
import static org.mockito.Mockito.times;
78
import static org.mockito.Mockito.verify;
@@ -19,6 +20,8 @@
1920
import org.mockito.Mock;
2021
import org.mockito.junit.MockitoJUnit;
2122
import org.mockito.junit.MockitoRule;
23+
import software.amazon.awssdk.core.exception.SdkClientException;
24+
import software.amazon.awssdk.core.exception.SdkInterruptedException;
2225
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2326
import software.amazon.awssdk.services.sqs.model.Message;
2427
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
@@ -559,6 +562,25 @@ public void whenEnoughThreadsRequestMessagesTheRetrievalOfMessagesWillTrigger()
559562
assertThat(actualMessage).isEqualTo(message);
560563
}
561564

565+
@Test
566+
public void whenSqsAsyncClientThrowsSdkInterruptedExceptionTheBackgroundThreadShouldStop() throws Exception {
567+
// arrange
568+
final int threadsRequestingMessages = DEFAULT_PROPERTIES.getNumberOfThreadsWaitingTrigger();
569+
final BatchingMessageRetriever backgroundThread = new BatchingMessageRetriever(QUEUE_PROPERTIES, sqsAsyncClient,
570+
DEFAULT_PROPERTIES, new AtomicInteger(threadsRequestingMessages), new LinkedBlockingQueue<>(), new Object());
571+
572+
doThrow(new ExecutionException(SdkClientException.builder().cause(new SdkInterruptedException()).build())).when(responseThrowingException).get();
573+
when(sqsAsyncClient.receiveMessage(any(ReceiveMessageRequest.class)))
574+
.thenReturn(responseThrowingException);
575+
576+
// act
577+
backgroundThread.run();
578+
579+
// assert
580+
final ArgumentCaptor<ReceiveMessageRequest> receiveMessageRequestArgumentCaptor = ArgumentCaptor.forClass(ReceiveMessageRequest.class);
581+
verify(sqsAsyncClient).receiveMessage(receiveMessageRequestArgumentCaptor.capture());
582+
}
583+
562584
private void responseThrowingException(final CompletableFuture<ReceiveMessageResponse> mockResponse,
563585
final Throwable throwable) {
564586
try {

java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/retriever/prefetch/PrefetchingMessageRetrieverTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.mockito.Mock;
2121
import org.mockito.junit.MockitoJUnit;
2222
import org.mockito.junit.MockitoRule;
23+
import software.amazon.awssdk.core.exception.SdkClientException;
24+
import software.amazon.awssdk.core.exception.SdkInterruptedException;
2325
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2426
import software.amazon.awssdk.services.sqs.model.Message;
2527
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
@@ -631,6 +633,26 @@ public void cancellingRetrieverDuringBackoffWillNotRequestMoreMessages() throws
631633
verify(sqsAsyncClient, times(1)).receiveMessage(any(ReceiveMessageRequest.class));
632634
}
633635

636+
@Test
637+
public void whenSqsAsyncClientThrowsSdkInterruptedExceptionTheBackgroundThreadShouldStop() throws Exception {
638+
// arrange
639+
final LinkedBlockingQueue<Message> internalMessageQueue = new LinkedBlockingQueue<>();
640+
internalMessageQueue.add(Message.builder().build());
641+
doThrow(new ExecutionException(SdkClientException.builder().cause(new SdkInterruptedException()).build())).when(responseThrowingException).get();
642+
when(sqsAsyncClient.receiveMessage(any(ReceiveMessageRequest.class)))
643+
.thenReturn(responseThrowingException);
644+
final PrefetchingMessageRetriever backgroundMessagePrefetcher
645+
= new PrefetchingMessageRetriever(sqsAsyncClient, QUEUE_PROPERTIES, DEFAULT_PREFETCHING_PROPERTIES,
646+
internalMessageQueue, 5);
647+
648+
// act
649+
backgroundMessagePrefetcher.run();
650+
651+
// assert
652+
final ArgumentCaptor<ReceiveMessageRequest> receiveMessageRequestArgumentCaptor = ArgumentCaptor.forClass(ReceiveMessageRequest.class);
653+
verify(sqsAsyncClient).receiveMessage(receiveMessageRequestArgumentCaptor.capture());
654+
}
655+
634656
private void responseThrowingException(final CompletableFuture<ReceiveMessageResponse> mockResponse,
635657
final Throwable throwable) {
636658
try {

0 commit comments

Comments
 (0)