Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ run_prelaunch_second_phase() ->
%% 3. Logging.
ok = rabbit_prelaunch_logging:setup(Context),

rabbit_boot_state:set(prelaunch_done),

%% The clustering steps requires Khepri to be started to check for
%% consistency. This is the opposite compared to Mnesia which must be
%% stopped. That's why we setup Khepri and the coordination Ra system it
Expand Down Expand Up @@ -481,6 +483,10 @@ stop_boot_marker(Marker) ->
-spec stop() -> 'ok'.

stop() ->
case 1 =:= 1 of
true ->
ok;
false ->
case wait_for_ready_or_stopped() of
ok ->
case rabbit_boot_state:get() of
Expand All @@ -499,7 +505,7 @@ stop() ->
end;
_ ->
ok
end.
end end.

do_stop() ->
Apps0 = ?APPS ++ rabbit_plugins:active(),
Expand Down
41 changes: 41 additions & 0 deletions deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

-export([init/0,
reset/0,
do_reset_v2/0,
force_reset/0,
force_load_on_next_boot/0,
is_virgin_node/0, is_virgin_node/1,
Expand Down Expand Up @@ -129,12 +130,32 @@ clear_init_finished() ->
%% @doc Resets the database and the node.

reset() ->
case 1 =:= 1 of
true ->
reset_v2();
false ->
reset_v1()
end.

reset_v1() ->
ok = case rabbit_khepri:is_enabled() of
true -> reset_using_khepri();
false -> reset_using_mnesia()
end,
post_reset().

reset_v2() ->
{ok, _BootState} = rabbit_db_cluster:pre_cluster_changes(),
try
do_reset_v2()
after
application:stop(rabbit)
end.

do_reset_v2() ->
?assertEqual(prelaunch_done, rabbit_boot_state:get()),
reset_v1().

reset_using_mnesia() ->
?LOG_INFO(
"DB: resetting node (using Mnesia)",
Expand All @@ -152,12 +173,32 @@ reset_using_khepri() ->
%% @doc Resets the database and the node.

force_reset() ->
case 1 =:= 1 of
true ->
force_reset_v2();
false ->
force_reset_v1()
end.

force_reset_v1() ->
ok = case rabbit_khepri:is_enabled() of
true -> force_reset_using_khepri();
false -> force_reset_using_mnesia()
end,
post_reset().

force_reset_v2() ->
{ok, _BootState} = rabbit_db_cluster:pre_cluster_changes(),
try
do_force_reset_v2()
after
application:stop(rabbit)
end.

do_force_reset_v2() ->
?assertEqual(prelaunch_done, rabbit_boot_state:get()),
force_reset_v1().

force_reset_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node forcefully (using Mnesia)",
Expand Down
100 changes: 100 additions & 0 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
-module(rabbit_db_cluster).

-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").

-include_lib("rabbit_common/include/logging.hrl").

-export([ensure_feature_flags_are_in_sync/2,
pre_cluster_changes/0,
post_cluster_changes/1,
join/2,
forget_member/2]).
-export([change_node_type/1]).
Expand All @@ -39,6 +42,58 @@
%% Cluster formation.
%% -------------------------------------------------------------------

pre_cluster_changes() ->
case rabbit_boot_state:has_reached(prelaunch_done) of
true ->
?LOG_NOTICE(
"DB: prerare for cluster changes; stopping service",
#{domain => ?RMQLOG_DOMAIN_DB}),
BootState = rabbit_boot_state:get(),
case rabbit_boot_state:has_reached(ready) of
true ->
rabbit_boot_state:set(prelaunch_done),
?assertNot(rabbit:is_running()),

%% The maintenance mode stops network listeners, closes
%% all client connections and transfer Ra leaders to other
%% nodes.
ok = rabbit_maintenance:drain();
false ->
?assertNot(rabbit:is_running()),
ok
end,

%% We also need to stop the Feature flags controller to make sure
%% no feature flags are modified while the cluster membership is
%% being worked on.
ok = rabbit_ff_controller:wait_for_task_and_stop(),
{ok, BootState};
false ->
erlang:throw({error, rabbit_not_running})
end.

post_cluster_changes(FormerBootState) ->
?assertEqual(prelaunch_done, rabbit_boot_state:get()),

%% Restart the Feature flags controller and exit from maintenance mode.
ok = rabbit_sup:start_child(rabbit_ff_controller),

case FormerBootState of
ready ->
rabbit_maintenance:revive(),

%% We can now mark the node as ready again.
rabbit_boot_state:set(ready),
?assert(rabbit:is_running());
_ ->
ok
end,

?LOG_NOTICE(
"DB: cluster changes finished; service resumed",
#{domain => ?RMQLOG_DOMAIN_DB}),
ok.

ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) ->
Ret = rabbit_feature_flags:sync_feature_flags_with_cluster(
Nodes, NodeIsVirgin),
Expand Down Expand Up @@ -92,6 +147,14 @@ join(ThisNode, _NodeType) when ThisNode =:= node() ->
{error, cannot_cluster_node_with_itself};
join(RemoteNode, NodeType)
when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) ->
case 1 =:= 1 of
true ->
join_v2(RemoteNode, NodeType);
false ->
join_v1(RemoteNode, NodeType)
end.

join_v1(RemoteNode, NodeType) ->
case can_join(RemoteNode) of
{ok, ClusterNodes} when is_list(ClusterNodes) ->
%% RabbitMQ and Mnesia must be stopped to modify the cluster. In
Expand Down Expand Up @@ -240,6 +303,43 @@ join(RemoteNode, NodeType)
Error
end.

join_v2(RemoteNode, NodeType) ->
case can_join(RemoteNode) of
{ok, ClusterNodes} when is_list(ClusterNodes) ->
{ok, BootState} = pre_cluster_changes(),

rabbit_ff_registry_factory:acquire_state_change_lock(),
try
ok = rabbit_db:do_reset_v2(),
ok = rabbit_node_monitor:notify_left_cluster(node()),
rabbit_feature_flags:copy_feature_states_after_reset(
RemoteNode)
after
rabbit_ff_registry_factory:release_state_change_lock()
end,

?LOG_INFO(
"DB: joining cluster using remote nodes:~n~tp", [ClusterNodes],
#{domain => ?RMQLOG_DOMAIN_DB}),
Ret = case rabbit_khepri:is_enabled(RemoteNode) of
true -> join_using_khepri(ClusterNodes, NodeType);
false -> join_using_mnesia(ClusterNodes, NodeType)
end,

case Ret of
ok ->
ok;
{error, _} ->
%% We reset feature flags states again and make sure the
%% recorded states on disk are deleted.
rabbit_feature_flags:reset()
end,

post_cluster_changes(BootState);
{error, _} = Error ->
Error
end.

join_using_mnesia(ClusterNodes, NodeType) when is_list(ClusterNodes) ->
rabbit_mnesia:join_cluster(ClusterNodes, NodeType).

Expand Down
14 changes: 14 additions & 0 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,14 @@ await_replication() ->
%% @private

reset() ->
case 1 =:= 1 of
false ->
reset_v1();
true ->
reset_v2()
end.

reset_v1() ->
case rabbit:is_running() of
false ->
%% Rabbit should be stopped, but Khepri needs to be running.
Expand All @@ -386,6 +394,12 @@ reset() ->
throw({error, rabbitmq_unexpectedly_running})
end.

reset_v2() ->
ok = khepri_cluster:reset(?RA_CLUSTER_NAME),

_ = file:delete(rabbit_guid:filename()),
ok.

-spec dir() -> Dir when
Dir :: file:filename_all().
%% @doc Returns the Khepri store directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForceResetCommand do

use RabbitMQ.CLI.Core.MergesNoDefaults
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
use RabbitMQ.CLI.Core.RequiresRabbitAppStopped

def run([], %{node: node_name}) do
case :rabbit_misc.rpc_call(node_name, :rabbit_db, :force_reset, []) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ResetCommand do

use RabbitMQ.CLI.Core.MergesNoDefaults
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
use RabbitMQ.CLI.Core.RequiresRabbitAppStopped

def run([], %{node: node_name}) do
case :rabbit_misc.rpc_call(node_name, :rabbit_db, :reset, []) do
Expand Down
12 changes: 7 additions & 5 deletions deps/rabbitmq_prelaunch/src/rabbit_boot_state.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

-type boot_state() :: stopped |
booting |
prelaunch_done |
core_started |
ready |
stopping.
Expand Down Expand Up @@ -66,11 +67,12 @@ wait_for(BootState, Timeout)
wait_for(_, _) ->
{error, timeout}.

boot_state_idx(stopped) -> 0;
boot_state_idx(booting) -> 1;
boot_state_idx(core_started) -> 2;
boot_state_idx(ready) -> 3;
boot_state_idx(stopping) -> 4.
boot_state_idx(stopped) -> 0;
boot_state_idx(booting) -> 1;
boot_state_idx(prelaunch_done) -> 2;
boot_state_idx(core_started) -> 3;
boot_state_idx(ready) -> 4;
boot_state_idx(stopping) -> 5.

is_valid(BootState) ->
is_integer(boot_state_idx(BootState)).
Expand Down
Loading