diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index e0c3f839deb2..8e37fc9d82f3 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -369,6 +369,8 @@ run_prelaunch_second_phase() -> %% 3. Logging. ok = rabbit_prelaunch_logging:setup(Context), + rabbit_boot_state:set(prelaunch_done), + %% The clustering steps requires Khepri to be started to check for %% consistency. This is the opposite compared to Mnesia which must be %% stopped. That's why we setup Khepri and the coordination Ra system it @@ -481,6 +483,10 @@ stop_boot_marker(Marker) -> -spec stop() -> 'ok'. stop() -> + case 1 =:= 1 of + true -> + ok; + false -> case wait_for_ready_or_stopped() of ok -> case rabbit_boot_state:get() of @@ -499,7 +505,7 @@ stop() -> end; _ -> ok - end. + end end. do_stop() -> Apps0 = ?APPS ++ rabbit_plugins:active(), diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index a84ff93b9d97..df904719c755 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -17,6 +17,7 @@ -export([init/0, reset/0, + do_reset_v2/0, force_reset/0, force_load_on_next_boot/0, is_virgin_node/0, is_virgin_node/1, @@ -129,12 +130,32 @@ clear_init_finished() -> %% @doc Resets the database and the node. reset() -> + case 1 =:= 1 of + true -> + reset_v2(); + false -> + reset_v1() + end. + +reset_v1() -> ok = case rabbit_khepri:is_enabled() of true -> reset_using_khepri(); false -> reset_using_mnesia() end, post_reset(). +reset_v2() -> + {ok, _BootState} = rabbit_db_cluster:pre_cluster_changes(), + try + do_reset_v2() + after + application:stop(rabbit) + end. + +do_reset_v2() -> + ?assertEqual(prelaunch_done, rabbit_boot_state:get()), + reset_v1(). + reset_using_mnesia() -> ?LOG_INFO( "DB: resetting node (using Mnesia)", @@ -152,12 +173,32 @@ reset_using_khepri() -> %% @doc Resets the database and the node. force_reset() -> + case 1 =:= 1 of + true -> + force_reset_v2(); + false -> + force_reset_v1() + end. + +force_reset_v1() -> ok = case rabbit_khepri:is_enabled() of true -> force_reset_using_khepri(); false -> force_reset_using_mnesia() end, post_reset(). +force_reset_v2() -> + {ok, _BootState} = rabbit_db_cluster:pre_cluster_changes(), + try + do_force_reset_v2() + after + application:stop(rabbit) + end. + +do_force_reset_v2() -> + ?assertEqual(prelaunch_done, rabbit_boot_state:get()), + force_reset_v1(). + force_reset_using_mnesia() -> ?LOG_DEBUG( "DB: resetting node forcefully (using Mnesia)", diff --git a/deps/rabbit/src/rabbit_db_cluster.erl b/deps/rabbit/src/rabbit_db_cluster.erl index 1fd720e527fe..8639da99a956 100644 --- a/deps/rabbit/src/rabbit_db_cluster.erl +++ b/deps/rabbit/src/rabbit_db_cluster.erl @@ -9,10 +9,13 @@ -module(rabbit_db_cluster). -include_lib("kernel/include/logger.hrl"). +-include_lib("stdlib/include/assert.hrl"). -include_lib("rabbit_common/include/logging.hrl"). -export([ensure_feature_flags_are_in_sync/2, + pre_cluster_changes/0, + post_cluster_changes/1, join/2, forget_member/2]). -export([change_node_type/1]). @@ -39,6 +42,58 @@ %% Cluster formation. %% ------------------------------------------------------------------- +pre_cluster_changes() -> + case rabbit_boot_state:has_reached(prelaunch_done) of + true -> + ?LOG_NOTICE( + "DB: prerare for cluster changes; stopping service", + #{domain => ?RMQLOG_DOMAIN_DB}), + BootState = rabbit_boot_state:get(), + case rabbit_boot_state:has_reached(ready) of + true -> + rabbit_boot_state:set(prelaunch_done), + ?assertNot(rabbit:is_running()), + + %% The maintenance mode stops network listeners, closes + %% all client connections and transfer Ra leaders to other + %% nodes. + ok = rabbit_maintenance:drain(); + false -> + ?assertNot(rabbit:is_running()), + ok + end, + + %% We also need to stop the Feature flags controller to make sure + %% no feature flags are modified while the cluster membership is + %% being worked on. + ok = rabbit_ff_controller:wait_for_task_and_stop(), + {ok, BootState}; + false -> + erlang:throw({error, rabbit_not_running}) + end. + +post_cluster_changes(FormerBootState) -> + ?assertEqual(prelaunch_done, rabbit_boot_state:get()), + + %% Restart the Feature flags controller and exit from maintenance mode. + ok = rabbit_sup:start_child(rabbit_ff_controller), + + case FormerBootState of + ready -> + rabbit_maintenance:revive(), + + %% We can now mark the node as ready again. + rabbit_boot_state:set(ready), + ?assert(rabbit:is_running()); + _ -> + ok + end, + + ?LOG_NOTICE( + "DB: cluster changes finished; service resumed", + #{domain => ?RMQLOG_DOMAIN_DB}), + ok. + ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) -> Ret = rabbit_feature_flags:sync_feature_flags_with_cluster( Nodes, NodeIsVirgin), @@ -92,6 +147,14 @@ join(ThisNode, _NodeType) when ThisNode =:= node() -> {error, cannot_cluster_node_with_itself}; join(RemoteNode, NodeType) when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) -> + case 1 =:= 1 of + true -> + join_v2(RemoteNode, NodeType); + false -> + join_v1(RemoteNode, NodeType) + end. + +join_v1(RemoteNode, NodeType) -> case can_join(RemoteNode) of {ok, ClusterNodes} when is_list(ClusterNodes) -> %% RabbitMQ and Mnesia must be stopped to modify the cluster. In @@ -240,6 +303,43 @@ join(RemoteNode, NodeType) Error end. +join_v2(RemoteNode, NodeType) -> + case can_join(RemoteNode) of + {ok, ClusterNodes} when is_list(ClusterNodes) -> + {ok, BootState} = pre_cluster_changes(), + + rabbit_ff_registry_factory:acquire_state_change_lock(), + try + ok = rabbit_db:do_reset_v2(), + ok = rabbit_node_monitor:notify_left_cluster(node()), + rabbit_feature_flags:copy_feature_states_after_reset( + RemoteNode) + after + rabbit_ff_registry_factory:release_state_change_lock() + end, + + ?LOG_INFO( + "DB: joining cluster using remote nodes:~n~tp", [ClusterNodes], + #{domain => ?RMQLOG_DOMAIN_DB}), + Ret = case rabbit_khepri:is_enabled(RemoteNode) of + true -> join_using_khepri(ClusterNodes, NodeType); + false -> join_using_mnesia(ClusterNodes, NodeType) + end, + + case Ret of + ok -> + ok; + {error, _} -> + %% We reset feature flags states again and make sure the + %% recorded states on disk are deleted. + rabbit_feature_flags:reset() + end, + + post_cluster_changes(BootState); + {error, _} = Error -> + Error + end. + join_using_mnesia(ClusterNodes, NodeType) when is_list(ClusterNodes) -> rabbit_mnesia:join_cluster(ClusterNodes, NodeType). diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 6b7f0a6a898e..1180bfe5fc25 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -372,6 +372,14 @@ await_replication() -> %% @private reset() -> + case 1 =:= 1 of + false -> + reset_v1(); + true -> + reset_v2() + end. + +reset_v1() -> case rabbit:is_running() of false -> %% Rabbit should be stopped, but Khepri needs to be running. @@ -386,6 +394,12 @@ reset() -> throw({error, rabbitmq_unexpectedly_running}) end. +reset_v2() -> + ok = khepri_cluster:reset(?RA_CLUSTER_NAME), + + _ = file:delete(rabbit_guid:filename()), + ok. + -spec dir() -> Dir when Dir :: file:filename_all(). %% @doc Returns the Khepri store directory. diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex index 3b0a8ec36953..a0da0deb671a 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_reset_command.ex @@ -11,7 +11,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForceResetCommand do use RabbitMQ.CLI.Core.MergesNoDefaults use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments - use RabbitMQ.CLI.Core.RequiresRabbitAppStopped def run([], %{node: node_name}) do case :rabbit_misc.rpc_call(node_name, :rabbit_db, :force_reset, []) do diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex index 92a6765ef2d5..0598feccb72e 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex @@ -11,7 +11,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ResetCommand do use RabbitMQ.CLI.Core.MergesNoDefaults use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments - use RabbitMQ.CLI.Core.RequiresRabbitAppStopped def run([], %{node: node_name}) do case :rabbit_misc.rpc_call(node_name, :rabbit_db, :reset, []) do diff --git a/deps/rabbitmq_prelaunch/src/rabbit_boot_state.erl b/deps/rabbitmq_prelaunch/src/rabbit_boot_state.erl index 649e0403a425..040086e09ee5 100644 --- a/deps/rabbitmq_prelaunch/src/rabbit_boot_state.erl +++ b/deps/rabbitmq_prelaunch/src/rabbit_boot_state.erl @@ -26,6 +26,7 @@ -type boot_state() :: stopped | booting | + prelaunch_done | core_started | ready | stopping. @@ -66,11 +67,12 @@ wait_for(BootState, Timeout) wait_for(_, _) -> {error, timeout}. -boot_state_idx(stopped) -> 0; -boot_state_idx(booting) -> 1; -boot_state_idx(core_started) -> 2; -boot_state_idx(ready) -> 3; -boot_state_idx(stopping) -> 4. +boot_state_idx(stopped) -> 0; +boot_state_idx(booting) -> 1; +boot_state_idx(prelaunch_done) -> 2; +boot_state_idx(core_started) -> 3; +boot_state_idx(ready) -> 4; +boot_state_idx(stopping) -> 5. is_valid(BootState) -> is_integer(boot_state_idx(BootState)).