diff --git a/Cargo.lock b/Cargo.lock index aebb5828..d1557557 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1576,6 +1576,7 @@ dependencies = [ "reth-cli-util", "reth-optimism-cli", "reth-optimism-node", + "reth-optimism-payload-builder", ] [[package]] @@ -1597,17 +1598,22 @@ dependencies = [ "base-flashtypes", "base-reth-flashblocks", "base-reth-test-utils", + "chrono", "eyre", "futures-util", "httpmock", + "indexmap 2.12.1", "jsonrpsee", "jsonrpsee-types", "metrics", "metrics-derive", "op-alloy-consensus", + "op-alloy-flz", "op-alloy-network", "op-alloy-rpc-types", + "parking_lot", "rand 0.9.2", + "rdkafka", "reth", "reth-db", "reth-db-common", @@ -1616,6 +1622,7 @@ dependencies = [ "reth-optimism-chainspec", "reth-optimism-evm", "reth-optimism-node", + "reth-optimism-payload-builder", "reth-optimism-primitives", "reth-primitives-traits", "reth-provider", @@ -1637,6 +1644,8 @@ dependencies = [ name = "base-reth-runner" version = "0.2.1" dependencies = [ + "alloy-primitives", + "base-flashtypes", "base-reth-flashblocks", "base-reth-rpc", "base-tracex", @@ -1644,11 +1653,15 @@ dependencies = [ "eyre", "futures-util", "once_cell", + "parking_lot", + "rdkafka", "reth", "reth-db", "reth-exex", "reth-optimism-chainspec", "reth-optimism-node", + "reth-optimism-payload-builder", + "tokio", "tracing", "url", ] @@ -5911,6 +5924,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-src" +version = "300.5.4+3.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507b3792995dae9b0df8a1c1e3771e8418b7c2d9f0baeba32e6fe8b06c7cb72" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.111" @@ -5919,6 +5941,7 @@ checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -6758,6 +6781,37 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b52c81ac3cac39c9639b95c20452076e74b8d9a71bc6fc4d83407af2ea6fff" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.9.0+2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5230dca48bc354d718269f3e4353280e188b610f7af7e2fcf54b7a79d5802872" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "openssl-sys", + "pkg-config", +] + [[package]] name = "recvmsg" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 6a860dca..53fe7deb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ reth-e2e-test-utils = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9 reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-primitives-traits = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-optimism-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", features = [ "op", @@ -119,6 +120,7 @@ op-alloy-rpc-types = "0.22.0" op-alloy-consensus = "0.22.0" op-alloy-rpc-jsonrpsee = "0.22.0" op-alloy-rpc-types-engine = "0.22.0" +op-alloy-flz = "0.13.1" # tokio tokio = "1.48.0" @@ -158,3 +160,6 @@ derive_more = "2.1.0" serde_json = "1.0.145" metrics-derive = "0.1.0" tracing-subscriber = "0.3.22" +parking_lot = "0.12.3" +indexmap = "2.7.0" +rdkafka = { version = "0.37.0", default-features = false, features = ["tokio", "ssl-vendored", "libz-static"] } diff --git a/bin/node/Cargo.toml b/bin/node/Cargo.toml index 2782995f..bc760def 100644 --- a/bin/node/Cargo.toml +++ b/bin/node/Cargo.toml @@ -21,6 +21,7 @@ base-reth-runner.workspace = true reth-optimism-node.workspace = true reth-optimism-cli.workspace = true reth-cli-util.workspace = true +reth-optimism-payload-builder.workspace = true # misc clap.workspace = true diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index 8feed7aa..d8909246 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -2,12 +2,16 @@ use std::sync::Arc; -use base_reth_runner::{BaseNodeConfig, FlashblocksCell, FlashblocksConfig, TracingConfig}; +use base_reth_runner::{ + BaseNodeConfig, FlashblocksCell, FlashblocksConfig, KafkaConfig, MeteringConfig, + ResourceLimitsConfig, TracingConfig, +}; use once_cell::sync::OnceCell; use reth_optimism_node::args::RollupArgs; +use reth_optimism_payload_builder::config::OpDAConfig; /// CLI Arguments -#[derive(Debug, Clone, PartialEq, Eq, clap::Args)] +#[derive(Debug, Clone, PartialEq, clap::Args)] #[command(next_help_heading = "Rollup")] pub struct Args { /// Rollup arguments @@ -40,6 +44,50 @@ pub struct Args { /// Enable metering RPC for transaction bundle simulation #[arg(long = "enable-metering", value_name = "ENABLE_METERING")] pub enable_metering: bool, + + // --- Priority fee estimation args --- + /// Path to Kafka properties file (required for priority fee estimation). + /// The properties file should contain rdkafka settings like bootstrap.servers, + /// group.id, session.timeout.ms, etc. + #[arg(long = "metering-kafka-properties-file")] + pub metering_kafka_properties_file: Option, + + /// Kafka topic for accepted bundle events + #[arg(long = "metering-kafka-topic", default_value = "tips-ingress")] + pub metering_kafka_topic: String, + + /// Kafka consumer group ID (overrides group.id in properties file if set) + #[arg(long = "metering-kafka-group-id")] + pub metering_kafka_group_id: Option, + + /// Gas limit per flashblock for priority fee estimation + #[arg(long = "metering-gas-limit", default_value = "30000000")] + pub metering_gas_limit: u64, + + /// Execution time budget in microseconds per flashblock + #[arg(long = "metering-execution-time-us", default_value = "50000")] + pub metering_execution_time_us: u64, + + /// State root time budget in microseconds (optional, disabled by default) + #[arg(long = "metering-state-root-time-us")] + pub metering_state_root_time_us: Option, + + /// Data availability bytes limit per flashblock (default). + /// This value is used when `miner_setMaxDASize` has not been called. + #[arg(long = "metering-da-bytes", default_value = "120000")] + pub metering_da_bytes: u64, + + /// Percentile for recommended priority fee (0.0-1.0) + #[arg(long = "metering-priority-fee-percentile", default_value = "0.5")] + pub metering_priority_fee_percentile: f64, + + /// Default priority fee when resource is not congested (in wei) + #[arg(long = "metering-uncongested-priority-fee", default_value = "1")] + pub metering_uncongested_priority_fee: u128, + + /// Number of recent blocks to retain in metering cache + #[arg(long = "metering-cache-size", default_value = "12")] + pub metering_cache_size: usize, } impl Args { @@ -58,6 +106,31 @@ impl From for BaseNodeConfig { max_pending_blocks_depth: args.max_pending_blocks_depth, }); + // Build Kafka config if properties file is provided + let kafka = args.metering_kafka_properties_file.map(|properties_file| KafkaConfig { + properties_file, + topic: args.metering_kafka_topic, + group_id_override: args.metering_kafka_group_id, + }); + + let metering = MeteringConfig { + enabled: args.enable_metering, + kafka, + resource_limits: ResourceLimitsConfig { + gas_limit: args.metering_gas_limit, + execution_time_us: args.metering_execution_time_us, + state_root_time_us: args.metering_state_root_time_us, + da_bytes: args.metering_da_bytes, + }, + priority_fee_percentile: args.metering_priority_fee_percentile, + uncongested_priority_fee: args.metering_uncongested_priority_fee, + cache_size: args.metering_cache_size, + }; + + // Create shared DA config. This is shared between the payload builder and the + // priority fee estimator, allowing miner_setMaxDASize to affect both. + let da_config = OpDAConfig::default(); + Self { rollup_args: args.rollup_args, flashblocks, @@ -66,7 +139,9 @@ impl From for BaseNodeConfig { logs_enabled: args.enable_transaction_tracing_logs, }, metering_enabled: args.enable_metering, + metering, flashblocks_cell, + da_config, } } } diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 390fdf1a..21172426 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -51,8 +51,22 @@ jsonrpsee-types.workspace = true tracing.workspace = true eyre.workspace = true serde.workspace = true +serde_json.workspace = true metrics.workspace = true metrics-derive.workspace = true +parking_lot.workspace = true +indexmap.workspace = true + +# priority fee estimation +reth-optimism-payload-builder.workspace = true + +# Kafka consumer +rdkafka.workspace = true +chrono.workspace = true + +# DA calculation +op-alloy-flz.workspace = true +op-alloy-consensus.workspace = true [dev-dependencies] base-flashtypes.workspace = true diff --git a/crates/rpc/src/base/annotator.rs b/crates/rpc/src/base/annotator.rs new file mode 100644 index 00000000..d61a9fa5 --- /dev/null +++ b/crates/rpc/src/base/annotator.rs @@ -0,0 +1,288 @@ +//! Resource annotator that correlates Kafka metering data with flashblock inclusions. + +use std::{fmt, sync::Arc}; + +use alloy_primitives::TxHash; +use parking_lot::RwLock; +use tokio::sync::mpsc::UnboundedReceiver; +use tracing::{debug, info, warn}; + +use crate::{MeteredTransaction, MeteringCache}; + +/// Message received from the flashblocks websocket feed indicating which +/// transactions were included in a specific flashblock. +#[derive(Debug)] +pub struct FlashblockInclusion { + /// Block number. + pub block_number: u64, + /// Flashblock index within the block. + pub flashblock_index: u64, + /// Tx hashes included in this flashblock. + pub ordered_tx_hashes: Vec, +} + +/// Maximum number of pending transactions before oldest entries are evicted. +const MAX_PENDING_TRANSACTIONS: usize = 10_000; + +/// Annotates flashblock transactions with their resource usage. +/// +/// The flow is: +/// 1. Kafka sends `MeteredTransaction` with resource usage data keyed by tx hash +/// 2. These are stored in a pending lookup table +/// 3. Websocket sends `FlashblockInclusion` with actual (block, flashblock) location +/// 4. We look up pending transactions and insert them into the cache at the real location +pub struct ResourceAnnotator { + cache: Arc>, + tx_updates_rx: UnboundedReceiver, + flashblock_rx: UnboundedReceiver, + /// Pending metering data awaiting flashblock inclusion confirmation. + /// Uses IndexMap to maintain insertion order for FIFO eviction. + pending_transactions: indexmap::IndexMap, +} + +impl fmt::Debug for ResourceAnnotator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ResourceAnnotator") + .field("pending_transactions", &self.pending_transactions.len()) + .finish_non_exhaustive() + } +} + +impl ResourceAnnotator { + /// Creates a new resource annotator. + pub fn new( + cache: Arc>, + tx_updates_rx: UnboundedReceiver, + flashblock_rx: UnboundedReceiver, + ) -> Self { + Self { + cache, + tx_updates_rx, + flashblock_rx, + pending_transactions: indexmap::IndexMap::new(), + } + } + + /// Runs the annotator until both channels are closed. + pub async fn run(mut self) { + info!(target: "metering::annotator", "Starting ResourceAnnotator"); + loop { + tokio::select! { + Some(tx_event) = self.tx_updates_rx.recv() => { + self.handle_tx_event(tx_event); + } + Some(flashblock_event) = self.flashblock_rx.recv() => { + self.handle_flashblock_event(flashblock_event); + } + else => { + info!(target: "metering::annotator", "ResourceAnnotator terminating"); + break; + } + } + } + } + + fn handle_tx_event(&mut self, tx: MeteredTransaction) { + debug!( + tx_hash = %tx.tx_hash, + gas_used = tx.gas_used, + "Storing metered transaction in pending map" + ); + self.pending_transactions.insert(tx.tx_hash, tx); + + // Evict oldest entries if we exceed the limit. + while self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS { + if let Some((evicted_hash, _)) = self.pending_transactions.shift_remove_index(0) { + info!( + tx_hash = %evicted_hash, + "Evicting old transaction from pending map (limit exceeded)" + ); + metrics::counter!("metering.pending.evicted").increment(1); + } + } + + metrics::gauge!("metering.pending.size").set(self.pending_transactions.len() as f64); + } + + fn handle_flashblock_event(&mut self, event: FlashblockInclusion) { + // Reorg detection: flashblock_index=0 for existing block indicates reorg + if event.flashblock_index == 0 && self.cache.read().contains_block(event.block_number) { + let cleared = self.cache.write().clear_blocks_from(event.block_number); + + warn!( + target: "metering::annotator", + block_number = event.block_number, + blocks_cleared = cleared, + "Reorg detected: cleared cache from block" + ); + metrics::counter!("metering.cache.reorgs_detected").increment(1); + } + + let mut matched = 0usize; + let mut missed = 0usize; + + { + let mut cache = self.cache.write(); + for tx_hash in &event.ordered_tx_hashes { + if let Some(tx) = self.pending_transactions.shift_remove(tx_hash) { + cache.push_transaction(event.block_number, event.flashblock_index, tx); + matched += 1; + } else { + missed += 1; + } + } + } + + if matched > 0 { + debug!( + block_number = event.block_number, + flashblock_index = event.flashblock_index, + matched, + "Inserted transactions into cache from flashblock" + ); + } + + // All transactions should come through as bundles. Any misses indicate + // the Kafka event hasn't arrived yet or was lost. + if missed > 0 { + warn!( + block_number = event.block_number, + flashblock_index = event.flashblock_index, + matched, + missed, + "Flashblock contained transactions not found in pending map" + ); + metrics::counter!("metering.streams.tx_misses_total").increment(missed as u64); + } + + metrics::gauge!("metering.pending.size").set(self.pending_transactions.len() as f64); + } +} + +#[cfg(test)] +mod tests { + use alloy_primitives::{B256, U256}; + use tokio::sync::mpsc; + + use super::*; + + fn test_tx(hash: u64, priority: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&hash.to_be_bytes()); + MeteredTransaction { + tx_hash: B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority), + gas_used: 10, + execution_time_us: 5, + state_root_time_us: 7, + data_availability_bytes: 20, + } + } + + fn test_flashblock( + block_number: u64, + flashblock_index: u64, + hashes: Vec, + ) -> FlashblockInclusion { + FlashblockInclusion { + block_number, + flashblock_index, + ordered_tx_hashes: hashes + .into_iter() + .map(|h| { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&h.to_be_bytes()); + B256::new(hash_bytes) + }) + .collect(), + } + } + + #[tokio::test] + async fn reorg_clears_affected_blocks() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (fb_sender, fb_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache.clone(), tx_rx, fb_rx); + + // Pre-populate cache with blocks 100, 101, 102 + { + let mut c = cache.write(); + c.push_transaction(100, 0, test_tx(1, 10)); + c.push_transaction(101, 0, test_tx(2, 20)); + c.push_transaction(102, 0, test_tx(3, 30)); + } + + assert!(cache.read().contains_block(100)); + assert!(cache.read().contains_block(101)); + assert!(cache.read().contains_block(102)); + + // Send flashblock_index=0 for existing block 101 (simulates reorg) + let event = test_flashblock(101, 0, vec![]); + annotator.handle_flashblock_event(event); + + // Blocks 101 and 102 should be cleared, block 100 should remain + assert!(cache.read().contains_block(100)); + assert!(!cache.read().contains_block(101)); + assert!(!cache.read().contains_block(102)); + + drop(tx_sender); + drop(fb_sender); + } + + #[tokio::test] + async fn non_zero_flashblock_does_not_trigger_reorg() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (fb_sender, fb_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache.clone(), tx_rx, fb_rx); + + // Pre-populate cache with block 100 + { + let mut c = cache.write(); + c.push_transaction(100, 0, test_tx(1, 10)); + } + + assert!(cache.read().contains_block(100)); + + // Send flashblock_index=1 for existing block 100 (not a reorg signal) + let event = test_flashblock(100, 1, vec![]); + annotator.handle_flashblock_event(event); + + // Block 100 should still exist + assert!(cache.read().contains_block(100)); + + drop(tx_sender); + drop(fb_sender); + } + + #[tokio::test] + async fn flashblock_zero_for_new_block_does_not_trigger_reorg() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (fb_sender, fb_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache.clone(), tx_rx, fb_rx); + + // Pre-populate cache with block 100 + { + let mut c = cache.write(); + c.push_transaction(100, 0, test_tx(1, 10)); + } + + assert!(cache.read().contains_block(100)); + assert!(!cache.read().contains_block(101)); + + // Send flashblock_index=0 for NEW block 101 (not a reorg, just a new block) + let event = test_flashblock(101, 0, vec![]); + annotator.handle_flashblock_event(event); + + // Block 100 should still exist (no reorg happened) + assert!(cache.read().contains_block(100)); + + drop(tx_sender); + drop(fb_sender); + } +} diff --git a/crates/rpc/src/base/cache.rs b/crates/rpc/src/base/cache.rs new file mode 100644 index 00000000..59aae123 --- /dev/null +++ b/crates/rpc/src/base/cache.rs @@ -0,0 +1,403 @@ +//! In-memory cache for metering data used by the priority fee estimator. +//! +//! Transactions are stored in sequencer order (highest priority fee first) as received +//! from flashblock events. + +use std::collections::{BTreeMap, HashMap, VecDeque}; + +use alloy_primitives::{B256, U256}; + +/// A metered transaction with resource consumption data. +#[derive(Debug, Clone)] +pub struct MeteredTransaction { + /// Transaction hash. + pub tx_hash: B256, + /// Priority fee per gas for ordering. + pub priority_fee_per_gas: U256, + /// Gas consumed. + pub gas_used: u64, + /// Execution time in microseconds. + pub execution_time_us: u128, + /// State root computation time in microseconds. + pub state_root_time_us: u128, + /// Data availability bytes. + pub data_availability_bytes: u64, +} + +impl MeteredTransaction { + /// Creates a zeroed transaction (placeholder with no resource usage). + pub const fn zeroed(tx_hash: B256) -> Self { + Self { + tx_hash, + priority_fee_per_gas: U256::ZERO, + gas_used: 0, + execution_time_us: 0, + state_root_time_us: 0, + data_availability_bytes: 0, + } + } +} + +/// Aggregated resource totals. +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceTotals { + /// Total gas used. + pub gas_used: u64, + /// Total execution time in microseconds. + pub execution_time_us: u128, + /// Total state root time in microseconds. + pub state_root_time_us: u128, + /// Total data availability bytes. + pub data_availability_bytes: u64, +} + +impl ResourceTotals { + const fn accumulate(&mut self, tx: &MeteredTransaction) { + self.gas_used = self.gas_used.saturating_add(tx.gas_used); + self.execution_time_us = self.execution_time_us.saturating_add(tx.execution_time_us); + self.state_root_time_us = self.state_root_time_us.saturating_add(tx.state_root_time_us); + self.data_availability_bytes = + self.data_availability_bytes.saturating_add(tx.data_availability_bytes); + } +} + +/// Metrics for a single flashblock within a block. +/// +/// Transactions are stored in sequencer order (highest priority fee first). +#[derive(Debug)] +pub struct FlashblockMetrics { + /// Block number. + pub block_number: u64, + /// Flashblock index within the block. + pub flashblock_index: u64, + /// Transactions in sequencer order. + transactions: Vec, + totals: ResourceTotals, +} + +impl FlashblockMetrics { + /// Creates a new flashblock metrics container. + pub fn new(block_number: u64, flashblock_index: u64) -> Self { + Self { + block_number, + flashblock_index, + transactions: Vec::new(), + totals: ResourceTotals::default(), + } + } + + /// Appends a transaction, preserving sequencer order. + pub fn push_transaction(&mut self, tx: MeteredTransaction) { + self.totals.accumulate(&tx); + self.transactions.push(tx); + } + + /// Returns the resource totals for this flashblock. + pub const fn totals(&self) -> ResourceTotals { + self.totals + } + + /// Returns transactions in sequencer order. + pub fn transactions(&self) -> &[MeteredTransaction] { + &self.transactions + } + + /// Returns the number of transactions. + pub const fn len(&self) -> usize { + self.transactions.len() + } + + /// Returns true if empty. + pub const fn is_empty(&self) -> bool { + self.transactions.is_empty() + } +} + +/// Aggregated metrics for a block, including per-flashblock breakdown. +#[derive(Debug)] +pub struct BlockMetrics { + /// Block number. + pub block_number: u64, + flashblocks: BTreeMap, + totals: ResourceTotals, +} + +impl BlockMetrics { + /// Creates a new block metrics container. + pub fn new(block_number: u64) -> Self { + Self { block_number, flashblocks: BTreeMap::new(), totals: ResourceTotals::default() } + } + + /// Returns the number of flashblocks. + pub fn flashblock_count(&self) -> usize { + self.flashblocks.len() + } + + /// Iterates over all flashblocks. + pub fn flashblocks(&self) -> impl Iterator { + self.flashblocks.values() + } + + /// Returns a mutable reference to the flashblock, creating it if necessary. + /// Returns `(flashblock, is_new)`. + pub fn flashblock_mut(&mut self, flashblock_index: u64) -> (&mut FlashblockMetrics, bool) { + let is_new = !self.flashblocks.contains_key(&flashblock_index); + let entry = self + .flashblocks + .entry(flashblock_index) + .or_insert_with(|| FlashblockMetrics::new(self.block_number, flashblock_index)); + (entry, is_new) + } + + /// Returns the resource totals for this block. + pub const fn totals(&self) -> ResourceTotals { + self.totals + } + + fn recompute_totals(&mut self) { + self.totals = ResourceTotals::default(); + for flashblock in self.flashblocks.values() { + let totals = flashblock.totals(); + self.totals.gas_used = self.totals.gas_used.saturating_add(totals.gas_used); + self.totals.execution_time_us = + self.totals.execution_time_us.saturating_add(totals.execution_time_us); + self.totals.state_root_time_us = + self.totals.state_root_time_us.saturating_add(totals.state_root_time_us); + self.totals.data_availability_bytes = + self.totals.data_availability_bytes.saturating_add(totals.data_availability_bytes); + } + } +} + +/// In-memory cache maintaining metering data for the most recent blocks. +#[derive(Debug)] +pub struct MeteringCache { + max_blocks: usize, + blocks: VecDeque, + block_index: HashMap, +} + +impl MeteringCache { + /// Creates a new cache retaining at most `max_blocks` recent blocks. + pub fn new(max_blocks: usize) -> Self { + Self { max_blocks, blocks: VecDeque::new(), block_index: HashMap::new() } + } + + /// Returns the maximum number of blocks retained. + pub const fn max_blocks(&self) -> usize { + self.max_blocks + } + + /// Returns the block metrics for the given block number. + pub fn block(&self, block_number: u64) -> Option<&BlockMetrics> { + self.block_index.get(&block_number).and_then(|&idx| self.blocks.get(idx)) + } + + /// Returns a mutable reference to the block, creating it if necessary. + pub fn block_mut(&mut self, block_number: u64) -> &mut BlockMetrics { + if let Some(&idx) = self.block_index.get(&block_number) { + return self.blocks.get_mut(idx).expect("block index out of bounds"); + } + + let block = BlockMetrics::new(block_number); + self.blocks.push_back(block); + let idx = self.blocks.len() - 1; + self.block_index.insert(block_number, idx); + + self.evict_if_needed(); + self.blocks.get_mut(*self.block_index.get(&block_number).unwrap()).unwrap() + } + + /// Appends a transaction to the cache, preserving sequencer order. + pub fn push_transaction( + &mut self, + block_number: u64, + flashblock_index: u64, + tx: MeteredTransaction, + ) { + let block = self.block_mut(block_number); + let (flashblock, _) = block.flashblock_mut(flashblock_index); + flashblock.push_transaction(tx); + block.recompute_totals(); + } + + /// Returns the number of cached blocks. + pub fn len(&self) -> usize { + self.blocks.len() + } + + /// Returns true if the cache is empty. + pub fn is_empty(&self) -> bool { + self.blocks.is_empty() + } + + /// Iterates over blocks in descending order (most recent first). + pub fn blocks_desc(&self) -> impl Iterator { + self.blocks.iter().rev() + } + + /// Returns true if the specified block_number exists in the cache. + pub fn contains_block(&self, block_number: u64) -> bool { + self.block_index.contains_key(&block_number) + } + + /// Clears all blocks with block_number >= the specified value. + /// Returns the number of blocks cleared. + pub fn clear_blocks_from(&mut self, block_number: u64) -> usize { + let mut cleared = 0; + + // Remove from back to front (blocks stored oldest first) + while let Some(block) = self.blocks.back() { + if block.block_number >= block_number { + let removed = self.blocks.pop_back().unwrap(); + self.block_index.remove(&removed.block_number); + cleared += 1; + } else { + break; + } + } + + cleared + } + + fn evict_if_needed(&mut self) { + let mut evicted = false; + while self.blocks.len() > self.max_blocks { + if let Some(oldest) = self.blocks.pop_front() { + self.block_index.remove(&oldest.block_number); + evicted = true; + } + } + // Rebuild index once after all evictions to maintain correctness. + if evicted { + self.rebuild_index(); + } + } + + fn rebuild_index(&mut self) { + self.block_index.clear(); + for (idx, block) in self.blocks.iter().enumerate() { + self.block_index.insert(block.block_number, idx); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_tx(hash: u64, priority: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&hash.to_be_bytes()); + MeteredTransaction { + tx_hash: B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority), + gas_used: 10, + execution_time_us: 5, + state_root_time_us: 7, + data_availability_bytes: 20, + } + } + + #[test] + fn insert_and_retrieve_transactions() { + let mut cache = MeteringCache::new(12); + let tx1 = test_tx(1, 2); + cache.push_transaction(100, 0, tx1.clone()); + + let block = cache.block(100).unwrap(); + let flashblock = block.flashblocks().next().unwrap(); + assert_eq!(flashblock.len(), 1); + assert_eq!(flashblock.transactions()[0].tx_hash, tx1.tx_hash); + } + + #[test] + fn transactions_preserve_sequencer_order() { + let mut cache = MeteringCache::new(12); + // Insert in sequencer order (highest priority first) + cache.push_transaction(100, 0, test_tx(1, 30)); + cache.push_transaction(100, 0, test_tx(2, 20)); + cache.push_transaction(100, 0, test_tx(3, 10)); + + let block = cache.block(100).unwrap(); + let flashblock = block.flashblocks().next().unwrap(); + let fees: Vec<_> = + flashblock.transactions().iter().map(|tx| tx.priority_fee_per_gas).collect(); + // Order should be preserved as inserted + assert_eq!(fees, vec![U256::from(30u64), U256::from(20u64), U256::from(10u64)]); + } + + #[test] + fn evicts_old_blocks() { + let mut cache = MeteringCache::new(2); + for block_number in 0..3u64 { + cache.push_transaction(block_number, 0, test_tx(block_number, block_number)); + } + assert!(cache.block(0).is_none()); + assert!(cache.block(1).is_some()); + assert!(cache.block(2).is_some()); + } + + #[test] + fn contains_block_returns_correct_values() { + let mut cache = MeteringCache::new(10); + cache.push_transaction(100, 0, test_tx(1, 10)); + cache.push_transaction(101, 0, test_tx(2, 20)); + + assert!(cache.contains_block(100)); + assert!(cache.contains_block(101)); + assert!(!cache.contains_block(99)); + assert!(!cache.contains_block(102)); + } + + #[test] + fn clear_blocks_from_clears_subsequent_blocks() { + let mut cache = MeteringCache::new(10); + cache.push_transaction(100, 0, test_tx(1, 10)); + cache.push_transaction(101, 0, test_tx(2, 20)); + cache.push_transaction(102, 0, test_tx(3, 30)); + + let cleared = cache.clear_blocks_from(101); + + assert_eq!(cleared, 2); + assert!(cache.contains_block(100)); + assert!(!cache.contains_block(101)); + assert!(!cache.contains_block(102)); + assert_eq!(cache.len(), 1); + } + + #[test] + fn clear_blocks_from_returns_zero_when_no_match() { + let mut cache = MeteringCache::new(10); + cache.push_transaction(100, 0, test_tx(1, 10)); + cache.push_transaction(101, 0, test_tx(2, 20)); + + let cleared = cache.clear_blocks_from(200); + + assert_eq!(cleared, 0); + assert_eq!(cache.len(), 2); + } + + #[test] + fn clear_blocks_from_clears_all_blocks() { + let mut cache = MeteringCache::new(10); + cache.push_transaction(100, 0, test_tx(1, 10)); + cache.push_transaction(101, 0, test_tx(2, 20)); + cache.push_transaction(102, 0, test_tx(3, 30)); + + let cleared = cache.clear_blocks_from(100); + + assert_eq!(cleared, 3); + assert!(cache.is_empty()); + } + + #[test] + fn clear_blocks_from_handles_empty_cache() { + let mut cache = MeteringCache::new(10); + + let cleared = cache.clear_blocks_from(100); + + assert_eq!(cleared, 0); + assert!(cache.is_empty()); + } +} diff --git a/crates/rpc/src/base/estimator.rs b/crates/rpc/src/base/estimator.rs new file mode 100644 index 00000000..e6e29677 --- /dev/null +++ b/crates/rpc/src/base/estimator.rs @@ -0,0 +1,902 @@ +//! Priority fee estimation based on resource consumption in flashblocks. + +use std::sync::Arc; + +use alloy_primitives::U256; +use parking_lot::RwLock; +use reth_optimism_payload_builder::config::OpDAConfig; + +use crate::base::cache::{MeteredTransaction, MeteringCache}; + +/// Errors that can occur during priority fee estimation. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EstimateError { + /// The bundle's resource demand exceeds the configured capacity limit. + DemandExceedsCapacity { + /// The resource that exceeded capacity. + resource: ResourceKind, + /// The requested demand. + demand: u128, + /// The configured limit. + limit: u128, + }, +} + +impl std::fmt::Display for EstimateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::DemandExceedsCapacity { resource, demand, limit } => { + write!( + f, + "bundle {} demand ({}) exceeds capacity limit ({})", + resource.as_name(), + demand, + limit + ) + } + } + } +} + +impl std::error::Error for EstimateError {} + +/// Configured capacity limits for each resource type. +/// +/// These values define the maximum capacity available per flashblock (or per block +/// for "use-it-or-lose-it" resources). The estimator uses these limits to determine +/// when resources are congested. +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceLimits { + /// Gas limit per flashblock. + pub gas_used: Option, + /// Execution time budget in microseconds. + pub execution_time_us: Option, + /// State root computation time budget in microseconds. + pub state_root_time_us: Option, + /// Data availability bytes limit per flashblock. + pub data_availability_bytes: Option, +} + +impl ResourceLimits { + /// Returns the limit for the given resource kind. + fn limit_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::GasUsed => self.gas_used.map(|v| v as u128), + ResourceKind::ExecutionTime => self.execution_time_us, + ResourceKind::StateRootTime => self.state_root_time_us, + ResourceKind::DataAvailability => self.data_availability_bytes.map(|v| v as u128), + } + } +} + +/// Resources that influence flashblock inclusion ordering. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ResourceKind { + /// Gas consumption. + GasUsed, + /// Execution time. + ExecutionTime, + /// State root computation time. + StateRootTime, + /// Data availability bytes. + DataAvailability, +} + +impl ResourceKind { + /// Returns all resource kinds in a fixed order. + pub const fn all() -> [Self; 4] { + [Self::GasUsed, Self::ExecutionTime, Self::StateRootTime, Self::DataAvailability] + } + + /// Returns `true` if this resource is "use-it-or-lose-it", meaning capacity + /// that isn't consumed in one flashblock cannot be reclaimed in later ones. + /// + /// Execution time is the canonical example: the block builder has a fixed + /// time budget per block, and unused time in flashblock 0 doesn't roll over + /// to flashblock 1. For these resources, the estimator aggregates usage + /// across all flashblocks rather than evaluating each flashblock in isolation. + /// + /// Other resources like gas and DA bytes are bounded per-block but are + /// evaluated per-flashblock since their limits apply independently. + const fn use_it_or_lose_it(self) -> bool { + matches!(self, Self::ExecutionTime) + } + + /// Returns a human-readable name for the resource kind. + pub const fn as_name(&self) -> &'static str { + match self { + Self::GasUsed => "gas", + Self::ExecutionTime => "execution time", + Self::StateRootTime => "state root time", + Self::DataAvailability => "data availability", + } + } + + /// Returns a camelCase name for JSON serialization. + pub const fn as_camel_case(&self) -> &'static str { + match self { + Self::GasUsed => "gasUsed", + Self::ExecutionTime => "executionTime", + Self::StateRootTime => "stateRootTime", + Self::DataAvailability => "dataAvailability", + } + } +} + +/// Amount of resources required by the bundle being priced. +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceDemand { + /// Gas demand. + pub gas_used: Option, + /// Execution time demand in microseconds. + pub execution_time_us: Option, + /// State root time demand in microseconds. + pub state_root_time_us: Option, + /// Data availability bytes demand. + pub data_availability_bytes: Option, +} + +impl ResourceDemand { + fn demand_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::GasUsed => self.gas_used.map(|v| v as u128), + ResourceKind::ExecutionTime => self.execution_time_us, + ResourceKind::StateRootTime => self.state_root_time_us, + ResourceKind::DataAvailability => self.data_availability_bytes.map(|v| v as u128), + } + } +} + +/// Fee estimate for a single resource type. +/// +/// The estimation algorithm answers: "What priority fee would my bundle need to pay +/// to displace enough lower-paying transactions to free up the resources I need?" +#[derive(Debug, Clone)] +pub struct ResourceEstimate { + /// Minimum fee to displace enough capacity for the bundle's resource demand. + pub threshold_priority_fee: U256, + /// Recommended fee based on a percentile of transactions above the threshold. + /// Provides a safety margin over the bare minimum. + pub recommended_priority_fee: U256, + /// Total resource usage of transactions at or above the threshold fee. + pub cumulative_usage: u128, + /// Number of transactions at or above `threshold_priority_fee`. These higher-paying + /// transactions remain included alongside the bundle; lower-paying ones are displaced. + pub threshold_tx_count: usize, + /// Total transactions considered in the estimate. + pub total_transactions: usize, +} + +/// Per-resource fee estimates. +/// +/// Each field corresponds to a resource type. `None` indicates the resource +/// was not requested or could not be estimated (e.g., demand exceeds capacity). +#[derive(Debug, Clone, Default)] +pub struct ResourceEstimates { + /// Gas usage estimate. + pub gas_used: Option, + /// Execution time estimate. + pub execution_time: Option, + /// State root time estimate. + pub state_root_time: Option, + /// Data availability estimate. + pub data_availability: Option, +} + +impl ResourceEstimates { + /// Returns the estimate for the given resource kind. + pub const fn get(&self, kind: ResourceKind) -> Option<&ResourceEstimate> { + match kind { + ResourceKind::GasUsed => self.gas_used.as_ref(), + ResourceKind::ExecutionTime => self.execution_time.as_ref(), + ResourceKind::StateRootTime => self.state_root_time.as_ref(), + ResourceKind::DataAvailability => self.data_availability.as_ref(), + } + } + + /// Sets the estimate for the given resource kind. + pub const fn set(&mut self, kind: ResourceKind, estimate: ResourceEstimate) { + match kind { + ResourceKind::GasUsed => self.gas_used = Some(estimate), + ResourceKind::ExecutionTime => self.execution_time = Some(estimate), + ResourceKind::StateRootTime => self.state_root_time = Some(estimate), + ResourceKind::DataAvailability => self.data_availability = Some(estimate), + } + } + + /// Iterates over all present estimates with their resource kind. + pub fn iter(&self) -> impl Iterator { + [ + (ResourceKind::GasUsed, &self.gas_used), + (ResourceKind::ExecutionTime, &self.execution_time), + (ResourceKind::StateRootTime, &self.state_root_time), + (ResourceKind::DataAvailability, &self.data_availability), + ] + .into_iter() + .filter_map(|(kind, opt)| opt.as_ref().map(|est| (kind, est))) + } + + /// Returns true if no estimates are present. + pub fn is_empty(&self) -> bool { + self.iter().next().is_none() + } +} + +/// Estimates for a specific flashblock index. +#[derive(Debug, Clone)] +pub struct FlashblockResourceEstimates { + /// Flashblock index. + pub flashblock_index: u64, + /// Per-resource estimates. + pub estimates: ResourceEstimates, +} + +/// Aggregated estimates for a block. +#[derive(Debug, Clone)] +pub struct BlockPriorityEstimates { + /// Block number. + pub block_number: u64, + /// Per-flashblock estimates. + pub flashblocks: Vec, + /// Minimum recommended fee across all flashblocks (easiest inclusion). + pub min_across_flashblocks: ResourceEstimates, + /// Maximum recommended fee across all flashblocks (most competitive). + pub max_across_flashblocks: ResourceEstimates, +} + +/// Priority fee estimate aggregated across multiple recent blocks. +#[derive(Debug, Clone)] +pub struct RollingPriorityEstimate { + /// Number of blocks that contributed to this estimate. + pub blocks_sampled: usize, + /// Per-resource estimates (median across sampled blocks). + pub estimates: ResourceEstimates, + /// Recommended priority fee: maximum across all resources. + pub priority_fee: U256, +} + +/// Computes resource fee estimates based on cached flashblock metering data. +#[derive(Debug)] +pub struct PriorityFeeEstimator { + cache: Arc>, + percentile: f64, + limits: ResourceLimits, + default_priority_fee: U256, + /// Optional shared DA config from the miner RPC. When set, the estimator uses + /// `max_da_block_size` from this config instead of `limits.data_availability_bytes`. + /// This allows dynamic updates via `miner_setMaxDASize`. + da_config: Option, +} + +impl PriorityFeeEstimator { + /// Creates a new estimator referencing the shared metering cache. + /// + /// # Parameters + /// - `cache`: Shared cache containing recent flashblock metering data. + /// - `percentile`: Point in the fee distribution (among transactions above threshold) + /// to use for the recommended fee. + /// - `limits`: Configured resource capacity limits. + /// - `default_priority_fee`: Fee to return when a resource is not congested. + /// - `da_config`: Optional shared DA config for dynamic DA limit updates. + pub const fn new( + cache: Arc>, + percentile: f64, + limits: ResourceLimits, + default_priority_fee: U256, + da_config: Option, + ) -> Self { + Self { cache, percentile, limits, default_priority_fee, da_config } + } + + /// Returns the current DA block size limit, preferring the dynamic `OpDAConfig` value + /// if available, otherwise falling back to the static limit. + pub fn max_da_block_size(&self) -> Option { + self.da_config + .as_ref() + .and_then(|c| c.max_da_block_size()) + .or(self.limits.data_availability_bytes) + } + + /// Returns the limit for the given resource kind, using dynamic config where available. + fn limit_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::DataAvailability => self.max_da_block_size().map(|v| v as u128), + _ => self.limits.limit_for(resource), + } + } + + /// Returns fee estimates for the provided block. If `block_number` is `None` + /// the most recent block in the cache is used. + /// + /// Returns `Ok(None)` if the cache is empty, the requested block is not cached, + /// or no transactions exist in the cached flashblocks. + /// + /// Returns `Err` if the bundle's demand exceeds any resource's capacity limit. + pub fn estimate_for_block( + &self, + block_number: Option, + demand: ResourceDemand, + ) -> Result, EstimateError> { + let cache_guard = self.cache.read(); + let block_metrics = block_number + .map_or_else(|| cache_guard.blocks_desc().next(), |target| cache_guard.block(target)); + let Some(block_metrics) = block_metrics else { + return Ok(None); + }; + + let block_number = block_metrics.block_number; + + // Clone transactions per flashblock so we can drop the lock. + // Transactions are pre-sorted descending by priority fee in the cache. + let mut flashblock_transactions = Vec::new(); + let mut total_tx_count = 0usize; + for flashblock in block_metrics.flashblocks() { + let txs: Vec = flashblock.transactions().to_vec(); + if txs.is_empty() { + continue; + } + total_tx_count += txs.len(); + flashblock_transactions.push((flashblock.flashblock_index, txs)); + } + drop(cache_guard); + + if flashblock_transactions.is_empty() { + return Ok(None); + } + + // Build the aggregate list for use-it-or-lose-it resources. + // Need to sort since we're combining multiple pre-sorted flashblocks. + let mut aggregate_refs: Vec<&MeteredTransaction> = Vec::with_capacity(total_tx_count); + for (_, txs) in &flashblock_transactions { + aggregate_refs.extend(txs.iter()); + } + aggregate_refs.sort_by(|a, b| b.priority_fee_per_gas.cmp(&a.priority_fee_per_gas)); + + let mut flashblock_estimates = Vec::new(); + + for (flashblock_index, txs) in &flashblock_transactions { + // Build a reference slice for this flashblock's transactions. + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + + let mut estimates = ResourceEstimates::default(); + for resource in ResourceKind::all() { + let Some(demand_value) = demand.demand_for(resource) else { + continue; + }; + let Some(limit_value) = self.limit_for(resource) else { + continue; + }; + + let transactions: &[&MeteredTransaction] = + if resource.use_it_or_lose_it() { &aggregate_refs } else { &txs_refs }; + let estimate = compute_estimate( + resource, + transactions, + demand_value, + limit_value, + usage_extractor(resource), + self.percentile, + self.default_priority_fee, + )?; + + estimates.set(resource, estimate); + } + + flashblock_estimates.push(FlashblockResourceEstimates { + flashblock_index: *flashblock_index, + estimates, + }); + } + + let (min_across_flashblocks, max_across_flashblocks) = + compute_min_max_estimates(&flashblock_estimates); + + Ok(Some(BlockPriorityEstimates { + block_number, + flashblocks: flashblock_estimates, + min_across_flashblocks, + max_across_flashblocks, + })) + } + + /// Returns rolling fee estimates aggregated across the most recent blocks in the cache. + /// + /// For each resource, computes estimates per-block and takes the median recommended fee. + /// The final `recommended_priority_fee` is the maximum across all resources. + /// + /// Returns `Ok(None)` if the cache is empty or no blocks contain transaction data. + /// + /// Returns `Err` if the bundle's demand exceeds any resource's capacity limit. + pub fn estimate_rolling( + &self, + demand: ResourceDemand, + ) -> Result, EstimateError> { + let cache_guard = self.cache.read(); + let block_numbers: Vec = cache_guard.blocks_desc().map(|b| b.block_number).collect(); + drop(cache_guard); + + if block_numbers.is_empty() { + return Ok(None); + } + + // Collect per-block max estimates. Propagate any errors. + let mut block_estimates = Vec::new(); + for &n in &block_numbers { + if let Some(est) = self.estimate_for_block(Some(n), demand)? { + block_estimates.push(est.max_across_flashblocks); + } + } + + if block_estimates.is_empty() { + return Ok(None); + } + + // Compute median fee for each resource across blocks. + let mut estimates = ResourceEstimates::default(); + let mut max_fee = U256::ZERO; + + for resource in ResourceKind::all() { + let mut fees: Vec = block_estimates + .iter() + .filter_map(|e| e.get(resource)) + .map(|e| e.recommended_priority_fee) + .collect(); + + if fees.is_empty() { + continue; + } + + fees.sort(); + let median_fee = fees[fees.len() / 2]; + max_fee = max_fee.max(median_fee); + + estimates.set( + resource, + ResourceEstimate { + threshold_priority_fee: median_fee, + recommended_priority_fee: median_fee, + cumulative_usage: 0, + threshold_tx_count: 0, + total_transactions: 0, + }, + ); + } + + if estimates.is_empty() { + return Ok(None); + } + + Ok(Some(RollingPriorityEstimate { + blocks_sampled: block_numbers.len(), + estimates, + priority_fee: max_fee, + })) + } +} + +/// Core estimation algorithm (top-down approach). +/// +/// Given a sorted list of transactions and a resource limit, determines the minimum priority +/// fee needed to be included alongside enough high-paying transactions while still +/// leaving room for the bundle's demand. +/// +/// # Arguments +/// +/// * `transactions` - Must be sorted by priority fee descending (highest first) +/// +/// # Algorithm +/// +/// 1. Walk from highest-paying transactions, subtracting each transaction's usage from +/// remaining capacity. +/// 2. Stop when including another transaction would leave less capacity than the bundle needs. +/// 3. The threshold fee is the fee of the last included transaction (the minimum fee +/// among transactions that would be included alongside the bundle). +/// 4. If we include all transactions and still have capacity >= demand, the resource is +/// not congested, so return the configured default fee. +/// +/// Returns `Err` if the bundle's demand exceeds the resource limit. +fn compute_estimate( + resource: ResourceKind, + transactions: &[&MeteredTransaction], + demand: u128, + limit: u128, + usage_fn: fn(&MeteredTransaction) -> u128, + percentile: f64, + default_fee: U256, +) -> Result { + // Bundle demand exceeds the resource limit entirely. + if demand > limit { + return Err(EstimateError::DemandExceedsCapacity { resource, demand, limit }); + } + + // No transactions or zero demand means no competition for this resource. + if transactions.is_empty() || demand == 0 { + return Ok(ResourceEstimate { + threshold_priority_fee: default_fee, + recommended_priority_fee: default_fee, + cumulative_usage: 0, + threshold_tx_count: 0, + total_transactions: 0, + }); + } + + // Walk from highest-paying transactions, subtracting usage from remaining capacity. + // Stop when we can no longer fit another transaction while leaving room for demand. + let mut remaining = limit; + let mut included_usage = 0u128; + let mut last_included_idx: Option = None; + + for (idx, tx) in transactions.iter().enumerate() { + let usage = usage_fn(tx); + + // Check if we can include this transaction and still have room for the bundle. + if remaining >= usage && remaining.saturating_sub(usage) >= demand { + remaining = remaining.saturating_sub(usage); + included_usage = included_usage.saturating_add(usage); + last_included_idx = Some(idx); + } else { + // Can't include this transaction without crowding out the bundle. + break; + } + } + + // If we included all transactions and still have room, resource is not congested. + let is_uncongested = last_included_idx == Some(transactions.len() - 1) && remaining >= demand; + + if is_uncongested { + return Ok(ResourceEstimate { + threshold_priority_fee: default_fee, + recommended_priority_fee: default_fee, + cumulative_usage: included_usage, + threshold_tx_count: transactions.len(), + total_transactions: transactions.len(), + }); + } + + let (supporting_count, threshold_fee, recommended_fee) = last_included_idx.map_or_else( + || { + // No transactions fit - even the first transaction would crowd out + // the bundle. The bundle must beat the highest fee to be included. + // Report 0 supporting transactions since none were actually included. + let threshold_fee = transactions[0].priority_fee_per_gas; + (0, threshold_fee, threshold_fee) + }, + |idx| { + // At least one transaction fits alongside the bundle. + // The threshold is the fee of the last included transaction. + let threshold_fee = transactions[idx].priority_fee_per_gas; + + // For recommended fee, look at included transactions (those above threshold) + // and pick one at the specified percentile for a safety margin. + let included = &transactions[..=idx]; + let percentile = percentile.clamp(0.0, 1.0); + let recommended_fee = if included.len() <= 1 { + threshold_fee + } else { + // Pick from the higher end of included transactions for safety. + let pos = ((included.len() - 1) as f64 * (1.0 - percentile)).round() as usize; + included[pos.min(included.len() - 1)].priority_fee_per_gas + }; + + (idx + 1, threshold_fee, recommended_fee) + }, + ); + + Ok(ResourceEstimate { + threshold_priority_fee: threshold_fee, + recommended_priority_fee: recommended_fee, + cumulative_usage: included_usage, + threshold_tx_count: supporting_count, + total_transactions: transactions.len(), + }) +} + +/// Returns a function that extracts the relevant resource usage from a transaction. +fn usage_extractor(resource: ResourceKind) -> fn(&MeteredTransaction) -> u128 { + match resource { + ResourceKind::GasUsed => |tx: &MeteredTransaction| tx.gas_used as u128, + ResourceKind::ExecutionTime => |tx: &MeteredTransaction| tx.execution_time_us, + ResourceKind::StateRootTime => |tx: &MeteredTransaction| tx.state_root_time_us, + ResourceKind::DataAvailability => { + |tx: &MeteredTransaction| tx.data_availability_bytes as u128 + } + } +} + +/// Computes the minimum and maximum recommended fees across all flashblocks. +/// +/// Returns two `ResourceEstimates`: +/// - First: For each resource, the estimate with the lowest recommended fee (easiest inclusion). +/// - Second: For each resource, the estimate with the highest recommended fee (most competitive). +fn compute_min_max_estimates( + flashblocks: &[FlashblockResourceEstimates], +) -> (ResourceEstimates, ResourceEstimates) { + let mut min_estimates = ResourceEstimates::default(); + let mut max_estimates = ResourceEstimates::default(); + + for flashblock in flashblocks { + for (resource, estimate) in flashblock.estimates.iter() { + // Update min. + let current_min = min_estimates.get(resource); + if current_min.is_none() + || estimate.recommended_priority_fee < current_min.unwrap().recommended_priority_fee + { + min_estimates.set(resource, estimate.clone()); + } + + // Update max. + let current_max = max_estimates.get(resource); + if current_max.is_none() + || estimate.recommended_priority_fee > current_max.unwrap().recommended_priority_fee + { + max_estimates.set(resource, estimate.clone()); + } + } + } + + (min_estimates, max_estimates) +} + +#[cfg(test)] +mod tests { + use alloy_primitives::B256; + + use super::*; + + const DEFAULT_FEE: U256 = U256::from_limbs([1, 0, 0, 0]); // 1 wei + + fn tx(priority: u64, usage: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&priority.to_be_bytes()); + MeteredTransaction { + tx_hash: B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority), + gas_used: usage, + execution_time_us: usage as u128, + state_root_time_us: usage as u128, + data_availability_bytes: usage, + } + } + + #[test] + fn compute_estimate_congested_resource() { + // Limit: 30, Demand: 15 + // Transactions: priority=10 (10 gas), priority=5 (10 gas), priority=2 (10 gas) + // Walking from top (highest fee): + // - Include tx priority=10: remaining = 30-10 = 20 >= 15 ok + // - Include tx priority=5: remaining = 20-10 = 10 < 15 stop + // Threshold = 10 (the last included tx's fee) + let txs = vec![tx(10, 10), tx(5, 10), tx(2, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, // limit + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.cumulative_usage, 10); // Only the first tx was included + assert_eq!(quote.threshold_tx_count, 1); + assert_eq!(quote.total_transactions, 3); + } + + #[test] + fn compute_estimate_uncongested_resource() { + // Limit: 100, Demand: 15 + // All transactions fit with room to spare -> return default fee + let txs = vec![tx(10, 10), tx(5, 10), tx(2, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 100, // limit is much larger than total usage + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + assert_eq!(quote.cumulative_usage, 30); // All txs included + assert_eq!(quote.threshold_tx_count, 3); + } + + #[test] + fn compute_estimate_demand_exceeds_limit() { + // Demand > Limit -> Error + let txs = vec![tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let result = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 50, // demand + 30, // limit + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ); + assert!(matches!( + result, + Err(EstimateError::DemandExceedsCapacity { + resource: ResourceKind::GasUsed, + demand: 50, + limit: 30, + }) + )); + } + + #[test] + fn compute_estimate_exact_fit() { + // Limit: 30, Demand: 20 + // Transactions: priority=10 (10 gas), priority=5 (10 gas) + // After including tx priority=10: remaining = 20 >= 20 ok + // After including tx priority=5: remaining = 10 < 20 stop + let txs = vec![tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 20, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.cumulative_usage, 10); + assert_eq!(quote.threshold_tx_count, 1); + } + + #[test] + fn compute_estimate_single_transaction() { + // Single tx that fits + let txs = vec![tx(10, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + // After including the tx: remaining = 20 >= 15 ok + // But we only have 1 tx, so it's uncongested + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + } + + #[test] + fn compute_estimate_no_room_for_any_tx() { + // Limit: 25, Demand: 20 + // First tx uses 10, remaining = 15 < 20 -> can't even include first tx + let txs = vec![tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 20, + 25, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + // No transactions can be included, threshold is the highest fee + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.threshold_tx_count, 0); + assert_eq!(quote.cumulative_usage, 0); + } + + #[test] + fn compute_estimate_empty_transactions() { + // No transactions = uncongested, return default fee + let txs_refs: Vec<&MeteredTransaction> = vec![]; + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + } + + const DEFAULT_LIMITS: ResourceLimits = ResourceLimits { + gas_used: Some(25), + execution_time_us: Some(100), + state_root_time_us: None, + data_availability_bytes: Some(100), + }; + + fn setup_estimator( + limits: ResourceLimits, + ) -> (Arc>, PriorityFeeEstimator) { + let cache = Arc::new(RwLock::new(MeteringCache::new(4))); + let estimator = PriorityFeeEstimator::new(cache.clone(), 0.5, limits, DEFAULT_FEE, None); + (cache, estimator) + } + + #[test] + fn estimate_for_block_respects_limits() { + let (cache, estimator) = setup_estimator(DEFAULT_LIMITS); + { + let mut guard = cache.write(); + guard.push_transaction(1, 0, tx(10, 10)); + guard.push_transaction(1, 0, tx(5, 10)); + } + let mut demand = ResourceDemand::default(); + demand.gas_used = Some(15); + + let estimates = + estimator.estimate_for_block(Some(1), demand).expect("no error").expect("cached block"); + + assert_eq!(estimates.block_number, 1); + let gas_estimate = estimates.max_across_flashblocks.gas_used.expect("gas estimate present"); + assert_eq!(gas_estimate.threshold_priority_fee, U256::from(10)); + } + + #[test] + fn estimate_for_block_propagates_limit_errors() { + let mut limits = DEFAULT_LIMITS; + limits.gas_used = Some(10); + let (cache, estimator) = setup_estimator(limits); + { + let mut guard = cache.write(); + guard.push_transaction(1, 0, tx(10, 10)); + guard.push_transaction(1, 0, tx(5, 10)); + } + let mut demand = ResourceDemand::default(); + demand.gas_used = Some(15); + + let err = estimator + .estimate_for_block(Some(1), demand) + .expect_err("demand should exceed capacity"); + assert!(matches!( + err, + EstimateError::DemandExceedsCapacity { + resource: ResourceKind::GasUsed, + demand: 15, + limit: 10 + } + )); + } + + #[test] + fn estimate_rolling_aggregates_across_blocks() { + let (cache, estimator) = setup_estimator(DEFAULT_LIMITS); + { + let mut guard = cache.write(); + // Block 1 → threshold 10 + guard.push_transaction(1, 0, tx(10, 10)); + guard.push_transaction(1, 0, tx(5, 10)); + // Block 2 → threshold 30 + guard.push_transaction(2, 0, tx(30, 10)); + guard.push_transaction(2, 0, tx(25, 10)); + } + + let mut demand = ResourceDemand::default(); + demand.gas_used = Some(15); + + let rolling = + estimator.estimate_rolling(demand).expect("no error").expect("estimates available"); + + assert_eq!(rolling.blocks_sampled, 2); + let gas_estimate = rolling.estimates.gas_used.expect("gas estimate present"); + // Median across [10, 30] = 30 (upper median for even count) + assert_eq!(gas_estimate.recommended_priority_fee, U256::from(30)); + assert_eq!(rolling.priority_fee, U256::from(30)); + } +} diff --git a/crates/rpc/src/base/kafka.rs b/crates/rpc/src/base/kafka.rs new file mode 100644 index 00000000..f6e6805a --- /dev/null +++ b/crates/rpc/src/base/kafka.rs @@ -0,0 +1,186 @@ +//! Kafka consumer for accepted bundle events. + +use std::{fmt, time::Duration}; + +use alloy_consensus::{Transaction, transaction::Recovered}; +use alloy_eips::Encodable2718; +use alloy_primitives::U256; +use chrono::Utc; +use eyre::Result; +use op_alloy_consensus::OpTxEnvelope; +use op_alloy_flz::tx_estimated_size_fjord_bytes; +use rdkafka::{ + ClientConfig, Message, + consumer::{CommitMode, Consumer, StreamConsumer}, +}; +use tips_core::types::AcceptedBundle; +use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tracing::{debug, error, info, trace, warn}; + +use crate::MeteredTransaction; + +/// Configuration required to connect to the Kafka topic publishing accepted bundles. +#[derive(Debug)] +pub struct KafkaBundleConsumerConfig { + /// Kafka client configuration. + pub client_config: ClientConfig, + /// Topic name. + pub topic: String, +} + +/// Maximum backoff delay for Kafka receive errors. +const MAX_BACKOFF_SECS: u64 = 60; + +/// Consumes `AcceptedBundle` events from Kafka and publishes transaction-level metering data. +pub struct KafkaBundleConsumer { + consumer: StreamConsumer, + tx_sender: UnboundedSender, + topic: String, +} + +impl fmt::Debug for KafkaBundleConsumer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("KafkaBundleConsumer").field("topic", &self.topic).finish_non_exhaustive() + } +} + +impl KafkaBundleConsumer { + /// Creates a new Kafka bundle consumer. + pub fn new( + config: KafkaBundleConsumerConfig, + tx_sender: UnboundedSender, + ) -> Result { + let KafkaBundleConsumerConfig { client_config, topic } = config; + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic.as_str()])?; + + Ok(Self { consumer, tx_sender, topic }) + } + + /// Starts listening for Kafka messages until the task is cancelled. + pub async fn run(self) { + info!( + target: "metering::kafka", + topic = %self.topic, + "Starting Kafka bundle consumer" + ); + + let mut backoff_secs = 1u64; + + loop { + match self.consumer.recv().await { + Ok(message) => { + // Reset backoff on successful receive. + backoff_secs = 1; + if let Err(err) = self.handle_message(message).await { + error!(target: "metering::kafka", error = %err, "Failed to process Kafka message"); + metrics::counter!("metering.kafka.errors_total").increment(1); + } + } + Err(err) => { + error!( + target: "metering::kafka", + error = %err, + backoff_secs, + "Kafka receive error for topic {}. Retrying after backoff", + self.topic + ); + metrics::counter!("metering.kafka.errors_total").increment(1); + sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); + } + } + } + } + + async fn handle_message(&self, message: rdkafka::message::BorrowedMessage<'_>) -> Result<()> { + let payload = + message.payload().ok_or_else(|| eyre::eyre!("Kafka message missing payload"))?; + + let bundle: AcceptedBundle = serde_json::from_slice(payload)?; + + if let Some(ts) = message.timestamp().to_millis() { + let now_ms = Utc::now().timestamp_millis(); + let lag_ms = now_ms.saturating_sub(ts); + metrics::gauge!("metering.kafka.lag_ms").set(lag_ms as f64); + } + + debug!( + target: "metering::kafka", + block_number = bundle.block_number, + uuid = %bundle.uuid(), + tx_count = bundle.txs.len(), + "Received accepted bundle from Kafka" + ); + + self.publish_transactions(&bundle)?; + + // Best-effort asynchronous commit. + if let Err(err) = self.consumer.commit_message(&message, CommitMode::Async) { + warn!( + target: "metering::kafka", + error = %err, + "Failed to commit Kafka offset asynchronously" + ); + metrics::counter!("metering.kafka.errors_total").increment(1); + } + + Ok(()) + } + + fn publish_transactions(&self, bundle: &AcceptedBundle) -> Result<()> { + if bundle.txs.len() != bundle.meter_bundle_response.results.len() { + warn!( + target: "metering::kafka", + bundle_uuid = %bundle.uuid(), + tx_count = bundle.txs.len(), + result_count = bundle.meter_bundle_response.results.len(), + "Bundle transactions/results length mismatch; skipping" + ); + metrics::counter!("metering.kafka.messages_skipped").increment(1); + return Ok(()); + } + + for (tx, result) in bundle.txs.iter().zip(bundle.meter_bundle_response.results.iter()) { + let priority_fee_per_gas = calculate_priority_fee(tx); + let data_availability_bytes = tx_estimated_size_fjord_bytes(&tx.encoded_2718()); + + // TODO(metering): Populate state_root_time_us once the TIPS Kafka schema + // includes per-transaction state-root timing. + let metered_tx = MeteredTransaction { + tx_hash: tx.tx_hash(), + priority_fee_per_gas, + gas_used: result.gas_used, + execution_time_us: result.execution_time_us, + state_root_time_us: 0, + data_availability_bytes, + }; + + if let Err(err) = self.tx_sender.send(metered_tx) { + warn!( + target: "metering::kafka", + error = %err, + tx_hash = %tx.tx_hash(), + "Failed to send metered transaction event" + ); + metrics::counter!("metering.kafka.errors_total").increment(1); + } + } + + trace!( + target: "metering::kafka", + bundle_uuid = %bundle.uuid(), + transactions = bundle.txs.len(), + "Published metering events for bundle" + ); + + Ok(()) + } +} + +fn calculate_priority_fee(tx: &Recovered) -> U256 { + tx.max_priority_fee_per_gas() + .map(U256::from) + .unwrap_or_else(|| U256::from(tx.max_fee_per_gas())) +} diff --git a/crates/rpc/src/base/meter_rpc.rs b/crates/rpc/src/base/meter_rpc.rs index 9a99aa32..45181ff2 100644 --- a/crates/rpc/src/base/meter_rpc.rs +++ b/crates/rpc/src/base/meter_rpc.rs @@ -1,19 +1,32 @@ +//! Implementation of the metering RPC API. + +use std::sync::Arc; + use alloy_consensus::Header; -use alloy_eips::BlockNumberOrTag; +use alloy_eips::{BlockNumberOrTag, Encodable2718}; use alloy_primitives::U256; -use jsonrpsee::core::{RpcResult, async_trait}; +use jsonrpsee::{ + core::{RpcResult, async_trait}, + types::{ErrorCode, ErrorObjectOwned}, +}; +use op_alloy_flz::tx_estimated_size_fjord_bytes; use reth::providers::BlockReaderIdExt; use reth_optimism_chainspec::OpChainSpec; use reth_provider::{ChainSpecProvider, StateProviderFactory}; use tips_core::types::{Bundle, MeterBundleResponse, ParsedBundle}; -use tracing::{error, info}; +use tracing::{debug, error, info}; -use crate::{MeteringApiServer, meter_bundle}; +use super::types::{MeteredPriorityFeeResponse, ResourceFeeEstimateResponse}; +use crate::{ + MeteringApiServer, PriorityFeeEstimator, ResourceDemand, ResourceEstimates, + RollingPriorityEstimate, meter_bundle, +}; /// Implementation of the metering RPC API #[derive(Debug)] pub struct MeteringApiImpl { provider: Provider, + priority_fee_estimator: Option>, } impl MeteringApiImpl @@ -23,84 +36,86 @@ where + BlockReaderIdExt
+ Clone, { - /// Creates a new instance of MeteringApi + /// Creates a new instance of MeteringApi without priority fee estimation. pub const fn new(provider: Provider) -> Self { - Self { provider } + Self { provider, priority_fee_estimator: None } } -} -#[async_trait] -impl MeteringApiServer for MeteringApiImpl -where - Provider: StateProviderFactory - + ChainSpecProvider - + BlockReaderIdExt
- + Clone - + Send - + Sync - + 'static, -{ - async fn meter_bundle(&self, bundle: Bundle) -> RpcResult { + /// Creates a new instance of MeteringApi with priority fee estimation enabled. + pub const fn with_estimator( + provider: Provider, + priority_fee_estimator: Arc, + ) -> Self { + Self { provider, priority_fee_estimator: Some(priority_fee_estimator) } + } + + fn run_metering( + &self, + bundle: Bundle, + ) -> Result<(MeterBundleResponse, ResourceDemand), ErrorObjectOwned> { info!( num_transactions = &bundle.txs.len(), block_number = &bundle.block_number, "Starting bundle metering" ); - // Get the latest header let header = self .provider .sealed_header_by_number_or_tag(BlockNumberOrTag::Latest) .map_err(|e| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get latest header: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Failed to get latest header: {e}"), None::<()>, ) })? .ok_or_else(|| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), "Latest block not found".to_string(), None::<()>, ) })?; let parsed_bundle = ParsedBundle::try_from(bundle).map_err(|e| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InvalidParams.code(), - format!("Failed to parse bundle: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InvalidParams.code(), + format!("Failed to parse bundle: {e}"), None::<()>, ) })?; - // Get state provider for the block + let da_usage: u64 = parsed_bundle + .txs + .iter() + .map(|tx| tx_estimated_size_fjord_bytes(&tx.encoded_2718())) + .sum(); + let state_provider = self.provider.state_by_block_hash(header.hash()).map_err(|e| { error!(error = %e, "Failed to get state provider"); - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get state provider: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Failed to get state provider: {e}"), None::<()>, ) })?; - // Meter bundle using utility function + let chain_spec = self.provider.chain_spec(); + let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = - meter_bundle(state_provider, self.provider.chain_spec(), parsed_bundle, &header) - .map_err(|e| { - error!(error = %e, "Bundle metering failed"); - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Bundle metering failed: {}", e), - None::<()>, - ) - })?; - - // Calculate average gas price + meter_bundle(state_provider, chain_spec, parsed_bundle, &header).map_err(|e| { + error!(error = %e, "Bundle metering failed"); + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Bundle metering failed: {e}"), + None::<()>, + ) + })?; + let bundle_gas_price = if total_gas_used > 0 { total_gas_fees / U256::from(total_gas_used) } else { - U256::from(0) + U256::ZERO }; info!( @@ -111,17 +126,107 @@ where "Bundle metering completed successfully" ); - Ok(MeterBundleResponse { + let response = MeterBundleResponse { bundle_gas_price, bundle_hash, coinbase_diff: total_gas_fees, - eth_sent_to_coinbase: U256::from(0), + eth_sent_to_coinbase: U256::ZERO, gas_fees: total_gas_fees, results, state_block_number: header.number, state_flashblock_index: None, total_gas_used, total_execution_time_us: total_execution_time, - }) + }; + + let resource_demand = ResourceDemand { + gas_used: Some(total_gas_used), + execution_time_us: Some(total_execution_time), + state_root_time_us: None, // Populated when state-root metrics become available. + data_availability_bytes: Some(da_usage), + }; + + Ok((response, resource_demand)) } } + +#[async_trait] +impl MeteringApiServer for MeteringApiImpl +where + Provider: StateProviderFactory + + ChainSpecProvider + + BlockReaderIdExt
+ + Clone + + Send + + Sync + + 'static, +{ + async fn meter_bundle(&self, bundle: Bundle) -> RpcResult { + let (response, _) = self.run_metering(bundle)?; + Ok(response) + } + + async fn metered_priority_fee_per_gas( + &self, + bundle: Bundle, + ) -> RpcResult { + let (meter_bundle, resource_demand) = self.run_metering(bundle)?; + + let estimator = self.priority_fee_estimator.as_ref().ok_or_else(|| { + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + "Priority fee estimation not enabled".to_string(), + None::<()>, + ) + })?; + + debug!(?resource_demand, "Computing priority fee estimates"); + + let estimates = estimator + .estimate_rolling(resource_demand) + .map_err(|e| { + ErrorObjectOwned::owned(ErrorCode::InvalidParams.code(), e.to_string(), None::<()>) + })? + .ok_or_else(|| { + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + "Priority fee data unavailable".to_string(), + None::<()>, + ) + })?; + + let response = build_priority_fee_response(meter_bundle, estimates); + Ok(response) + } +} + +/// Converts a rolling estimate to the response format. +fn build_priority_fee_response( + meter_bundle: MeterBundleResponse, + estimate: RollingPriorityEstimate, +) -> MeteredPriorityFeeResponse { + let resource_estimates = build_resource_estimate_responses(&estimate.estimates); + + MeteredPriorityFeeResponse { + meter_bundle, + priority_fee: estimate.priority_fee, + blocks_sampled: estimate.blocks_sampled as u64, + resource_estimates, + } +} + +fn build_resource_estimate_responses( + estimates: &ResourceEstimates, +) -> Vec { + estimates + .iter() + .map(|(kind, est)| ResourceFeeEstimateResponse { + resource: kind.as_camel_case().to_string(), + threshold_priority_fee: est.threshold_priority_fee, + recommended_priority_fee: est.recommended_priority_fee, + cumulative_usage: U256::from(est.cumulative_usage), + threshold_tx_count: est.threshold_tx_count.try_into().unwrap_or(u64::MAX), + total_transactions: est.total_transactions.try_into().unwrap_or(u64::MAX), + }) + .collect() +} diff --git a/crates/rpc/src/base/mod.rs b/crates/rpc/src/base/mod.rs index b4097c40..772cdbce 100644 --- a/crates/rpc/src/base/mod.rs +++ b/crates/rpc/src/base/mod.rs @@ -1,3 +1,7 @@ +pub(crate) mod annotator; +pub(crate) mod cache; +pub(crate) mod estimator; +pub(crate) mod kafka; pub(crate) mod meter; pub(crate) mod meter_rpc; pub(crate) mod pubsub; diff --git a/crates/rpc/src/base/traits.rs b/crates/rpc/src/base/traits.rs index 4f80de59..c1d94b60 100644 --- a/crates/rpc/src/base/traits.rs +++ b/crates/rpc/src/base/traits.rs @@ -3,7 +3,7 @@ use alloy_primitives::TxHash; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use crate::{Bundle, MeterBundleResponse, TransactionStatusResponse}; +use crate::{Bundle, MeterBundleResponse, MeteredPriorityFeeResponse, TransactionStatusResponse}; /// RPC API for transaction metering #[rpc(server, namespace = "base")] @@ -11,6 +11,14 @@ pub trait MeteringApi { /// Simulates and meters a bundle of transactions #[method(name = "meterBundle")] async fn meter_bundle(&self, bundle: Bundle) -> RpcResult; + + /// Estimates the priority fee necessary for a bundle to be included in recently observed + /// flashblocks, considering multiple resource constraints. + #[method(name = "meteredPriorityFeePerGas")] + async fn metered_priority_fee_per_gas( + &self, + bundle: Bundle, + ) -> RpcResult; } /// RPC API for transaction status diff --git a/crates/rpc/src/base/types.rs b/crates/rpc/src/base/types.rs index 3340a80e..df8b2c97 100644 --- a/crates/rpc/src/base/types.rs +++ b/crates/rpc/src/base/types.rs @@ -1,7 +1,9 @@ -//! Types for the transaction status rpc +//! Types for the Base RPC extensions. +use alloy_primitives::U256; use alloy_rpc_types_eth::pubsub::SubscriptionKind; use serde::{Deserialize, Serialize}; +use tips_core::types::MeterBundleResponse; /// The status of a transaction. #[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] @@ -95,3 +97,38 @@ impl From for ExtendedSubscriptionKind { Self::Base(kind) } } + +// --- Metered priority fee types --- + +/// Human-friendly representation of a resource fee quote. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResourceFeeEstimateResponse { + /// Resource name (gasUsed, executionTime, etc). + pub resource: String, + /// Minimum fee to displace enough capacity. + pub threshold_priority_fee: U256, + /// Recommended fee with safety margin. + pub recommended_priority_fee: U256, + /// Cumulative resource usage above threshold. + pub cumulative_usage: U256, + /// Number of transactions above threshold. + pub threshold_tx_count: u64, + /// Total transactions considered. + pub total_transactions: u64, +} + +/// Response payload for `base_meteredPriorityFeePerGas`. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MeteredPriorityFeeResponse { + /// Bundled metering results. + #[serde(flatten)] + pub meter_bundle: MeterBundleResponse, + /// Recommended priority fee (max across all resources and median across recent blocks). + pub priority_fee: U256, + /// Number of recent blocks used to compute the rolling estimate. + pub blocks_sampled: u64, + /// Per-resource estimates (median across sampled blocks). + pub resource_estimates: Vec, +} diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 5a5db0a0..96fe8c43 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -8,12 +8,23 @@ pub use tips_core::types::{Bundle, MeterBundleResponse, TransactionResult}; mod base; pub use base::{ + annotator::{FlashblockInclusion, ResourceAnnotator}, + cache::{BlockMetrics, FlashblockMetrics, MeteredTransaction, MeteringCache, ResourceTotals}, + estimator::{ + BlockPriorityEstimates, EstimateError, FlashblockResourceEstimates, PriorityFeeEstimator, + ResourceDemand, ResourceEstimate, ResourceEstimates, ResourceKind, ResourceLimits, + RollingPriorityEstimate, + }, + kafka::{KafkaBundleConsumer, KafkaBundleConsumerConfig}, meter::meter_bundle, meter_rpc::MeteringApiImpl, pubsub::{EthPubSub, EthPubSubApiServer}, traits::{MeteringApiServer, TransactionStatusApiServer}, transaction_rpc::TransactionStatusApiImpl, - types::{BaseSubscriptionKind, ExtendedSubscriptionKind, Status, TransactionStatusResponse}, + types::{ + BaseSubscriptionKind, ExtendedSubscriptionKind, MeteredPriorityFeeResponse, + ResourceFeeEstimateResponse, Status, TransactionStatusResponse, + }, }; mod eth; diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml index 8f48a83e..59b887e2 100644 --- a/crates/runner/Cargo.toml +++ b/crates/runner/Cargo.toml @@ -23,6 +23,13 @@ reth-db.workspace = true reth-exex.workspace = true reth-optimism-node.workspace = true reth-optimism-chainspec.workspace = true +reth-optimism-payload-builder.workspace = true + +# alloy +alloy-primitives.workspace = true + +# flashblocks +base-flashtypes.workspace = true # misc eyre.workspace = true @@ -30,4 +37,7 @@ futures-util.workspace = true once_cell.workspace = true tracing.workspace = true url.workspace = true +parking_lot.workspace = true derive_more = { workspace = true, features = ["debug"] } +rdkafka.workspace = true +tokio.workspace = true diff --git a/crates/runner/src/config.rs b/crates/runner/src/config.rs index 58c60a56..52e106a3 100644 --- a/crates/runner/src/config.rs +++ b/crates/runner/src/config.rs @@ -1,6 +1,7 @@ //! Contains the Base node configuration structures. use reth_optimism_node::args::RollupArgs; +use reth_optimism_payload_builder::config::OpDAConfig; use crate::extensions::FlashblocksCell; @@ -15,8 +16,12 @@ pub struct BaseNodeConfig { pub tracing: TracingConfig, /// Indicates whether the metering RPC surface should be installed. pub metering_enabled: bool, + /// Configuration for priority fee estimation. + pub metering: MeteringConfig, /// Shared Flashblocks state cache. pub flashblocks_cell: FlashblocksCell, + /// Shared DA config for dynamic updates via `miner_setMaxDASize`. + pub da_config: OpDAConfig, } impl BaseNodeConfig { @@ -43,3 +48,48 @@ pub struct TracingConfig { /// Emits `info`-level logs for the tracing ExEx when enabled. pub logs_enabled: bool, } + +/// Configuration for priority fee estimation. +#[derive(Debug, Clone)] +pub struct MeteringConfig { + /// Whether metering is enabled. + pub enabled: bool, + /// Kafka configuration for bundle events. + pub kafka: Option, + /// Resource limits for fee estimation. + pub resource_limits: ResourceLimitsConfig, + /// Percentile for recommended priority fee (0.0-1.0). + pub priority_fee_percentile: f64, + /// Default priority fee when resource is not congested (in wei). + pub uncongested_priority_fee: u128, + /// Number of recent blocks to retain in metering cache. + pub cache_size: usize, +} + +/// Kafka connection configuration. +/// +/// All rdkafka settings (bootstrap.servers, group.id, timeouts, etc.) should be +/// specified in the properties file. The CLI only specifies the path to this file +/// and the topic name. +#[derive(Debug, Clone)] +pub struct KafkaConfig { + /// Path to the Kafka properties file containing rdkafka settings. + pub properties_file: String, + /// Topic name for accepted bundle events. + pub topic: String, + /// Optional consumer group ID override (takes precedence over properties file). + pub group_id_override: Option, +} + +/// Resource limits for priority fee estimation. +#[derive(Debug, Clone, Copy)] +pub struct ResourceLimitsConfig { + /// Gas limit per flashblock. + pub gas_limit: u64, + /// Execution time budget in microseconds. + pub execution_time_us: u64, + /// State root time budget in microseconds (optional). + pub state_root_time_us: Option, + /// Data availability bytes limit. + pub da_bytes: u64, +} diff --git a/crates/runner/src/extensions/rpc.rs b/crates/runner/src/extensions/rpc.rs index 164a3af5..90b3ae10 100644 --- a/crates/runner/src/extensions/rpc.rs +++ b/crates/runner/src/extensions/rpc.rs @@ -2,19 +2,113 @@ use std::sync::Arc; -use base_reth_flashblocks::{FlashblocksState, FlashblocksSubscriber}; +use alloy_primitives::{B256, U256, keccak256}; +use base_flashtypes::Flashblock; +use base_reth_flashblocks::{FlashblocksReceiver, FlashblocksState, FlashblocksSubscriber}; use base_reth_rpc::{ - EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, MeteringApiImpl, - MeteringApiServer, TransactionStatusApiImpl, TransactionStatusApiServer, + EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblockInclusion, + KafkaBundleConsumer, KafkaBundleConsumerConfig, MeteredTransaction, MeteringApiImpl, + MeteringApiServer, MeteringCache, PriorityFeeEstimator, ResourceAnnotator, ResourceLimits, + TransactionStatusApiImpl, TransactionStatusApiServer, }; -use tracing::info; +use parking_lot::RwLock; +use rdkafka::ClientConfig; +use reth_optimism_payload_builder::config::OpDAConfig; +use tokio::sync::mpsc; +use tracing::{error, info, warn}; use url::Url; use crate::{ - BaseNodeConfig, FlashblocksConfig, + BaseNodeConfig, FlashblocksConfig, MeteringConfig, extensions::{BaseNodeExtension, ConfigurableBaseNodeExtension, FlashblocksCell, OpBuilder}, }; +/// Runtime state for the metering pipeline. +#[derive(Clone)] +struct MeteringRuntime { + /// Priority fee estimator. + estimator: Arc, + /// Sender for metered transactions from Kafka. + tx_sender: mpsc::UnboundedSender, + /// Sender for flashblock inclusions. + flashblock_sender: mpsc::UnboundedSender, +} + +/// Composite receiver that forwards flashblocks to both FlashblocksState and the metering pipeline. +struct CompositeFlashblocksReceiver { + state: Arc>, + /// Optional channel for the metering pipeline; flashblocks RPC still needs the stream even + /// when metering is disabled, so we only forward inclusions if a sender is provided. + metering_sender: Option>, +} + +impl CompositeFlashblocksReceiver { + const fn new( + state: Arc>, + metering_sender: Option>, + ) -> Self { + Self { state, metering_sender } + } +} + +impl FlashblocksReceiver for CompositeFlashblocksReceiver +where + FlashblocksState: FlashblocksReceiver, +{ + fn on_flashblock_received(&self, flashblock: Flashblock) { + // Forward to the state first + self.state.on_flashblock_received(flashblock.clone()); + + // Then forward to metering if enabled + let Some(sender) = &self.metering_sender else { + return; + }; + let Some(inclusion) = flashblock_inclusion_from_flashblock(&flashblock) else { + return; + }; + + if sender.send(inclusion).is_err() { + warn!( + target: "metering::flashblocks", + "Failed to forward flashblock inclusion to metering" + ); + } + } +} + +/// Converts a flashblock to a FlashblockInclusion for the metering pipeline. +fn flashblock_inclusion_from_flashblock(flashblock: &Flashblock) -> Option { + if flashblock.diff.transactions.is_empty() { + return None; + } + + let ordered_tx_hashes: Vec = flashblock.diff.transactions.iter().map(keccak256).collect(); + + Some(FlashblockInclusion { + block_number: flashblock.metadata.block_number, + flashblock_index: flashblock.index, + ordered_tx_hashes, + }) +} + +/// Loads Kafka configuration from a properties file. +fn load_kafka_config_from_file( + path: &str, +) -> Result, Box> { + let content = std::fs::read_to_string(path)?; + let mut props = Vec::new(); + for line in content.lines() { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + if let Some((key, value)) = line.split_once('=') { + props.push((key.trim().to_string(), value.trim().to_string())); + } + } + Ok(props) +} + /// Helper struct that wires the custom RPC modules into the node builder. #[derive(Debug, Clone)] pub struct BaseRpcExtension { @@ -22,10 +116,12 @@ pub struct BaseRpcExtension { pub flashblocks_cell: FlashblocksCell, /// Optional Flashblocks configuration. pub flashblocks: Option, - /// Indicates whether the metering RPC surface should be installed. - pub metering_enabled: bool, + /// Full metering configuration. + pub metering: MeteringConfig, /// Sequencer RPC endpoint for transaction status proxying. pub sequencer_rpc: Option, + /// Shared DA config for dynamic updates via `miner_setMaxDASize`. + pub da_config: OpDAConfig, } impl BaseRpcExtension { @@ -34,8 +130,9 @@ impl BaseRpcExtension { Self { flashblocks_cell: config.flashblocks_cell.clone(), flashblocks: config.flashblocks.clone(), - metering_enabled: config.metering_enabled, + metering: config.metering.clone(), sequencer_rpc: config.rollup_args.sequencer.clone(), + da_config: config.da_config.clone(), } } } @@ -45,13 +142,116 @@ impl BaseNodeExtension for BaseRpcExtension { fn apply(&self, builder: OpBuilder) -> OpBuilder { let flashblocks_cell = self.flashblocks_cell.clone(); let flashblocks = self.flashblocks.clone(); - let metering_enabled = self.metering_enabled; + let metering = self.metering.clone(); let sequencer_rpc = self.sequencer_rpc.clone(); + let da_config = self.da_config.clone(); builder.extend_rpc_modules(move |ctx| { - if metering_enabled { - info!(message = "Starting Metering RPC"); - let metering_api = MeteringApiImpl::new(ctx.provider().clone()); + // Warn if metering is enabled but Kafka is not configured + if metering.enabled && metering.kafka.is_none() { + warn!( + message = "Metering enabled but Kafka not configured", + help = "Priority fee estimation requires --metering-kafka-properties-file" + ); + } + + // Set up metering runtime if enabled with Kafka + let metering_runtime = if metering.enabled && metering.kafka.is_some() { + info!(message = "Starting Metering RPC with priority fee estimation"); + + let cache = Arc::new(RwLock::new(MeteringCache::new(metering.cache_size))); + let limits = ResourceLimits { + gas_used: Some(metering.resource_limits.gas_limit), + execution_time_us: Some(metering.resource_limits.execution_time_us as u128), + state_root_time_us: metering + .resource_limits + .state_root_time_us + .map(|v| v as u128), + data_availability_bytes: Some(metering.resource_limits.da_bytes), + }; + let default_fee = U256::from(metering.uncongested_priority_fee); + let estimator = Arc::new(PriorityFeeEstimator::new( + cache.clone(), + metering.priority_fee_percentile, + limits, + default_fee, + Some(da_config.clone()), + )); + + // Create channels for the annotator + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + let (flashblock_sender, flashblock_receiver) = + mpsc::unbounded_channel::(); + + // Spawn the resource annotator + tokio::spawn(async move { + ResourceAnnotator::new(cache, tx_receiver, flashblock_receiver).run().await; + }); + + Some(MeteringRuntime { estimator, tx_sender, flashblock_sender }) + } else { + None + }; + + // Spawn Kafka consumer if configured + if let (Some(runtime), Some(kafka_cfg)) = (&metering_runtime, &metering.kafka) { + info!( + message = "Starting Kafka consumer for metering", + properties_file = %kafka_cfg.properties_file, + topic = %kafka_cfg.topic + ); + + // Load all rdkafka settings from the properties file + let props = match load_kafka_config_from_file(&kafka_cfg.properties_file) { + Ok(props) => props, + Err(err) => { + error!( + target: "metering::kafka", + file = %kafka_cfg.properties_file, + %err, + "Failed to load Kafka properties file" + ); + return Ok(()); + } + }; + + let mut client_config = ClientConfig::new(); + for (key, value) in props { + client_config.set(key, value); + } + + // Apply CLI override for group.id if specified + if let Some(group_id) = &kafka_cfg.group_id_override { + client_config.set("group.id", group_id); + } + + let tx_sender = runtime.tx_sender.clone(); + let topic = kafka_cfg.topic.clone(); + tokio::spawn(async move { + let config = KafkaBundleConsumerConfig { client_config, topic }; + + match KafkaBundleConsumer::new(config, tx_sender) { + Ok(consumer) => consumer.run().await, + Err(err) => error!( + target: "metering::kafka", + %err, + "Failed to initialize Kafka consumer" + ), + } + }); + } + + // Register metering RPC + if metering.enabled { + let metering_api = metering_runtime.as_ref().map_or_else( + || MeteringApiImpl::new(ctx.provider().clone()), + |rt| { + MeteringApiImpl::with_estimator( + ctx.provider().clone(), + rt.estimator.clone(), + ) + }, + ); ctx.modules.merge_configured(metering_api.into_rpc())?; } @@ -74,7 +274,13 @@ impl BaseNodeExtension for BaseRpcExtension { .clone(); fb.start(); - let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url); + // Create composite receiver that forwards to both flashblocks state and metering + let metering_sender = + metering_runtime.as_ref().map(|rt| rt.flashblock_sender.clone()); + let receiver = + Arc::new(CompositeFlashblocksReceiver::new(fb.clone(), metering_sender)); + + let mut flashblocks_client = FlashblocksSubscriber::new(receiver, ws_url); flashblocks_client.start(); let api_ext = EthApiExt::new( @@ -85,8 +291,9 @@ impl BaseNodeExtension for BaseRpcExtension { ctx.modules.replace_configured(api_ext.into_rpc())?; // Register the eth_subscribe subscription endpoint for flashblocks - // Uses replace_configured since eth_subscribe already exists from reth's standard module - // Pass eth_api to enable proxying standard subscription types to reth's implementation + // Uses replace_configured since eth_subscribe already exists from reth's standard + // module Pass eth_api to enable proxying standard subscription types to + // reth's implementation let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb); ctx.modules.replace_configured(eth_pubsub.into_rpc())?; } else { diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs index c96fadf1..c4c9587a 100644 --- a/crates/runner/src/lib.rs +++ b/crates/runner/src/lib.rs @@ -13,7 +13,10 @@ mod runner; pub use runner::BaseNodeRunner; mod config; -pub use config::{BaseNodeConfig, FlashblocksConfig, TracingConfig}; +pub use config::{ + BaseNodeConfig, FlashblocksConfig, KafkaConfig, MeteringConfig, ResourceLimitsConfig, + TracingConfig, +}; mod extensions; pub use extensions::{ diff --git a/crates/runner/src/runner.rs b/crates/runner/src/runner.rs index e33f2ec4..9f0dcaca 100644 --- a/crates/runner/src/runner.rs +++ b/crates/runner/src/runner.rs @@ -56,7 +56,8 @@ impl BaseNodeRunner { ) -> Result> { info!(target: "base-runner", "starting custom Base node"); - let op_node = OpNode::new(config.rollup_args.clone()); + let op_node = + OpNode::new(config.rollup_args.clone()).with_da_config(config.da_config.clone()); let builder = builder .with_types_and_provider::>()