From eb5e7511bca0e63577654cb1c908a04175fdfd6a Mon Sep 17 00:00:00 2001 From: Roman Zimmermann Date: Sat, 14 Mar 2015 10:04:31 +0100 Subject: [PATCH 1/3] Python3 compatibility. RentMethod/celerytest#2 --- celerytest/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/celerytest/__init__.py b/celerytest/__init__.py index 9607ff5..52b74a6 100644 --- a/celerytest/__init__.py +++ b/celerytest/__init__.py @@ -1,9 +1,10 @@ -from config import CELERY_TEST_CONFIG, CELERY_TEST_CONFIG_MEMORY -from worker import CeleryWorkerThread +from .config import CELERY_TEST_CONFIG, CELERY_TEST_CONFIG_MEMORY +from .worker import CeleryWorkerThread def setup_celery_worker(app, config=CELERY_TEST_CONFIG_MEMORY, concurrency=1): - conf = dict(CELERY_TEST_CONFIG.__dict__.items() + config.__dict__.items()) + conf = dict(CELERY_TEST_CONFIG.__dict__.items()) + conf.update(config.__dict__.items()) conf['CELERYD_CONCURRENCY'] = concurrency app.config_from_object(conf) From 8f4c8e40243152490f2b4f2881c08b53d4414b98 Mon Sep 17 00:00:00 2001 From: Roman Zimmermann Date: Sun, 15 Mar 2015 11:02:39 +0100 Subject: [PATCH 2/3] Fix race-condition for idle and active events. RentMethod/celerytest#6 --- celerytest/worker.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/celerytest/worker.py b/celerytest/worker.py index 9000118..979eea9 100644 --- a/celerytest/worker.py +++ b/celerytest/worker.py @@ -17,6 +17,8 @@ def __init__(self, app): self.workers = [] self.consumers = [] self.monitor = CeleryMonitorThread(app) + signals.after_task_publish.connect(self.monitor.task_begin) + signals.task_postrun.connect(self.monitor.task_finished) self.ready = threading.Event() self.active = self.monitor.active @@ -77,28 +79,28 @@ def __init__(self, app): self.stop_requested = False self.pending = 0 + self.pending_lock = threading.Lock() self.idle = threading.Event() self.idle.set() self.active = threading.Event() - def on_event(self, event): - # maintain state - self.state.event(event) - # only need to update state when something relevant to pending tasks is happening - check_states = ['task-received','task-started','task-succeeded','task-failed','task-revoked'] - if not event['type'] in check_states: - return - - active = len(self.immediate_pending_tasks) > 0 - - # switch signals if needed - if active and self.idle.is_set(): + def task_begin(self, *args, **kwargs): + with self.pending_lock: + self.pending += 1 self.idle.clear() self.active.set() - elif not active and self.active.is_set(): - self.idle.set() - self.active.clear() + + def task_finished(self, *args, **kwargs): + with self.pending_lock: + self.pending -= 1 + if self.pending <= 0: + self.active.clear() + self.idle.set() + + def on_event(self, event): + # maintain state + self.state.event(event) @property def pending_tasks(self): From b8a033fc3268544cbecb7efe98c814765576c43a Mon Sep 17 00:00:00 2001 From: Roman Zimmermann Date: Mon, 18 May 2015 09:37:53 +0200 Subject: [PATCH 3/3] Test for issue #6. --- celerytest/tests.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/celerytest/tests.py b/celerytest/tests.py index 7fc4e22..0aabd8f 100644 --- a/celerytest/tests.py +++ b/celerytest/tests.py @@ -98,4 +98,9 @@ def test_scheduled_task(self): t5 = time.time() self.assertTrue(result.ready()) - self.assertTrue(t5-t4 < self.overhead_time) \ No newline at end of file + self.assertTrue(t5-t4 < self.overhead_time) + + def test_wait_for_idle(self): + result = multiply.delay(2, 3) + self.worker.idle.wait() + self.assertTrue(result.ready())