From ab35acf4d81b1c0d9803b4601422bea0f9c1999f Mon Sep 17 00:00:00 2001 From: Sam Williams Date: Sun, 21 Dec 2025 12:07:18 +0500 Subject: [PATCH 1/5] impr: tidy noisy event groups and improve prometheus management Removes some particularly spammy event groups from prometheus counts, such that excessive logging does not cause issues on busy nodes. Additionally, if the memory of the prometheus logging server exceeds a standardized bound (1 GiB for now), the Erlang process is killed and rebooted by the next caller. This loses the logs therein, but is preferable over causing the server to significantly disfunction. --- src/dev_codec_httpsig.erl | 5 +++-- src/dev_message.erl | 24 +++++++++++++++--------- src/hb_event.erl | 23 ++++++++++++++++++++++- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/dev_codec_httpsig.erl b/src/dev_codec_httpsig.erl index 5a3e0ae46..8e9635a13 100644 --- a/src/dev_codec_httpsig.erl +++ b/src/dev_codec_httpsig.erl @@ -230,7 +230,8 @@ commit(BaseMsg, Req = #{ <<"type">> := <<"hmac-sha256">> }, RawOpts) -> normalize_for_encoding(Msg, UnauthedCommitment, Opts), SigBase = signature_base(EncMsg, EncComm, Opts), HMac = hb_util:human_id(crypto:mac(hmac, sha256, Key, SigBase)), - ?event(httpsig_commit, + ?event( + debug_commitments, {hmac_commit, {type, <<"hmac-sha256">>}, {keyid, KeyID}, @@ -255,7 +256,7 @@ commit(BaseMsg, Req = #{ <<"type">> := <<"hmac-sha256">> }, RawOpts) -> } } }, - ?event({hmac_generation_complete, Res}), + ?event(debug_commitments, {hmac_generation_complete, Res}), Res. %% @doc Annotate the commitment with the `bundle' key if the request contains diff --git a/src/dev_message.erl b/src/dev_message.erl index 5396cd7b7..a87ab4c1d 100644 --- a/src/dev_message.erl +++ b/src/dev_message.erl @@ -99,7 +99,7 @@ id(RawBase, Req, NodeOpts) -> % filtering for the committers specified in the request. #{ <<"commitments">> := Commitments } = with_relevant_commitments(Base, Req, IDOpts), - ?event(debug_commitments, + ?event(debug_id, {generating_ids, {selected_commitments, Commitments}, {req, Req}, @@ -109,7 +109,7 @@ id(RawBase, Req, NodeOpts) -> case hb_maps:keys(Commitments) of [] -> % If there are no commitments, we must (re)calculate the ID. - ?event(debug_id, no_commitments_found_in_id_call), + ?event(ids, regenerating_id), calculate_id(hb_maps:without([<<"commitments">>], Base), Req, IDOpts); IDs -> % Accumulate the relevant IDs into a single value. This is performed @@ -124,7 +124,7 @@ id(RawBase, Req, NodeOpts) -> % accumulation function starts with a buffer of zero encoded as a % 256-bit binary. Subsequently, a single ID on its own 'accumulates' % to itself. - ?event(debug_id, {accumulating_existing_ids, IDs}), + ?event(ids, returning_existing_ids), {ok, hb_util:human_id( hb_crypto:accumulate( @@ -137,13 +137,13 @@ id(RawBase, Req, NodeOpts) -> calculate_id(RawBase, Req, NodeOpts) -> % Find the ID device for the message. Base = hb_message:convert(RawBase, tabm, NodeOpts), - ?event(linkify, {calculate_ids, {base, Base}}), + ?event(debug_id, {calculate_ids, {base, Base}}), IDMod = case id_device(Base, NodeOpts) of {ok, IDDev} -> IDDev; {error, Error} -> throw({id, Error}) end, - ?event(linkify, {generating_id, {idmod, IDMod}, {base, Base}}), + ?event(debug_id, {generating_id, {idmod, IDMod}, {base, Base}}), % Get the device module from the message, or use the default if it is not % set. We can tell if the device is not set (or is the default) by checking % whether the device module is the same as this module. @@ -450,7 +450,7 @@ committed(Self, Req, Opts) -> OnlyCommittedKeys ) end, - ?event({only_committed_keys, CommittedNormalizedKeys}), + ?event(debug_commitments, {only_committed_keys, CommittedNormalizedKeys}), {ok, CommittedNormalizedKeys}. %% @doc Return a message with only the relevant commitments for a given request. @@ -497,14 +497,17 @@ commitment_ids_from_request(Base, Req, Opts) -> FromCommitterAddrs = case ReqCommitters of <<"none">> -> - ?event(no_commitment_ids_for_committers), + ?event(debug_commitments, no_commitment_ids_for_committers), []; <<"all">> -> {ok, Committers} = committers(Base, Req, Opts), ?event(debug_commitments, {commitment_ids_from_committers, Committers}), commitment_ids_from_committers(Committers, Commitments, Opts); RawCommitterAddrs -> - ?event({getting_commitment_ids_for_committers, RawCommitterAddrs}), + ?event( + debug_commitments, + {getting_commitment_ids_for_committers, RawCommitterAddrs} + ), CommitterAddrs = if is_list(RawCommitterAddrs) -> RawCommitterAddrs; true -> [RawCommitterAddrs] @@ -531,7 +534,10 @@ commitment_ids_from_request(Base, Req, Opts) -> ); FinalCommitmentIDs -> FinalCommitmentIDs end, - ?event({commitment_ids_from_request, {base, Base}, {req, Req}, {res, Res}}), + ?event( + debug_commitments, + {commitment_ids_from_request, {base, Base}, {req, Req}, {res, Res}} + ), Res. %% @doc Ensure that the `commitments` submessage of a base message is fully diff --git a/src/hb_event.erl b/src/hb_event.erl index 02a3c9a2e..279f175c2 100644 --- a/src/hb_event.erl +++ b/src/hb_event.erl @@ -7,6 +7,7 @@ -include_lib("eunit/include/eunit.hrl"). -define(OVERLOAD_QUEUE_LENGTH, 10000). +-define(MAX_MEMORY, 1_000_000_000). % 1GB -ifdef(NO_EVENTS). log(_X) -> ok. @@ -66,6 +67,9 @@ should_print(Topic, Opts) -> increment(Topic, Message, Opts) -> increment(Topic, Message, Opts, 1). increment(global, _Message, _Opts, _Count) -> ignored; +increment(linkify, _Message, _Opts, _Count) -> ignored; +increment(debug_id, _Message, _Opts, _Count) -> ignored; +increment(debug_commitments, _Message, _Opts, _Count) -> ignored; increment(ao_core, _Message, _Opts, _Count) -> ignored; increment(ao_internal, _Message, _Opts, _Count) -> ignored; increment(ao_devices, _Message, _Opts, _Count) -> ignored; @@ -184,16 +188,33 @@ handle_events() -> {message_queue_len, Len} when Len > ?OVERLOAD_QUEUE_LENGTH -> % Print a warning, but do so less frequently the more % overloaded the system is. + {memory, MemorySize} = erlang:process_info(self(), memory), case rand:uniform(max(1000, Len - ?OVERLOAD_QUEUE_LENGTH)) of 1 -> ?debug_print( {warning, prometheus_event_queue_overloading, {queue, Len}, - {current_message, EventName} + {current_message, EventName}, + {memory_bytes, MemorySize} } ); _ -> ignored + end, + % If the size of this process is too large, exit such that + % we can be restarted by the next caller. + case MemorySize of + MemorySize when MemorySize > ?MAX_MEMORY -> + ?debug_print( + {error, + prometheus_event_queue_terminating_on_memory_overload, + {queue, Len}, + {memory_bytes, MemorySize}, + {current_message, EventName} + } + ), + exit(memory_overload); + _ -> no_action end; _ -> ignored end, From cb47f606bd933babad4d766224b5ab2de678ab3c Mon Sep 17 00:00:00 2001 From: Sam Williams Date: Sun, 21 Dec 2025 12:51:16 +0500 Subject: [PATCH 2/5] chore: do not log `debug_linkify` --- src/hb_event.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/hb_event.erl b/src/hb_event.erl index 279f175c2..c21f1d900 100644 --- a/src/hb_event.erl +++ b/src/hb_event.erl @@ -68,6 +68,7 @@ increment(Topic, Message, Opts) -> increment(Topic, Message, Opts, 1). increment(global, _Message, _Opts, _Count) -> ignored; increment(linkify, _Message, _Opts, _Count) -> ignored; +increment(debug_linkify, _Message, _Opts, _Count) -> ignored; increment(debug_id, _Message, _Opts, _Count) -> ignored; increment(debug_commitments, _Message, _Opts, _Count) -> ignored; increment(ao_core, _Message, _Opts, _Count) -> ignored; From 788a2b83da17b0e0469d77134148b744613844ea Mon Sep 17 00:00:00 2001 From: Sam Williams Date: Sun, 21 Dec 2025 17:53:55 +0500 Subject: [PATCH 3/5] chore: do not print for every cached result serve --- src/dev_process.erl | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/dev_process.erl b/src/dev_process.erl index c1ef336d8..3bb7e6ab4 100644 --- a/src/dev_process.erl +++ b/src/dev_process.erl @@ -539,19 +539,13 @@ now(RawBase, Req, Opts) -> case LatestKnown of {ok, LatestSlot, RawLatestMsg} -> LatestMsg = without_snapshot(RawLatestMsg, Opts), - ?event(compute_short, + ?event(compute_cache, {serving_latest_cached_state, {proc_id, ProcessID}, {slot, LatestSlot} }, Opts ), - ?event( - {serving_from_cache, - {proc_id, ProcessID}, - {slot, LatestSlot}, - {msg, LatestMsg} - }), dev_process_worker:notify_compute( ProcessID, LatestSlot, From f71e188b6bdae2c4c47f54be7b9f4dc8dd6628a9 Mon Sep 17 00:00:00 2001 From: Sam Williams Date: Sun, 21 Dec 2025 18:14:38 +0500 Subject: [PATCH 4/5] impr: timing and logging improvements for `~process@1.0` --- src/dev_process.erl | 249 +++++++++++++++++++++++++++----------------- 1 file changed, 156 insertions(+), 93 deletions(-) diff --git a/src/dev_process.erl b/src/dev_process.erl index 3bb7e6ab4..b9ac0c395 100644 --- a/src/dev_process.erl +++ b/src/dev_process.erl @@ -269,42 +269,19 @@ compute(Base, Req, Opts) -> %% @doc Continually get and apply the next assignment from the scheduler until %% we reach the target slot that the user has requested. compute_to_slot(ProcID, Base, Req, TargetSlot, Opts) -> - CurrentSlot = hb_ao:get(<<"at-slot">>, Base, Opts#{ hashpath => ignore }), - ?event(compute_short, - {starting_compute, - {proc_id, ProcID}, - {current, CurrentSlot}, - {target, TargetSlot} - } - ), - case CurrentSlot of - CurrentSlot when CurrentSlot > TargetSlot -> - % The cache should already have the result, so we should never end up - % here. Depending on the type of process, 'rewinding' may require - % re-computing from a significantly earlier checkpoint, so for now - % we throw an error. - ?event(compute, {error_already_calculated_slot, {target, TargetSlot}, {current, CurrentSlot}}), - throw( - {error, - {already_calculated_slot, - {target, TargetSlot}, - {current, CurrentSlot} - } - } - ); + case hb_ao:get(<<"at-slot">>, Base, Opts#{ hashpath => ignore }) of CurrentSlot when CurrentSlot == TargetSlot -> % We reached the target height so we force a snapshot and return. - ?event(compute, {reached_target_slot_returning_state, TargetSlot}), - store_result( - true, - ProcID, - TargetSlot, - Base, - Req, + ?event(compute_short, + {reached_target_slot_returning_state, + {proc_id, ProcID}, + {slot, TargetSlot} + }, Opts ), + store_result(true, ProcID, TargetSlot, Base, Req, Opts), {ok, without_snapshot(dev_process_lib:as_process(Base, Opts), Opts)}; - CurrentSlot -> + CurrentSlot when CurrentSlot < TargetSlot -> % Compute the next state transition. NextSlot = CurrentSlot + 1, % Get the next input message from the scheduler device. @@ -312,20 +289,24 @@ compute_to_slot(ProcID, Base, Req, TargetSlot, Opts) -> {error, Res} -> % If the scheduler device cannot provide a next message, % we return its error details, along with the current slot. - ?event(compute, - {error_getting_schedule, - {error, Res}, - {phase, <<"get-schedule">>}, - {attempted_slot, NextSlot} + ?event(compute_short, + {error_getting_assignment, + {proc_id, ProcID}, + {attempted_slot, NextSlot}, + {target_slot, TargetSlot}, + {error, Res} } ), - {error, Res#{ - <<"phase">> => <<"get-schedule">>, - <<"attempted-slot">> => NextSlot - }}; + {error, + Res#{ + <<"phase">> => <<"get-schedule">>, + <<"attempted-slot">> => NextSlot, + <<"process-id">> => ProcID + } + }; {ok, #{ <<"body">> := SlotMsg, <<"state">> := State }} -> % Compute the next single state transition. - case compute_slot(ProcID, State, SlotMsg, Req, Opts) of + case compute_slot(ProcID, State, SlotMsg, Req, TargetSlot, Opts) of {ok, NewState} -> % Continue computing to the target slot. compute_to_slot( @@ -336,77 +317,153 @@ compute_to_slot(ProcID, Base, Req, TargetSlot, Opts) -> Opts ); {error, Error} -> - % If the compute_slot function returns an error, - % we return the error details, along with the current - % slot. - ErrMsg = - if is_map(Error) -> - Error; - true -> #{ <<"error">> => Error } - end, - ?event(compute, - {error_computing_slot, - {error, ErrMsg}, - {phase, <<"compute">>}, - {attempted_slot, NextSlot} - } - ), - {error, - ErrMsg#{ - <<"phase">> => <<"compute">>, - <<"attempted-slot">> => NextSlot - } - } + % Forward error details back to the caller. + {error, Error} end - end + end; + CurrentSlot when CurrentSlot > TargetSlot -> + % The cache should already have the result, so we should never end up + % here. Depending on the type of process, 'rewinding' may require + % re-computing from a significantly earlier checkpoint, so for now + % we throw an error. + ?event( + compute, + {error_already_calculated_slot, + {target, TargetSlot}, + {current, CurrentSlot} + }, + Opts + ), + throw( + {error, + {already_calculated_slot, + {target, TargetSlot}, + {current, CurrentSlot} + } + } + ) end. %% @doc Compute a single slot for a process, given an initialized state. -compute_slot(ProcID, State, RawInputMsg, ReqMsg, Opts) -> - % Ensure that the next slot is the slot that we are expecting, just - % in case there is a scheduler device error. - NextSlot = hb_util:int(hb_ao:get(<<"slot">>, RawInputMsg, Opts)), - % If the input message does not have a path, set it to `compute'. - InputMsg = - case hb_path:from_message(request, RawInputMsg, Opts) of - undefined -> RawInputMsg#{ <<"path">> => <<"compute">> }; - _ -> RawInputMsg - end, - ?event(compute, {input_msg, InputMsg}), - ?event(compute, {executing, {proc_id, ProcID}, {slot, NextSlot}}, Opts), - % Unset the previous results. - UnsetResults = hb_ao:set(State, #{ <<"results">> => unset }, Opts), - Res = dev_process_lib:run_as(<<"execution">>, UnsetResults, InputMsg, Opts), +compute_slot(ProcID, State, RawInputMsg, InitReq, TargetSlot, Opts) -> + {PrepTimeMicroSecs, {ok, Slot, PreparedState, Req}} = + timer:tc( + fun() -> + prepare_next_slot(ProcID, State, RawInputMsg, Opts) + end + ), + ?event( + compute, + {prepared_slot, + {proc_id, ProcID}, + {slot, Slot}, + {prep_time_microsecs, PrepTimeMicroSecs} + }, + Opts + ), + {RuntimeMicroSecs, Res} = + timer:tc( + fun() -> + dev_process_lib:run_as(<<"execution">>, PreparedState, Req, Opts) + end + ), + ?event( + compute, + {computed_slot, + {proc_id, ProcID}, + {slot, Slot}, + {runtime_microsecs, RuntimeMicroSecs} + }, + Opts + ), case Res of {ok, NewProcStateMsg} -> % We have now transformed slot n -> n + 1. Increment the current slot. NewProcStateMsgWithSlot = hb_ao:set( NewProcStateMsg, - #{ <<"device">> => <<"process@1.0">>, <<"at-slot">> => NextSlot }, + #{ <<"device">> => <<"process@1.0">>, <<"at-slot">> => Slot }, Opts ), % Notify any waiters that the result for a slot is now available. dev_process_worker:notify_compute( ProcID, - NextSlot, + Slot, {ok, NewProcStateMsgWithSlot}, Opts ), - ProcStateWithSnapshot = - store_result( - false, - ProcID, - NextSlot, - NewProcStateMsgWithSlot, - ReqMsg, - Opts + {StoreTimeMicroSecs, ProcStateWithSnapshot} = + timer:tc( + fun() -> + store_result( + false, + ProcID, + Slot, + NewProcStateMsgWithSlot, + InitReq, + Opts + ) + end ), + ?event(compute_short, + {computed_slot, + {proc_id, ProcID}, + {slot, Slot}, + {target_slot, TargetSlot}, + {prep_ms, PrepTimeMicroSecs div 1000}, + {execution_ms, RuntimeMicroSecs div 1000}, + {store_ms, StoreTimeMicroSecs div 1000}, + {action, + hb_ao:get( + <<"body/action">>, + Req, + no_action_set, + Opts#{ hashpath => ignore } + ) + } + } + ), {ok, ProcStateWithSnapshot}; {error, Error} -> - {error, Error} + % An error occurred while computing the slot. Return the details. + ErrMsg = + if is_map(Error) -> Error; + true -> #{ <<"error">> => Error } + end, + ?event(compute_short, + {error_computing_slot, + {proc_id, ProcID}, + {attempted_slot, Slot}, + {target_slot, TargetSlot}, + {prep_ms, PrepTimeMicroSecs div 1000}, + {execution_ms, RuntimeMicroSecs div 1000}, + {error, ErrMsg} + } + ), + {error, + ErrMsg#{ + <<"phase">> => <<"compute">>, + <<"attempted-slot">> => Slot + } + } end. +%% @doc Prepare the process state message for computing the next slot. +prepare_next_slot(ProcID, State, RawReq, Opts) -> + Slot = hb_util:int(hb_ao:get(<<"slot">>, RawReq, Opts)), + ?event(compute, {next_slot, Slot}), + % If the input message does not have a path, set it to `compute'. + Req = + case hb_path:from_message(request, RawReq, Opts) of + undefined -> RawReq#{ <<"path">> => <<"compute">> }; + _ -> RawReq + end, + ?event(compute, {input_msg, Req}), + ?event(compute, {executing, {proc_id, ProcID}, {slot, Slot}}, Opts), + % Unset the previous results. + PreparedState = hb_ao:set(State, #{ <<"results">> => unset }, Opts), + {ok, Slot, PreparedState, Req}. + %% @doc Store the resulting state in the cache, potentially with the snapshot %% key. store_result(ForceSnapshot, ProcID, Slot, Res, Req, Opts) -> @@ -469,8 +526,7 @@ store_result(ForceSnapshot, ProcID, Slot, Res, Req, Opts) -> %% `process_snapshot_slots' option. If it is set, we check if the slot is %% a multiple of the interval. If either are true, we must snapshot. should_snapshot(Slot, Res, Opts) -> - should_snapshot_slots(Slot, Opts) - orelse should_snapshot_time(Res, Opts). + should_snapshot_slots(Slot, Opts) orelse should_snapshot_time(Res, Opts). %% @doc Calculate if we should snapshot based on the number of slots. should_snapshot_slots(Slot, Opts) -> @@ -603,7 +659,8 @@ ensure_loaded(Base, Req, Opts) -> {proc_id, ProcID}, {res, LoadRes}, {target, TargetSlot} - } + }, + Opts ), case LoadRes of {ok, MaybeLoadedSlot, MaybeLoadedSnapshotMsg} -> @@ -638,7 +695,13 @@ ensure_loaded(Base, Req, Opts) -> <<"initialized">> => <<"true">> }, LoadedSlot = hb_cache:ensure_all_loaded(MaybeLoadedSlot, Opts), - ?event(compute, {found_state_checkpoint, ProcID, LoadedSnapshotReq}), + ?event(compute, + {found_state_checkpoint, + {proc_id,ProcID, LoadedSnapshotReq}, + {loaded_snapshot_req, LoadedSnapshotReq} + }, + Opts + ), {ok, Normalized} = dev_process_lib:run_as( <<"execution">>, From f427225a1b6541034fd35c28d15b1af5b20a2367 Mon Sep 17 00:00:00 2001 From: Sam Williams Date: Sun, 21 Dec 2025 18:31:58 +0500 Subject: [PATCH 5/5] feat: add `top` --- rebar.config | 1 + src/hb.erl | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 03ca19b31..80c1cd9ac 100644 --- a/rebar.config +++ b/rebar.config @@ -3,6 +3,7 @@ {profiles, [ {no_events, [{erl_opts, [{d, 'NO_EVENTS', true}]}]}, + {top, [{deps, [observer_cli]}, {erl_opts, [{d, 'AO_TOP', true}]}]}, {store_events, [{erl_opts, [{d, 'STORE_EVENTS', true}]}]}, {ao_profiling, [{erl_opts, [{d, 'AO_PROFILING', true}]}]}, {eflame, diff --git a/src/hb.erl b/src/hb.erl index 5fedadeac..3b42ac733 100644 --- a/src/hb.erl +++ b/src/hb.erl @@ -82,7 +82,7 @@ %%% modules of the hyperbeam node. -module(hb). %%% Configuration and environment: --export([init/0, now/0, build/0, deploy_scripts/0]). +-export([init/0, top/0, now/0, build/0, deploy_scripts/0]). %%% Base start configurations: -export([start_simple_pay/0, start_simple_pay/1, start_simple_pay/2]). -export([topup/3, topup/4]). @@ -102,6 +102,14 @@ init() -> ?event({old_system_stack_depth, Old}), ok. +-ifdef(AO_TOP). +%% @doc Start a monitoring interface for the node. Presently this is offered +%% with the `observer_cli' module atop `Recon'. +top() -> observer_cli:start(). +-else. +top() -> not_available. +-endif. + %% @doc Start a mainnet server without payments. start_mainnet() -> start_mainnet(hb_opts:get(port)).