diff --git a/dp3/task_processing/task_distributor.py b/dp3/task_processing/task_distributor.py index 537f45c7..e7f86f48 100644 --- a/dp3/task_processing/task_distributor.py +++ b/dp3/task_processing/task_distributor.py @@ -1,5 +1,9 @@ +import contextlib +import faulthandler import logging +import os import queue +import sys import threading import time from datetime import datetime @@ -12,6 +16,8 @@ from ..common.callback_registrar import CallbackRegistrar from .task_queue import TaskQueueReader, TaskQueueWriter +SHUTDOWN_TIME = 60.0 # Wait up to 60 seconds for workers to finish their tasks + class TaskDistributor: """ @@ -123,17 +129,35 @@ def stop(self) -> None: # Signalize stop to worker threads self.running = False + deadline_ts = time.monotonic() + SHUTDOWN_TIME - # Wait until all workers stopped + # Wait until all workers stopped (bounded) for worker in self._worker_threads: - worker.join() + remaining = max(0.0, deadline_ts - time.monotonic()) + worker.join(timeout=remaining) self._task_queue_reader.disconnect() self._task_queue_writer.disconnect() + alive = [w for w in self._worker_threads if w.is_alive()] + if alive: + self._dump_thread_stacks() + self.log.critical("Forcing shutdown with %d workers still alive", len(alive)) + with contextlib.suppress(Exception): + logging.shutdown() # flush logs + os._exit(1) # nuke entire process + # Cleanup self._worker_threads = [] + def _dump_thread_stacks(self) -> None: + self.log.critical("=== Graceful shutdown failed, thread stack dump follows ===") + for worker in self._worker_threads: + if not worker.is_alive(): + continue + self.log.error("Thread %s (ident=0x%x) still alive", worker.name, worker.ident) + faulthandler.dump_traceback(file=sys.stderr, all_threads=True) + def _distribute_task(self, msg_id, task: DataPointTask): """ Puts given task into local queue of the corresponding thread.