Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions dp3/task_processing/task_distributor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import contextlib
import faulthandler
import logging
import os
import queue
import sys
import threading
import time
from datetime import datetime
Expand All @@ -12,6 +16,8 @@
from ..common.callback_registrar import CallbackRegistrar
from .task_queue import TaskQueueReader, TaskQueueWriter

SHUTDOWN_TIME = 60.0 # Wait up to 60 seconds for workers to finish their tasks


class TaskDistributor:
"""
Expand Down Expand Up @@ -123,17 +129,35 @@ def stop(self) -> None:

# Signalize stop to worker threads
self.running = False
deadline_ts = time.monotonic() + SHUTDOWN_TIME

# Wait until all workers stopped
# Wait until all workers stopped (bounded)
for worker in self._worker_threads:
worker.join()
remaining = max(0.0, deadline_ts - time.monotonic())
worker.join(timeout=remaining)

self._task_queue_reader.disconnect()
self._task_queue_writer.disconnect()

alive = [w for w in self._worker_threads if w.is_alive()]
if alive:
self._dump_thread_stacks()
self.log.critical("Forcing shutdown with %d workers still alive", len(alive))
with contextlib.suppress(Exception):
logging.shutdown() # flush logs
os._exit(1) # nuke entire process

# Cleanup
self._worker_threads = []

def _dump_thread_stacks(self) -> None:
self.log.critical("=== Graceful shutdown failed, thread stack dump follows ===")
for worker in self._worker_threads:
if not worker.is_alive():
continue
self.log.error("Thread %s (ident=0x%x) still alive", worker.name, worker.ident)
faulthandler.dump_traceback(file=sys.stderr, all_threads=True)

def _distribute_task(self, msg_id, task: DataPointTask):
"""
Puts given task into local queue of the corresponding thread.
Expand Down