Skip to content

Commit 4c4cdf2

Browse files
feat: add purge command to cli, add status support to SQS
1 parent 4b1e56e commit 4c4cdf2

File tree

4 files changed

+32
-16
lines changed

4 files changed

+32
-16
lines changed

taskqueue/aws_queue_api.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,16 @@ def enqueued(self):
5757

5858
@property
5959
def inserted(self):
60-
raise NotImplementedError()
60+
return float('NaN')
6161

6262
@property
6363
def completed(self):
64-
raise NotImplementedError()
64+
return float('NaN')
6565

6666
@property
6767
def leased(self):
68-
raise NotImplementedError()
68+
status = self.status()
69+
return int(status['ApproximateNumberOfMessagesNotVisible'])
6970

7071
def is_empty():
7172
return self.enqueued == 0
@@ -165,18 +166,18 @@ def delete(self, task):
165166
def tally(self):
166167
pass
167168

168-
def purge(self):
169-
# This is more efficient, but it kept freezing
170-
# try:
171-
# self.sqs.purge_queue(QueueUrl=self.qurl)
169+
def purge(self, native=False):
170+
# can throw:
172171
# except botocore.errorfactory.PurgeQueueInProgress:
172+
if native:
173+
self.sqs.purge_queue(QueueUrl=self.qurl)
174+
return
173175

174176
while self.enqueued:
175177
# visibility_timeout must be > 0 for delete to work
176178
tasks = self.lease(num_tasks=10, seconds=10)
177179
for task in tasks:
178180
self.delete(task)
179-
return self
180181

181182
def __iter__(self):
182183
return iter(self.lease(num_tasks=10, seconds=0))

taskqueue/file_queue_api.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,9 @@ def tally(self):
416416
with open(self.completions_path, 'ba') as f:
417417
f.write(b'\0')
418418

419-
def purge(self):
419+
def purge(self, native=False):
420+
# native only has meaning for SQS for now
421+
# but we have to accept it as a parameter.
420422
all_files = itertools.chain(
421423
os.scandir(self.queue_path),
422424
os.scandir(self.movement_path)
@@ -427,6 +429,8 @@ def purge(self):
427429
except FileNotFoundError:
428430
pass
429431

432+
self.rezero()
433+
430434
def is_empty(self):
431435
try:
432436
first(os.scandir(self.queue_path))

taskqueue/taskqueue.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,10 +258,10 @@ def deltask(tid):
258258
green=self.green,
259259
)
260260

261-
def purge(self):
261+
def purge(self, native=False):
262262
"""Deletes all tasks in the queue."""
263263
try:
264-
return self.api.purge()
264+
return self.api.purge(native)
265265
except AttributeError:
266266
while True:
267267
lst = self.list()

taskqueue_cli/taskqueue_cli.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import math
23

34
import click
45
from tqdm import tqdm
@@ -44,13 +45,18 @@ def status(queuepath):
4445
enq = tq.enqueued
4546
comp = tq.completed
4647
leased = tq.leased
47-
print(f"Inserted: {ins}")
48+
49+
if not math.isnan(ins):
50+
print(f"Inserted: {ins}")
51+
4852
if ins > 0:
4953
print(f"Enqueued: {enq} ({enq / ins * 100:.1f}% left)")
50-
print(f"Completed: {comp} ({comp / ins * 100:.1f}%)")
54+
if not math.isnan(comp):
55+
print(f"Completed: {comp} ({comp / ins * 100:.1f}%)")
5156
else:
5257
print(f"Enqueued: {enq} (--% left)")
53-
print(f"Completed: {comp} (--%)")
58+
if not math.isnan(comp):
59+
print(f"Completed: {comp} (--%)")
5460

5561
if enq > 0:
5662
print(f"Leased: {leased} ({leased / enq * 100:.1f}% of queue)")
@@ -118,8 +124,13 @@ def mv(src, dest):
118124
tqs.delete(tasks)
119125
pbar.update(len(tasks))
120126

121-
122-
127+
@main.command()
128+
@click.argument("queuepath")
129+
def purge(queuepath):
130+
"""Delete all queued messages and zero out queue statistics."""
131+
queuepath = normalize_path(queuepath)
132+
tq = TaskQueue(queuepath)
133+
tq.purge()
123134

124135

125136

0 commit comments

Comments
 (0)