Skip to content

Commit bddd944

Browse files
committed
wip: add feature flag and put RaUids in nodes
1 parent 8d21428 commit bddd944

File tree

3 files changed

+74
-37
lines changed

3 files changed

+74
-37
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,10 @@
219219
depends_on => ['rabbitmq_4.1.0'],
220220
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
221221
}}).
222+
223+
-rabbit_feature_flag(
224+
{'track_qq_members_uids',
225+
#{desc => "Track queue members UIDs in the metadata store",
226+
stability => stable,
227+
depends_on => []
228+
}}).

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,14 @@ start_cluster(Q) ->
273273
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
274274
|| Node <- [LeaderNode | FollowerNodes]]),
275275
NewQ0 = amqqueue:set_pid(Q, LeaderId),
276-
NewQ1 = amqqueue:set_type_state(NewQ0,
277-
#{nodes => [LeaderNode | FollowerNodes],
278-
uids => UIDs}),
276+
NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
277+
false ->
278+
amqqueue:set_type_state(NewQ0,
279+
#{nodes => [LeaderNode | FollowerNodes]});
280+
true ->
281+
amqqueue:set_type_state(NewQ0,
282+
#{nodes => UIDs})
283+
end,
279284

280285
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
281286
rabbit_fifo, version, [],
@@ -720,7 +725,7 @@ repair_amqqueue_nodes(Q0) ->
720725
{Name, _} = amqqueue:get_pid(Q0),
721726
Members = ra_leaderboard:lookup_members(Name),
722727
RaNodes = [N || {_, N} <- Members],
723-
#{nodes := Nodes} = amqqueue:get_type_state(Q0),
728+
Nodes = get_nodes(Q0),
724729
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
725730
true ->
726731
%% up to date
@@ -729,7 +734,18 @@ repair_amqqueue_nodes(Q0) ->
729734
%% update amqqueue record
730735
Fun = fun (Q) ->
731736
TS0 = amqqueue:get_type_state(Q),
732-
TS = TS0#{nodes => RaNodes},
737+
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids)
738+
andalso has_uuid_tracking(TS0)
739+
of
740+
false ->
741+
TS0#{nodes => RaNodes};
742+
true ->
743+
RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of,
744+
[?RA_SYSTEM, Name],
745+
?RPC_TIMEOUT)}
746+
|| N <- RaNodes]),
747+
TS0#{nodes => RaUids}
748+
end,
733749
amqqueue:set_type_state(Q, TS)
734750
end,
735751
_ = rabbit_amqqueue:update(QName, Fun),
@@ -794,10 +810,9 @@ recover(_Vhost, Queues) ->
794810
QName = amqqueue:get_name(Q0),
795811
MutConf = make_mutable_config(Q0),
796812
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
797-
QTypeState0 = amqqueue:get_type_state(Q0),
798-
RaUIds = maps:get(uids, QTypeState0, undefined),
799-
QTypeState = case RaUIds of
800-
undefined ->
813+
#{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0),
814+
QTypeState = case Nodes of
815+
List when is_list(List) ->
801816
%% Queue is not aware of node to uid mapping, do nothing
802817
QTypeState0;
803818
#{node() := RaUId} ->
@@ -808,7 +823,7 @@ recover(_Vhost, Queues) ->
808823
%% does not match the one returned by ra_directory, regen uid
809824
maybe_delete_data_dir(RaUId),
810825
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
811-
QTypeState0#{uids := RaUIds#{node() => NewRaUId}}
826+
QTypeState0#{nodes := Nodes#{node() => NewRaUId}}
812827
end,
813828
Q = amqqueue:set_type_state(Q0, QTypeState),
814829
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
@@ -1412,21 +1427,20 @@ do_add_member(Q0, Node, Membership, Timeout)
14121427
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
14131428
ServerId = {RaName, Node},
14141429
Members = members(Q0),
1415-
QTypeState0 = amqqueue:get_type_state(Q0),
1416-
RaUIds = maps:get(uids, QTypeState0, undefined),
1417-
QTypeState = case RaUIds of
1418-
undefined ->
1419-
%% Queue is not aware of node to uid mapping, do nothing
1420-
QTypeState0;
1421-
#{Node := _} ->
1422-
%% Queue is aware and uid for targeted node exists, do nothing
1423-
QTypeState0;
1424-
_ ->
1425-
%% Queue is aware but current node has no UId, regen uid
1426-
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1427-
QTypeState0#{uids := RaUIds#{Node => NewRaUId}}
1428-
end,
1429-
Q = amqqueue:set_type_state(Q0, QTypeState),
1430+
QTypeState0 = #{nodes := _Nodes}= amqqueue:get_type_state(Q0),
1431+
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1432+
%QTypeState = case Nodes of
1433+
% L when is_list(L) ->
1434+
% %% Queue is not aware of node to uid mapping, just add the new node
1435+
% QTypeState0#{nodes := lists:usort([Node | Nodes])};
1436+
% #{Node := _} ->
1437+
% %% Queue is aware and uid for targeted node exists, do nothing
1438+
% QTypeState0;
1439+
% _ ->
1440+
% %% Queue is aware but current node has no UId, regen uid
1441+
% QTypeState0#{nodes := Nodes#{Node => NewRaUId}}
1442+
%end,
1443+
Q = amqqueue:set_type_state(Q0, QTypeState0),
14301444
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
14311445
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
14321446
case ra:start_server(?RA_SYSTEM, Conf) of
@@ -1442,8 +1456,12 @@ do_add_member(Q0, Node, Membership, Timeout)
14421456
{ok, {RaIndex, RaTerm}, Leader} ->
14431457
Fun = fun(Q1) ->
14441458
Q2 = update_type_state(
1445-
Q1, fun(#{nodes := Nodes} = Ts) ->
1446-
Ts#{nodes => lists:usort([Node | Nodes])}
1459+
Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) ->
1460+
Ts#{nodes => lists:usort([Node | NodesList])};
1461+
(#{nodes := #{Node := _} = _NodesMap} = Ts) ->
1462+
Ts;
1463+
(#{nodes := NodesMap} = Ts) when is_map(NodesMap) ->
1464+
Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)}
14471465
end),
14481466
amqqueue:set_pid(Q2, Leader)
14491467
end,
@@ -1516,12 +1534,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
15161534
Fun = fun(Q1) ->
15171535
update_type_state(
15181536
Q1,
1519-
fun(#{nodes := Nodes,
1520-
uids := UIds} = Ts) ->
1521-
Ts#{nodes => lists:delete(Node, Nodes),
1522-
uids => maps:remove(Node, UIds)};
1523-
(#{nodes := Nodes} = Ts) ->
1524-
Ts#{nodes => lists:delete(Node, Nodes)}
1537+
fun(#{nodes := Nodes} = Ts) when is_list(Nodes) ->
1538+
Ts#{nodes => lists:delete(Node, Nodes)};
1539+
(#{nodes := Nodes} = Ts) when is_map(Nodes) ->
1540+
Ts#{nodes => maps:remove(Node, Nodes)}
15251541
end)
15261542
end,
15271543
_ = rabbit_amqqueue:update(QName, Fun),
@@ -2082,7 +2098,12 @@ make_mutable_config(Q) ->
20822098

20832099
get_nodes(Q) when ?is_amqqueue(Q) ->
20842100
#{nodes := Nodes} = amqqueue:get_type_state(Q),
2085-
Nodes.
2101+
case Nodes of
2102+
List when is_list(List) ->
2103+
List;
2104+
Map when is_map(Map) ->
2105+
maps:keys(Map)
2106+
end.
20862107

20872108
get_connected_nodes(Q) when ?is_amqqueue(Q) ->
20882109
ErlangNodes = [node() | nodes()],
@@ -2459,3 +2480,8 @@ queue_vm_stats_sups() ->
24592480
queue_vm_ets() ->
24602481
{[quorum_ets],
24612482
[[ra_log_ets]]}.
2483+
2484+
has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) ->
2485+
true;
2486+
has_uuid_tracking(_QTypeState) ->
2487+
false.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2855,10 +2855,14 @@ add_member_2(Config) ->
28552855
{<<"x-quorum-initial-group-size">>, long, 1}])),
28562856
?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
28572857
[<<"/">>, QQ, Server0, 5000])),
2858-
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
2859-
[rabbit_misc:r(<<"/">>, queue, QQ)]),
2858+
#{online := Onlines} = ?awaitMatch(#{online := [_One, _Two]},
2859+
maps:from_list(rpc:call(Server0,
2860+
rabbit_quorum_queue,
2861+
infos,
2862+
[rabbit_misc:r(<<"/">>, queue, QQ)])),
2863+
3000),
28602864
Servers = lists:sort([Server0, Server1]),
2861-
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
2865+
?assertEqual(Servers, lists:sort(Onlines)).
28622866

28632867
delete_member_not_running(Config) ->
28642868
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

0 commit comments

Comments
 (0)