diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 03b147cb8a1..b48605109c2 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1513,40 +1513,53 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> {ok, pos_integer()} | {error, pos_integer(), term()}}]. shrink_all(Node) -> ?LOG_INFO("Asked to remove all quorum queue replicas from node ~ts", [Node]), - [begin - QName = amqqueue:get_name(Q), - ?LOG_INFO("~ts: removing member (replica) on node ~w", - [rabbit_misc:rs(QName), Node]), - Size = length(get_nodes(Q)), - case delete_member(Q, Node) of - ok -> - {QName, {ok, Size-1}}; - {error, cluster_change_not_permitted} -> - %% this could be timing related and due to a new leader just being - %% elected but it's noop command not been committed yet. - %% lets sleep and retry once - ?LOG_INFO("~ts: failed to remove member (replica) on node ~w " - "as cluster change is not permitted. " - "retrying once in 500ms", - [rabbit_misc:rs(QName), Node]), - timer:sleep(500), - case delete_member(Q, Node) of - ok -> - {QName, {ok, Size-1}}; - {error, Err} -> - ?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), - {QName, {error, Size, Err}} - end; - {error, Err} -> - ?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), - {QName, {error, Size, Err}} - end - end || Q <- rabbit_amqqueue:list(), - amqqueue:get_type(Q) == ?MODULE, - lists:member(Node, get_nodes(Q))]. - + Parent = self(), + %% This operation is mostly bound by I/O so this default is set high: + Size = application:get_env(rabbit, quorum_queue_shrink_batch_size, 64), + Chunks = ra_lib:lists_chunk(Size, [Q || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == ?MODULE, + lists:member(Node, get_nodes(Q))]), + lists:append([begin + Running = [spawn(fun() -> + Res = shrink(Node, Q), + Parent ! {self(), Res} + end) || Q <- Chunk], + [receive + {Pid, Res} -> + Res + end || Pid <- Running] + end || Chunk <- Chunks]). + +shrink(Node, Q) -> + QName = amqqueue:get_name(Q), + ?LOG_INFO("~ts: removing member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + Size = length(get_nodes(Q)), + case delete_member(Q, Node) of + ok -> + {QName, {ok, Size-1}}; + {error, cluster_change_not_permitted} -> + %% this could be timing related and due to a new leader just being + %% elected but it's noop command not been committed yet. + %% lets sleep and retry once + ?LOG_INFO("~ts: failed to remove member (replica) on node ~w " + "as cluster change is not permitted. " + "retrying once in 500ms", + [rabbit_misc:rs(QName), Node]), + timer:sleep(500), + case delete_member(Q, Node) of + ok -> + {QName, {ok, Size-1}}; + {error, Err} -> + ?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end; + {error, Err} -> + ?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end. grow(Node, VhostSpec, QueueSpec, Strategy) -> grow(Node, VhostSpec, QueueSpec, Strategy, promotable).