Skip to content

Commit d6721bf

Browse files
feat(cli): support mv command
fix: parallel respects block_size feat: lease command supports wait_sec
1 parent 101df2a commit d6721bf

File tree

5 files changed

+96
-19
lines changed

5 files changed

+96
-19
lines changed

taskqueue/aws_queue_api.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ def cancel_lease(self, rhandle):
116116
def release_all(self):
117117
raise NotImplementedError()
118118

119-
def _request(self, num_tasks, visibility_timeout):
119+
def lease(self, seconds, num_tasks=1, wait_sec=20):
120+
if wait_sec is None:
121+
wait_sec = 20
122+
120123
resp = self.sqs.receive_message(
121124
QueueUrl=self.qurl,
122125
AttributeNames=[
@@ -126,8 +129,8 @@ def _request(self, num_tasks, visibility_timeout):
126129
MessageAttributeNames=[
127130
'All'
128131
],
129-
VisibilityTimeout=visibility_timeout,
130-
WaitTimeSeconds=20,
132+
VisibilityTimeout=seconds,
133+
WaitTimeSeconds=wait_sec,
131134
)
132135

133136
if 'Messages' not in resp:
@@ -140,11 +143,6 @@ def _request(self, num_tasks, visibility_timeout):
140143
tasks.append(task)
141144
return tasks
142145

143-
def lease(self, seconds, num_tasks=1):
144-
if num_tasks > 1:
145-
raise ValueError("This library (not boto/SQS) only supports fetching one task at a time. Requested: {}.".format(num_tasks))
146-
return self._request(num_tasks, seconds)
147-
148146
def delete(self, task):
149147
if type(task) == str:
150148
rhandle = task
@@ -175,13 +173,13 @@ def purge(self):
175173

176174
while self.enqueued:
177175
# visibility_timeout must be > 0 for delete to work
178-
tasks = self._request(num_tasks=10, visibility_timeout=10)
176+
tasks = self.lease(num_tasks=10, seconds=10)
179177
for task in tasks:
180178
self.delete(task)
181179
return self
182180

183181
def __iter__(self):
184-
return iter(self._request(num_tasks=10, visibility_timeout=0))
182+
return iter(self.lease(num_tasks=10, seconds=0))
185183

186184

187185

taskqueue/file_queue_api.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import functools
33
import itertools
44
import json
5+
import math
56
import operator
67
import os.path
78
import random
@@ -320,7 +321,10 @@ def _lease_filename(self, filename, seconds):
320321

321322
return json.loads(read_file(new_filepath))
322323

323-
def lease(self, seconds, num_tasks):
324+
def lease(self, seconds, num_tasks, wait_sec=None):
325+
if wait_sec is None:
326+
wait_sec = 0
327+
324328
def fmt(direntry):
325329
filename = direntry.name
326330
timestamp, _ = filename.split('--')
@@ -354,7 +358,24 @@ def fmt(direntry):
354358
if lessee is not None:
355359
leases.append(lessee)
356360

357-
return leases
361+
wait_leases = []
362+
if wait_sec > 0 and len(leases) < num_tasks:
363+
# Add a constant b/c this will cascade into shorter and
364+
# shorter checks as wait_sec shrinks and we don't
365+
# want hundreds of workers to accidently synchronize
366+
sleep_amt = random.random() * (wait_sec + 1)
367+
368+
# but we still want to guarantee that wait_sec is not
369+
# exceeded.
370+
sleep_amt = min(sleep_amt, wait_sec)
371+
time.sleep(sleep_amt)
372+
wait_leases = self.lease(
373+
seconds,
374+
num_tasks - len(leases),
375+
wait_sec - sleep_amt
376+
)
377+
378+
return leases + wait_leases
358379

359380
@retry
360381
def delete(self, task):

taskqueue/taskqueue.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def insert(
183183
)
184184

185185
def insertfn(batch):
186-
return self.api.insert(batch, delay_seconds)
186+
return self.api.insert(batch, delay_seconds)
187187

188188
cts = schedule_jobs(
189189
fns=( partial(insertfn, batch) for batch in sip(bodies, batch_size) ),
@@ -219,7 +219,7 @@ def cancel(self, task):
219219
def release_all(self):
220220
return self.api.release_all()
221221

222-
def lease(self, seconds=600, num_tasks=1):
222+
def lease(self, seconds=600, num_tasks=1, wait_sec=None):
223223
"""
224224
Acquires a lease on the topmost N unowned tasks in the specified queue.
225225
Required query parameters: leaseSecs, numTasks
@@ -229,7 +229,7 @@ def lease(self, seconds=600, num_tasks=1):
229229
if seconds < 0:
230230
raise ValueError("lease seconds must be >= 0. Got: " + str(seconds))
231231

232-
tasks = self.api.lease(seconds, num_tasks)
232+
tasks = self.api.lease(seconds, num_tasks, wait_sec)
233233

234234
if not len(tasks):
235235
raise QueueEmptyError()
@@ -562,7 +562,7 @@ def capturing_soloprocess_upload(*args, **kwargs):
562562
ct = 0
563563
with tqdm(desc="Upload", total=total) as pbar:
564564
with pathos.pools.ProcessPool(parallel) as pool:
565-
for num_inserted in pool.imap(uploadfn, sip(tasks, 2000)):
565+
for num_inserted in pool.imap(uploadfn, sip(tasks, block_size)):
566566
pbar.update(num_inserted)
567567
ct += num_inserted
568568

taskqueue_cli/taskqueue_cli.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import click
44

5-
from taskqueue import TaskQueue, __version__
5+
from taskqueue import TaskQueue, __version__, QueueEmptyError
66
from taskqueue.lib import toabs
77
from taskqueue.paths import get_protocol
88

@@ -72,7 +72,8 @@ def cp(src, dest):
7272
process while a queue is being worked.
7373
7474
Currently sqs queues are not copiable,
75-
but you can copy an fq to sqs.
75+
but you can copy an fq to sqs. The mv
76+
command supports sqs queues.
7677
"""
7778
src = normalize_path(src)
7879
dest = normalize_path(dest)
@@ -82,5 +83,36 @@ def cp(src, dest):
8283

8384
tqd.insert(tqs)
8485

86+
@main.command()
87+
@click.argument("src")
88+
@click.argument("dest")
89+
def mv(src, dest):
90+
"""
91+
Moves the contents of a queue to another
92+
service or location. Do not run this
93+
process while a queue is being worked.
94+
95+
Moving an sqs queue to a file queue
96+
may result in duplicated tasks.
97+
"""
98+
src = normalize_path(src)
99+
dest = normalize_path(dest)
100+
101+
tqd = TaskQueue(dest)
102+
tqs = TaskQueue(src)
103+
104+
while True:
105+
try:
106+
tasks = tqs.lease(num_tasks=10, seconds=10)
107+
except QueueEmptyError:
108+
break
109+
110+
tqd.insert(tasks)
111+
tqs.delete(tasks)
112+
113+
114+
115+
116+
85117

86118

test/test_taskqueue.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
import pytest
1010

1111
import taskqueue
12-
from taskqueue import queueable, FunctionTask, RegisteredTask, TaskQueue, MockTask, PrintTask, LocalTaskQueue
12+
from taskqueue import (
13+
queueable, FunctionTask, RegisteredTask,
14+
TaskQueue, MockTask, PrintTask, LocalTaskQueue,
15+
QueueEmptyError
16+
)
1317
from taskqueue.paths import ExtractedPath, mkpath
1418
from taskqueue.queueables import totask
1519
from taskqueue.queueablefns import tofunc, UnregisteredFunctionError, func2task
@@ -154,6 +158,28 @@ def test_get(sqs, protocol):
154158
t = tq.lease()
155159
tq.delete(t)
156160

161+
def test_lease(sqs):
162+
path = getpath("sqs")
163+
tq = TaskQueue(path, n_threads=0)
164+
165+
n_inserts = 20
166+
tq.purge()
167+
tq.insert(( PrintTask(str(x)) for x in range(n_inserts) ))
168+
169+
tasks = tq.lease(num_tasks=10, wait_sec=0)
170+
assert len(tasks) == 10
171+
tq.delete(tasks)
172+
173+
tasks = tq.lease(num_tasks=10, wait_sec=0)
174+
assert len(tasks) == 10
175+
tq.delete(tasks)
176+
177+
try:
178+
tasks = tq.lease(num_tasks=10, wait_sec=0)
179+
assert False
180+
except QueueEmptyError:
181+
pass
182+
157183
@pytest.mark.parametrize('protocol', PROTOCOL)
158184
def test_single_threaded_insertion(sqs, protocol):
159185
path = getpath(protocol)

0 commit comments

Comments
 (0)