diff --git a/celerytest/__init__.py b/celerytest/__init__.py index 9607ff5..52b74a6 100644 --- a/celerytest/__init__.py +++ b/celerytest/__init__.py @@ -1,9 +1,10 @@ -from config import CELERY_TEST_CONFIG, CELERY_TEST_CONFIG_MEMORY -from worker import CeleryWorkerThread +from .config import CELERY_TEST_CONFIG, CELERY_TEST_CONFIG_MEMORY +from .worker import CeleryWorkerThread def setup_celery_worker(app, config=CELERY_TEST_CONFIG_MEMORY, concurrency=1): - conf = dict(CELERY_TEST_CONFIG.__dict__.items() + config.__dict__.items()) + conf = dict(CELERY_TEST_CONFIG.__dict__.items()) + conf.update(config.__dict__.items()) conf['CELERYD_CONCURRENCY'] = concurrency app.config_from_object(conf) diff --git a/celerytest/tests.py b/celerytest/tests.py index 7fc4e22..0aabd8f 100644 --- a/celerytest/tests.py +++ b/celerytest/tests.py @@ -98,4 +98,9 @@ def test_scheduled_task(self): t5 = time.time() self.assertTrue(result.ready()) - self.assertTrue(t5-t4 < self.overhead_time) \ No newline at end of file + self.assertTrue(t5-t4 < self.overhead_time) + + def test_wait_for_idle(self): + result = multiply.delay(2, 3) + self.worker.idle.wait() + self.assertTrue(result.ready()) diff --git a/celerytest/worker.py b/celerytest/worker.py index 9000118..979eea9 100644 --- a/celerytest/worker.py +++ b/celerytest/worker.py @@ -17,6 +17,8 @@ def __init__(self, app): self.workers = [] self.consumers = [] self.monitor = CeleryMonitorThread(app) + signals.after_task_publish.connect(self.monitor.task_begin) + signals.task_postrun.connect(self.monitor.task_finished) self.ready = threading.Event() self.active = self.monitor.active @@ -77,28 +79,28 @@ def __init__(self, app): self.stop_requested = False self.pending = 0 + self.pending_lock = threading.Lock() self.idle = threading.Event() self.idle.set() self.active = threading.Event() - def on_event(self, event): - # maintain state - self.state.event(event) - # only need to update state when something relevant to pending tasks is happening - check_states = ['task-received','task-started','task-succeeded','task-failed','task-revoked'] - if not event['type'] in check_states: - return - - active = len(self.immediate_pending_tasks) > 0 - - # switch signals if needed - if active and self.idle.is_set(): + def task_begin(self, *args, **kwargs): + with self.pending_lock: + self.pending += 1 self.idle.clear() self.active.set() - elif not active and self.active.is_set(): - self.idle.set() - self.active.clear() + + def task_finished(self, *args, **kwargs): + with self.pending_lock: + self.pending -= 1 + if self.pending <= 0: + self.active.clear() + self.idle.set() + + def on_event(self, event): + # maintain state + self.state.event(event) @property def pending_tasks(self):