From 7737ac150f9fc43ed64f7219149f0cf3c5a96bbe Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Wed, 30 Apr 2025 18:12:51 +0200 Subject: [PATCH 1/9] Stress test for cluster scaling Scaling can fail when discrepancies between Cluster.scheduler_info and delayed incoming messages develop: The delayed message references a worker that is not in scheduler_info anymore since that has been updated by wait_for_workers(). --- distributed/tests/test_stress.py | 39 +++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 565f3d97922..035591bd2eb 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -5,13 +5,14 @@ from contextlib import suppress from operator import add from time import sleep +import copy import pytest from tlz import concat, sliding_window from dask import delayed -from distributed import Client, Nanny, Worker, wait +from distributed import Client, Nanny, Worker, wait, SpecCluster from distributed.chaos import KillWorker from distributed.compatibility import WINDOWS from distributed.metrics import time @@ -337,3 +338,39 @@ async def test_chaos_rechunk(c, s, *workers): await asyncio.sleep(0.1) await z.cancel() + + +@pytest.mark.slow +def test_stress_scale(): + for n in range(30): + cluster_kwargs = {} + client_kwargs = { + 'set_as_default': False, + } + spec = {} + template = {'cls': Nanny} + N = 24 + for i in range(N): + w = spec[f'worker-{i}'] = copy.copy(template) + cluster = SpecCluster( + workers=spec, + worker=template, # <- template for newly scaled up workers + **cluster_kwargs, + ) + client = Client(cluster, **client_kwargs) + client.wait_for_workers(N) + + # scale down: + print("down") + client.cluster.scale(n=1) + + # scale up again: + print("up") + # client.cluster.worker_spec = copy.copy(spec) + client.cluster.scale(n=len(spec)) + client.wait_for_workers(len(spec)) + + # shutdown: + print("shutdown") + client.close() + client.cluster.close(timeout=30) From 87c7d78d24036c1fb3bb69d09bc1faedbb37adfc Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Thu, 8 May 2025 12:41:22 +0200 Subject: [PATCH 2/9] Make triggering the error more likely Monkeypatch a slight delay and opening to run other async tasks into worker status message processing. In tests, this made it much more likely to trigger the error: Usually first time with five workers on a 24 core EPYC. That means hopefully much faster and no repetitions needed. To be confirmed on a quad core laptop! --- distributed/tests/test_stress.py | 56 ++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 035591bd2eb..371ac994a47 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -12,7 +12,7 @@ from dask import delayed -from distributed import Client, Nanny, Worker, wait, SpecCluster +from distributed import Client, Nanny, Scheduler, SpecCluster, Worker, wait from distributed.chaos import KillWorker from distributed.compatibility import WINDOWS from distributed.metrics import time @@ -341,22 +341,44 @@ async def test_chaos_rechunk(c, s, *workers): @pytest.mark.slow -def test_stress_scale(): - for n in range(30): - cluster_kwargs = {} - client_kwargs = { - 'set_as_default': False, - } - spec = {} - template = {'cls': Nanny} - N = 24 - for i in range(N): - w = spec[f'worker-{i}'] = copy.copy(template) - cluster = SpecCluster( - workers=spec, - worker=template, # <- template for newly scaled up workers - **cluster_kwargs, - ) +def test_stress_scale(monkeypatch): + cluster_kwargs = {} + client_kwargs = { + "set_as_default": False, + } + # No idea how else to handle contestion of port 8787 + scheduler_spec = { + "cls": Scheduler, + "options": {"dashboard": False, "dashboard_address": 9876}, + } + spec = {} + template = {"cls": Nanny} + N = 5 + for i in range(N): + w = spec[f"worker-{i}"] = copy.copy(template) + with SpecCluster( + scheduler=scheduler_spec, + workers=spec, + worker=template, # <- template for newly scaled up workers + **cluster_kwargs, + ) as cluster: + # Introduce a delay in worker status message processing and allow + # other async code to run in the meantime by monkeypatching the + # read() function with an asyncio.sleep(). This slight delay greatly + # increases the likelihood of discrepancies in worker inventory + # tracking.The chosen time is an empirical compromise between enough + # delay to cause discrepancies and not too much delay so that + # messages still arrive in time. + comm = cluster._watch_worker_status_comm + old_read = comm.read + + async def new_read(): + res = await old_read() + await asyncio.sleep(0.2) + return res + + monkeypatch.setattr(comm, "read", new_read) + client = Client(cluster, **client_kwargs) client.wait_for_workers(N) From ce115464a5da579f2db2bc321c9b2e47373544be Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Fri, 9 May 2025 10:34:21 +0200 Subject: [PATCH 3/9] Prevent lockup For some reason the context manager may lock up the test run. This way it terminates. Confirmed that the current test is likely to trigger the issue also on a quad-core laptop. --- distributed/tests/test_stress.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 371ac994a47..8028cb2d026 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -356,12 +356,13 @@ def test_stress_scale(monkeypatch): N = 5 for i in range(N): w = spec[f"worker-{i}"] = copy.copy(template) - with SpecCluster( - scheduler=scheduler_spec, - workers=spec, - worker=template, # <- template for newly scaled up workers - **cluster_kwargs, - ) as cluster: + try: + cluster = SpecCluster( + scheduler=scheduler_spec, + workers=spec, + worker=template, # <- template for newly scaled up workers + **cluster_kwargs, + ) # Introduce a delay in worker status message processing and allow # other async code to run in the meantime by monkeypatching the # read() function with an asyncio.sleep(). This slight delay greatly @@ -395,4 +396,5 @@ async def new_read(): # shutdown: print("shutdown") client.close() + finally: client.cluster.close(timeout=30) From f472ab782e54f4512040c08e0ba8a9b05f30e3e7 Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Wed, 30 Apr 2025 17:35:59 +0200 Subject: [PATCH 4/9] Fix race condition between messages and scheduler info SpecCluster.scheduler_info might be updated by Cluster._wait_for_workers() while messages are in flight. When scaling down this can lead to a KeyError upon trying to get the worker's name from scheduler_info if that worker is not present in scheduler_info anymore. As a fix, this sends both worker key and name in the message queue so that the name doesn't need to be looked up. --- distributed/deploy/cluster.py | 3 ++- distributed/deploy/spec.py | 4 ++-- distributed/scheduler.py | 6 +++++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 233fa0d969c..6b84d9b027a 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -252,7 +252,8 @@ def _update_worker_status(self, op, msg): self.scheduler_info["workers"].update(workers) self.scheduler_info.update(msg) elif op == "remove": - del self.scheduler_info["workers"][msg] + worker = msg['worker'] + del self.scheduler_info["workers"][worker] else: # pragma: no cover raise ValueError("Invalid op", op, msg) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 7da310a2e61..6b4cd4637b8 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -397,12 +397,12 @@ async def _correct_state_internal(self) -> None: def _update_worker_status(self, op, msg): if op == "remove": - name = self.scheduler_info["workers"][msg]["name"] + name = msg['name'] def f(): if ( name in self.workers - and msg not in self.scheduler_info["workers"] + and msg['worker'] not in self.scheduler_info["workers"] and not any( d["name"] == name for d in self.scheduler_info["workers"].values() diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 64c943f0908..f5b082a68a0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -9412,7 +9412,11 @@ def add_worker(self, scheduler: Scheduler, worker: str) -> None: def remove_worker(self, scheduler: Scheduler, worker: str, **kwargs: Any) -> None: try: - self.bcomm.send(["remove", worker]) + msg = { + 'worker': worker, + 'name': scheduler.workers[worker].name, + } + self.bcomm.send(["remove", msg]) except CommClosedError: scheduler.remove_plugin(name=self.name) From 45efc069171c16d5cb4fe093220d3138d8a08508 Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Wed, 30 Apr 2025 18:04:31 +0200 Subject: [PATCH 5/9] Worker is already removed by parent function at this point TODO: Fully understand impact of adding the name parameter on other functions. --- distributed/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f5b082a68a0..43030f5ac9f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5563,7 +5563,7 @@ async def remove_worker( try: try: result = plugin.remove_worker( - scheduler=self, worker=address, stimulus_id=stimulus_id + scheduler=self, worker=address, name=ws.name, stimulus_id=stimulus_id ) except TypeError: parameters = inspect.signature(plugin.remove_worker).parameters @@ -9410,11 +9410,11 @@ def add_worker(self, scheduler: Scheduler, worker: str) -> None: except CommClosedError: scheduler.remove_plugin(name=self.name) - def remove_worker(self, scheduler: Scheduler, worker: str, **kwargs: Any) -> None: + def remove_worker(self, scheduler: Scheduler, worker: str, name: str, **kwargs: Any) -> None: try: msg = { 'worker': worker, - 'name': scheduler.workers[worker].name, + 'name': name, } self.bcomm.send(["remove", msg]) except CommClosedError: From 8f98e4e10ce55ff31ce1052e108769a9d1492d0a Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Wed, 30 Apr 2025 18:30:36 +0200 Subject: [PATCH 6/9] Avoid KeyError del raises KeyError, two argument pop not. --- distributed/deploy/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 6b84d9b027a..d11d5849522 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -253,7 +253,7 @@ def _update_worker_status(self, op, msg): self.scheduler_info.update(msg) elif op == "remove": worker = msg['worker'] - del self.scheduler_info["workers"][worker] + self.scheduler_info["workers"].pop(worker, None) else: # pragma: no cover raise ValueError("Invalid op", op, msg) From a0d310aaf593f84dfd3ae6348b61789b0c0ec8cd Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Wed, 30 Apr 2025 18:55:43 +0200 Subject: [PATCH 7/9] Hopefully make linters happier --- distributed/deploy/spec.py | 4 ++-- distributed/diagnostics/plugin.py | 2 +- distributed/scheduler.py | 8 +++++--- distributed/tests/test_stress.py | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 6b4cd4637b8..113847a2d0c 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -397,12 +397,12 @@ async def _correct_state_internal(self) -> None: def _update_worker_status(self, op, msg): if op == "remove": - name = msg['name'] + name = msg["name"] def f(): if ( name in self.workers - and msg['worker'] not in self.scheduler_info["workers"] + and msg["worker"] not in self.scheduler_info["workers"] and not any( d["name"] == name for d in self.scheduler_info["workers"].values() diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 2c74e494e6a..f7bdacc5221 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -190,7 +190,7 @@ def add_worker(self, scheduler: Scheduler, worker: str) -> None | Awaitable[None """ def remove_worker( - self, scheduler: Scheduler, worker: str, *, stimulus_id: str, **kwargs: Any + self, scheduler: Scheduler, worker: str, name: str, *, stimulus_id: str, **kwargs: Any ) -> None | Awaitable[None]: """Run when a worker leaves the cluster diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 43030f5ac9f..2dfd6b92aa8 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -9410,11 +9410,13 @@ def add_worker(self, scheduler: Scheduler, worker: str) -> None: except CommClosedError: scheduler.remove_plugin(name=self.name) - def remove_worker(self, scheduler: Scheduler, worker: str, name: str, **kwargs: Any) -> None: + def remove_worker( + self, scheduler: Scheduler, worker: str, name: str, **kwargs: Any + ) -> None: try: msg = { - 'worker': worker, - 'name': name, + "worker": worker, + "name": name, } self.bcomm.send(["remove", msg]) except CommClosedError: diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 8028cb2d026..ac0eba4a683 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -1,11 +1,11 @@ from __future__ import annotations import asyncio +import copy import random from contextlib import suppress from operator import add from time import sleep -import copy import pytest from tlz import concat, sliding_window From aa70d524f7682906104f70b8932f22a982be3b79 Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Mon, 5 May 2025 10:04:28 +0200 Subject: [PATCH 8/9] Update distributed/diagnostics/plugin.py Co-authored-by: Jacob Tomlinson --- distributed/diagnostics/plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index f7bdacc5221..fe1daa3314f 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -190,7 +190,7 @@ def add_worker(self, scheduler: Scheduler, worker: str) -> None | Awaitable[None """ def remove_worker( - self, scheduler: Scheduler, worker: str, name: str, *, stimulus_id: str, **kwargs: Any + self, scheduler: Scheduler, worker: str, *, name: str, stimulus_id: str, **kwargs: Any ) -> None | Awaitable[None]: """Run when a worker leaves the cluster From b9401b50236f39c9e6952fe57cecb8cda7911fa9 Mon Sep 17 00:00:00 2001 From: Dieter Weber Date: Thu, 8 May 2025 12:39:57 +0200 Subject: [PATCH 9/9] Make pre-commit happy mypy and black mostly --- distributed/deploy/cluster.py | 2 +- distributed/diagnostics/plugin.py | 10 ++++++++-- distributed/scheduler.py | 9 ++++++--- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index d11d5849522..11391df2b6d 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -252,7 +252,7 @@ def _update_worker_status(self, op, msg): self.scheduler_info["workers"].update(workers) self.scheduler_info.update(msg) elif op == "remove": - worker = msg['worker'] + worker = msg["worker"] self.scheduler_info["workers"].pop(worker, None) else: # pragma: no cover raise ValueError("Invalid op", op, msg) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index fe1daa3314f..2e37e36afea 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -11,7 +11,7 @@ import tempfile import uuid import zipfile -from collections.abc import Awaitable +from collections.abc import Awaitable, Hashable from typing import TYPE_CHECKING, Any, Callable, ClassVar from dask.typing import Key @@ -190,7 +190,13 @@ def add_worker(self, scheduler: Scheduler, worker: str) -> None | Awaitable[None """ def remove_worker( - self, scheduler: Scheduler, worker: str, *, name: str, stimulus_id: str, **kwargs: Any + self, + scheduler: Scheduler, + worker: str, + *, + name: Hashable, + stimulus_id: str, + **kwargs: Any, ) -> None | Awaitable[None]: """Run when a worker leaves the cluster diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2dfd6b92aa8..642d897f98c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5563,7 +5563,10 @@ async def remove_worker( try: try: result = plugin.remove_worker( - scheduler=self, worker=address, name=ws.name, stimulus_id=stimulus_id + scheduler=self, + worker=address, + name=ws.name, + stimulus_id=stimulus_id, ) except TypeError: parameters = inspect.signature(plugin.remove_worker).parameters @@ -9411,8 +9414,8 @@ def add_worker(self, scheduler: Scheduler, worker: str) -> None: scheduler.remove_plugin(name=self.name) def remove_worker( - self, scheduler: Scheduler, worker: str, name: str, **kwargs: Any - ) -> None: + self, scheduler: Scheduler, worker: str, name: Hashable, **kwargs: Any + ) -> None: try: msg = { "worker": worker,