From d0b46685eb68b4d3fa70076a5750b28638dda4a9 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Mon, 29 Sep 2025 15:05:55 +0200 Subject: [PATCH 01/10] Reconcile QQ node dead during delete and redeclare MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Péter Gömöri --- deps/rabbit/src/amqqueue.erl | 10 -- deps/rabbit/src/rabbit_amqp_management.erl | 2 +- deps/rabbit/src/rabbit_amqqueue.erl | 14 +- deps/rabbit/src/rabbit_core_ff.erl | 7 + deps/rabbit/src/rabbit_queue_location.erl | 2 +- deps/rabbit/src/rabbit_quorum_queue.erl | 131 +++++++++++--- deps/rabbit/src/rabbit_stream_coordinator.erl | 4 +- deps/rabbit/test/quorum_queue_SUITE.erl | 164 +++++++++++++++++- deps/rabbitmq_ct_helpers/src/queue_utils.erl | 2 +- ...etheus_rabbitmq_core_metrics_collector.erl | 3 +- 10 files changed, 282 insertions(+), 57 deletions(-) diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index 3c958d90fea0..9386c104754d 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -30,7 +30,6 @@ % exclusive_owner get_exclusive_owner/1, get_leader_node/1, - get_nodes/1, % name (#resource) get_name/1, set_name/2, @@ -425,15 +424,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader; get_leader_node(#amqqueue{pid = none}) -> none; get_leader_node(#amqqueue{pid = Pid}) -> node(Pid). --spec get_nodes(amqqueue_v2()) -> [node(),...]. - -get_nodes(Q) -> - case amqqueue:get_type_state(Q) of - #{nodes := Nodes} -> - Nodes; - _ -> - [get_leader_node(Q)] - end. % operator_policy diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index eb0178fe5352..486d64d7af12 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) -> {Leader :: node() | none, Replicas :: [node(),...]}. queue_topology(Q) -> Leader = amqqueue:get_leader_node(Q), - Replicas = amqqueue:get_nodes(Q), + Replicas = rabbit_amqqueue:get_nodes(Q), {Leader, Replicas}. decode_exchange({map, KVList}) -> diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 652244882b91..bee076b811b9 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -23,6 +23,7 @@ -export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). +-export([get_nodes/1]). -export([count/0]). -export([list_down/1, list_down/2, list_all/1, count/1, list_names/0, list_names/1, list_local_names/0, @@ -1212,6 +1213,12 @@ list() -> count() -> rabbit_db_queue:count(). +-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...]. + +get_nodes(Q) -> + [{members, Nodes}] = info(Q, [members]), + Nodes. + -spec list_names() -> [name()]. list_names() -> @@ -2025,12 +2032,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable) ). get_quorum_nodes(Q) -> - case amqqueue:get_type_state(Q) of - #{nodes := Nodes} -> - Nodes; - _ -> - [] - end. + rabbit_amqqueue:get_nodes(Q). -spec prepend_extra_bcc(Qs) -> Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}]. diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index e1e6a244cb93..d75cf05f4215 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -225,3 +225,10 @@ stability => stable, depends_on => ['rabbitmq_4.2.0'] }}). + +-rabbit_feature_flag( + {'track_qq_members_uids', + #{desc => "Track queue members UIDs in the metadata store", + stability => stable, + depends_on => [] + }}). diff --git a/deps/rabbit/src/rabbit_queue_location.erl b/deps/rabbit/src/rabbit_queue_location.erl index 0f204f97347e..4b63c2b99d2f 100644 --- a/deps/rabbit/src/rabbit_queue_location.erl +++ b/deps/rabbit/src/rabbit_queue_location.erl @@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) -> Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]), Queues = GetQueues(), Counters = lists:foldl(fun(Q, Acc) -> - #{nodes := Nodes} = amqqueue:get_type_state(Q), + Nodes = rabbit_amqqueue:get_nodes(Q), lists:foldl(fun(N, A) when is_map_key(N, A) -> maps:update_with(N, fun(C) -> C+1 end, A); diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 817ee1975bd4..760e6a1c82f6 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -269,9 +269,17 @@ start_cluster(Q) -> {LeaderNode, FollowerNodes} = rabbit_queue_location:select_leader_and_followers(Q, QuorumSize), LeaderId = {RaName, LeaderNode}, + UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))} + || Node <- [LeaderNode | FollowerNodes]]), NewQ0 = amqqueue:set_pid(Q, LeaderId), - NewQ1 = amqqueue:set_type_state(NewQ0, - #{nodes => [LeaderNode | FollowerNodes]}), + NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of + false -> + amqqueue:set_type_state(NewQ0, + #{nodes => [LeaderNode | FollowerNodes]}); + true -> + amqqueue:set_type_state(NewQ0, + #{nodes => UIDs}) + end, Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes, rabbit_fifo, version, [], @@ -717,7 +725,7 @@ repair_amqqueue_nodes(Q0) -> {Name, _} = amqqueue:get_pid(Q0), Members = ra_leaderboard:lookup_members(Name), RaNodes = [N || {_, N} <- Members], - #{nodes := Nodes} = amqqueue:get_type_state(Q0), + Nodes = get_nodes(Q0), case lists:sort(RaNodes) =:= lists:sort(Nodes) of true -> %% up to date @@ -726,7 +734,18 @@ repair_amqqueue_nodes(Q0) -> %% update amqqueue record Fun = fun (Q) -> TS0 = amqqueue:get_type_state(Q), - TS = TS0#{nodes => RaNodes}, + TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) + andalso has_uuid_tracking(TS0) + of + false -> + TS0#{nodes => RaNodes}; + true -> + RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of, + [?RA_SYSTEM, Name], + ?RPC_TIMEOUT)} + || N <- RaNodes]), + TS0#{nodes => RaUids} + end, amqqueue:set_type_state(Q, TS) end, _ = rabbit_amqqueue:update(QName, Fun), @@ -790,6 +809,23 @@ recover(_Vhost, Queues) -> ServerId = {Name, node()}, QName = amqqueue:get_name(Q0), MutConf = make_mutable_config(Q0), + RaUId = ra_directory:uid_of(?RA_SYSTEM, Name), + #{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0), + QTypeState = case Nodes of + List when is_list(List) -> + %% Queue is not aware of node to uid mapping, do nothing + QTypeState0; + #{node() := RaUId} -> + %% Queue is aware and uid for current node is correct, do nothing + QTypeState0; + _ -> + %% Queue is aware but either current node has no UId or it + %% does not match the one returned by ra_directory, regen uid + maybe_delete_data_dir(RaUId), + NewRaUId = ra:new_uid(ra_lib:to_binary(Name)), + QTypeState0#{nodes := Nodes#{node() => NewRaUId}} + end, + Q = amqqueue:set_type_state(Q0, QTypeState), Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of ok -> % queue was restarted, good @@ -802,7 +838,7 @@ recover(_Vhost, Queues) -> [rabbit_misc:rs(QName), Err1]), % queue was never started on this node % so needs to be started from scratch. - case start_server(make_ra_conf(Q0, ServerId)) of + case start_server(make_ra_conf(Q, ServerId)) of ok -> ok; Err2 -> ?LOG_WARNING("recover: quorum queue ~w could not" @@ -824,8 +860,7 @@ recover(_Vhost, Queues) -> %% present in the rabbit_queue table and not just in %% rabbit_durable_queue %% So many code paths are dependent on this. - ok = rabbit_db_queue:set_dirty(Q0), - Q = Q0, + ok = rabbit_db_queue:set_dirty(Q), case Res of ok -> {[Q | R0], F0}; @@ -1205,12 +1240,17 @@ cleanup_data_dir() -> maybe_delete_data_dir(UId) -> _ = ra_directory:unregister_name(?RA_SYSTEM, UId), Dir = ra_env:server_data_dir(?RA_SYSTEM, UId), - {ok, Config} = ra_log:read_config(Dir), - case maps:get(machine, Config) of - {module, rabbit_fifo, _} -> - ra_lib:recursive_delete(Dir); - _ -> - ok + case filelib:is_dir(Dir) of + false -> + ok; + true -> + {ok, Config} = ra_log:read_config(Dir), + case maps:get(machine, Config) of + {module, rabbit_fifo, _} -> + ra_lib:recursive_delete(Dir); + _ -> + ok + end end. policy_changed(Q) -> @@ -1357,16 +1397,29 @@ add_member(Q, Node, Membership) -> do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT). -do_add_member(Q, Node, Membership, Timeout) - when ?is_amqqueue(Q) andalso - ?amqqueue_is_quorum(Q) andalso +do_add_member(Q0, Node, Membership, Timeout) + when ?is_amqqueue(Q0) andalso + ?amqqueue_is_quorum(Q0) andalso is_atom(Node) -> - {RaName, _} = amqqueue:get_pid(Q), - QName = amqqueue:get_name(Q), + {RaName, _} = amqqueue:get_pid(Q0), + QName = amqqueue:get_name(Q0), %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, - Members = members(Q), - + Members = members(Q0), + QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0), + NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)), + QTypeState = case Nodes of + L when is_list(L) -> + %% Queue is not aware of node to uid mapping, just add the new node + QTypeState0#{nodes => lists:usort([Node | Nodes])}; + #{Node := _} -> + %% Queue is aware and uid for targeted node exists, do nothing + QTypeState0; + _ -> + %% Queue is aware but current node has no UId, regen uid + QTypeState0#{nodes => Nodes#{Node => NewRaUId}} + end, + Q = amqqueue:set_type_state(Q0, QTypeState), MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity), Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion), case ra:start_server(?RA_SYSTEM, Conf) of @@ -1376,8 +1429,12 @@ do_add_member(Q, Node, Membership, Timeout) {ok, {RaIndex, RaTerm}, Leader} -> Fun = fun(Q1) -> Q2 = update_type_state( - Q1, fun(#{nodes := Nodes} = Ts) -> - Ts#{nodes => lists:usort([Node | Nodes])} + Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) -> + Ts#{nodes => lists:usort([Node | NodesList])}; + (#{nodes := #{Node := _}} = Ts) -> + Ts; + (#{nodes := NodesMap} = Ts) when is_map(NodesMap) -> + Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)} end), amqqueue:set_pid(Q2, Leader) end, @@ -1450,8 +1507,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> Fun = fun(Q1) -> update_type_state( Q1, - fun(#{nodes := Nodes} = Ts) -> - Ts#{nodes => lists:delete(Node, Nodes)} + fun(#{nodes := Nodes} = Ts) when is_list(Nodes) -> + Ts#{nodes => lists:delete(Node, Nodes)}; + (#{nodes := Nodes} = Ts) when is_map(Nodes) -> + Ts#{nodes => maps:remove(Node, Nodes)} end) end, _ = rabbit_amqqueue:update(QName, Fun), @@ -1971,7 +2030,15 @@ make_ra_conf(Q, ServerId, TickTimeout, #resource{name = QNameBin} = QName, RaMachine = ra_machine(Q), [{ClusterName, _} | _] = Members = members(Q), - UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + {_, Node} = ServerId, + UId = case amqqueue:get_type_state(Q) of + #{nodes := #{Node := Id}} -> + Id; + _ -> + %% Queue was declared on an older version of RabbitMQ + %% and does not have the node to uid mappings + ra:new_uid(ra_lib:to_binary(ClusterName)) + end, FName = rabbit_misc:rs(QName), Formatter = {?MODULE, format_ra_event, [QName]}, LogCfg = #{uid => UId, @@ -2003,7 +2070,12 @@ make_mutable_config(Q) -> get_nodes(Q) when ?is_amqqueue(Q) -> #{nodes := Nodes} = amqqueue:get_type_state(Q), - Nodes. + case Nodes of + List when is_list(List) -> + List; + Map when is_map(Map) -> + maps:keys(Map) + end. get_connected_nodes(Q) when ?is_amqqueue(Q) -> ErlangNodes = [node() | nodes()], @@ -2110,7 +2182,7 @@ force_checkpoint_on_queue(QName) -> {ok, Q} when ?amqqueue_is_quorum(Q) -> {RaName, _} = amqqueue:get_pid(Q), ?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]), - Nodes = amqqueue:get_nodes(Q), + Nodes = rabbit_amqqueue:get_nodes(Q), _ = [ra:cast_aux_command({RaName, Node}, force_checkpoint) || Node <- Nodes], ok; @@ -2383,3 +2455,8 @@ queue_vm_ets() -> tick_interval() -> application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL). + +has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) -> + true; +has_uuid_tracking(_QTypeState) -> + false. diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index ad9bfc1c1803..80defde10ffa 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -153,8 +153,8 @@ stop() -> new_stream(Q, LeaderNode) when ?is_amqqueue(Q) andalso is_atom(LeaderNode) -> - #{name := StreamId, - nodes := Nodes} = amqqueue:get_type_state(Q), + #{name := StreamId} = amqqueue:get_type_state(Q), + Nodes = rabbit_amqqueue:get_nodes(Q), %% assertion leader is in nodes configuration true = lists:member(LeaderNode, Nodes), process_command({new_stream, StreamId, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 980f1d48cac3..3be93850b61e 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -106,7 +106,9 @@ groups() -> force_checkpoint, policy_repair, gh_12635, - replica_states + replica_states, + restart_after_queue_reincarnation, + no_messages_after_queue_reincarnation ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -2966,15 +2968,21 @@ add_member_wrong_type(Config) -> [<<"/">>, SQ, Server, voter, 5000])). add_member_already_a_member(Config) -> - [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + R1 = rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}]), %% idempotent by design ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server, voter, 5000])). + [<<"/">>, QQ, Server, voter, 5000])), + ?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])), + ?assertEqual(ok, + rpc:call(Server, rabbit_quorum_queue, add_member, + [<<"/">>, QQ, Server2, voter, 5000])), + ?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])). add_member_not_found(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -3016,10 +3024,14 @@ add_member_2(Config) -> {<<"x-quorum-initial-group-size">>, long, 1}])), ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member, [<<"/">>, QQ, Server0, 5000])), - Info = rpc:call(Server0, rabbit_quorum_queue, infos, - [rabbit_misc:r(<<"/">>, queue, QQ)]), + #{online := Onlines} = ?awaitMatch(#{online := [_One, _Two]}, + maps:from_list(rpc:call(Server0, + rabbit_quorum_queue, + infos, + [rabbit_misc:r(<<"/">>, queue, QQ)])), + 3000), Servers = lists:sort([Server0, Server1]), - ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))). + ?assertEqual(Servers, lists:sort(Onlines)). delete_member_not_running(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -5109,6 +5121,141 @@ replica_states(Config) -> end end, Result2). +% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/discussions/13131 +restart_after_queue_reincarnation(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + QName = <<"QQ">>, + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + VHost = amqqueue:get_vhost(Q), + + MessagesPublished = 1000, + publish_many(Ch, QName, MessagesPublished), + + %% Trigger a snapshot by purging the queue. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, purge, [Q]), + + %% Stop S3 + rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3), + ?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)), + + %% Delete and re-declare queue with the same name. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + % Now S3 should have the old queue state, and S1 and S2 a new one. + St1 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + Status0 = [{proplists:get_value(<<"Node Name">>, S), S} || S <- St1], + S3_Status1 = proplists:get_value(S3, Status0), + Others_Status1 = [V || {_K, V} <- proplists:delete(S3, Status0)], + + S3_LastLogIndex = proplists:get_value(<<"Last Log Index">>, S3_Status1), + S3_LastWritten = proplists:get_value(<<"Last Written">>, S3_Status1), + S3_LastApplied = proplists:get_value(<<"Last Applied">>, S3_Status1), + S3_CommitIndex = proplists:get_value(<<"Commit Index">>, S3_Status1), + S3_Term = proplists:get_value(<<"Term">>, S3_Status1), + + ?assertEqual(noproc, proplists:get_value(<<"Raft State">>, S3_Status1)), + ?assertEqual(unknown, proplists:get_value(<<"Membership">>, S3_Status1)), + [begin + ?assert(S3_LastLogIndex > proplists:get_value(<<"Last Log Index">>, O)), + ?assert(S3_LastWritten > proplists:get_value(<<"Last Written">>, O)), + ?assert(S3_LastApplied > proplists:get_value(<<"Last Applied">>, O)), + ?assert(S3_CommitIndex > proplists:get_value(<<"Commit Index">>, O)), + ?assertEqual(S3_Term, proplists:get_value(<<"Term">>, O)) + end || O <- Others_Status1], + + %% Bumping term in online nodes + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]), + + %% Restart S3 + ?assertEqual(ok, rabbit_control_helper:command(start_app, S3)), + + ?awaitMatch(true, begin + %% Now all three nodes should have the new state. + % They are either leader or follower. + Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + lists:all( + fun(NodeStatus) -> + NodeRaftState = proplists:get_value(<<"Raft State">>, NodeStatus), + lists:member(NodeRaftState, [leader, follower]) + end, Status2) + end, ?DEFAULT_AWAIT), + Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + + % Remove "Node Name" and "Raft State" from the status. + Status3 = [NE1, NE2, NE3]= [ + begin + R = proplists:delete(<<"Node Name">>, NodeEntry), + proplists:delete(<<"Raft State">>, R) + end || NodeEntry <- Status2], + % Check all other properties have same value on all nodes. + ct:pal("Status3: ~tp", [Status3]), + [ + begin + ?assertEqual(V, proplists:get_value(K, NE2)), + ?assertEqual(V, proplists:get_value(K, NE3)) + end || {K, V} <- NE1 + ]. + +% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/issues/12366 +no_messages_after_queue_reincarnation(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + QName = <<"QQ">>, + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + + publish(Ch, QName, <<"msg1">>), + publish(Ch, QName, <<"msg2">>), + + %% Stop S3 + rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3), + ?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)), + + qos(Ch, 1, false), + subscribe(Ch, QName, false, <<"tag0">>, [], 500), + DeliveryTag = receive + {#'basic.deliver'{delivery_tag = DT}, #amqp_msg{}} -> + receive + {#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} -> + ct:fail("did not expect the second one") + after 500 -> + DT + end + after 500 -> + ct:fail("Expected some delivery, but got none") + end, + + %% Delete and re-declare queue with the same name. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Bumping term in online nodes + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]), + + %% Restart S3 + ?assertEqual(ok, rabbit_control_helper:command(start_app, S3)), + + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + %% No message should be delivered after reincarnation + receive + {#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} -> + ct:fail("Expected no deliveries, but got one") + after 500 -> + ok + end. + %%---------------------------------------------------------------------------- same_elements(L1, L2) @@ -5178,7 +5325,10 @@ consume_empty(Ch, Queue, NoAck) -> subscribe(Ch, Queue, NoAck) -> subscribe(Ch, Queue, NoAck, <<"ctag">>, []). + subscribe(Ch, Queue, NoAck, Tag, Args) -> + subscribe(Ch, Queue, NoAck, Tag, Args, ?TIMEOUT). +subscribe(Ch, Queue, NoAck, Tag, Args, Timeout) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, arguments = Args, @@ -5187,7 +5337,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) -> receive #'basic.consume_ok'{consumer_tag = Tag} -> ok - after ?TIMEOUT -> + after Timeout -> flush(100), exit(subscribe_timeout) end. diff --git a/deps/rabbitmq_ct_helpers/src/queue_utils.erl b/deps/rabbitmq_ct_helpers/src/queue_utils.erl index d2c69792fde0..c4280eb121a3 100644 --- a/deps/rabbitmq_ct_helpers/src/queue_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/queue_utils.erl @@ -208,7 +208,7 @@ assert_number_of_replicas(Config, Server, VHost, QQ, Count) -> begin {ok, Q} = rabbit_ct_broker_helpers:rpc( Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]), - #{nodes := Nodes} = amqqueue:get_type_state(Q), + Nodes = rabbit_amqqueue:get_nodes(Q), length(Nodes) end, 30000). diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index b2e7799b800c..868df0c5d0e5 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -449,8 +449,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) -> true -> Acc; false -> Type = amqqueue:get_type(Q), - TypeState = amqqueue:get_type_state(Q), - Members = maps:get(nodes, TypeState, []), + Members = rabbit_amqqueue:get_nodes(Q), case membership(amqqueue:get_pid(Q), Members) of not_a_member -> Acc; From b9e29ddc94bfdd3085746c3a6395c1ec5e87f1de Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Wed, 1 Oct 2025 13:34:32 +0200 Subject: [PATCH 02/10] Apply PR suggestions --- deps/rabbit/src/rabbit_amqp_management.erl | 2 +- deps/rabbit/src/rabbit_amqqueue.erl | 9 +- deps/rabbit/src/rabbit_queue_location.erl | 2 +- deps/rabbit/src/rabbit_queue_type.erl | 6 ++ deps/rabbit/src/rabbit_quorum_queue.erl | 97 +++++++++---------- deps/rabbit/src/rabbit_stream_coordinator.erl | 2 +- deps/rabbitmq_ct_helpers/src/queue_utils.erl | 2 +- ...etheus_rabbitmq_core_metrics_collector.erl | 2 +- 8 files changed, 57 insertions(+), 65 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 486d64d7af12..a21cc5e4e350 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) -> {Leader :: node() | none, Replicas :: [node(),...]}. queue_topology(Q) -> Leader = amqqueue:get_leader_node(Q), - Replicas = rabbit_amqqueue:get_nodes(Q), + Replicas = rabbit_queue_type:get_nodes(Q), {Leader, Replicas}. decode_exchange({map, KVList}) -> diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index bee076b811b9..0a2c7d2e2b7f 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -23,7 +23,6 @@ -export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). --export([get_nodes/1]). -export([count/0]). -export([list_down/1, list_down/2, list_all/1, count/1, list_names/0, list_names/1, list_local_names/0, @@ -1213,12 +1212,6 @@ list() -> count() -> rabbit_db_queue:count(). --spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...]. - -get_nodes(Q) -> - [{members, Nodes}] = info(Q, [members]), - Nodes. - -spec list_names() -> [name()]. list_names() -> @@ -2032,7 +2025,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable) ). get_quorum_nodes(Q) -> - rabbit_amqqueue:get_nodes(Q). + rabbit_queue_type:get_nodes(Q). -spec prepend_extra_bcc(Qs) -> Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}]. diff --git a/deps/rabbit/src/rabbit_queue_location.erl b/deps/rabbit/src/rabbit_queue_location.erl index 4b63c2b99d2f..f6a5c55e4dd6 100644 --- a/deps/rabbit/src/rabbit_queue_location.erl +++ b/deps/rabbit/src/rabbit_queue_location.erl @@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) -> Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]), Queues = GetQueues(), Counters = lists:foldl(fun(Q, Acc) -> - Nodes = rabbit_amqqueue:get_nodes(Q), + Nodes = rabbit_queue_type:get_nodes(Q), lists:foldl(fun(N, A) when is_map_key(N, A) -> maps:update_with(N, fun(C) -> C+1 end, A); diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 25598de91d6e..32f7d32b8b26 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -38,6 +38,7 @@ format/2, remove/2, info/2, + get_nodes/1, state_info/1, format_status/1, info_down/2, @@ -416,6 +417,11 @@ info(Q, Items) -> Mod = amqqueue:get_type(Q), Mod:info(Q, Items). +-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...]. +get_nodes(Q) -> + [{members, Nodes}] = info(Q, [members]), + Nodes. + fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) -> maps:fold(Fun, Acc, Ctxs). diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 760e6a1c82f6..131c1506717f 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -273,13 +273,13 @@ start_cluster(Q) -> || Node <- [LeaderNode | FollowerNodes]]), NewQ0 = amqqueue:set_pid(Q, LeaderId), NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of - false -> - amqqueue:set_type_state(NewQ0, - #{nodes => [LeaderNode | FollowerNodes]}); - true -> - amqqueue:set_type_state(NewQ0, - #{nodes => UIDs}) - end, + false -> + amqqueue:set_type_state(NewQ0, + #{nodes => [LeaderNode | FollowerNodes]}); + true -> + amqqueue:set_type_state(NewQ0, + #{nodes => UIDs}) + end, Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes, rabbit_fifo, version, [], @@ -734,16 +734,14 @@ repair_amqqueue_nodes(Q0) -> %% update amqqueue record Fun = fun (Q) -> TS0 = amqqueue:get_type_state(Q), - TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) - andalso has_uuid_tracking(TS0) - of + TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of false -> TS0#{nodes => RaNodes}; true -> RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of, - [?RA_SYSTEM, Name], - ?RPC_TIMEOUT)} - || N <- RaNodes]), + [?RA_SYSTEM, Name], + ?RPC_TIMEOUT)} + || N <- RaNodes]), TS0#{nodes => RaUids} end, amqqueue:set_type_state(Q, TS) @@ -804,28 +802,28 @@ maybe_apply_policies(Q, #{config := CurrentConfig}) -> {[amqqueue:amqqueue()], [amqqueue:amqqueue()]}. recover(_Vhost, Queues) -> lists:foldl( - fun (Q0, {R0, F0}) -> - {Name, _} = amqqueue:get_pid(Q0), + fun (Q, {R0, F0}) -> + {Name, _} = amqqueue:get_pid(Q), ServerId = {Name, node()}, - QName = amqqueue:get_name(Q0), - MutConf = make_mutable_config(Q0), + QName = amqqueue:get_name(Q), + MutConf = make_mutable_config(Q), RaUId = ra_directory:uid_of(?RA_SYSTEM, Name), - #{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0), - QTypeState = case Nodes of + #{nodes := Nodes} = amqqueue:get_type_state(Q), + case Nodes of List when is_list(List) -> %% Queue is not aware of node to uid mapping, do nothing - QTypeState0; + ok; #{node() := RaUId} -> - %% Queue is aware and uid for current node is correct, do nothing - QTypeState0; - _ -> - %% Queue is aware but either current node has no UId or it - %% does not match the one returned by ra_directory, regen uid - maybe_delete_data_dir(RaUId), - NewRaUId = ra:new_uid(ra_lib:to_binary(Name)), - QTypeState0#{nodes := Nodes#{node() => NewRaUId}} + %% Queue is aware and uid for current node is correct, do + %% nothing + ok; + #{node() := _NewRaUId} -> + %% Queue is aware but it does not match the one returned by + %% ra_directory + rabbit_log:info("Quorum queue ~ts: detected node uuid change, " + "deleting old data directory", [rabbit_misc:rs(QName)]), + maybe_delete_data_dir(RaUId) end, - Q = amqqueue:set_type_state(Q0, QTypeState), Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of ok -> % queue was restarted, good @@ -1409,16 +1407,16 @@ do_add_member(Q0, Node, Membership, Timeout) QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0), NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)), QTypeState = case Nodes of - L when is_list(L) -> - %% Queue is not aware of node to uid mapping, just add the new node - QTypeState0#{nodes => lists:usort([Node | Nodes])}; - #{Node := _} -> - %% Queue is aware and uid for targeted node exists, do nothing - QTypeState0; - _ -> - %% Queue is aware but current node has no UId, regen uid - QTypeState0#{nodes => Nodes#{Node => NewRaUId}} - end, + L when is_list(L) -> + %% Queue is not aware of node to uid mapping, just add the new node + QTypeState0#{nodes => lists:usort([Node | Nodes])}; + #{Node := _} -> + %% Queue is aware and uid for targeted node exists, do nothing + QTypeState0; + _ -> + %% Queue is aware but current node has no UId, regen uid + QTypeState0#{nodes => Nodes#{Node => NewRaUId}} + end, Q = amqqueue:set_type_state(Q0, QTypeState), MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity), Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion), @@ -2032,13 +2030,13 @@ make_ra_conf(Q, ServerId, TickTimeout, [{ClusterName, _} | _] = Members = members(Q), {_, Node} = ServerId, UId = case amqqueue:get_type_state(Q) of - #{nodes := #{Node := Id}} -> - Id; - _ -> - %% Queue was declared on an older version of RabbitMQ - %% and does not have the node to uid mappings - ra:new_uid(ra_lib:to_binary(ClusterName)) - end, + #{nodes := #{Node := Id}} -> + Id; + _ -> + %% Queue was declared on an older version of RabbitMQ + %% or does not have the node to uid mappings + ra:new_uid(ra_lib:to_binary(ClusterName)) + end, FName = rabbit_misc:rs(QName), Formatter = {?MODULE, format_ra_event, [QName]}, LogCfg = #{uid => UId, @@ -2182,7 +2180,7 @@ force_checkpoint_on_queue(QName) -> {ok, Q} when ?amqqueue_is_quorum(Q) -> {RaName, _} = amqqueue:get_pid(Q), ?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]), - Nodes = rabbit_amqqueue:get_nodes(Q), + Nodes = rabbit_queue_type:get_nodes(Q), _ = [ra:cast_aux_command({RaName, Node}, force_checkpoint) || Node <- Nodes], ok; @@ -2455,8 +2453,3 @@ queue_vm_ets() -> tick_interval() -> application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL). - -has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) -> - true; -has_uuid_tracking(_QTypeState) -> - false. diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 80defde10ffa..e600a444f3de 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -154,7 +154,7 @@ stop() -> new_stream(Q, LeaderNode) when ?is_amqqueue(Q) andalso is_atom(LeaderNode) -> #{name := StreamId} = amqqueue:get_type_state(Q), - Nodes = rabbit_amqqueue:get_nodes(Q), + Nodes = rabbit_queue_type:get_nodes(Q), %% assertion leader is in nodes configuration true = lists:member(LeaderNode, Nodes), process_command({new_stream, StreamId, diff --git a/deps/rabbitmq_ct_helpers/src/queue_utils.erl b/deps/rabbitmq_ct_helpers/src/queue_utils.erl index c4280eb121a3..78f1991a8aa2 100644 --- a/deps/rabbitmq_ct_helpers/src/queue_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/queue_utils.erl @@ -208,7 +208,7 @@ assert_number_of_replicas(Config, Server, VHost, QQ, Count) -> begin {ok, Q} = rabbit_ct_broker_helpers:rpc( Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]), - Nodes = rabbit_amqqueue:get_nodes(Q), + Nodes = rabbit_queue_type:get_nodes(Q), length(Nodes) end, 30000). diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 868df0c5d0e5..faf349693df4 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -449,7 +449,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) -> true -> Acc; false -> Type = amqqueue:get_type(Q), - Members = rabbit_amqqueue:get_nodes(Q), + Members = rabbit_queue_type:get_nodes(Q), case membership(amqqueue:get_pid(Q), Members) of not_a_member -> Acc; From aff8cd28c0c866ea8719e63afbdc8179a3d80dac Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Wed, 1 Oct 2025 14:09:57 +0200 Subject: [PATCH 03/10] Fix call to get_nodes --- deps/rabbit/src/rabbit_quorum_queue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 131c1506717f..f0ff5e7f4f18 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -2375,7 +2375,7 @@ transfer_leadership(_CandidateNodes) -> %% wait for leader elections before processing next chunk of queues [begin {RaName, LeaderNode} = amqqueue:get_pid(Q), - MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)), + MemberNodes = lists:delete(LeaderNode, rabbit_queue_type:get_nodes(Q)), %% we don't do any explicit error handling here as it is more %% important to make progress _ = lists:any(fun (N) -> From ea3a471df8d67abfe734e534e01016f7ae1657e7 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Thu, 2 Oct 2025 09:22:05 +0200 Subject: [PATCH 04/10] Handle queue returning no info on members info --- deps/rabbit/src/rabbit_queue_type.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 32f7d32b8b26..d02d9ddf0e5f 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -419,8 +419,12 @@ info(Q, Items) -> -spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...]. get_nodes(Q) -> - [{members, Nodes}] = info(Q, [members]), - Nodes. + case info(Q, [members]) of + [{members, Nodes}] -> + Nodes; + [] -> + [] + end. fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) -> maps:fold(Fun, Acc, Ctxs). From 9b048e351b1601f6ef4c312c6b24a58a0fed175a Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Thu, 2 Oct 2025 10:15:12 +0200 Subject: [PATCH 05/10] Show status key to compare in tests --- deps/rabbit/test/quorum_queue_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 3be93850b61e..f9c32ddeb979 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -5198,8 +5198,8 @@ restart_after_queue_reincarnation(Config) -> ct:pal("Status3: ~tp", [Status3]), [ begin - ?assertEqual(V, proplists:get_value(K, NE2)), - ?assertEqual(V, proplists:get_value(K, NE3)) + ?assertEqual({K, V}, {K, proplists:get_value(K, NE2)}), + ?assertEqual({K, V}, {K, proplists:get_value(K, NE3)}) end || {K, V} <- NE1 ]. From dcdae79c176d1f39a69b15404b3e61a2cefa797d Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Thu, 2 Oct 2025 13:07:31 +0200 Subject: [PATCH 06/10] Use awaitMatch on QQ status fields too I thought that asserting the right membership status on all nodes would guarantee that the other status fields would be the expected ones but it seems like that assumption was wrong. --- deps/rabbit/test/quorum_queue_SUITE.erl | 46 ++++++++++++++++--------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index f9c32ddeb979..e0c770e49529 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -5123,6 +5123,14 @@ replica_states(Config) -> % Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/discussions/13131 restart_after_queue_reincarnation(Config) -> + case rabbit_ct_helpers:is_mixed_versions() of + true -> + {skip, "queue reincarnation protection can't work on mixed mode"}; + false -> + restart_after_queue_reincarnation_(Config) + end. + +restart_after_queue_reincarnation_(Config) -> [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, S1), QName = <<"QQ">>, @@ -5186,25 +5194,31 @@ restart_after_queue_reincarnation(Config) -> lists:member(NodeRaftState, [leader, follower]) end, Status2) end, ?DEFAULT_AWAIT), - Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), - - % Remove "Node Name" and "Raft State" from the status. - Status3 = [NE1, NE2, NE3]= [ - begin - R = proplists:delete(<<"Node Name">>, NodeEntry), - proplists:delete(<<"Raft State">>, R) - end || NodeEntry <- Status2], - % Check all other properties have same value on all nodes. - ct:pal("Status3: ~tp", [Status3]), - [ - begin - ?assertEqual({K, V}, {K, proplists:get_value(K, NE2)}), - ?assertEqual({K, V}, {K, proplists:get_value(K, NE3)}) - end || {K, V} <- NE1 - ]. + ?awaitMatch(true, begin + Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + + % Remove "Node Name" and "Raft State" from the status. + Status3 = [NE1, NE2, NE3]= [ + begin + R = proplists:delete(<<"Node Name">>, NodeEntry), + proplists:delete(<<"Raft State">>, R) + end || NodeEntry <- Status2], + % Check all other properties have same value on all nodes. + ct:pal("Status3: ~tp", [Status3]), + lists:all(fun({A, B}) -> A == B end, [ {V, proplists:get_value(K, NE2)} || {K, V} <- NE1]) andalso + lists:all(fun({A, B}) -> A == B end, [ {V, proplists:get_value(K, NE3)} || {K, V} <- NE1]) + end, ?DEFAULT_AWAIT). % Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/issues/12366 no_messages_after_queue_reincarnation(Config) -> + case rabbit_ct_helpers:is_mixed_versions() of + true -> + {skip, "queue reincarnation protection can't work on mixed mode"}; + false -> + no_messages_after_queue_reincarnation_(Config) + end. + +no_messages_after_queue_reincarnation_(Config) -> [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, S1), QName = <<"QQ">>, From 75911adea9c6fd0b6aa39732293a3940d5a33775 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Fri, 24 Oct 2025 10:34:25 +0200 Subject: [PATCH 07/10] Properly line up map arrow --- deps/rabbit/src/rabbit_core_ff.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index d75cf05f4215..ff7ffb5c1ac6 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -228,7 +228,7 @@ -rabbit_feature_flag( {'track_qq_members_uids', - #{desc => "Track queue members UIDs in the metadata store", + #{desc => "Track queue members UIDs in the metadata store", stability => stable, depends_on => [] }}). From ac59a8db73b560670f7c6d37dec401f353ded4e3 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Mon, 10 Nov 2025 08:26:37 +0100 Subject: [PATCH 08/10] Warn on `undefined` remote uid while recovering --- deps/rabbit/src/rabbit_quorum_queue.erl | 27 +++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index f0ff5e7f4f18..ee91082e761d 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -738,10 +738,22 @@ repair_amqqueue_nodes(Q0) -> false -> TS0#{nodes => RaNodes}; true -> - RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of, - [?RA_SYSTEM, Name], - ?RPC_TIMEOUT)} - || N <- RaNodes]), + RaUidsList = [begin + Uid = erpc:call(N, ra_directory, uid_of, + [?RA_SYSTEM, Name], + ?RPC_TIMEOUT), + case Uid of + undefined -> + ?LOG_WARNING("Unexpected undefined uuid from node ~p for quorum queue ~ts during repair_amqqueue_nodes", + [N, rabbit_misc:rs(QName)]); + _ -> + ok + end, + {N, Uid} + end + || N <- RaNodes], + + RaUids = maps:from_list(RaUidsList), TS0#{nodes => RaUids} end, amqqueue:set_type_state(Q, TS) @@ -808,6 +820,13 @@ recover(_Vhost, Queues) -> QName = amqqueue:get_name(Q), MutConf = make_mutable_config(Q), RaUId = ra_directory:uid_of(?RA_SYSTEM, Name), + case RaUId of + undefined -> + ?LOG_WARNING("Unexpected undefined uuid for current node for quorum queue ~ts during recover", + [rabbit_misc:rs(QName)]); + _ -> + ok + end, #{nodes := Nodes} = amqqueue:get_type_state(Q), case Nodes of List when is_list(List) -> From fc2e7eae5a02a0f345e7f58e88b2aacd75ed42de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Mon, 15 Dec 2025 16:02:42 +0100 Subject: [PATCH 09/10] rabbit_quorum_queue: Remove duplicate word "queue" from logs --- deps/rabbit/src/rabbit_quorum_queue.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index ee91082e761d..5f2fb8c4f54e 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -286,7 +286,7 @@ start_cluster(Q) -> ?RPC_TIMEOUT)], MinVersion = lists:min([rabbit_fifo:version() | Versions]), - ?LOG_DEBUG("Will start up to ~w replicas for quorum queue ~ts with " + ?LOG_DEBUG("Will start up to ~w replicas for quorum ~ts with " "leader on node '~ts', initial machine version ~b", [QuorumSize, rabbit_misc:rs(QName), LeaderNode, MinVersion]), case rabbit_amqqueue:internal_declare(NewQ1, false) of @@ -663,7 +663,7 @@ handle_tick(QName, ok -> ok; repaired -> - ?LOG_DEBUG("Repaired quorum queue ~ts amqqueue record", + ?LOG_DEBUG("Repaired quorum ~ts amqqueue record", [rabbit_misc:rs(QName)]) end, ExpectedNodes = rabbit_nodes:list_members(), @@ -1376,7 +1376,7 @@ add_member(VHost, Name, Node, Membership, Timeout) is_binary(Name) andalso is_atom(Node) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, - ?LOG_DEBUG("Asked to add a replica for queue ~ts on node ~ts", + ?LOG_DEBUG("Asked to add a replica for ~ts on node ~ts", [rabbit_misc:rs(QName), Node]), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> @@ -2178,7 +2178,7 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), OtherNodes = lists:delete(Node, get_nodes(Q)), - ?LOG_WARNING("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]), + ?LOG_WARNING("Shrinking ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]), ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (QQ) -> TS0 = amqqueue:get_type_state(QQ), From bdf7c5bd91596dc4e94ba8bfcf66de4531d42604 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Tue, 25 Nov 2025 08:27:23 +0100 Subject: [PATCH 10/10] Refactor repair_amqqueue_nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Only query node uids via RPC for missing members - Update nodes in state to map if FF is enabled - Properly return whether queue repaired or not Co-authored-by: Péter Gömöri --- deps/rabbit/src/rabbit_quorum_queue.erl | 117 ++++++++++++++++++------ deps/rabbit/test/quorum_queue_SUITE.erl | 64 +++++++++++++ 2 files changed, 151 insertions(+), 30 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 5f2fb8c4f54e..f5854b72893a 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -725,43 +725,100 @@ repair_amqqueue_nodes(Q0) -> {Name, _} = amqqueue:get_pid(Q0), Members = ra_leaderboard:lookup_members(Name), RaNodes = [N || {_, N} <- Members], - Nodes = get_nodes(Q0), - case lists:sort(RaNodes) =:= lists:sort(Nodes) of + case rabbit_feature_flags:is_enabled(track_qq_members_uids) of + false -> + Nodes = get_nodes(Q0), + case lists:sort(RaNodes) =:= lists:sort(Nodes) of + true -> + %% up to date + ok; + false -> + %% update amqqueue record + Fun = fun (Q) -> + TS0 = amqqueue:get_type_state(Q), + TS = TS0#{nodes => RaNodes}, + amqqueue:set_type_state(Q, TS) + end, + _ = rabbit_amqqueue:update(QName, Fun), + repaired + end; + true -> + {ok, Q0} = rabbit_amqqueue:lookup(QName), + OldTypeState = amqqueue:get_type_state(Q0), + case OldTypeState of + #{nodes := List} when is_list(List) -> + repair_with_list_nodes(QName, Name, RaNodes, OldTypeState); + #{nodes := Map} when is_map(Map) -> + repair_with_map_nodes(QName, Name, RaNodes, Map) + end + end. + +%% @doc Repair logic when OldTypeState has a list as nodes value. +%% Only updates the queue state if ALL nodes return valid UIDs. +repair_with_list_nodes(QName, Name, RaNodes, _OldTypeState) -> + case gather_node_uids(QName, Name, RaNodes) of + {NewNodesUids, _ErrorList = []} -> + %% All nodes returned valid UIDs, proceed with update + Fun = fun (Q) -> + Ts0 = amqqueue:get_type_state(Q), + Ts = Ts0#{nodes => NewNodesUids}, + amqqueue:set_type_state(Q, Ts) + end, + _ = rabbit_amqqueue:update(QName, Fun), + repaired; + _ -> + %% Fetching UID for at least some nodes failed + %% Do not update the queue state + ok + end. + +%% @doc Repair logic when OldTypeState has a map as nodes value. +%% Only adds new nodes that return valid UIDs. +repair_with_map_nodes(QName, Name, RaNodes, PreviousUidsMap) -> + PrevNodes = maps:keys(PreviousUidsMap), + case lists:sort(PrevNodes) == lists:sort(RaNodes) of true -> - %% up to date ok; false -> - %% update amqqueue record + NodesToAdd = RaNodes -- PrevNodes, + {AddedNodesUids, _ErrorList} = gather_node_uids(QName, Name, NodesToAdd), + RemainingNodesUids = maps:with(RaNodes, PreviousUidsMap), + NewNodes = maps:merge(RemainingNodesUids, AddedNodesUids), Fun = fun (Q) -> - TS0 = amqqueue:get_type_state(Q), - TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of - false -> - TS0#{nodes => RaNodes}; - true -> - RaUidsList = [begin - Uid = erpc:call(N, ra_directory, uid_of, - [?RA_SYSTEM, Name], - ?RPC_TIMEOUT), - case Uid of - undefined -> - ?LOG_WARNING("Unexpected undefined uuid from node ~p for quorum queue ~ts during repair_amqqueue_nodes", - [N, rabbit_misc:rs(QName)]); - _ -> - ok - end, - {N, Uid} - end - || N <- RaNodes], - - RaUids = maps:from_list(RaUidsList), - TS0#{nodes => RaUids} - end, - amqqueue:set_type_state(Q, TS) + Ts0 = amqqueue:get_type_state(Q), + Ts = Ts0#{nodes => NewNodes}, + amqqueue:set_type_state(Q, Ts) end, _ = rabbit_amqqueue:update(QName, Fun), repaired end. +gather_node_uids(QName, Name, RaNodes) -> + RPCRes = erpc:multicall(RaNodes, ra_directory, uid_of, [?RA_SYSTEM, Name], ?RPC_TIMEOUT), + NewNodesUidsList0 = lists:zip(RaNodes, RPCRes), + + %% Check if all nodes returned valid UIDs + {ValidList, ErrorList} = + lists:partition( + fun({_Node, {ok, UId}}) when UId =/= undefined -> + true; + (_) -> + false + end, NewNodesUidsList0), + NewNodesUidsList = [{Node, UId} || {Node, {ok, UId}} <- ValidList], + + lists:foreach(fun({Node, {ok, undefined}}) -> + ?LOG_WARNING("Unexpected undefined uuid from node ~p " + "for quorum ~ts during repair_amqqueue_nodes", + [Node, rabbit_misc:rs(QName)]); + ({Node, CaughtCallException}) -> + ?LOG_WARNING("Call exception while retrieving uuid from node ~p " + "for quorum ~ts during repair_amqqueue_nodes: ~p", + [Node, rabbit_misc:rs(QName), CaughtCallException]) + end, ErrorList), + + {maps:from_list(NewNodesUidsList), ErrorList}. + reductions(Name) -> try {reductions, R} = process_info(whereis(Name), reductions), @@ -822,7 +879,7 @@ recover(_Vhost, Queues) -> RaUId = ra_directory:uid_of(?RA_SYSTEM, Name), case RaUId of undefined -> - ?LOG_WARNING("Unexpected undefined uuid for current node for quorum queue ~ts during recover", + ?LOG_WARNING("Unexpected undefined uuid for current node for quorum ~ts during recover", [rabbit_misc:rs(QName)]); _ -> ok @@ -839,7 +896,7 @@ recover(_Vhost, Queues) -> #{node() := _NewRaUId} -> %% Queue is aware but it does not match the one returned by %% ra_directory - rabbit_log:info("Quorum queue ~ts: detected node uuid change, " + rabbit_log:info("Quorum ~ts: detected node uuid change, " "deleting old data directory", [rabbit_misc:rs(QName)]), maybe_delete_data_dir(RaUId) end, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index e0c770e49529..f9f3fbbaa2d5 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -105,6 +105,10 @@ groups() -> force_checkpoint_on_queue, force_checkpoint, policy_repair, + repair_metadata_nodes_list_to_map, + repair_metadata_nodes_added_member, + repair_metadata_nodes_removed_member, + repair_metadata_nodes_added_removed_member, gh_12635, replica_states, restart_after_queue_reincarnation, @@ -1527,6 +1531,66 @@ force_checkpoint(Config) -> % Result should only have quorum queue ?assertEqual(ExpectedRes, ForceCheckpointRes). +repair_metadata_nodes_list_to_map(Config) -> + %% After feature flag `track_qq_members_uids` is enabled, quorum + %% queues will convert their type state in metadata store + %% from nodes list to node=>uid mappings + UpdateFun = + fun(QueueRec) -> + #{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec), + amqqueue:set_type_state(QueueRec, TypeState#{nodes => maps:keys(NodesMap)}) + end, + repair_metadata_nodes(Config, UpdateFun). + +repair_metadata_nodes_added_member(Config) -> + Server1 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + UpdateFun = + fun(QueueRec) -> + #{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec), + amqqueue:set_type_state(QueueRec, TypeState#{nodes => maps:remove(Server1, NodesMap)}) + end, + repair_metadata_nodes(Config, UpdateFun). + +repair_metadata_nodes_removed_member(Config) -> + UpdateFun = + fun(QueueRec) -> + #{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec), + amqqueue:set_type_state(QueueRec, TypeState#{nodes => NodesMap#{'rabbit@foo' => <<"dummy_uid">>}}) + end, + repair_metadata_nodes(Config, UpdateFun). + +repair_metadata_nodes_added_removed_member(Config) -> + Server1 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + UpdateFun = + fun(QueueRec) -> + #{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec), + NewNodeMap = maps:remove(Server1, NodesMap#{'rabbit@foo' => <<"dummy_uid">>}), + amqqueue:set_type_state(QueueRec, TypeState#{nodes => NewNodeMap}) + end, + repair_metadata_nodes(Config, UpdateFun). + +repair_metadata_nodes(Config, UpdateFun) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + QQName = rabbit_misc:r(<<"/">>, queue, QQ), + + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + + + QueueRecBefore = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QQName]), + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, update, [QQName, UpdateFun]), + + ?assertEqual(repaired, rpc:call(Server, rabbit_quorum_queue, repair_amqqueue_nodes, + [QQName])), + + QueueRecAfter = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QQName]), + + ?assertEqual(QueueRecBefore, QueueRecAfter), + ok. + % Tests that, if the process of a QQ is dead in the moment of declaring a policy % that affects such queue, when the process is made available again, the policy % will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863)