From 82085499bad5158fd0d6423aa136b9c56c8b3423 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 5 Dec 2025 17:05:20 -0500 Subject: [PATCH] rabbit_quorum_queue: Shrink batches of QQs in parallel Shrinking a member node off of a QQ can be parallelized. The operation involves * removing the node from the QQ's cluster membership (appending a command to the log and committing it) with `ra:remove_member/3` * updating the metadata store to remove the member from the QQ type state with `rabbit_amqqueue:update/2` * deleting the queue data from the node with `ra:force_delete_server/2` if the node can be reached All of these operations are I/O bound. Updating the cluster membership and metadata store involves appending commands to those logs and replicating them. Writing commands to Ra synchronously in serial is fairly slow - sending many commands in parallel is much more efficient. By parallelizing these steps we can write larger chunks of commands to WAL(s). `ra:force_delete_server/2` benefits from parallelizing if the node being shrunk off is no longer reachable, for example in some hardware failures. The underlying `rpc:call/4` will attempt to auto-connect to the node and this can take some time to time out. By parallelizing this, each `rpc:call/4` reuses the same underlying distribution entry and all calls fail together once the connection fails to establish. --- deps/rabbit/src/rabbit_quorum_queue.erl | 81 ++++++++++++++----------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 03b147cb8a1..b48605109c2 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1513,40 +1513,53 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> {ok, pos_integer()} | {error, pos_integer(), term()}}]. shrink_all(Node) -> ?LOG_INFO("Asked to remove all quorum queue replicas from node ~ts", [Node]), - [begin - QName = amqqueue:get_name(Q), - ?LOG_INFO("~ts: removing member (replica) on node ~w", - [rabbit_misc:rs(QName), Node]), - Size = length(get_nodes(Q)), - case delete_member(Q, Node) of - ok -> - {QName, {ok, Size-1}}; - {error, cluster_change_not_permitted} -> - %% this could be timing related and due to a new leader just being - %% elected but it's noop command not been committed yet. - %% lets sleep and retry once - ?LOG_INFO("~ts: failed to remove member (replica) on node ~w " - "as cluster change is not permitted. " - "retrying once in 500ms", - [rabbit_misc:rs(QName), Node]), - timer:sleep(500), - case delete_member(Q, Node) of - ok -> - {QName, {ok, Size-1}}; - {error, Err} -> - ?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), - {QName, {error, Size, Err}} - end; - {error, Err} -> - ?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), - {QName, {error, Size, Err}} - end - end || Q <- rabbit_amqqueue:list(), - amqqueue:get_type(Q) == ?MODULE, - lists:member(Node, get_nodes(Q))]. - + Parent = self(), + %% This operation is mostly bound by I/O so this default is set high: + Size = application:get_env(rabbit, quorum_queue_shrink_batch_size, 64), + Chunks = ra_lib:lists_chunk(Size, [Q || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == ?MODULE, + lists:member(Node, get_nodes(Q))]), + lists:append([begin + Running = [spawn(fun() -> + Res = shrink(Node, Q), + Parent ! {self(), Res} + end) || Q <- Chunk], + [receive + {Pid, Res} -> + Res + end || Pid <- Running] + end || Chunk <- Chunks]). + +shrink(Node, Q) -> + QName = amqqueue:get_name(Q), + ?LOG_INFO("~ts: removing member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + Size = length(get_nodes(Q)), + case delete_member(Q, Node) of + ok -> + {QName, {ok, Size-1}}; + {error, cluster_change_not_permitted} -> + %% this could be timing related and due to a new leader just being + %% elected but it's noop command not been committed yet. + %% lets sleep and retry once + ?LOG_INFO("~ts: failed to remove member (replica) on node ~w " + "as cluster change is not permitted. " + "retrying once in 500ms", + [rabbit_misc:rs(QName), Node]), + timer:sleep(500), + case delete_member(Q, Node) of + ok -> + {QName, {ok, Size-1}}; + {error, Err} -> + ?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end; + {error, Err} -> + ?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end. grow(Node, VhostSpec, QueueSpec, Strategy) -> grow(Node, VhostSpec, QueueSpec, Strategy, promotable).