Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions celerytest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from config import CELERY_TEST_CONFIG, CELERY_TEST_CONFIG_MEMORY
from worker import CeleryWorkerThread
from .config import CELERY_TEST_CONFIG, CELERY_TEST_CONFIG_MEMORY
from .worker import CeleryWorkerThread


def setup_celery_worker(app, config=CELERY_TEST_CONFIG_MEMORY, concurrency=1):
conf = dict(CELERY_TEST_CONFIG.__dict__.items() + config.__dict__.items())
conf = dict(CELERY_TEST_CONFIG.__dict__.items())
conf.update(config.__dict__.items())
conf['CELERYD_CONCURRENCY'] = concurrency
app.config_from_object(conf)

Expand Down
7 changes: 6 additions & 1 deletion celerytest/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,9 @@ def test_scheduled_task(self):
t5 = time.time()
self.assertTrue(result.ready())

self.assertTrue(t5-t4 < self.overhead_time)
self.assertTrue(t5-t4 < self.overhead_time)

def test_wait_for_idle(self):
result = multiply.delay(2, 3)
self.worker.idle.wait()
self.assertTrue(result.ready())
32 changes: 17 additions & 15 deletions celerytest/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def __init__(self, app):
self.workers = []
self.consumers = []
self.monitor = CeleryMonitorThread(app)
signals.after_task_publish.connect(self.monitor.task_begin)
signals.task_postrun.connect(self.monitor.task_finished)

self.ready = threading.Event()
self.active = self.monitor.active
Expand Down Expand Up @@ -77,28 +79,28 @@ def __init__(self, app):
self.stop_requested = False

self.pending = 0
self.pending_lock = threading.Lock()
self.idle = threading.Event()
self.idle.set()
self.active = threading.Event()

def on_event(self, event):
# maintain state
self.state.event(event)

# only need to update state when something relevant to pending tasks is happening
check_states = ['task-received','task-started','task-succeeded','task-failed','task-revoked']
if not event['type'] in check_states:
return

active = len(self.immediate_pending_tasks) > 0

# switch signals if needed
if active and self.idle.is_set():
def task_begin(self, *args, **kwargs):
with self.pending_lock:
self.pending += 1
self.idle.clear()
self.active.set()
elif not active and self.active.is_set():
self.idle.set()
self.active.clear()

def task_finished(self, *args, **kwargs):
with self.pending_lock:
self.pending -= 1
if self.pending <= 0:
self.active.clear()
self.idle.set()

def on_event(self, event):
# maintain state
self.state.event(event)

@property
def pending_tasks(self):
Expand Down