diff --git a/rebar.config b/rebar.config index 03ca19b31..80c1cd9ac 100644 --- a/rebar.config +++ b/rebar.config @@ -3,6 +3,7 @@ {profiles, [ {no_events, [{erl_opts, [{d, 'NO_EVENTS', true}]}]}, + {top, [{deps, [observer_cli]}, {erl_opts, [{d, 'AO_TOP', true}]}]}, {store_events, [{erl_opts, [{d, 'STORE_EVENTS', true}]}]}, {ao_profiling, [{erl_opts, [{d, 'AO_PROFILING', true}]}]}, {eflame, diff --git a/src/dev_codec_httpsig.erl b/src/dev_codec_httpsig.erl index 5a3e0ae46..8e9635a13 100644 --- a/src/dev_codec_httpsig.erl +++ b/src/dev_codec_httpsig.erl @@ -230,7 +230,8 @@ commit(BaseMsg, Req = #{ <<"type">> := <<"hmac-sha256">> }, RawOpts) -> normalize_for_encoding(Msg, UnauthedCommitment, Opts), SigBase = signature_base(EncMsg, EncComm, Opts), HMac = hb_util:human_id(crypto:mac(hmac, sha256, Key, SigBase)), - ?event(httpsig_commit, + ?event( + debug_commitments, {hmac_commit, {type, <<"hmac-sha256">>}, {keyid, KeyID}, @@ -255,7 +256,7 @@ commit(BaseMsg, Req = #{ <<"type">> := <<"hmac-sha256">> }, RawOpts) -> } } }, - ?event({hmac_generation_complete, Res}), + ?event(debug_commitments, {hmac_generation_complete, Res}), Res. %% @doc Annotate the commitment with the `bundle' key if the request contains diff --git a/src/dev_message.erl b/src/dev_message.erl index 5396cd7b7..a87ab4c1d 100644 --- a/src/dev_message.erl +++ b/src/dev_message.erl @@ -99,7 +99,7 @@ id(RawBase, Req, NodeOpts) -> % filtering for the committers specified in the request. #{ <<"commitments">> := Commitments } = with_relevant_commitments(Base, Req, IDOpts), - ?event(debug_commitments, + ?event(debug_id, {generating_ids, {selected_commitments, Commitments}, {req, Req}, @@ -109,7 +109,7 @@ id(RawBase, Req, NodeOpts) -> case hb_maps:keys(Commitments) of [] -> % If there are no commitments, we must (re)calculate the ID. - ?event(debug_id, no_commitments_found_in_id_call), + ?event(ids, regenerating_id), calculate_id(hb_maps:without([<<"commitments">>], Base), Req, IDOpts); IDs -> % Accumulate the relevant IDs into a single value. This is performed @@ -124,7 +124,7 @@ id(RawBase, Req, NodeOpts) -> % accumulation function starts with a buffer of zero encoded as a % 256-bit binary. Subsequently, a single ID on its own 'accumulates' % to itself. - ?event(debug_id, {accumulating_existing_ids, IDs}), + ?event(ids, returning_existing_ids), {ok, hb_util:human_id( hb_crypto:accumulate( @@ -137,13 +137,13 @@ id(RawBase, Req, NodeOpts) -> calculate_id(RawBase, Req, NodeOpts) -> % Find the ID device for the message. Base = hb_message:convert(RawBase, tabm, NodeOpts), - ?event(linkify, {calculate_ids, {base, Base}}), + ?event(debug_id, {calculate_ids, {base, Base}}), IDMod = case id_device(Base, NodeOpts) of {ok, IDDev} -> IDDev; {error, Error} -> throw({id, Error}) end, - ?event(linkify, {generating_id, {idmod, IDMod}, {base, Base}}), + ?event(debug_id, {generating_id, {idmod, IDMod}, {base, Base}}), % Get the device module from the message, or use the default if it is not % set. We can tell if the device is not set (or is the default) by checking % whether the device module is the same as this module. @@ -450,7 +450,7 @@ committed(Self, Req, Opts) -> OnlyCommittedKeys ) end, - ?event({only_committed_keys, CommittedNormalizedKeys}), + ?event(debug_commitments, {only_committed_keys, CommittedNormalizedKeys}), {ok, CommittedNormalizedKeys}. %% @doc Return a message with only the relevant commitments for a given request. @@ -497,14 +497,17 @@ commitment_ids_from_request(Base, Req, Opts) -> FromCommitterAddrs = case ReqCommitters of <<"none">> -> - ?event(no_commitment_ids_for_committers), + ?event(debug_commitments, no_commitment_ids_for_committers), []; <<"all">> -> {ok, Committers} = committers(Base, Req, Opts), ?event(debug_commitments, {commitment_ids_from_committers, Committers}), commitment_ids_from_committers(Committers, Commitments, Opts); RawCommitterAddrs -> - ?event({getting_commitment_ids_for_committers, RawCommitterAddrs}), + ?event( + debug_commitments, + {getting_commitment_ids_for_committers, RawCommitterAddrs} + ), CommitterAddrs = if is_list(RawCommitterAddrs) -> RawCommitterAddrs; true -> [RawCommitterAddrs] @@ -531,7 +534,10 @@ commitment_ids_from_request(Base, Req, Opts) -> ); FinalCommitmentIDs -> FinalCommitmentIDs end, - ?event({commitment_ids_from_request, {base, Base}, {req, Req}, {res, Res}}), + ?event( + debug_commitments, + {commitment_ids_from_request, {base, Base}, {req, Req}, {res, Res}} + ), Res. %% @doc Ensure that the `commitments` submessage of a base message is fully diff --git a/src/dev_process.erl b/src/dev_process.erl index c1ef336d8..b9ac0c395 100644 --- a/src/dev_process.erl +++ b/src/dev_process.erl @@ -269,42 +269,19 @@ compute(Base, Req, Opts) -> %% @doc Continually get and apply the next assignment from the scheduler until %% we reach the target slot that the user has requested. compute_to_slot(ProcID, Base, Req, TargetSlot, Opts) -> - CurrentSlot = hb_ao:get(<<"at-slot">>, Base, Opts#{ hashpath => ignore }), - ?event(compute_short, - {starting_compute, - {proc_id, ProcID}, - {current, CurrentSlot}, - {target, TargetSlot} - } - ), - case CurrentSlot of - CurrentSlot when CurrentSlot > TargetSlot -> - % The cache should already have the result, so we should never end up - % here. Depending on the type of process, 'rewinding' may require - % re-computing from a significantly earlier checkpoint, so for now - % we throw an error. - ?event(compute, {error_already_calculated_slot, {target, TargetSlot}, {current, CurrentSlot}}), - throw( - {error, - {already_calculated_slot, - {target, TargetSlot}, - {current, CurrentSlot} - } - } - ); + case hb_ao:get(<<"at-slot">>, Base, Opts#{ hashpath => ignore }) of CurrentSlot when CurrentSlot == TargetSlot -> % We reached the target height so we force a snapshot and return. - ?event(compute, {reached_target_slot_returning_state, TargetSlot}), - store_result( - true, - ProcID, - TargetSlot, - Base, - Req, + ?event(compute_short, + {reached_target_slot_returning_state, + {proc_id, ProcID}, + {slot, TargetSlot} + }, Opts ), + store_result(true, ProcID, TargetSlot, Base, Req, Opts), {ok, without_snapshot(dev_process_lib:as_process(Base, Opts), Opts)}; - CurrentSlot -> + CurrentSlot when CurrentSlot < TargetSlot -> % Compute the next state transition. NextSlot = CurrentSlot + 1, % Get the next input message from the scheduler device. @@ -312,20 +289,24 @@ compute_to_slot(ProcID, Base, Req, TargetSlot, Opts) -> {error, Res} -> % If the scheduler device cannot provide a next message, % we return its error details, along with the current slot. - ?event(compute, - {error_getting_schedule, - {error, Res}, - {phase, <<"get-schedule">>}, - {attempted_slot, NextSlot} + ?event(compute_short, + {error_getting_assignment, + {proc_id, ProcID}, + {attempted_slot, NextSlot}, + {target_slot, TargetSlot}, + {error, Res} } ), - {error, Res#{ - <<"phase">> => <<"get-schedule">>, - <<"attempted-slot">> => NextSlot - }}; + {error, + Res#{ + <<"phase">> => <<"get-schedule">>, + <<"attempted-slot">> => NextSlot, + <<"process-id">> => ProcID + } + }; {ok, #{ <<"body">> := SlotMsg, <<"state">> := State }} -> % Compute the next single state transition. - case compute_slot(ProcID, State, SlotMsg, Req, Opts) of + case compute_slot(ProcID, State, SlotMsg, Req, TargetSlot, Opts) of {ok, NewState} -> % Continue computing to the target slot. compute_to_slot( @@ -336,77 +317,153 @@ compute_to_slot(ProcID, Base, Req, TargetSlot, Opts) -> Opts ); {error, Error} -> - % If the compute_slot function returns an error, - % we return the error details, along with the current - % slot. - ErrMsg = - if is_map(Error) -> - Error; - true -> #{ <<"error">> => Error } - end, - ?event(compute, - {error_computing_slot, - {error, ErrMsg}, - {phase, <<"compute">>}, - {attempted_slot, NextSlot} - } - ), - {error, - ErrMsg#{ - <<"phase">> => <<"compute">>, - <<"attempted-slot">> => NextSlot - } - } + % Forward error details back to the caller. + {error, Error} end - end + end; + CurrentSlot when CurrentSlot > TargetSlot -> + % The cache should already have the result, so we should never end up + % here. Depending on the type of process, 'rewinding' may require + % re-computing from a significantly earlier checkpoint, so for now + % we throw an error. + ?event( + compute, + {error_already_calculated_slot, + {target, TargetSlot}, + {current, CurrentSlot} + }, + Opts + ), + throw( + {error, + {already_calculated_slot, + {target, TargetSlot}, + {current, CurrentSlot} + } + } + ) end. %% @doc Compute a single slot for a process, given an initialized state. -compute_slot(ProcID, State, RawInputMsg, ReqMsg, Opts) -> - % Ensure that the next slot is the slot that we are expecting, just - % in case there is a scheduler device error. - NextSlot = hb_util:int(hb_ao:get(<<"slot">>, RawInputMsg, Opts)), - % If the input message does not have a path, set it to `compute'. - InputMsg = - case hb_path:from_message(request, RawInputMsg, Opts) of - undefined -> RawInputMsg#{ <<"path">> => <<"compute">> }; - _ -> RawInputMsg - end, - ?event(compute, {input_msg, InputMsg}), - ?event(compute, {executing, {proc_id, ProcID}, {slot, NextSlot}}, Opts), - % Unset the previous results. - UnsetResults = hb_ao:set(State, #{ <<"results">> => unset }, Opts), - Res = dev_process_lib:run_as(<<"execution">>, UnsetResults, InputMsg, Opts), +compute_slot(ProcID, State, RawInputMsg, InitReq, TargetSlot, Opts) -> + {PrepTimeMicroSecs, {ok, Slot, PreparedState, Req}} = + timer:tc( + fun() -> + prepare_next_slot(ProcID, State, RawInputMsg, Opts) + end + ), + ?event( + compute, + {prepared_slot, + {proc_id, ProcID}, + {slot, Slot}, + {prep_time_microsecs, PrepTimeMicroSecs} + }, + Opts + ), + {RuntimeMicroSecs, Res} = + timer:tc( + fun() -> + dev_process_lib:run_as(<<"execution">>, PreparedState, Req, Opts) + end + ), + ?event( + compute, + {computed_slot, + {proc_id, ProcID}, + {slot, Slot}, + {runtime_microsecs, RuntimeMicroSecs} + }, + Opts + ), case Res of {ok, NewProcStateMsg} -> % We have now transformed slot n -> n + 1. Increment the current slot. NewProcStateMsgWithSlot = hb_ao:set( NewProcStateMsg, - #{ <<"device">> => <<"process@1.0">>, <<"at-slot">> => NextSlot }, + #{ <<"device">> => <<"process@1.0">>, <<"at-slot">> => Slot }, Opts ), % Notify any waiters that the result for a slot is now available. dev_process_worker:notify_compute( ProcID, - NextSlot, + Slot, {ok, NewProcStateMsgWithSlot}, Opts ), - ProcStateWithSnapshot = - store_result( - false, - ProcID, - NextSlot, - NewProcStateMsgWithSlot, - ReqMsg, - Opts + {StoreTimeMicroSecs, ProcStateWithSnapshot} = + timer:tc( + fun() -> + store_result( + false, + ProcID, + Slot, + NewProcStateMsgWithSlot, + InitReq, + Opts + ) + end ), + ?event(compute_short, + {computed_slot, + {proc_id, ProcID}, + {slot, Slot}, + {target_slot, TargetSlot}, + {prep_ms, PrepTimeMicroSecs div 1000}, + {execution_ms, RuntimeMicroSecs div 1000}, + {store_ms, StoreTimeMicroSecs div 1000}, + {action, + hb_ao:get( + <<"body/action">>, + Req, + no_action_set, + Opts#{ hashpath => ignore } + ) + } + } + ), {ok, ProcStateWithSnapshot}; {error, Error} -> - {error, Error} + % An error occurred while computing the slot. Return the details. + ErrMsg = + if is_map(Error) -> Error; + true -> #{ <<"error">> => Error } + end, + ?event(compute_short, + {error_computing_slot, + {proc_id, ProcID}, + {attempted_slot, Slot}, + {target_slot, TargetSlot}, + {prep_ms, PrepTimeMicroSecs div 1000}, + {execution_ms, RuntimeMicroSecs div 1000}, + {error, ErrMsg} + } + ), + {error, + ErrMsg#{ + <<"phase">> => <<"compute">>, + <<"attempted-slot">> => Slot + } + } end. +%% @doc Prepare the process state message for computing the next slot. +prepare_next_slot(ProcID, State, RawReq, Opts) -> + Slot = hb_util:int(hb_ao:get(<<"slot">>, RawReq, Opts)), + ?event(compute, {next_slot, Slot}), + % If the input message does not have a path, set it to `compute'. + Req = + case hb_path:from_message(request, RawReq, Opts) of + undefined -> RawReq#{ <<"path">> => <<"compute">> }; + _ -> RawReq + end, + ?event(compute, {input_msg, Req}), + ?event(compute, {executing, {proc_id, ProcID}, {slot, Slot}}, Opts), + % Unset the previous results. + PreparedState = hb_ao:set(State, #{ <<"results">> => unset }, Opts), + {ok, Slot, PreparedState, Req}. + %% @doc Store the resulting state in the cache, potentially with the snapshot %% key. store_result(ForceSnapshot, ProcID, Slot, Res, Req, Opts) -> @@ -469,8 +526,7 @@ store_result(ForceSnapshot, ProcID, Slot, Res, Req, Opts) -> %% `process_snapshot_slots' option. If it is set, we check if the slot is %% a multiple of the interval. If either are true, we must snapshot. should_snapshot(Slot, Res, Opts) -> - should_snapshot_slots(Slot, Opts) - orelse should_snapshot_time(Res, Opts). + should_snapshot_slots(Slot, Opts) orelse should_snapshot_time(Res, Opts). %% @doc Calculate if we should snapshot based on the number of slots. should_snapshot_slots(Slot, Opts) -> @@ -539,19 +595,13 @@ now(RawBase, Req, Opts) -> case LatestKnown of {ok, LatestSlot, RawLatestMsg} -> LatestMsg = without_snapshot(RawLatestMsg, Opts), - ?event(compute_short, + ?event(compute_cache, {serving_latest_cached_state, {proc_id, ProcessID}, {slot, LatestSlot} }, Opts ), - ?event( - {serving_from_cache, - {proc_id, ProcessID}, - {slot, LatestSlot}, - {msg, LatestMsg} - }), dev_process_worker:notify_compute( ProcessID, LatestSlot, @@ -609,7 +659,8 @@ ensure_loaded(Base, Req, Opts) -> {proc_id, ProcID}, {res, LoadRes}, {target, TargetSlot} - } + }, + Opts ), case LoadRes of {ok, MaybeLoadedSlot, MaybeLoadedSnapshotMsg} -> @@ -644,7 +695,13 @@ ensure_loaded(Base, Req, Opts) -> <<"initialized">> => <<"true">> }, LoadedSlot = hb_cache:ensure_all_loaded(MaybeLoadedSlot, Opts), - ?event(compute, {found_state_checkpoint, ProcID, LoadedSnapshotReq}), + ?event(compute, + {found_state_checkpoint, + {proc_id,ProcID, LoadedSnapshotReq}, + {loaded_snapshot_req, LoadedSnapshotReq} + }, + Opts + ), {ok, Normalized} = dev_process_lib:run_as( <<"execution">>, diff --git a/src/hb.erl b/src/hb.erl index 5fedadeac..3b42ac733 100644 --- a/src/hb.erl +++ b/src/hb.erl @@ -82,7 +82,7 @@ %%% modules of the hyperbeam node. -module(hb). %%% Configuration and environment: --export([init/0, now/0, build/0, deploy_scripts/0]). +-export([init/0, top/0, now/0, build/0, deploy_scripts/0]). %%% Base start configurations: -export([start_simple_pay/0, start_simple_pay/1, start_simple_pay/2]). -export([topup/3, topup/4]). @@ -102,6 +102,14 @@ init() -> ?event({old_system_stack_depth, Old}), ok. +-ifdef(AO_TOP). +%% @doc Start a monitoring interface for the node. Presently this is offered +%% with the `observer_cli' module atop `Recon'. +top() -> observer_cli:start(). +-else. +top() -> not_available. +-endif. + %% @doc Start a mainnet server without payments. start_mainnet() -> start_mainnet(hb_opts:get(port)). diff --git a/src/hb_event.erl b/src/hb_event.erl index 02a3c9a2e..c21f1d900 100644 --- a/src/hb_event.erl +++ b/src/hb_event.erl @@ -7,6 +7,7 @@ -include_lib("eunit/include/eunit.hrl"). -define(OVERLOAD_QUEUE_LENGTH, 10000). +-define(MAX_MEMORY, 1_000_000_000). % 1GB -ifdef(NO_EVENTS). log(_X) -> ok. @@ -66,6 +67,10 @@ should_print(Topic, Opts) -> increment(Topic, Message, Opts) -> increment(Topic, Message, Opts, 1). increment(global, _Message, _Opts, _Count) -> ignored; +increment(linkify, _Message, _Opts, _Count) -> ignored; +increment(debug_linkify, _Message, _Opts, _Count) -> ignored; +increment(debug_id, _Message, _Opts, _Count) -> ignored; +increment(debug_commitments, _Message, _Opts, _Count) -> ignored; increment(ao_core, _Message, _Opts, _Count) -> ignored; increment(ao_internal, _Message, _Opts, _Count) -> ignored; increment(ao_devices, _Message, _Opts, _Count) -> ignored; @@ -184,16 +189,33 @@ handle_events() -> {message_queue_len, Len} when Len > ?OVERLOAD_QUEUE_LENGTH -> % Print a warning, but do so less frequently the more % overloaded the system is. + {memory, MemorySize} = erlang:process_info(self(), memory), case rand:uniform(max(1000, Len - ?OVERLOAD_QUEUE_LENGTH)) of 1 -> ?debug_print( {warning, prometheus_event_queue_overloading, {queue, Len}, - {current_message, EventName} + {current_message, EventName}, + {memory_bytes, MemorySize} } ); _ -> ignored + end, + % If the size of this process is too large, exit such that + % we can be restarted by the next caller. + case MemorySize of + MemorySize when MemorySize > ?MAX_MEMORY -> + ?debug_print( + {error, + prometheus_event_queue_terminating_on_memory_overload, + {queue, Len}, + {memory_bytes, MemorySize}, + {current_message, EventName} + } + ), + exit(memory_overload); + _ -> no_action end; _ -> ignored end,