Skip to content

Commit 0c6e848

Browse files
feat: add retry to SQS (#39)
* feat: add retry to SQS * install: bump tenacity req for retry_if_not_exception_type
1 parent 2dec0ba commit 0c6e848

File tree

2 files changed

+37
-10
lines changed

2 files changed

+37
-10
lines changed

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ orjson
77
numpy>=1.13.1
88
pathos
99
pbr
10-
tenacity
10+
tenacity>=8.0.1
1111
tqdm
1212
requests>2,<3

taskqueue/aws_queue_api.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import types
44

55
import boto3
6+
import botocore.exceptions
67
import botocore.errorfactory
78

89
from .lib import toiter, sip, jsonify
@@ -11,8 +12,20 @@
1112
AWS_DEFAULT_REGION
1213
)
1314

15+
import tenacity
16+
1417
AWS_BATCH_SIZE = 10 # send_message_batch's max batch size is 10
1518

19+
class ClientSideError(Exception):
20+
pass
21+
22+
retry = tenacity.retry(
23+
reraise=True,
24+
stop=tenacity.stop_after_attempt(4),
25+
wait=tenacity.wait_random_exponential(0.5, 60.0),
26+
retry=tenacity.retry_if_not_exception_type(ClientSideError),
27+
)
28+
1629
class AWSTaskQueueAPI(object):
1730
def __init__(self, qurl, region_name=AWS_DEFAULT_REGION, **kwargs):
1831
"""
@@ -42,14 +55,18 @@ def __init__(self, qurl, region_name=AWS_DEFAULT_REGION, **kwargs):
4255
)
4356

4457
if self.qurl is None:
45-
try:
46-
self.qurl = self.sqs.get_queue_url(QueueName=qurl)["QueueUrl"]
47-
except Exception:
48-
print(qurl)
49-
raise
58+
self.qurl = self._get_qurl(qurl)
5059

5160
self.batch_size = AWS_BATCH_SIZE
5261

62+
@retry
63+
def _get_qurl(self, qurl):
64+
try:
65+
return self.sqs.get_queue_url(QueueName=qurl)["QueueUrl"]
66+
except Exception as err:
67+
print(f"Failed to fetch queue URL for: {qurl}")
68+
raise
69+
5370
@property
5471
def enqueued(self):
5572
status = self.status()
@@ -71,13 +88,15 @@ def leased(self):
7188
def is_empty():
7289
return self.enqueued == 0
7390

91+
@retry
7492
def status(self):
7593
resp = self.sqs.get_queue_attributes(
7694
QueueUrl=self.qurl,
7795
AttributeNames=['ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesNotVisible']
7896
)
7997
return resp['Attributes']
8098

99+
@retry
81100
def insert(self, tasks, delay_seconds=0):
82101
tasks = toiter(tasks)
83102

@@ -94,10 +113,18 @@ def insert(self, tasks, delay_seconds=0):
94113
} for j, task in enumerate(batch)
95114
]
96115

97-
resp = self.sqs.send_message_batch(
98-
QueueUrl=self.qurl,
99-
Entries=entries,
100-
)
116+
try:
117+
resp = self.sqs.send_message_batch(
118+
QueueUrl=self.qurl,
119+
Entries=entries,
120+
)
121+
except botocore.exceptions.ClientError as error:
122+
http_code = error.response['ResponseMetadata']['HTTPStatusCode']
123+
if 400 <= int(http_code) < 500:
124+
raise ClientSideError(error)
125+
else:
126+
raise error
127+
101128
total += len(entries)
102129

103130
return total

0 commit comments

Comments
 (0)