From 349d948cf6a3ee29ede7a014df421fbe0782bb93 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Mon, 15 Dec 2025 19:04:36 -0500 Subject: [PATCH 01/24] chore: impleement tests --- .../account-abstraction-core/core/src/lib.rs | 1 + .../core/src/mempool/mempool_impl.rs | 422 ++++++++++++++++++ .../core/src/mempool/mod.rs | 1 + .../core/src/types.rs | 9 + 4 files changed, 433 insertions(+) create mode 100644 crates/account-abstraction-core/core/src/mempool/mempool_impl.rs create mode 100644 crates/account-abstraction-core/core/src/mempool/mod.rs diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index fe08aa7..b3aa2f7 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -3,3 +3,4 @@ pub mod entrypoints; pub mod types; pub use account_abstraction_service::{AccountAbstractionService, AccountAbstractionServiceImpl}; pub use types::{SendUserOperationResponse, VersionedUserOperation}; +pub mod mempool; diff --git a/crates/account-abstraction-core/core/src/mempool/mempool_impl.rs b/crates/account-abstraction-core/core/src/mempool/mempool_impl.rs new file mode 100644 index 0000000..c338b9d --- /dev/null +++ b/crates/account-abstraction-core/core/src/mempool/mempool_impl.rs @@ -0,0 +1,422 @@ +use crate::types::VersionedUserOperation; +use alloy_primitives::FixedBytes; +use std::collections::{BTreeSet, HashMap}; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +pub struct PoolConfig { + minimum_required_pvg_gas: u128, +} + +pub type UserOpHash = FixedBytes<32>; + +#[derive(Eq, PartialEq, Clone, Debug)] +pub struct PoolOperation { + pub operation: VersionedUserOperation, + pub hash: UserOpHash, +} + +impl PoolOperation { + pub fn should_replace(&self, other: &PoolOperation) -> bool { + self.operation.max_fee_per_gas() < other.operation.max_fee_per_gas() + } +} + +#[derive(Eq, PartialEq, Clone, Debug)] +pub struct OrderedPoolOperation { + pub operation: PoolOperation, + pub submission_id: u64, + pub priority_order: u64, +} + +impl Ord for OrderedPoolOperation { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other + .operation.operation + .max_fee_per_gas() + .cmp(&self.operation.operation.max_fee_per_gas()) + .then_with(|| self.submission_id.cmp(&other.submission_id)) + } +} + +impl PartialOrd for OrderedPoolOperation { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl OrderedPoolOperation { + pub fn create_from_pool_operation(operation: &PoolOperation, submission_id: u64) -> Self { + Self { + operation: operation.clone(), + priority_order: submission_id, + submission_id, + } + } +} + +pub trait Mempool { + fn add_operation( + &mut self, + operation: &PoolOperation, + ) -> Result, anyhow::Error>; + fn get_top_operations(&self, n: usize) -> impl Iterator>; + + fn remove_operation( + &mut self, + operation_hash: &UserOpHash, + ) -> Result, anyhow::Error>; +} + +pub struct MempoolImpl { + config: PoolConfig, + best: BTreeSet, + hash_to_operation: HashMap, + submission_id_counter: AtomicU64, +} + +impl Mempool for MempoolImpl { + fn add_operation( + &mut self, + operation: &PoolOperation, + ) -> Result, anyhow::Error> { + if operation.operation.max_fee_per_gas() < self.config.minimum_required_pvg_gas { + return Err(anyhow::anyhow!( + "Gas price is below the minimum required PVG gas" + )); + } + let ordered_operation_result = self.handle_add_operation(operation)?; + Ok(ordered_operation_result) + } + + fn get_top_operations(&self, n: usize) -> impl Iterator> { + self.best + .iter() + .take(n) + .map(|o| Arc::new(o.operation.clone())) + } + + fn remove_operation( + &mut self, + operation_hash: &UserOpHash, + ) -> Result, anyhow::Error> { + if let Some(ordered_operation) = self.hash_to_operation.remove(operation_hash) { + self.best.remove(&ordered_operation); + Ok(Some(ordered_operation.operation)) + } else { + Ok(None) + } + } +} + +impl MempoolImpl { + fn handle_add_operation( + &mut self, + operation: &PoolOperation, + ) -> Result, anyhow::Error> { + if let Some(old_ordered_operation) = self.hash_to_operation.get(&operation.hash) { + if operation.should_replace(&old_ordered_operation.operation) { + self.best.remove(old_ordered_operation); + self.hash_to_operation.remove(&operation.hash); + } else { + return Ok(None); + } + } + + let order = self.get_next_order_id(); + let ordered_operation = OrderedPoolOperation::create_from_pool_operation(operation, order); + + self.best.insert(ordered_operation.clone()); + self.hash_to_operation + .insert(operation.hash, ordered_operation.clone()); + Ok(Some(ordered_operation)) + } + + fn get_next_order_id(&self) -> u64 { + self.submission_id_counter.fetch_add(1, Ordering::SeqCst) + } + + pub fn new(config: PoolConfig) -> Self { + Self { + config, + best: BTreeSet::new(), + hash_to_operation: HashMap::new(), + submission_id_counter: AtomicU64::new(0), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{Address, Uint}; + use alloy_rpc_types::erc4337; + + fn create_test_user_operation(max_fee_per_gas: u128) -> VersionedUserOperation { + VersionedUserOperation::UserOperation(erc4337::UserOperation { + sender: Address::ZERO, + nonce: Uint::from(0), + init_code: Default::default(), + call_data: Default::default(), + call_gas_limit: Uint::from(100000), + verification_gas_limit: Uint::from(100000), + pre_verification_gas: Uint::from(21000), + max_fee_per_gas: Uint::from(max_fee_per_gas), + max_priority_fee_per_gas: Uint::from(max_fee_per_gas), + paymaster_and_data: Default::default(), + signature: Default::default(), + }) + } + + fn create_pool_operation(max_fee_per_gas: u128, hash: UserOpHash) -> PoolOperation { + PoolOperation { + operation: create_test_user_operation(max_fee_per_gas), + hash, + } + } + + fn create_test_mempool(minimum_required_pvg_gas: u128) -> MempoolImpl { + MempoolImpl::new(PoolConfig { + minimum_required_pvg_gas, + }) + } + + // Tests successfully adding a valid operation to the mempool + #[test] + fn test_add_operation_success() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + let operation = create_pool_operation(2000, hash); + + let result = mempool.add_operation(&operation); + + assert!(result.is_ok()); + let ordered_op = result.unwrap(); + assert!(ordered_op.is_some()); + let ordered_op = ordered_op.unwrap(); + assert_eq!(ordered_op.operation.hash, hash); + assert_eq!( + ordered_op.operation.operation.max_fee_per_gas(), + Uint::from(2000) + ); + } + + // Tests adding an operation with a gas price below the minimum required PVG gas + #[test] + fn test_add_operation_below_minimum_gas() { + let mut mempool = create_test_mempool(2000); + let hash = FixedBytes::from([1u8; 32]); + let operation = create_pool_operation(1000, hash); + + let result = mempool.add_operation(&operation); + + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Gas price is below the minimum required PVG gas") + ); + } + + // Tests adding an operation with the same hash but higher gas price + #[test] + fn test_add_operation_duplicate_hash_higher_gas() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + + let operation1 = create_pool_operation(2000, hash); + let result1 = mempool.add_operation(&operation1); + assert!(result1.is_ok()); + assert!(result1.unwrap().is_some()); + + let operation2 = create_pool_operation(3000, hash); + let result2 = mempool.add_operation(&operation2); + assert!(result2.is_ok()); + assert!(result2.unwrap().is_none()); + } + + // Tests adding an operation with the same hash but lower gas price + #[test] + fn test_add_operation_duplicate_hash_lower_gas() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + + let operation1 = create_pool_operation(3000, hash); + let result1 = mempool.add_operation(&operation1); + assert!(result1.is_ok()); + assert!(result1.unwrap().is_some()); + + let operation2 = create_pool_operation(2000, hash); + let result2 = mempool.add_operation(&operation2); + assert!(result2.is_ok()); + let ordered_op = result2.unwrap(); + assert!(ordered_op.is_some()); + let ordered_op = ordered_op.unwrap(); + assert_eq!( + ordered_op.operation.operation.max_fee_per_gas(), + Uint::from(2000) + ); + } + + // Tests adding an operation with the same hash and equal gas price + #[test] + fn test_add_operation_duplicate_hash_equal_gas() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + + let operation1 = create_pool_operation(2000, hash); + let result1 = mempool.add_operation(&operation1); + assert!(result1.is_ok()); + assert!(result1.unwrap().is_some()); + + let operation2 = create_pool_operation(2000, hash); + let result2 = mempool.add_operation(&operation2); + assert!(result2.is_ok()); + assert!(result2.unwrap().is_none()); + } + + // Tests adding multiple operations with different hashes + #[test] + fn test_add_multiple_operations_with_different_hashes() { + let mut mempool = create_test_mempool(1000); + + let hash1 = FixedBytes::from([1u8; 32]); + let operation1 = create_pool_operation(2000, hash1); + let result1 = mempool.add_operation(&operation1); + assert!(result1.is_ok()); + assert!(result1.unwrap().is_some()); + + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = create_pool_operation(3000, hash2); + let result2 = mempool.add_operation(&operation2); + assert!(result2.is_ok()); + assert!(result2.unwrap().is_some()); + + let hash3 = FixedBytes::from([3u8; 32]); + let operation3 = create_pool_operation(1500, hash3); + let result3 = mempool.add_operation(&operation3); + assert!(result3.is_ok()); + assert!(result3.unwrap().is_some()); + + assert_eq!(mempool.hash_to_operation.len(), 3); + assert_eq!(mempool.best.len(), 3); + } + + // Tests removing an operation that is not in the mempool + #[test] + fn test_remove_operation_not_in_mempool() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + + let result = mempool.remove_operation(&hash); + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + } + + // Tests removing an operation that exists in the mempool + #[test] + fn test_remove_operation_exists() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + let operation = create_pool_operation(2000, hash); + + mempool.add_operation(&operation).unwrap(); + + let result = mempool.remove_operation(&hash); + assert!(result.is_ok()); + let removed = result.unwrap(); + assert!(removed.is_some()); + let removed_op = removed.unwrap(); + assert_eq!(removed_op.hash, hash); + assert_eq!(removed_op.operation.max_fee_per_gas(), Uint::from(2000)); + } + + // Tests removing an operation and checking the best operations + #[test] + fn test_remove_operation_and_check_best() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + let operation = create_pool_operation(2000, hash); + + mempool.add_operation(&operation).unwrap(); + + let best_before: Vec<_> = mempool.get_top_operations(10).collect(); + assert_eq!(best_before.len(), 1); + assert_eq!(best_before[0].hash, hash); + + let result = mempool.remove_operation(&hash); + assert!(result.is_ok()); + assert!(result.unwrap().is_some()); + + let best_after: Vec<_> = mempool.get_top_operations(10).collect(); + assert_eq!(best_after.len(), 0); + } + + // Tests getting the top operations with ordering + #[test] + fn test_get_top_operations_ordering() { + let mut mempool = create_test_mempool(1000); + + let hash1 = FixedBytes::from([1u8; 32]); + let operation1 = create_pool_operation(2000, hash1); + mempool.add_operation(&operation1).unwrap(); + + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = create_pool_operation(3000, hash2); + mempool.add_operation(&operation2).unwrap(); + + let hash3 = FixedBytes::from([3u8; 32]); + let operation3 = create_pool_operation(1500, hash3); + mempool.add_operation(&operation3).unwrap(); + + let best: Vec<_> = mempool.get_top_operations(10).collect(); + assert_eq!(best.len(), 3); + assert_eq!(best[0].operation.max_fee_per_gas(), Uint::from(3000)); + assert_eq!(best[1].operation.max_fee_per_gas(), Uint::from(2000)); + assert_eq!(best[2].operation.max_fee_per_gas(), Uint::from(1500)); + } + + // Tests getting the top operations with a limit + #[test] + fn test_get_top_operations_limit() { + let mut mempool = create_test_mempool(1000); + + let hash1 = FixedBytes::from([1u8; 32]); + let operation1 = create_pool_operation(2000, hash1); + mempool.add_operation(&operation1).unwrap(); + + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = create_pool_operation(3000, hash2); + mempool.add_operation(&operation2).unwrap(); + + let hash3 = FixedBytes::from([3u8; 32]); + let operation3 = create_pool_operation(1500, hash3); + mempool.add_operation(&operation3).unwrap(); + + let best: Vec<_> = mempool.get_top_operations(2).collect(); + assert_eq!(best.len(), 2); + assert_eq!(best[0].operation.max_fee_per_gas(), Uint::from(3000)); + assert_eq!(best[1].operation.max_fee_per_gas(), Uint::from(2000)); + } + + // Tests top opperations tie breaker with submission id + #[test] + fn test_get_top_operations_submission_id_tie_breaker() { + let mut mempool = create_test_mempool(1000); + + let hash1 = FixedBytes::from([1u8; 32]); + let operation1 = create_pool_operation(2000, hash1); + mempool.add_operation(&operation1).unwrap().unwrap(); + + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = create_pool_operation(2000, hash2); + mempool.add_operation(&operation2).unwrap().unwrap(); + + let best: Vec<_> = mempool.get_top_operations(2).collect(); + assert_eq!(best.len(), 2); + assert_eq!(best[0].hash, hash1); + assert_eq!(best[1].hash, hash2); + } +} diff --git a/crates/account-abstraction-core/core/src/mempool/mod.rs b/crates/account-abstraction-core/core/src/mempool/mod.rs new file mode 100644 index 0000000..92cedb1 --- /dev/null +++ b/crates/account-abstraction-core/core/src/mempool/mod.rs @@ -0,0 +1 @@ +pub mod mempool_impl; diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 03e3eb2..900c7a2 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -11,6 +11,15 @@ pub enum VersionedUserOperation { UserOperation(erc4337::UserOperation), PackedUserOperation(erc4337::PackedUserOperation), } + +impl VersionedUserOperation { + pub fn max_fee_per_gas(&self) -> U256 { + match self { + VersionedUserOperation::UserOperation(op) => op.max_fee_per_gas, + VersionedUserOperation::PackedUserOperation(op) => op.max_fee_per_gas, + } + } +} #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct UserOperationRequest { pub user_operation: VersionedUserOperation, From 8f19d0fee7dbba00bb6d2ec0679a814d5e003c91 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Mon, 15 Dec 2025 19:19:52 -0500 Subject: [PATCH 02/24] chore: create pool mod --- .../core/src/mempool/mod.rs | 2 +- .../src/mempool/{mempool_impl.rs => pool.rs} | 39 ++++++------------- .../core/src/types.rs | 17 +++++++- 3 files changed, 29 insertions(+), 29 deletions(-) rename crates/account-abstraction-core/core/src/mempool/{mempool_impl.rs => pool.rs} (93%) diff --git a/crates/account-abstraction-core/core/src/mempool/mod.rs b/crates/account-abstraction-core/core/src/mempool/mod.rs index 92cedb1..c9e2bb1 100644 --- a/crates/account-abstraction-core/core/src/mempool/mod.rs +++ b/crates/account-abstraction-core/core/src/mempool/mod.rs @@ -1 +1 @@ -pub mod mempool_impl; +pub mod pool; diff --git a/crates/account-abstraction-core/core/src/mempool/mempool_impl.rs b/crates/account-abstraction-core/core/src/mempool/pool.rs similarity index 93% rename from crates/account-abstraction-core/core/src/mempool/mempool_impl.rs rename to crates/account-abstraction-core/core/src/mempool/pool.rs index c338b9d..a30e0a7 100644 --- a/crates/account-abstraction-core/core/src/mempool/mempool_impl.rs +++ b/crates/account-abstraction-core/core/src/mempool/pool.rs @@ -1,5 +1,5 @@ -use crate::types::VersionedUserOperation; -use alloy_primitives::FixedBytes; +use crate::types::{PoolOperation, UserOpHash, VersionedUserOperation}; +use alloy_primitives::{FixedBytes}; use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use std::sync::atomic::AtomicU64; @@ -9,23 +9,9 @@ pub struct PoolConfig { minimum_required_pvg_gas: u128, } -pub type UserOpHash = FixedBytes<32>; - -#[derive(Eq, PartialEq, Clone, Debug)] -pub struct PoolOperation { - pub operation: VersionedUserOperation, - pub hash: UserOpHash, -} - -impl PoolOperation { - pub fn should_replace(&self, other: &PoolOperation) -> bool { - self.operation.max_fee_per_gas() < other.operation.max_fee_per_gas() - } -} - #[derive(Eq, PartialEq, Clone, Debug)] pub struct OrderedPoolOperation { - pub operation: PoolOperation, + pub pool_operation: PoolOperation, pub submission_id: u64, pub priority_order: u64, } @@ -33,9 +19,9 @@ pub struct OrderedPoolOperation { impl Ord for OrderedPoolOperation { fn cmp(&self, other: &Self) -> std::cmp::Ordering { other - .operation.operation + .pool_operation.operation .max_fee_per_gas() - .cmp(&self.operation.operation.max_fee_per_gas()) + .cmp(&self.pool_operation.operation.max_fee_per_gas()) .then_with(|| self.submission_id.cmp(&other.submission_id)) } } @@ -49,7 +35,7 @@ impl PartialOrd for OrderedPoolOperation { impl OrderedPoolOperation { pub fn create_from_pool_operation(operation: &PoolOperation, submission_id: u64) -> Self { Self { - operation: operation.clone(), + pool_operation: operation.clone(), priority_order: submission_id, submission_id, } @@ -62,7 +48,6 @@ pub trait Mempool { operation: &PoolOperation, ) -> Result, anyhow::Error>; fn get_top_operations(&self, n: usize) -> impl Iterator>; - fn remove_operation( &mut self, operation_hash: &UserOpHash, @@ -94,7 +79,7 @@ impl Mempool for MempoolImpl { self.best .iter() .take(n) - .map(|o| Arc::new(o.operation.clone())) + .map(|o| Arc::new(o.pool_operation.clone())) } fn remove_operation( @@ -103,7 +88,7 @@ impl Mempool for MempoolImpl { ) -> Result, anyhow::Error> { if let Some(ordered_operation) = self.hash_to_operation.remove(operation_hash) { self.best.remove(&ordered_operation); - Ok(Some(ordered_operation.operation)) + Ok(Some(ordered_operation.pool_operation)) } else { Ok(None) } @@ -116,7 +101,7 @@ impl MempoolImpl { operation: &PoolOperation, ) -> Result, anyhow::Error> { if let Some(old_ordered_operation) = self.hash_to_operation.get(&operation.hash) { - if operation.should_replace(&old_ordered_operation.operation) { + if operation.should_replace(&old_ordered_operation.pool_operation) { self.best.remove(old_ordered_operation); self.hash_to_operation.remove(&operation.hash); } else { @@ -195,9 +180,9 @@ mod tests { let ordered_op = result.unwrap(); assert!(ordered_op.is_some()); let ordered_op = ordered_op.unwrap(); - assert_eq!(ordered_op.operation.hash, hash); + assert_eq!(ordered_op.pool_operation.hash, hash); assert_eq!( - ordered_op.operation.operation.max_fee_per_gas(), + ordered_op.pool_operation.operation.max_fee_per_gas(), Uint::from(2000) ); } @@ -255,7 +240,7 @@ mod tests { assert!(ordered_op.is_some()); let ordered_op = ordered_op.unwrap(); assert_eq!( - ordered_op.operation.operation.max_fee_per_gas(), + ordered_op.pool_operation.operation.max_fee_per_gas(), Uint::from(2000) ); } diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 900c7a2..4cabf23 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -1,5 +1,5 @@ use crate::entrypoints::{v06, v07, version::EntryPointVersion}; -use alloy_primitives::{Address, B256, ChainId, U256}; +use alloy_primitives::{Address, B256, ChainId, U256, FixedBytes}; use alloy_rpc_types::erc4337; pub use alloy_rpc_types::erc4337::SendUserOperationResponse; use anyhow::Result; @@ -116,6 +116,21 @@ pub struct AggregatorInfo { pub stake_info: EntityStakeInfo, } + +pub type UserOpHash = FixedBytes<32>; + +#[derive(Eq, PartialEq, Clone, Debug)] +pub struct PoolOperation { + pub operation: VersionedUserOperation, + pub hash: UserOpHash, +} + +impl PoolOperation { + pub fn should_replace(&self, other: &PoolOperation) -> bool { + self.operation.max_fee_per_gas() < other.operation.max_fee_per_gas() + } +} + // Tests #[cfg(test)] mod tests { From d45aced3eb93f42baa8124c5e2408bc8da9c80a4 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Mon, 15 Dec 2025 19:37:56 -0500 Subject: [PATCH 03/24] feaet: expose mempool --- crates/account-abstraction-core/core/src/lib.rs | 2 +- .../core/src/{mempool/pool.rs => mempool.rs} | 0 crates/account-abstraction-core/core/src/mempool/mod.rs | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) rename crates/account-abstraction-core/core/src/{mempool/pool.rs => mempool.rs} (100%) delete mode 100644 crates/account-abstraction-core/core/src/mempool/mod.rs diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index b3aa2f7..73cf5f3 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -3,4 +3,4 @@ pub mod entrypoints; pub mod types; pub use account_abstraction_service::{AccountAbstractionService, AccountAbstractionServiceImpl}; pub use types::{SendUserOperationResponse, VersionedUserOperation}; -pub mod mempool; +pub mod mempool; diff --git a/crates/account-abstraction-core/core/src/mempool/pool.rs b/crates/account-abstraction-core/core/src/mempool.rs similarity index 100% rename from crates/account-abstraction-core/core/src/mempool/pool.rs rename to crates/account-abstraction-core/core/src/mempool.rs diff --git a/crates/account-abstraction-core/core/src/mempool/mod.rs b/crates/account-abstraction-core/core/src/mempool/mod.rs deleted file mode 100644 index c9e2bb1..0000000 --- a/crates/account-abstraction-core/core/src/mempool/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod pool; From dee8e1de00363230960d4af809ca4a673ce9c8cf Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 09:38:47 -0500 Subject: [PATCH 04/24] chore: update mempool --- .../core/src/mempool.rs | 30 ++++++++++--------- .../core/src/types.rs | 7 +++++ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index a30e0a7..2888e4e 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -1,5 +1,4 @@ -use crate::types::{PoolOperation, UserOpHash, VersionedUserOperation}; -use alloy_primitives::{FixedBytes}; +use crate::types::{PoolOperation, UserOpHash}; use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use std::sync::atomic::AtomicU64; @@ -17,11 +16,14 @@ pub struct OrderedPoolOperation { } impl Ord for OrderedPoolOperation { + /// TODO: There can be invalid opperations, where base fee, + expected gas price + /// is greater that the maximum gas, in that case we don't include it in the mempool as such mempool changes. fn cmp(&self, other: &Self) -> std::cmp::Ordering { other - .pool_operation.operation - .max_fee_per_gas() - .cmp(&self.pool_operation.operation.max_fee_per_gas()) + .pool_operation + .operation + .max_priority_fee_per_gas() + .cmp(&self.pool_operation.operation.max_priority_fee_per_gas()) .then_with(|| self.submission_id.cmp(&other.submission_id)) } } @@ -51,7 +53,7 @@ pub trait Mempool { fn remove_operation( &mut self, operation_hash: &UserOpHash, - ) -> Result, anyhow::Error>; + ) -> Result, anyhow::Error> } pub struct MempoolImpl { @@ -135,10 +137,10 @@ impl MempoolImpl { #[cfg(test)] mod tests { use super::*; - use alloy_primitives::{Address, Uint}; - use alloy_rpc_types::erc4337; - - fn create_test_user_operation(max_fee_per_gas: u128) -> VersionedUserOperation { + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::{erc4337}; + use crate::types::VersionedUserOperation; + fn create_test_user_operation(max_priority_fee_per_gas: u128) -> VersionedUserOperation { VersionedUserOperation::UserOperation(erc4337::UserOperation { sender: Address::ZERO, nonce: Uint::from(0), @@ -147,16 +149,16 @@ mod tests { call_gas_limit: Uint::from(100000), verification_gas_limit: Uint::from(100000), pre_verification_gas: Uint::from(21000), - max_fee_per_gas: Uint::from(max_fee_per_gas), - max_priority_fee_per_gas: Uint::from(max_fee_per_gas), + max_fee_per_gas: Uint::from(max_priority_fee_per_gas), + max_priority_fee_per_gas: Uint::from(max_priority_fee_per_gas), paymaster_and_data: Default::default(), signature: Default::default(), }) } - fn create_pool_operation(max_fee_per_gas: u128, hash: UserOpHash) -> PoolOperation { + fn create_pool_operation(max_priority_fee_per_gas: u128, hash: UserOpHash) -> PoolOperation { PoolOperation { - operation: create_test_user_operation(max_fee_per_gas), + operation: create_test_user_operation(max_priority_fee_per_gas), hash, } } diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 4cabf23..465bea0 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -19,6 +19,13 @@ impl VersionedUserOperation { VersionedUserOperation::PackedUserOperation(op) => op.max_fee_per_gas, } } + + pub fn max_priority_fee_per_gas(&self) -> U256 { + match self { + VersionedUserOperation::UserOperation(op) => op.max_priority_fee_per_gas, + VersionedUserOperation::PackedUserOperation(op) => op.max_priority_fee_per_gas, + } + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct UserOperationRequest { From c973a842b02258adf428f86611f231d628b14877 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 09:52:18 -0500 Subject: [PATCH 05/24] chore: update --- .../account-abstraction-core/core/src/lib.rs | 2 +- .../core/src/mempool.rs | 24 +++++++------------ .../core/src/types.rs | 5 ++-- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index 73cf5f3..b3aa2f7 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -3,4 +3,4 @@ pub mod entrypoints; pub mod types; pub use account_abstraction_service::{AccountAbstractionService, AccountAbstractionServiceImpl}; pub use types::{SendUserOperationResponse, VersionedUserOperation}; -pub mod mempool; +pub mod mempool; diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 2888e4e..3cc274f 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -5,7 +5,7 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; pub struct PoolConfig { - minimum_required_pvg_gas: u128, + minimum_max_fee_per_gas: u128, } #[derive(Eq, PartialEq, Clone, Debug)] @@ -16,7 +16,7 @@ pub struct OrderedPoolOperation { } impl Ord for OrderedPoolOperation { - /// TODO: There can be invalid opperations, where base fee, + expected gas price + /// TODO: There can be invalid opperations, where base fee, + expected gas price /// is greater that the maximum gas, in that case we don't include it in the mempool as such mempool changes. fn cmp(&self, other: &Self) -> std::cmp::Ordering { other @@ -53,7 +53,7 @@ pub trait Mempool { fn remove_operation( &mut self, operation_hash: &UserOpHash, - ) -> Result, anyhow::Error> + ) -> Result, anyhow::Error>; } pub struct MempoolImpl { @@ -68,7 +68,7 @@ impl Mempool for MempoolImpl { &mut self, operation: &PoolOperation, ) -> Result, anyhow::Error> { - if operation.operation.max_fee_per_gas() < self.config.minimum_required_pvg_gas { + if operation.operation.max_fee_per_gas() < self.config.minimum_max_fee_per_gas { return Err(anyhow::anyhow!( "Gas price is below the minimum required PVG gas" )); @@ -137,9 +137,9 @@ impl MempoolImpl { #[cfg(test)] mod tests { use super::*; - use alloy_primitives::{Address, FixedBytes, Uint}; - use alloy_rpc_types::{erc4337}; use crate::types::VersionedUserOperation; + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::erc4337; fn create_test_user_operation(max_priority_fee_per_gas: u128) -> VersionedUserOperation { VersionedUserOperation::UserOperation(erc4337::UserOperation { sender: Address::ZERO, @@ -165,7 +165,7 @@ mod tests { fn create_test_mempool(minimum_required_pvg_gas: u128) -> MempoolImpl { MempoolImpl::new(PoolConfig { - minimum_required_pvg_gas, + minimum_max_fee_per_gas: minimum_required_pvg_gas, }) } @@ -221,7 +221,7 @@ mod tests { let operation2 = create_pool_operation(3000, hash); let result2 = mempool.add_operation(&operation2); assert!(result2.is_ok()); - assert!(result2.unwrap().is_none()); + assert!(result2.unwrap().is_some()); } // Tests adding an operation with the same hash but lower gas price @@ -238,13 +238,7 @@ mod tests { let operation2 = create_pool_operation(2000, hash); let result2 = mempool.add_operation(&operation2); assert!(result2.is_ok()); - let ordered_op = result2.unwrap(); - assert!(ordered_op.is_some()); - let ordered_op = ordered_op.unwrap(); - assert_eq!( - ordered_op.pool_operation.operation.max_fee_per_gas(), - Uint::from(2000) - ); + assert!(result2.unwrap().is_none()); } // Tests adding an operation with the same hash and equal gas price diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 465bea0..8b8f031 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -1,5 +1,5 @@ use crate::entrypoints::{v06, v07, version::EntryPointVersion}; -use alloy_primitives::{Address, B256, ChainId, U256, FixedBytes}; +use alloy_primitives::{Address, B256, ChainId, FixedBytes, U256}; use alloy_rpc_types::erc4337; pub use alloy_rpc_types::erc4337::SendUserOperationResponse; use anyhow::Result; @@ -123,7 +123,6 @@ pub struct AggregatorInfo { pub stake_info: EntityStakeInfo, } - pub type UserOpHash = FixedBytes<32>; #[derive(Eq, PartialEq, Clone, Debug)] @@ -134,7 +133,7 @@ pub struct PoolOperation { impl PoolOperation { pub fn should_replace(&self, other: &PoolOperation) -> bool { - self.operation.max_fee_per_gas() < other.operation.max_fee_per_gas() + self.operation.max_fee_per_gas() > other.operation.max_fee_per_gas() } } From 040fa8fd149f9f6652430bd0036ce82cdefc8e5f Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 12:19:35 -0500 Subject: [PATCH 06/24] chore: implement mempool --- .../core/src/mempool.rs | 250 ++++++++++++++---- .../core/src/types.rs | 21 +- 2 files changed, 211 insertions(+), 60 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 3cc274f..33ecd12 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -1,8 +1,9 @@ -use crate::types::{PoolOperation, UserOpHash}; +use crate::types::{UserOpHash, WrappedUserOperation}; +use alloy_primitives::Address; +use std::cmp::Ordering; use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; pub struct PoolConfig { minimum_max_fee_per_gas: u128, @@ -10,63 +11,113 @@ pub struct PoolConfig { #[derive(Eq, PartialEq, Clone, Debug)] pub struct OrderedPoolOperation { - pub pool_operation: PoolOperation, + pub pool_operation: WrappedUserOperation, pub submission_id: u64, - pub priority_order: u64, } -impl Ord for OrderedPoolOperation { +impl OrderedPoolOperation { + pub fn create_from_pool_operation( + operation: &WrappedUserOperation, + submission_id: u64, + ) -> Self { + Self { + pool_operation: operation.clone(), + submission_id, + } + } + + pub fn sender(&self) -> Address { + self.pool_operation.operation.sender() + } +} + +/// Ordering by max priority fee (desc) then submission id, then hash to ensure total order +#[derive(Clone, Debug)] +pub struct ByMaxFeeAndSubmissionId(pub OrderedPoolOperation); + +impl PartialEq for ByMaxFeeAndSubmissionId { + fn eq(&self, other: &Self) -> bool { + self.0.pool_operation.hash == other.0.pool_operation.hash + } +} +impl Eq for ByMaxFeeAndSubmissionId {} + +impl PartialOrd for ByMaxFeeAndSubmissionId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ByMaxFeeAndSubmissionId { /// TODO: There can be invalid opperations, where base fee, + expected gas price /// is greater that the maximum gas, in that case we don't include it in the mempool as such mempool changes. - fn cmp(&self, other: &Self) -> std::cmp::Ordering { + fn cmp(&self, other: &Self) -> Ordering { other + .0 .pool_operation .operation .max_priority_fee_per_gas() - .cmp(&self.pool_operation.operation.max_priority_fee_per_gas()) - .then_with(|| self.submission_id.cmp(&other.submission_id)) + .cmp(&self.0.pool_operation.operation.max_priority_fee_per_gas()) + .then_with(|| self.0.submission_id.cmp(&other.0.submission_id)) + .then_with(|| self.0.pool_operation.hash.cmp(&other.0.pool_operation.hash)) + } +} + +/// Ordering by nonce (asc), then submission id, then hash to ensure total order +#[derive(Clone, Debug)] +pub struct ByNonce(pub OrderedPoolOperation); + +impl PartialEq for ByNonce { + fn eq(&self, other: &Self) -> bool { + self.0.pool_operation.hash == other.0.pool_operation.hash } } +impl Eq for ByNonce {} -impl PartialOrd for OrderedPoolOperation { - fn partial_cmp(&self, other: &Self) -> Option { +impl PartialOrd for ByNonce { + fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl OrderedPoolOperation { - pub fn create_from_pool_operation(operation: &PoolOperation, submission_id: u64) -> Self { - Self { - pool_operation: operation.clone(), - priority_order: submission_id, - submission_id, - } +impl Ord for ByNonce { + /// TODO: There can be invalid opperations, where base fee, + expected gas price + /// is greater that the maximum gas, in that case we don't include it in the mempool as such mempool changes. + fn cmp(&self, other: &Self) -> Ordering { + self.0 + .pool_operation + .operation + .nonce() + .cmp(&other.0.pool_operation.operation.nonce()) + .then_with(|| self.0.submission_id.cmp(&other.0.submission_id)) + .then_with(|| self.0.pool_operation.hash.cmp(&other.0.pool_operation.hash)) } } pub trait Mempool { fn add_operation( &mut self, - operation: &PoolOperation, + operation: &WrappedUserOperation, ) -> Result, anyhow::Error>; - fn get_top_operations(&self, n: usize) -> impl Iterator>; + fn get_top_operations(&self, n: usize) -> impl Iterator>; fn remove_operation( &mut self, operation_hash: &UserOpHash, - ) -> Result, anyhow::Error>; + ) -> Result, anyhow::Error>; } pub struct MempoolImpl { config: PoolConfig, - best: BTreeSet, + best: BTreeSet, hash_to_operation: HashMap, + account_to_operation: HashMap>, submission_id_counter: AtomicU64, } impl Mempool for MempoolImpl { fn add_operation( &mut self, - operation: &PoolOperation, + operation: &WrappedUserOperation, ) -> Result, anyhow::Error> { if operation.operation.max_fee_per_gas() < self.config.minimum_max_fee_per_gas { return Err(anyhow::anyhow!( @@ -77,19 +128,50 @@ impl Mempool for MempoolImpl { Ok(ordered_operation_result) } - fn get_top_operations(&self, n: usize) -> impl Iterator> { + fn get_top_operations(&self, n: usize) -> impl Iterator> { + // Filter so we only surface an account's lowest-nonce op (no gaps) self.best .iter() + .filter_map(|op_by_fee| { + let lowest = self + .account_to_operation + .get(&op_by_fee.0.sender()) + .and_then(|set| set.first()); + + match lowest { + Some(lowest) + if lowest.0.pool_operation.hash == op_by_fee.0.pool_operation.hash => + { + Some(Arc::new(op_by_fee.0.pool_operation.clone())) + } + Some(_) => None, + None => { + println!( + "No operations found for account: {} but one was found in the best set", + op_by_fee.0.sender() + ); + None + } + } + }) .take(n) - .map(|o| Arc::new(o.pool_operation.clone())) } fn remove_operation( &mut self, operation_hash: &UserOpHash, - ) -> Result, anyhow::Error> { + ) -> Result, anyhow::Error> { if let Some(ordered_operation) = self.hash_to_operation.remove(operation_hash) { - self.best.remove(&ordered_operation); + self.best + .remove(&ByMaxFeeAndSubmissionId(ordered_operation.clone())); + + let sender = ordered_operation.sender(); + if let Some(by_nonce_set) = self.account_to_operation.get_mut(&sender) { + by_nonce_set.remove(&ByNonce(ordered_operation.clone())); + if by_nonce_set.is_empty() { + self.account_to_operation.remove(&sender); + } + } Ok(Some(ordered_operation.pool_operation)) } else { Ok(None) @@ -97,14 +179,25 @@ impl Mempool for MempoolImpl { } } +// When user opperation is added to the mempool we need to check + impl MempoolImpl { fn handle_add_operation( &mut self, - operation: &PoolOperation, + operation: &WrappedUserOperation, ) -> Result, anyhow::Error> { + // Account if let Some(old_ordered_operation) = self.hash_to_operation.get(&operation.hash) { - if operation.should_replace(&old_ordered_operation.pool_operation) { - self.best.remove(old_ordered_operation); + if operation.has_higher_max_fee(&old_ordered_operation.pool_operation) { + self.best + .remove(&ByMaxFeeAndSubmissionId(old_ordered_operation.clone())); + let sender = old_ordered_operation.sender(); + if let Some(by_nonce_set) = self.account_to_operation.get_mut(&sender) { + by_nonce_set.remove(&ByNonce(old_ordered_operation.clone())); + if by_nonce_set.is_empty() { + self.account_to_operation.remove(&sender); + } + } self.hash_to_operation.remove(&operation.hash); } else { return Ok(None); @@ -114,14 +207,20 @@ impl MempoolImpl { let order = self.get_next_order_id(); let ordered_operation = OrderedPoolOperation::create_from_pool_operation(operation, order); - self.best.insert(ordered_operation.clone()); + self.best + .insert(ByMaxFeeAndSubmissionId(ordered_operation.clone())); + self.account_to_operation + .entry(ordered_operation.sender()) + .or_default() + .insert(ByNonce(ordered_operation.clone())); self.hash_to_operation .insert(operation.hash, ordered_operation.clone()); Ok(Some(ordered_operation)) } fn get_next_order_id(&self) -> u64 { - self.submission_id_counter.fetch_add(1, Ordering::SeqCst) + self.submission_id_counter + .fetch_add(1, AtomicOrdering::SeqCst) } pub fn new(config: PoolConfig) -> Self { @@ -129,6 +228,7 @@ impl MempoolImpl { config, best: BTreeSet::new(), hash_to_operation: HashMap::new(), + account_to_operation: HashMap::new(), submission_id_counter: AtomicU64::new(0), } } @@ -142,7 +242,7 @@ mod tests { use alloy_rpc_types::erc4337; fn create_test_user_operation(max_priority_fee_per_gas: u128) -> VersionedUserOperation { VersionedUserOperation::UserOperation(erc4337::UserOperation { - sender: Address::ZERO, + sender: Address::random(), nonce: Uint::from(0), init_code: Default::default(), call_data: Default::default(), @@ -156,8 +256,8 @@ mod tests { }) } - fn create_pool_operation(max_priority_fee_per_gas: u128, hash: UserOpHash) -> PoolOperation { - PoolOperation { + fn create_wrapped_operation(max_priority_fee_per_gas: u128, hash: UserOpHash) -> WrappedUserOperation { + WrappedUserOperation { operation: create_test_user_operation(max_priority_fee_per_gas), hash, } @@ -174,7 +274,7 @@ mod tests { fn test_add_operation_success() { let mut mempool = create_test_mempool(1000); let hash = FixedBytes::from([1u8; 32]); - let operation = create_pool_operation(2000, hash); + let operation = create_wrapped_operation(2000, hash); let result = mempool.add_operation(&operation); @@ -194,7 +294,7 @@ mod tests { fn test_add_operation_below_minimum_gas() { let mut mempool = create_test_mempool(2000); let hash = FixedBytes::from([1u8; 32]); - let operation = create_pool_operation(1000, hash); + let operation = create_wrapped_operation(1000, hash); let result = mempool.add_operation(&operation); @@ -213,12 +313,12 @@ mod tests { let mut mempool = create_test_mempool(1000); let hash = FixedBytes::from([1u8; 32]); - let operation1 = create_pool_operation(2000, hash); + let operation1 = create_wrapped_operation(2000, hash); let result1 = mempool.add_operation(&operation1); assert!(result1.is_ok()); assert!(result1.unwrap().is_some()); - let operation2 = create_pool_operation(3000, hash); + let operation2 = create_wrapped_operation(3000, hash); let result2 = mempool.add_operation(&operation2); assert!(result2.is_ok()); assert!(result2.unwrap().is_some()); @@ -230,12 +330,12 @@ mod tests { let mut mempool = create_test_mempool(1000); let hash = FixedBytes::from([1u8; 32]); - let operation1 = create_pool_operation(3000, hash); + let operation1 = create_wrapped_operation(3000, hash); let result1 = mempool.add_operation(&operation1); assert!(result1.is_ok()); assert!(result1.unwrap().is_some()); - let operation2 = create_pool_operation(2000, hash); + let operation2 = create_wrapped_operation(2000, hash); let result2 = mempool.add_operation(&operation2); assert!(result2.is_ok()); assert!(result2.unwrap().is_none()); @@ -247,12 +347,12 @@ mod tests { let mut mempool = create_test_mempool(1000); let hash = FixedBytes::from([1u8; 32]); - let operation1 = create_pool_operation(2000, hash); + let operation1 = create_wrapped_operation(2000, hash); let result1 = mempool.add_operation(&operation1); assert!(result1.is_ok()); assert!(result1.unwrap().is_some()); - let operation2 = create_pool_operation(2000, hash); + let operation2 = create_wrapped_operation(2000, hash); let result2 = mempool.add_operation(&operation2); assert!(result2.is_ok()); assert!(result2.unwrap().is_none()); @@ -264,19 +364,19 @@ mod tests { let mut mempool = create_test_mempool(1000); let hash1 = FixedBytes::from([1u8; 32]); - let operation1 = create_pool_operation(2000, hash1); + let operation1 = create_wrapped_operation(2000, hash1); let result1 = mempool.add_operation(&operation1); assert!(result1.is_ok()); assert!(result1.unwrap().is_some()); let hash2 = FixedBytes::from([2u8; 32]); - let operation2 = create_pool_operation(3000, hash2); + let operation2 = create_wrapped_operation(3000, hash2); let result2 = mempool.add_operation(&operation2); assert!(result2.is_ok()); assert!(result2.unwrap().is_some()); let hash3 = FixedBytes::from([3u8; 32]); - let operation3 = create_pool_operation(1500, hash3); + let operation3 = create_wrapped_operation(1500, hash3); let result3 = mempool.add_operation(&operation3); assert!(result3.is_ok()); assert!(result3.unwrap().is_some()); @@ -301,7 +401,7 @@ mod tests { fn test_remove_operation_exists() { let mut mempool = create_test_mempool(1000); let hash = FixedBytes::from([1u8; 32]); - let operation = create_pool_operation(2000, hash); + let operation = create_wrapped_operation(2000, hash); mempool.add_operation(&operation).unwrap(); @@ -319,7 +419,7 @@ mod tests { fn test_remove_operation_and_check_best() { let mut mempool = create_test_mempool(1000); let hash = FixedBytes::from([1u8; 32]); - let operation = create_pool_operation(2000, hash); + let operation = create_wrapped_operation(2000, hash); mempool.add_operation(&operation).unwrap(); @@ -341,15 +441,15 @@ mod tests { let mut mempool = create_test_mempool(1000); let hash1 = FixedBytes::from([1u8; 32]); - let operation1 = create_pool_operation(2000, hash1); + let operation1 = create_wrapped_operation(2000, hash1); mempool.add_operation(&operation1).unwrap(); let hash2 = FixedBytes::from([2u8; 32]); - let operation2 = create_pool_operation(3000, hash2); + let operation2 = create_wrapped_operation(3000, hash2); mempool.add_operation(&operation2).unwrap(); let hash3 = FixedBytes::from([3u8; 32]); - let operation3 = create_pool_operation(1500, hash3); + let operation3 = create_wrapped_operation(1500, hash3); mempool.add_operation(&operation3).unwrap(); let best: Vec<_> = mempool.get_top_operations(10).collect(); @@ -365,15 +465,15 @@ mod tests { let mut mempool = create_test_mempool(1000); let hash1 = FixedBytes::from([1u8; 32]); - let operation1 = create_pool_operation(2000, hash1); + let operation1 = create_wrapped_operation(2000, hash1); mempool.add_operation(&operation1).unwrap(); let hash2 = FixedBytes::from([2u8; 32]); - let operation2 = create_pool_operation(3000, hash2); + let operation2 = create_wrapped_operation(3000, hash2); mempool.add_operation(&operation2).unwrap(); let hash3 = FixedBytes::from([3u8; 32]); - let operation3 = create_pool_operation(1500, hash3); + let operation3 = create_wrapped_operation(1500, hash3); mempool.add_operation(&operation3).unwrap(); let best: Vec<_> = mempool.get_top_operations(2).collect(); @@ -388,11 +488,11 @@ mod tests { let mut mempool = create_test_mempool(1000); let hash1 = FixedBytes::from([1u8; 32]); - let operation1 = create_pool_operation(2000, hash1); + let operation1 = create_wrapped_operation(2000, hash1); mempool.add_operation(&operation1).unwrap().unwrap(); let hash2 = FixedBytes::from([2u8; 32]); - let operation2 = create_pool_operation(2000, hash2); + let operation2 = create_wrapped_operation(2000, hash2); mempool.add_operation(&operation2).unwrap().unwrap(); let best: Vec<_> = mempool.get_top_operations(2).collect(); @@ -400,4 +500,42 @@ mod tests { assert_eq!(best[0].hash, hash1); assert_eq!(best[1].hash, hash2); } + + #[test] + fn test_get_top_operations_should_return_the_lowest_nonce_operation_for_each_account() { + let mut mempool = create_test_mempool(1000); + let hash1 = FixedBytes::from([1u8; 32]); + let test_user_operation = create_test_user_operation(2000); + + // Destructure to the inner struct, then update nonce + let base_op = match test_user_operation.clone() { + VersionedUserOperation::UserOperation(op) => op, + _ => panic!("expected UserOperation variant"), + }; + + let operation1 = WrappedUserOperation { + operation: VersionedUserOperation::UserOperation(erc4337::UserOperation { + nonce: Uint::from(0), + max_fee_per_gas: Uint::from(2000), + ..base_op.clone() + }), + hash: hash1, + }; + + mempool.add_operation(&operation1).unwrap().unwrap(); + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = WrappedUserOperation { + operation: VersionedUserOperation::UserOperation(erc4337::UserOperation { + nonce: Uint::from(1), + max_fee_per_gas: Uint::from(10_000), + ..base_op.clone() + }), + hash: hash2, + }; + mempool.add_operation(&operation2).unwrap().unwrap(); + + let best: Vec<_> = mempool.get_top_operations(2).collect(); + assert_eq!(best.len(), 1); + assert_eq!(best[0].operation.nonce(), Uint::from(0)); + } } diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 8b8f031..4600839 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -26,6 +26,19 @@ impl VersionedUserOperation { VersionedUserOperation::PackedUserOperation(op) => op.max_priority_fee_per_gas, } } + pub fn nonce(&self) -> U256 { + match self { + VersionedUserOperation::UserOperation(op) => op.nonce, + VersionedUserOperation::PackedUserOperation(op) => op.nonce, + } + } + + pub fn sender(&self) -> Address { + match self { + VersionedUserOperation::UserOperation(op) => op.sender, + VersionedUserOperation::PackedUserOperation(op) => op.sender, + } + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct UserOperationRequest { @@ -126,14 +139,14 @@ pub struct AggregatorInfo { pub type UserOpHash = FixedBytes<32>; #[derive(Eq, PartialEq, Clone, Debug)] -pub struct PoolOperation { +pub struct WrappedUserOperation { pub operation: VersionedUserOperation, pub hash: UserOpHash, } -impl PoolOperation { - pub fn should_replace(&self, other: &PoolOperation) -> bool { - self.operation.max_fee_per_gas() > other.operation.max_fee_per_gas() +impl WrappedUserOperation { + pub fn has_higher_max_fee(&self, other: &WrappedUserOperation) -> bool { + self.operation.max_fee_per_gas() > other.operation.max_fee_per_gas() } } From 80e1d413aa52ccdbe654d82e9b7db8f284119477 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 12:25:51 -0500 Subject: [PATCH 07/24] chore: remove mempool --- .../core/src/mempool.rs | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 33ecd12..f430b97 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -16,7 +16,7 @@ pub struct OrderedPoolOperation { } impl OrderedPoolOperation { - pub fn create_from_pool_operation( + pub fn from_wrapped( operation: &WrappedUserOperation, submission_id: u64, ) -> Self { @@ -110,7 +110,7 @@ pub struct MempoolImpl { config: PoolConfig, best: BTreeSet, hash_to_operation: HashMap, - account_to_operation: HashMap>, + operations_by_account: HashMap>, submission_id_counter: AtomicU64, } @@ -134,7 +134,7 @@ impl Mempool for MempoolImpl { .iter() .filter_map(|op_by_fee| { let lowest = self - .account_to_operation + .operations_by_account .get(&op_by_fee.0.sender()) .and_then(|set| set.first()); @@ -164,14 +164,9 @@ impl Mempool for MempoolImpl { if let Some(ordered_operation) = self.hash_to_operation.remove(operation_hash) { self.best .remove(&ByMaxFeeAndSubmissionId(ordered_operation.clone())); - - let sender = ordered_operation.sender(); - if let Some(by_nonce_set) = self.account_to_operation.get_mut(&sender) { - by_nonce_set.remove(&ByNonce(ordered_operation.clone())); - if by_nonce_set.is_empty() { - self.account_to_operation.remove(&sender); - } - } + self.operations_by_account + .get_mut(&ordered_operation.sender()) + .map(|set| set.remove(&ByNonce(ordered_operation.clone()))); Ok(Some(ordered_operation.pool_operation)) } else { Ok(None) @@ -192,10 +187,10 @@ impl MempoolImpl { self.best .remove(&ByMaxFeeAndSubmissionId(old_ordered_operation.clone())); let sender = old_ordered_operation.sender(); - if let Some(by_nonce_set) = self.account_to_operation.get_mut(&sender) { + if let Some(by_nonce_set) = self.operations_by_account.get_mut(&sender) { by_nonce_set.remove(&ByNonce(old_ordered_operation.clone())); if by_nonce_set.is_empty() { - self.account_to_operation.remove(&sender); + self.operations_by_account.remove(&sender); } } self.hash_to_operation.remove(&operation.hash); @@ -205,11 +200,11 @@ impl MempoolImpl { } let order = self.get_next_order_id(); - let ordered_operation = OrderedPoolOperation::create_from_pool_operation(operation, order); + let ordered_operation = OrderedPoolOperation::from_wrapped(operation, order); self.best .insert(ByMaxFeeAndSubmissionId(ordered_operation.clone())); - self.account_to_operation + self.operations_by_account .entry(ordered_operation.sender()) .or_default() .insert(ByNonce(ordered_operation.clone())); @@ -228,7 +223,7 @@ impl MempoolImpl { config, best: BTreeSet::new(), hash_to_operation: HashMap::new(), - account_to_operation: HashMap::new(), + operations_by_account: HashMap::new(), submission_id_counter: AtomicU64::new(0), } } From 52b75bf946ec71b994b03b7a6e6d33001d7e155d Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 13:47:43 -0500 Subject: [PATCH 08/24] chore: add todo comment --- crates/account-abstraction-core/core/src/mempool.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index f430b97..b968cf3 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -129,7 +129,9 @@ impl Mempool for MempoolImpl { } fn get_top_operations(&self, n: usize) -> impl Iterator> { - // Filter so we only surface an account's lowest-nonce op (no gaps) + // TODO: There is a case where we skip operations that are not the lowest nonce for an account. + // But we still have not given the N number of operations, meaning we don't return those operations. + self.best .iter() .filter_map(|op_by_fee| { From 361e90758f381d5d401aa4b83ac4ed6d957389fe Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 13:49:57 -0500 Subject: [PATCH 09/24] chore: run format --- .../core/src/mempool.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index b968cf3..c6010fa 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -16,10 +16,7 @@ pub struct OrderedPoolOperation { } impl OrderedPoolOperation { - pub fn from_wrapped( - operation: &WrappedUserOperation, - submission_id: u64, - ) -> Self { + pub fn from_wrapped(operation: &WrappedUserOperation, submission_id: u64) -> Self { Self { pool_operation: operation.clone(), submission_id, @@ -129,7 +126,7 @@ impl Mempool for MempoolImpl { } fn get_top_operations(&self, n: usize) -> impl Iterator> { - // TODO: There is a case where we skip operations that are not the lowest nonce for an account. + // TODO: There is a case where we skip operations that are not the lowest nonce for an account. // But we still have not given the N number of operations, meaning we don't return those operations. self.best @@ -253,7 +250,10 @@ mod tests { }) } - fn create_wrapped_operation(max_priority_fee_per_gas: u128, hash: UserOpHash) -> WrappedUserOperation { + fn create_wrapped_operation( + max_priority_fee_per_gas: u128, + hash: UserOpHash, + ) -> WrappedUserOperation { WrappedUserOperation { operation: create_test_user_operation(max_priority_fee_per_gas), hash, @@ -503,13 +503,13 @@ mod tests { let mut mempool = create_test_mempool(1000); let hash1 = FixedBytes::from([1u8; 32]); let test_user_operation = create_test_user_operation(2000); - + // Destructure to the inner struct, then update nonce let base_op = match test_user_operation.clone() { VersionedUserOperation::UserOperation(op) => op, _ => panic!("expected UserOperation variant"), }; - + let operation1 = WrappedUserOperation { operation: VersionedUserOperation::UserOperation(erc4337::UserOperation { nonce: Uint::from(0), @@ -518,7 +518,7 @@ mod tests { }), hash: hash1, }; - + mempool.add_operation(&operation1).unwrap().unwrap(); let hash2 = FixedBytes::from([2u8; 32]); let operation2 = WrappedUserOperation { From 7980fdbc11d1c7321f37fb858790735f61ca1890 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 13:52:40 -0500 Subject: [PATCH 10/24] chore: add mempool --- .../core/src/mempool.rs | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index c6010fa..3298d9f 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -181,21 +181,8 @@ impl MempoolImpl { operation: &WrappedUserOperation, ) -> Result, anyhow::Error> { // Account - if let Some(old_ordered_operation) = self.hash_to_operation.get(&operation.hash) { - if operation.has_higher_max_fee(&old_ordered_operation.pool_operation) { - self.best - .remove(&ByMaxFeeAndSubmissionId(old_ordered_operation.clone())); - let sender = old_ordered_operation.sender(); - if let Some(by_nonce_set) = self.operations_by_account.get_mut(&sender) { - by_nonce_set.remove(&ByNonce(old_ordered_operation.clone())); - if by_nonce_set.is_empty() { - self.operations_by_account.remove(&sender); - } - } - self.hash_to_operation.remove(&operation.hash); - } else { - return Ok(None); - } + if self.hash_to_operation.contains_key(&operation.hash) { + return Ok(None); } let order = self.get_next_order_id(); From 8f15c64b14c412d1a6e35827158141230a7e13a0 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 14:01:53 -0500 Subject: [PATCH 11/24] chore: remove mempool --- .../core/src/mempool.rs | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 3298d9f..08f9490 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -291,23 +291,7 @@ mod tests { ); } - // Tests adding an operation with the same hash but higher gas price - #[test] - fn test_add_operation_duplicate_hash_higher_gas() { - let mut mempool = create_test_mempool(1000); - let hash = FixedBytes::from([1u8; 32]); - - let operation1 = create_wrapped_operation(2000, hash); - let result1 = mempool.add_operation(&operation1); - assert!(result1.is_ok()); - assert!(result1.unwrap().is_some()); - - let operation2 = create_wrapped_operation(3000, hash); - let result2 = mempool.add_operation(&operation2); - assert!(result2.is_ok()); - assert!(result2.unwrap().is_some()); - } - + // Tests adding an operation with the same hash but lower gas price #[test] fn test_add_operation_duplicate_hash_lower_gas() { From edbe4b92749c129dd84388263abd4cd42a670601 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 14:19:49 -0500 Subject: [PATCH 12/24] chore: fix formatting --- crates/account-abstraction-core/core/src/mempool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 08f9490..d5f2fe0 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -291,7 +291,6 @@ mod tests { ); } - // Tests adding an operation with the same hash but lower gas price #[test] fn test_add_operation_duplicate_hash_lower_gas() { From 37f65515bb282cda18ec0070e0c51149ffe822f2 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 17:29:51 -0500 Subject: [PATCH 13/24] chore: begining kafka engine --- Cargo.lock | 1 + crates/account-abstraction-core/Cargo.toml | 3 +- .../core/src/kafka_mempool_engine.rs | 62 +++++++++++++++++++ .../account-abstraction-core/core/src/lib.rs | 1 + .../core/src/types.rs | 2 +- 5 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 crates/account-abstraction-core/core/src/kafka_mempool_engine.rs diff --git a/Cargo.lock b/Cargo.lock index 49c509f..829adc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,7 @@ dependencies = [ "async-trait", "jsonrpsee", "op-alloy-network", + "rdkafka", "reth-rpc-eth-types", "serde", "serde_json", diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index 55277ca..a63ebb9 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -22,8 +22,9 @@ jsonrpsee.workspace = true async-trait = { workspace = true } alloy-sol-types.workspace= true anyhow.workspace = true +rdkafka.workspace = true +serde_json.workspace = true [dev-dependencies] alloy-primitives.workspace = true -serde_json.workspace = true wiremock.workspace = true diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs new file mode 100644 index 0000000..f2af9fe --- /dev/null +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -0,0 +1,62 @@ +use crate::mempool::{self, Mempool}; +use crate::types::WrappedUserOperation; +use rdkafka::{consumer::StreamConsumer, Message}; +use serde::{Deserialize, Serialize}; +use serde_json; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "event", content = "data")] +pub enum KafkaEvent { + UserOpAdded { + user_op: WrappedUserOperation, + }, + UserOpIncluded { + user_op: WrappedUserOperation, + }, + UserOpDropped { + user_op: WrappedUserOperation, + reason: String, + }, +} + +pub struct KafkaMempoolEngine { + mempool: Arc>, + kafka_producer: StreamConsumer, +} + +impl KafkaMempoolEngine { + pub fn new( + mempool: Arc>, + kafka_producer: StreamConsumer, + ) -> Self { + Self { + mempool, + kafka_producer, + } + } + + pub async fn run(&self) -> anyhow::Result<()> { + loop { + let msg = self.kafka_producer.recv().await?.detach(); + let payload = msg + .payload() + .ok_or_else(|| anyhow::anyhow!("Kafka message missing payload"))?; + let event: KafkaEvent = serde_json::from_slice(payload).map_err(|e| anyhow::anyhow!("Failed to parse Kafka event: {e}"))?; + + match event { + KafkaEvent::UserOpAdded { user_op } => { + self.mempool.write().await.add_operation(&user_op)?; + } + KafkaEvent::UserOpIncluded { user_op } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + KafkaEvent::UserOpDropped { user_op, reason: _ } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + } + } + } +} + diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index b3aa2f7..e6f0ffb 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -3,4 +3,5 @@ pub mod entrypoints; pub mod types; pub use account_abstraction_service::{AccountAbstractionService, AccountAbstractionServiceImpl}; pub use types::{SendUserOperationResponse, VersionedUserOperation}; +pub mod kafka_mempool_engine; pub mod mempool; diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 4600839..3d79564 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -138,7 +138,7 @@ pub struct AggregatorInfo { pub type UserOpHash = FixedBytes<32>; -#[derive(Eq, PartialEq, Clone, Debug)] +#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct WrappedUserOperation { pub operation: VersionedUserOperation, pub hash: UserOpHash, From e5d1e449033a498074ff40db1d151e63d8cff8c6 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 18:03:54 -0500 Subject: [PATCH 14/24] chore: create small mempool engine --- .../core/src/kafka_mempool_engine.rs | 191 ++++++++++++++++-- .../core/src/mempool.rs | 8 + 2 files changed, 179 insertions(+), 20 deletions(-) diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index f2af9fe..029a137 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -1,6 +1,7 @@ use crate::mempool::{self, Mempool}; use crate::types::WrappedUserOperation; -use rdkafka::{consumer::StreamConsumer, Message}; +use async_trait::async_trait; +use rdkafka::{Message, consumer::StreamConsumer, message::OwnedMessage}; use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; @@ -21,42 +22,192 @@ pub enum KafkaEvent { }, } +#[async_trait] +pub trait KafkaConsumer { + async fn recv_msg(&self) -> anyhow::Result; +} + +#[async_trait] +impl KafkaConsumer for StreamConsumer { + async fn recv_msg(&self) -> anyhow::Result { + Ok(self.recv().await?.detach()) + } +} + pub struct KafkaMempoolEngine { mempool: Arc>, - kafka_producer: StreamConsumer, + kafka_consumer: Arc, } impl KafkaMempoolEngine { pub fn new( mempool: Arc>, - kafka_producer: StreamConsumer, + kafka_consumer: Arc, ) -> Self { Self { mempool, - kafka_producer, + kafka_consumer, } } pub async fn run(&self) -> anyhow::Result<()> { loop { - let msg = self.kafka_producer.recv().await?.detach(); - let payload = msg - .payload() - .ok_or_else(|| anyhow::anyhow!("Kafka message missing payload"))?; - let event: KafkaEvent = serde_json::from_slice(payload).map_err(|e| anyhow::anyhow!("Failed to parse Kafka event: {e}"))?; - - match event { - KafkaEvent::UserOpAdded { user_op } => { - self.mempool.write().await.add_operation(&user_op)?; - } - KafkaEvent::UserOpIncluded { user_op } => { - self.mempool.write().await.remove_operation(&user_op.hash)?; - } - KafkaEvent::UserOpDropped { user_op, reason: _ } => { - self.mempool.write().await.remove_operation(&user_op.hash)?; - } + self.process_next().await?; + } + } + + /// Process a single Kafka message (useful for tests and controlled loops) + pub async fn process_next(&self) -> anyhow::Result<()> { + let msg = self.kafka_consumer.recv_msg().await?; + let payload = msg + .payload() + .ok_or_else(|| anyhow::anyhow!("Kafka message missing payload"))?; + let event: KafkaEvent = serde_json::from_slice(payload) + .map_err(|e| anyhow::anyhow!("Failed to parse Kafka event: {e}"))?; + + self.handle_event(event).await + } + + async fn handle_event(&self, event: KafkaEvent) -> anyhow::Result<()> { + println!("Handling Kafka event: {:?}", event); + match event { + KafkaEvent::UserOpAdded { user_op } => { + self.mempool.write().await.add_operation(&user_op)?; + } + KafkaEvent::UserOpIncluded { user_op } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + KafkaEvent::UserOpDropped { user_op, reason: _ } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; } } + Ok(()) } } +#[cfg(test)] +mod tests { + use super::*; + use crate::mempool::PoolConfig; + use crate::types::VersionedUserOperation; + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::erc4337; + use rdkafka::Timestamp; + use tokio::sync::Mutex; + + fn make_wrapped_op(max_fee: u128, hash: [u8; 32]) -> WrappedUserOperation { + let op = VersionedUserOperation::UserOperation(erc4337::UserOperation { + sender: Address::ZERO, + nonce: Uint::from(0u64), + init_code: Default::default(), + call_data: Default::default(), + call_gas_limit: Uint::from(100_000u64), + verification_gas_limit: Uint::from(100_000u64), + pre_verification_gas: Uint::from(21_000u64), + max_fee_per_gas: Uint::from(max_fee), + max_priority_fee_per_gas: Uint::from(max_fee), + paymaster_and_data: Default::default(), + signature: Default::default(), + }); + + WrappedUserOperation { + operation: op, + hash: FixedBytes::from(hash), + } + } + + #[tokio::test] + async fn handle_add_operation() { + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::new(0)))); + + let op_hash = [1u8; 32]; + let wrapped = make_wrapped_op(1_000, op_hash); + + let add_event = KafkaEvent::UserOpAdded { + user_op: wrapped.clone(), + }; + let mock_consumer = Arc::new(MockConsumer::new(vec![OwnedMessage::new( + Some(serde_json::to_vec(&add_event).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + )])); + + let engine = KafkaMempoolEngine::new(mempool.clone(), mock_consumer); + + // Process add then remove deterministically + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from(op_hash)); + } + + #[tokio::test] + async fn remove_opperation_should_remove_from_mempool() { + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::new(0)))); + let op_hash = [1u8; 32]; + let wrapped = make_wrapped_op(1_000, op_hash); + let add_mempool = KafkaEvent::UserOpAdded { + user_op: wrapped.clone(), + }; + let remove_mempool = KafkaEvent::UserOpDropped { + user_op: wrapped.clone(), + reason: "test".to_string(), + }; + let mock_consumer = Arc::new(MockConsumer::new(vec![ + OwnedMessage::new( + Some(serde_json::to_vec(&add_mempool).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ), + OwnedMessage::new( + Some(serde_json::to_vec(&remove_mempool).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ), + ])); + + let engine = KafkaMempoolEngine::new(mempool.clone(), mock_consumer); + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from(op_hash)); + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 0); + } + struct MockConsumer { + msgs: Mutex>, + } + + impl MockConsumer { + fn new(msgs: Vec) -> Self { + Self { + msgs: Mutex::new(msgs), + } + } + } + + #[async_trait] + impl KafkaConsumer for MockConsumer { + async fn recv_msg(&self) -> anyhow::Result { + let mut guard = self.msgs.lock().await; + if guard.is_empty() { + Err(anyhow::anyhow!("no more messages")) + } else { + Ok(guard.remove(0)) + } + } + } +} diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index d5f2fe0..b2fb881 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -9,6 +9,14 @@ pub struct PoolConfig { minimum_max_fee_per_gas: u128, } +impl PoolConfig { + pub fn new(minimum_max_fee_per_gas: u128) -> Self { + Self { + minimum_max_fee_per_gas, + } + } +} + #[derive(Eq, PartialEq, Clone, Debug)] pub struct OrderedPoolOperation { pub pool_operation: WrappedUserOperation, From f95239fba945550bc2755768e83abdd128174198 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 10:10:52 -0500 Subject: [PATCH 15/24] chore: create kafka engine --- .../core/src/kafka_mempool_engine.rs | 19 ++++++++-- .../account-abstraction-core/core/src/lib.rs | 1 + .../core/src/mempool.rs | 4 +-- .../core/src/reputation_service.rs | 35 +++++++++++++++++++ crates/ingress-rpc/src/service.rs | 16 ++++++++- 5 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 crates/account-abstraction-core/core/src/reputation_service.rs diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 029a137..2279f25 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; use tokio::sync::RwLock; +use crate::mempool::PoolConfig; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] @@ -50,6 +51,20 @@ impl KafkaMempoolEngine { } } + pub fn with_kafka_consumer(kafka_consumer: Arc, pool_config: Option) -> Self { + let pool_config = pool_config.unwrap_or(PoolConfig::default()); + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(pool_config))); + Self { + mempool, + kafka_consumer, + } + + } + + pub fn get_mempool(&self) -> Arc> { + self.mempool.clone() + } + pub async fn run(&self) -> anyhow::Result<()> { loop { self.process_next().await?; @@ -118,7 +133,7 @@ mod tests { #[tokio::test] async fn handle_add_operation() { - let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::new(0)))); + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::default()))); let op_hash = [1u8; 32]; let wrapped = make_wrapped_op(1_000, op_hash); @@ -147,7 +162,7 @@ mod tests { #[tokio::test] async fn remove_opperation_should_remove_from_mempool() { - let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::new(0)))); + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::default()))); let op_hash = [1u8; 32]; let wrapped = make_wrapped_op(1_000, op_hash); let add_mempool = KafkaEvent::UserOpAdded { diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index e6f0ffb..6a609a9 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -5,3 +5,4 @@ pub use account_abstraction_service::{AccountAbstractionService, AccountAbstract pub use types::{SendUserOperationResponse, VersionedUserOperation}; pub mod kafka_mempool_engine; pub mod mempool; +pub mod reputation_service; \ No newline at end of file diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index b2fb881..7feaaef 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -10,9 +10,9 @@ pub struct PoolConfig { } impl PoolConfig { - pub fn new(minimum_max_fee_per_gas: u128) -> Self { + pub fn default() -> Self { Self { - minimum_max_fee_per_gas, + minimum_max_fee_per_gas: 0, } } } diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs new file mode 100644 index 0000000..40bc6de --- /dev/null +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; +use tokio::sync::RwLock; +use alloy_primitives::Address; +use crate::mempool; + +/// Reputation status for an entity +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ReputationStatus { + /// Entity is not throttled or banned + Ok, + /// Entity is throttled + Throttled, + /// Entity is banned + Banned, +} + +pub trait ReputationService { + fn get_reputation(&self, entity: &Address) -> ReputationStatus; +} + +pub struct ReputationServiceImpl { + mempool: Arc>, +} + +impl ReputationServiceImpl { + pub fn new(mempool: Arc>) -> Self { + Self { mempool } + } +} + +impl ReputationService for ReputationServiceImpl { + fn get_reputation(&self, entity: &Address) -> ReputationStatus { + ReputationStatus::Ok + } +} \ No newline at end of file diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index f0d3341..ce8a6c3 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,3 +1,5 @@ +use account_abstraction_core::kafka_mempool_engine::KafkaMempoolEngine; +use account_abstraction_core::reputation_service::ReputationStatus; use alloy_consensus::transaction::Recovered; use alloy_consensus::{Transaction, transaction::SignerRecoverable}; use alloy_primitives::{Address, B256, Bytes, FixedBytes}; @@ -25,8 +27,9 @@ use crate::validation::validate_bundle; use crate::{Config, TxSubmissionMethod}; use account_abstraction_core::entrypoints::version::EntryPointVersion; use account_abstraction_core::types::{UserOperationRequest, VersionedUserOperation}; -use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl}; +use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl, reputation_service::{ReputationService, ReputationServiceImpl}, mempool::{Mempool, PoolConfig}}; use std::sync::Arc; +use tokio::sync::RwLock; /// RPC providers for different endpoints pub struct Providers { @@ -69,6 +72,7 @@ pub struct IngressService { tx_submission_method: TxSubmissionMethod, bundle_queue_publisher: BundleQueuePublisher, user_op_queue_publisher: UserOpQueuePublisher, + reputation_service: Arc, audit_channel: mpsc::UnboundedSender, send_transaction_default_lifetime_seconds: u64, metrics: Metrics, @@ -97,6 +101,8 @@ impl IngressService { config.validate_user_operation_timeout_ms, ); let queue_connection = Arc::new(queue); + + let mempool = KafkaMempoolEngine::with_kafka_consumer(kafka_consumer, None); Self { mempool_provider, simulation_provider, @@ -111,6 +117,7 @@ impl IngressService { queue_connection.clone(), config.ingress_topic, ), + reputation_service: Arc::new(ReputationServiceImpl::new(mempool.get_mempool())), audit_channel, send_transaction_default_lifetime_seconds: config .send_transaction_default_lifetime_seconds, @@ -347,6 +354,13 @@ impl IngressApiServer for IngressService { EthApiError::InvalidParams(e.to_string()).into_rpc_err() })?; + let reputation = self.reputation_service.get_reputation(&request.user_operation.sender()); + if reputation == ReputationStatus::Banned { + return Err(EthApiError::InvalidParams("User operation sender is banned".into()).into_rpc_err()); + }else if reputation == ReputationStatus::Throttled { + return Err(EthApiError::InvalidParams("User operation sender is throttled".into()).into_rpc_err()); + } + let _ = self .account_abstraction_service .validate_user_operation(&request.user_operation, &entry_point) From e789cb40018a3d697e05c4c68cc980e997ee0af8 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 10:56:30 -0500 Subject: [PATCH 16/24] chore: init kafka mempool engine --- .../core/src/kafka_mempool_engine.rs | 16 +- .../account-abstraction-core/core/src/lib.rs | 2 +- .../core/src/reputation_service.rs | 6 +- crates/ingress-rpc/src/bin/main.rs | 21 +++ .../ingress-rpc/src/kafka_mempool_consumer.rs | 177 ++++++++++++++++++ crates/ingress-rpc/src/lib.rs | 16 ++ crates/ingress-rpc/src/service.rs | 66 ++++++- 7 files changed, 286 insertions(+), 18 deletions(-) create mode 100644 crates/ingress-rpc/src/kafka_mempool_consumer.rs diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 2279f25..5a153ae 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -1,3 +1,4 @@ +use crate::mempool::PoolConfig; use crate::mempool::{self, Mempool}; use crate::types::WrappedUserOperation; use async_trait::async_trait; @@ -6,7 +7,6 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; use tokio::sync::RwLock; -use crate::mempool::PoolConfig; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] @@ -51,14 +51,16 @@ impl KafkaMempoolEngine { } } - pub fn with_kafka_consumer(kafka_consumer: Arc, pool_config: Option) -> Self { + pub fn with_kafka_consumer( + kafka_consumer: Arc, + pool_config: Option, + ) -> Self { let pool_config = pool_config.unwrap_or(PoolConfig::default()); let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(pool_config))); Self { mempool, kafka_consumer, } - } pub fn get_mempool(&self) -> Arc> { @@ -133,7 +135,9 @@ mod tests { #[tokio::test] async fn handle_add_operation() { - let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::default()))); + let mempool = Arc::new(RwLock::new( + mempool::MempoolImpl::new(PoolConfig::default()), + )); let op_hash = [1u8; 32]; let wrapped = make_wrapped_op(1_000, op_hash); @@ -162,7 +166,9 @@ mod tests { #[tokio::test] async fn remove_opperation_should_remove_from_mempool() { - let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::default()))); + let mempool = Arc::new(RwLock::new( + mempool::MempoolImpl::new(PoolConfig::default()), + )); let op_hash = [1u8; 32]; let wrapped = make_wrapped_op(1_000, op_hash); let add_mempool = KafkaEvent::UserOpAdded { diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index 6a609a9..4fda184 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -5,4 +5,4 @@ pub use account_abstraction_service::{AccountAbstractionService, AccountAbstract pub use types::{SendUserOperationResponse, VersionedUserOperation}; pub mod kafka_mempool_engine; pub mod mempool; -pub mod reputation_service; \ No newline at end of file +pub mod reputation_service; diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs index 40bc6de..49b4aec 100644 --- a/crates/account-abstraction-core/core/src/reputation_service.rs +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -1,7 +1,7 @@ +use crate::mempool; +use alloy_primitives::Address; use std::sync::Arc; use tokio::sync::RwLock; -use alloy_primitives::Address; -use crate::mempool; /// Reputation status for an entity #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -32,4 +32,4 @@ impl ReputationService for ReputationServiceImpl { fn get_reputation(&self, entity: &Address) -> ReputationStatus { ReputationStatus::Ok } -} \ No newline at end of file +} diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 67da612..9e6e138 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -4,6 +4,7 @@ use jsonrpsee::server::Server; use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use std::sync::Arc; use tips_audit::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher}; use tips_core::kafka::load_kafka_config_from_file; use tips_core::logger::init_logger_with_format; @@ -11,6 +12,7 @@ use tips_core::{Bundle, MeterBundleResponse}; use tips_ingress_rpc::Config; use tips_ingress_rpc::connect_ingress_to_builder; use tips_ingress_rpc::health::bind_health_server; +use tips_ingress_rpc::kafka_mempool_consumer::{create_mempool_engine, run_mempool_engine}; use tips_ingress_rpc::metrics::init_prometheus_exporter; use tips_ingress_rpc::queue::KafkaMessageQueue; use tips_ingress_rpc::service::{IngressApiServer, IngressService, Providers}; @@ -73,6 +75,23 @@ async fn main() -> anyhow::Result<()> { let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); + let user_op_properties_file = config + .user_operation_consumer_properties + .as_deref() + .unwrap_or(&config.ingress_kafka_properties); + + let mempool_engine = create_mempool_engine( + user_op_properties_file, + &config.user_operation_topic, + &config.user_operation_consumer_group_id, + None, + )?; + + let mempool_engine_handle = { + let engine = mempool_engine.clone(); + tokio::spawn(async move { run_mempool_engine(engine).await }) + }; + let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); let (builder_backrun_tx, _) = broadcast::channel::(config.max_buffered_backrun_bundles); @@ -95,6 +114,7 @@ async fn main() -> anyhow::Result<()> { audit_tx, builder_tx, builder_backrun_tx, + mempool_engine.clone(), cfg, ); let bind_addr = format!("{}:{}", config.address, config.port); @@ -110,6 +130,7 @@ async fn main() -> anyhow::Result<()> { handle.stopped().await; health_handle.abort(); + mempool_engine_handle.abort(); Ok(()) } diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs new file mode 100644 index 0000000..9148a3e --- /dev/null +++ b/crates/ingress-rpc/src/kafka_mempool_consumer.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; +use std::time::Duration; + +use account_abstraction_core::{kafka_mempool_engine::KafkaMempoolEngine, mempool::PoolConfig}; +use backon::{ExponentialBuilder, Retryable}; +use rdkafka::{ + ClientConfig, + consumer::{Consumer, StreamConsumer}, +}; +use tips_core::kafka::load_kafka_config_from_file; +use tracing::warn; + +/// Build a Kafka consumer for the user operation topic. +/// Ensures the consumer group id is set per deployment and subscribes to the topic. +fn create_user_operation_consumer( + properties_file: &str, + topic: &str, + consumer_group_id: &str, +) -> anyhow::Result { + let mut client_config = ClientConfig::from_iter(load_kafka_config_from_file(properties_file)?); + + // Allow deployments to control group id even if the properties file omits it. + client_config.set("group.id", consumer_group_id); + // Rely on Kafka for at-least-once; we keep auto commit enabled unless overridden in the file. + client_config.set("enable.auto.commit", "true"); + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic])?; + + Ok(consumer) +} + +/// Factory function that creates a fully configured KafkaMempoolEngine. +/// Handles consumer creation, engine instantiation, and Arc wrapping. +pub fn create_mempool_engine( + properties_file: &str, + topic: &str, + consumer_group_id: &str, + pool_config: Option, +) -> anyhow::Result> { + let consumer = create_user_operation_consumer(properties_file, topic, consumer_group_id)?; + Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(consumer), + pool_config, + ))) +} + +/// Process a single Kafka message with exponential backoff retries. +pub async fn process_next_with_backoff(engine: &KafkaMempoolEngine) -> anyhow::Result<()> { + let process = || async { engine.process_next().await }; + + process + .retry( + &ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(100)) + .with_max_delay(Duration::from_secs(5)) + .with_max_times(5), + ) + .notify(|err: &anyhow::Error, dur: Duration| { + warn!( + error = %err, + retry_in_ms = dur.as_millis(), + "Retrying Kafka mempool engine step" + ); + }) + .await +} + +/// Run the mempool engine forever, applying backoff on individual message failures. +pub async fn run_mempool_engine(engine: Arc) { + loop { + if let Err(err) = process_next_with_backoff(&engine).await { + // We log and continue to avoid stalling the consumer; repeated failures + // will still observe backoff inside `process_next_with_backoff`. + warn!(error = %err, "Kafka mempool engine exhausted retries, continuing"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use account_abstraction_core::{ + kafka_mempool_engine::{KafkaConsumer, KafkaEvent}, + mempool::PoolConfig, + types::{VersionedUserOperation, WrappedUserOperation}, + }; + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::erc4337; + use rdkafka::{Message, Timestamp, message::OwnedMessage}; + use tokio::sync::Mutex; + + fn make_wrapped_op(max_fee: u128, hash: [u8; 32]) -> WrappedUserOperation { + let op = VersionedUserOperation::UserOperation(erc4337::UserOperation { + sender: Address::ZERO, + nonce: Uint::from(0u64), + init_code: Default::default(), + call_data: Default::default(), + call_gas_limit: Uint::from(100_000u64), + verification_gas_limit: Uint::from(100_000u64), + pre_verification_gas: Uint::from(21_000u64), + max_fee_per_gas: Uint::from(max_fee), + max_priority_fee_per_gas: Uint::from(max_fee), + paymaster_and_data: Default::default(), + signature: Default::default(), + }); + + WrappedUserOperation { + operation: op, + hash: FixedBytes::from(hash), + } + } + + struct MockConsumer { + msgs: Mutex>>, + } + + impl MockConsumer { + fn new(msgs: Vec>) -> Self { + Self { + msgs: Mutex::new(msgs), + } + } + } + + #[async_trait::async_trait] + impl KafkaConsumer for MockConsumer { + async fn recv_msg(&self) -> anyhow::Result { + let mut guard = self.msgs.lock().await; + if guard.is_empty() { + Err(anyhow::anyhow!("no more messages")) + } else { + guard.remove(0) + } + } + } + + #[tokio::test] + async fn process_next_with_backoff_recovers_after_error() { + let add_event = KafkaEvent::UserOpAdded { + user_op: make_wrapped_op(1_000, [1u8; 32]), + }; + + let payload = serde_json::to_vec(&add_event).unwrap(); + let good_msg = OwnedMessage::new( + Some(payload), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ); + + // First call fails, second succeeds. + let consumer = Arc::new(MockConsumer::new(vec![ + Err(anyhow::anyhow!("transient error")), + Ok(good_msg), + ])); + + let engine = KafkaMempoolEngine::with_kafka_consumer(consumer, Some(PoolConfig::default())); + + // Should succeed after retrying the transient failure. + let result = process_next_with_backoff(&engine).await; + assert!(result.is_ok()); + + // The mempool should contain the added op. + let items: Vec<_> = engine + .get_mempool() + .read() + .await + .get_top_operations(10) + .collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from([1u8; 32])); + } +} diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 649e120..fa0a607 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -1,4 +1,5 @@ pub mod health; +pub mod kafka_mempool_consumer; pub mod metrics; pub mod queue; pub mod service; @@ -85,6 +86,21 @@ pub struct Config { )] pub audit_topic: String, + /// Kafka properties file for the user operation consumer (defaults to ingress properties if unset) + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE" + )] + pub user_operation_consumer_properties: Option, + + /// Consumer group id for user operation topic (set uniquely per deployment) + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_GROUP_ID", + default_value = "tips-user-operation" + )] + pub user_operation_consumer_group_id: String, + /// User operation topic for pushing valid user operations #[arg( long, diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index ce8a6c3..65ae703 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -27,7 +27,11 @@ use crate::validation::validate_bundle; use crate::{Config, TxSubmissionMethod}; use account_abstraction_core::entrypoints::version::EntryPointVersion; use account_abstraction_core::types::{UserOperationRequest, VersionedUserOperation}; -use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl, reputation_service::{ReputationService, ReputationServiceImpl}, mempool::{Mempool, PoolConfig}}; +use account_abstraction_core::{ + AccountAbstractionService, AccountAbstractionServiceImpl, + mempool::{Mempool, PoolConfig}, + reputation_service::{ReputationService, ReputationServiceImpl}, +}; use std::sync::Arc; use tokio::sync::RwLock; @@ -73,6 +77,7 @@ pub struct IngressService { bundle_queue_publisher: BundleQueuePublisher, user_op_queue_publisher: UserOpQueuePublisher, reputation_service: Arc, + mempool_engine: Arc, audit_channel: mpsc::UnboundedSender, send_transaction_default_lifetime_seconds: u64, metrics: Metrics, @@ -90,6 +95,7 @@ impl IngressService { audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, builder_backrun_tx: broadcast::Sender, + mempool_engine: Arc, config: Config, ) -> Self { let mempool_provider = Arc::new(providers.mempool); @@ -102,7 +108,6 @@ impl IngressService { ); let queue_connection = Arc::new(queue); - let mempool = KafkaMempoolEngine::with_kafka_consumer(kafka_consumer, None); Self { mempool_provider, simulation_provider, @@ -117,7 +122,8 @@ impl IngressService { queue_connection.clone(), config.ingress_topic, ), - reputation_service: Arc::new(ReputationServiceImpl::new(mempool.get_mempool())), + reputation_service: Arc::new(ReputationServiceImpl::new(mempool_engine.get_mempool())), + mempool_engine, audit_channel, send_transaction_default_lifetime_seconds: config .send_transaction_default_lifetime_seconds, @@ -354,11 +360,18 @@ impl IngressApiServer for IngressService { EthApiError::InvalidParams(e.to_string()).into_rpc_err() })?; - let reputation = self.reputation_service.get_reputation(&request.user_operation.sender()); + let reputation = self + .reputation_service + .get_reputation(&request.user_operation.sender()); if reputation == ReputationStatus::Banned { - return Err(EthApiError::InvalidParams("User operation sender is banned".into()).into_rpc_err()); - }else if reputation == ReputationStatus::Throttled { - return Err(EthApiError::InvalidParams("User operation sender is throttled".into()).into_rpc_err()); + return Err( + EthApiError::InvalidParams("User operation sender is banned".into()).into_rpc_err(), + ); + } else if reputation == ReputationStatus::Throttled { + return Err( + EthApiError::InvalidParams("User operation sender is throttled".into()) + .into_rpc_err(), + ); } let _ = self @@ -507,6 +520,7 @@ impl IngressService { mod tests { use super::*; use crate::{Config, TxSubmissionMethod, queue::MessageQueue}; + use account_abstraction_core::kafka_mempool_engine::KafkaConsumer; use alloy_provider::RootProvider; use anyhow::Result; use async_trait::async_trait; @@ -514,6 +528,7 @@ mod tests { use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; use jsonrpsee::server::{ServerBuilder, ServerHandle}; use mockall::mock; + use rdkafka::message::OwnedMessage; use serde_json::json; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; @@ -530,6 +545,15 @@ mod tests { } } + struct NoopConsumer; + + #[async_trait] + impl KafkaConsumer for NoopConsumer { + async fn recv_msg(&self) -> anyhow::Result { + Err(anyhow::anyhow!("no messages")) + } + } + fn create_test_config(mock_server: &MockServer) -> Config { Config { address: IpAddr::from([127, 0, 0, 1]), @@ -540,6 +564,8 @@ mod tests { ingress_topic: String::new(), audit_kafka_properties: String::new(), audit_topic: String::new(), + user_operation_consumer_properties: None, + user_operation_consumer_group_id: "tips-user-operation".to_string(), log_level: String::from("info"), log_format: tips_core::logger::LogFormat::Pretty, send_transaction_default_lifetime_seconds: 300, @@ -666,8 +692,19 @@ mod tests { let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let mempool_engine = Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(NoopConsumer), + None, + )); + let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + mempool_engine, + config, ); let bundle = Bundle::default(); @@ -725,8 +762,19 @@ mod tests { let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let mempool_engine = Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(NoopConsumer), + None, + )); + let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + mempool_engine, + config, ); // Valid signed transaction bytes From 9b4fbf0b6e96e5228a8c7dd3e488f48405a88d5c Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 10:57:45 -0500 Subject: [PATCH 17/24] chore: create kafka mempool engine --- .../account-abstraction-core/core/src/kafka_mempool_engine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 5a153ae..25304e4 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -24,7 +24,7 @@ pub enum KafkaEvent { } #[async_trait] -pub trait KafkaConsumer { +pub trait KafkaConsumer: Send + Sync { async fn recv_msg(&self) -> anyhow::Result; } From e111e7ae21acfba541cdecaf405175e60fd913a7 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 11:07:10 -0500 Subject: [PATCH 18/24] chore: kafka event factory creation --- .../core/src/reputation_service.rs | 2 +- crates/ingress-rpc/src/bin/main.rs | 1 - .../ingress-rpc/src/kafka_mempool_consumer.rs | 101 +----------------- crates/ingress-rpc/src/queue.rs | 16 ++- crates/ingress-rpc/src/service.rs | 2 - 5 files changed, 16 insertions(+), 106 deletions(-) diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs index 49b4aec..f0a5e85 100644 --- a/crates/account-abstraction-core/core/src/reputation_service.rs +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -29,7 +29,7 @@ impl ReputationServiceImpl { } impl ReputationService for ReputationServiceImpl { - fn get_reputation(&self, entity: &Address) -> ReputationStatus { + fn get_reputation(&self, _entity: &Address) -> ReputationStatus { ReputationStatus::Ok } } diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 9e6e138..ac9f14b 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -4,7 +4,6 @@ use jsonrpsee::server::Server; use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; -use std::sync::Arc; use tips_audit::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher}; use tips_core::kafka::load_kafka_config_from_file; use tips_core::logger::init_logger_with_format; diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs index 9148a3e..f62c673 100644 --- a/crates/ingress-rpc/src/kafka_mempool_consumer.rs +++ b/crates/ingress-rpc/src/kafka_mempool_consumer.rs @@ -75,103 +75,4 @@ pub async fn run_mempool_engine(engine: Arc) { warn!(error = %err, "Kafka mempool engine exhausted retries, continuing"); } } -} - -#[cfg(test)] -mod tests { - use super::*; - use account_abstraction_core::{ - kafka_mempool_engine::{KafkaConsumer, KafkaEvent}, - mempool::PoolConfig, - types::{VersionedUserOperation, WrappedUserOperation}, - }; - use alloy_primitives::{Address, FixedBytes, Uint}; - use alloy_rpc_types::erc4337; - use rdkafka::{Message, Timestamp, message::OwnedMessage}; - use tokio::sync::Mutex; - - fn make_wrapped_op(max_fee: u128, hash: [u8; 32]) -> WrappedUserOperation { - let op = VersionedUserOperation::UserOperation(erc4337::UserOperation { - sender: Address::ZERO, - nonce: Uint::from(0u64), - init_code: Default::default(), - call_data: Default::default(), - call_gas_limit: Uint::from(100_000u64), - verification_gas_limit: Uint::from(100_000u64), - pre_verification_gas: Uint::from(21_000u64), - max_fee_per_gas: Uint::from(max_fee), - max_priority_fee_per_gas: Uint::from(max_fee), - paymaster_and_data: Default::default(), - signature: Default::default(), - }); - - WrappedUserOperation { - operation: op, - hash: FixedBytes::from(hash), - } - } - - struct MockConsumer { - msgs: Mutex>>, - } - - impl MockConsumer { - fn new(msgs: Vec>) -> Self { - Self { - msgs: Mutex::new(msgs), - } - } - } - - #[async_trait::async_trait] - impl KafkaConsumer for MockConsumer { - async fn recv_msg(&self) -> anyhow::Result { - let mut guard = self.msgs.lock().await; - if guard.is_empty() { - Err(anyhow::anyhow!("no more messages")) - } else { - guard.remove(0) - } - } - } - - #[tokio::test] - async fn process_next_with_backoff_recovers_after_error() { - let add_event = KafkaEvent::UserOpAdded { - user_op: make_wrapped_op(1_000, [1u8; 32]), - }; - - let payload = serde_json::to_vec(&add_event).unwrap(); - let good_msg = OwnedMessage::new( - Some(payload), - None, - "topic".to_string(), - Timestamp::NotAvailable, - 0, - 0, - None, - ); - - // First call fails, second succeeds. - let consumer = Arc::new(MockConsumer::new(vec![ - Err(anyhow::anyhow!("transient error")), - Ok(good_msg), - ])); - - let engine = KafkaMempoolEngine::with_kafka_consumer(consumer, Some(PoolConfig::default())); - - // Should succeed after retrying the transient failure. - let result = process_next_with_backoff(&engine).await; - assert!(result.is_ok()); - - // The mempool should contain the added op. - let items: Vec<_> = engine - .get_mempool() - .read() - .await - .get_top_operations(10) - .collect(); - assert_eq!(items.len(), 1); - assert_eq!(items[0].hash, FixedBytes::from([1u8; 32])); - } -} +} \ No newline at end of file diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index ee6063c..4f2daad 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,4 +1,4 @@ -use account_abstraction_core::types::VersionedUserOperation; +use account_abstraction_core::{kafka_mempool_engine::KafkaEvent, types::{VersionedUserOperation, WrappedUserOperation}}; use alloy_primitives::B256; use anyhow::Result; use async_trait::async_trait; @@ -79,9 +79,21 @@ impl UserOpQueuePublisher { pub async fn publish(&self, user_op: &VersionedUserOperation, hash: &B256) -> Result<()> { let key = hash.to_string(); - let payload = serde_json::to_vec(&user_op)?; + let event = self.create_user_op_added_event(user_op, hash); + let payload = serde_json::to_vec(&event)?; self.queue.publish(&self.topic, &key, &payload).await } + + fn create_user_op_added_event(&self, user_op: &VersionedUserOperation, hash: &B256) -> KafkaEvent { + let wrapped_user_op = WrappedUserOperation { + operation: user_op.clone(), + hash: hash.clone(), + }; + let event = KafkaEvent::UserOpAdded { + user_op: wrapped_user_op, + }; + event + } } pub struct BundleQueuePublisher { diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 65ae703..c7dec6a 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -29,11 +29,9 @@ use account_abstraction_core::entrypoints::version::EntryPointVersion; use account_abstraction_core::types::{UserOperationRequest, VersionedUserOperation}; use account_abstraction_core::{ AccountAbstractionService, AccountAbstractionServiceImpl, - mempool::{Mempool, PoolConfig}, reputation_service::{ReputationService, ReputationServiceImpl}, }; use std::sync::Arc; -use tokio::sync::RwLock; /// RPC providers for different endpoints pub struct Providers { From e100f1ef0f7233b81c8664349f179dd823f0fdcf Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 11:52:27 -0500 Subject: [PATCH 19/24] chore: create consumer logic --- .env.example | 1 + crates/ingress-rpc/src/bin/main.rs | 5 +---- .../ingress-rpc/src/kafka_mempool_consumer.rs | 2 +- crates/ingress-rpc/src/lib.rs | 4 ++-- crates/ingress-rpc/src/queue.rs | 19 +++++++++++++------ crates/ingress-rpc/src/service.rs | 2 +- docker-compose.tips.yml | 7 +++++++ ...s-user-operation-consumer-kafka-properties | 9 +++++++++ 8 files changed, 35 insertions(+), 14 deletions(-) create mode 100644 docker/ingress-user-operation-consumer-kafka-properties diff --git a/.env.example b/.env.example index bb4a66a..4058bd9 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,7 @@ TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE=/app/docker/ingress-bundles-kafka-pro TIPS_INGRESS_KAFKA_INGRESS_TOPIC=tips-ingress TIPS_INGRESS_KAFKA_AUDIT_PROPERTIES_FILE=/app/docker/ingress-audit-kafka-properties TIPS_INGRESS_KAFKA_AUDIT_TOPIC=tips-audit +TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE=/app/docker/ingress-user-operation-consumer-kafka-properties TIPS_INGRESS_LOG_LEVEL=info TIPS_INGRESS_LOG_FORMAT=pretty TIPS_INGRESS_SEND_TRANSACTION_DEFAULT_LIFETIME_SECONDS=10800 diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index ac9f14b..a3f1ce8 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -74,10 +74,7 @@ async fn main() -> anyhow::Result<()> { let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); - let user_op_properties_file = config - .user_operation_consumer_properties - .as_deref() - .unwrap_or(&config.ingress_kafka_properties); + let user_op_properties_file = &config.user_operation_consumer_properties; let mempool_engine = create_mempool_engine( user_op_properties_file, diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs index f62c673..63efd80 100644 --- a/crates/ingress-rpc/src/kafka_mempool_consumer.rs +++ b/crates/ingress-rpc/src/kafka_mempool_consumer.rs @@ -75,4 +75,4 @@ pub async fn run_mempool_engine(engine: Arc) { warn!(error = %err, "Kafka mempool engine exhausted retries, continuing"); } } -} \ No newline at end of file +} diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index fa0a607..cc58640 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -86,12 +86,12 @@ pub struct Config { )] pub audit_topic: String, - /// Kafka properties file for the user operation consumer (defaults to ingress properties if unset) + /// Kafka properties file for the user operation consumer #[arg( long, env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE" )] - pub user_operation_consumer_properties: Option, + pub user_operation_consumer_properties: String, /// Consumer group id for user operation topic (set uniquely per deployment) #[arg( diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 4f2daad..d5b0a91 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,4 +1,7 @@ -use account_abstraction_core::{kafka_mempool_engine::KafkaEvent, types::{VersionedUserOperation, WrappedUserOperation}}; +use account_abstraction_core::{ + kafka_mempool_engine::KafkaEvent, + types::{VersionedUserOperation, WrappedUserOperation}, +}; use alloy_primitives::B256; use anyhow::Result; use async_trait::async_trait; @@ -84,15 +87,19 @@ impl UserOpQueuePublisher { self.queue.publish(&self.topic, &key, &payload).await } - fn create_user_op_added_event(&self, user_op: &VersionedUserOperation, hash: &B256) -> KafkaEvent { + fn create_user_op_added_event( + &self, + user_op: &VersionedUserOperation, + hash: &B256, + ) -> KafkaEvent { let wrapped_user_op = WrappedUserOperation { operation: user_op.clone(), - hash: hash.clone(), + hash: *hash, }; - let event = KafkaEvent::UserOpAdded { + + KafkaEvent::UserOpAdded { user_op: wrapped_user_op, - }; - event + } } } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index c7dec6a..7442562 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -562,7 +562,7 @@ mod tests { ingress_topic: String::new(), audit_kafka_properties: String::new(), audit_topic: String::new(), - user_operation_consumer_properties: None, + user_operation_consumer_properties: String::new(), user_operation_consumer_group_id: "tips-user-operation".to_string(), log_level: String::from("info"), log_format: tips_core::logger::LogFormat::Pretty, diff --git a/docker-compose.tips.yml b/docker-compose.tips.yml index 666802a..c73c226 100644 --- a/docker-compose.tips.yml +++ b/docker-compose.tips.yml @@ -15,6 +15,10 @@ services: volumes: - ./docker/ingress-bundles-kafka-properties:/app/docker/ingress-bundles-kafka-properties:ro - ./docker/ingress-audit-kafka-properties:/app/docker/ingress-audit-kafka-properties:ro + - ./docker/ingress-user-operation-consumer-kafka-properties:/app/docker/ingress-user-operation-consumer-kafka-properties:ro + depends_on: + kafka-setup: + condition: service_completed_successfully restart: unless-stopped audit: @@ -28,6 +32,9 @@ services: - .env.docker volumes: - ./docker/audit-kafka-properties:/app/docker/audit-kafka-properties:ro + depends_on: + kafka-setup: + condition: service_completed_successfully restart: unless-stopped ui: diff --git a/docker/ingress-user-operation-consumer-kafka-properties b/docker/ingress-user-operation-consumer-kafka-properties new file mode 100644 index 0000000..3bb02bf --- /dev/null +++ b/docker/ingress-user-operation-consumer-kafka-properties @@ -0,0 +1,9 @@ +# Kafka configuration properties for ingress user operation consumer +bootstrap.servers=host.docker.internal:9094 +message.timeout.ms=5000 +enable.partition.eof=false +session.timeout.ms=6000 +fetch.wait.max.ms=100 +fetch.min.bytes=1 +# Note: group.id and enable.auto.commit are set programmatically + From a21c15d5135585f7d3aea3d5632396e33ac3bcab Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 15:15:20 -0500 Subject: [PATCH 20/24] chore: update tracing --- Cargo.lock | 1 + crates/account-abstraction-core/Cargo.toml | 1 + .../core/src/kafka_mempool_engine.rs | 6 +++++- crates/ingress-rpc/src/kafka_mempool_consumer.rs | 3 ++- 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 829adc7..cca3b77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,6 +20,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tracing", "wiremock", ] diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index a63ebb9..e2a28f6 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -24,6 +24,7 @@ alloy-sol-types.workspace= true anyhow.workspace = true rdkafka.workspace = true serde_json.workspace = true +tracing.workspace=true [dev-dependencies] alloy-primitives.workspace = true diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 25304e4..3d567ca 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; use tokio::sync::RwLock; +use tracing::info; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] @@ -86,7 +87,10 @@ impl KafkaMempoolEngine { } async fn handle_event(&self, event: KafkaEvent) -> anyhow::Result<()> { - println!("Handling Kafka event: {:?}", event); + info!( + event = ?event, + "Kafka mempool engine handling event" + ); match event { KafkaEvent::UserOpAdded { user_op } => { self.mempool.write().await.add_operation(&user_op)?; diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs index 63efd80..2e510c9 100644 --- a/crates/ingress-rpc/src/kafka_mempool_consumer.rs +++ b/crates/ingress-rpc/src/kafka_mempool_consumer.rs @@ -38,7 +38,8 @@ pub fn create_mempool_engine( consumer_group_id: &str, pool_config: Option, ) -> anyhow::Result> { - let consumer = create_user_operation_consumer(properties_file, topic, consumer_group_id)?; + let consumer: StreamConsumer = + create_user_operation_consumer(properties_file, topic, consumer_group_id)?; Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( Arc::new(consumer), pool_config, From 9a4221ca0caa0b44d4aa4764c64851e352e33943 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 15:22:01 -0500 Subject: [PATCH 21/24] chore: update comments --- .../core/src/mempool.rs | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 7feaaef..7fd6dd5 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -64,7 +64,6 @@ impl Ord for ByMaxFeeAndSubmissionId { .max_priority_fee_per_gas() .cmp(&self.0.pool_operation.operation.max_priority_fee_per_gas()) .then_with(|| self.0.submission_id.cmp(&other.0.submission_id)) - .then_with(|| self.0.pool_operation.hash.cmp(&other.0.pool_operation.hash)) } } @@ -299,39 +298,6 @@ mod tests { ); } - // Tests adding an operation with the same hash but lower gas price - #[test] - fn test_add_operation_duplicate_hash_lower_gas() { - let mut mempool = create_test_mempool(1000); - let hash = FixedBytes::from([1u8; 32]); - - let operation1 = create_wrapped_operation(3000, hash); - let result1 = mempool.add_operation(&operation1); - assert!(result1.is_ok()); - assert!(result1.unwrap().is_some()); - - let operation2 = create_wrapped_operation(2000, hash); - let result2 = mempool.add_operation(&operation2); - assert!(result2.is_ok()); - assert!(result2.unwrap().is_none()); - } - - // Tests adding an operation with the same hash and equal gas price - #[test] - fn test_add_operation_duplicate_hash_equal_gas() { - let mut mempool = create_test_mempool(1000); - let hash = FixedBytes::from([1u8; 32]); - - let operation1 = create_wrapped_operation(2000, hash); - let result1 = mempool.add_operation(&operation1); - assert!(result1.is_ok()); - assert!(result1.unwrap().is_some()); - - let operation2 = create_wrapped_operation(2000, hash); - let result2 = mempool.add_operation(&operation2); - assert!(result2.is_ok()); - assert!(result2.unwrap().is_none()); - } // Tests adding multiple operations with different hashes #[test] From 08b105522f32d9d1fdbd20474b7ede7a0862d745 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 15:33:44 -0500 Subject: [PATCH 22/24] chore: format --- crates/account-abstraction-core/core/src/mempool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 7fd6dd5..30988e2 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -298,7 +298,6 @@ mod tests { ); } - // Tests adding multiple operations with different hashes #[test] fn test_add_multiple_operations_with_different_hashes() { From 8449cafb3810ca740f38086dcdd7ea633b162abc Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 16:19:13 -0500 Subject: [PATCH 23/24] chore: setup lock and implement services --- Cargo.lock | 1 + crates/account-abstraction-core/Cargo.toml | 1 + .../core/src/kafka_mempool_engine.rs | 45 ++++++++++- crates/ingress-rpc/src/bin/main.rs | 4 +- .../ingress-rpc/src/kafka_mempool_consumer.rs | 79 ------------------- crates/ingress-rpc/src/lib.rs | 1 - 6 files changed, 45 insertions(+), 86 deletions(-) delete mode 100644 crates/ingress-rpc/src/kafka_mempool_consumer.rs diff --git a/Cargo.lock b/Cargo.lock index cca3b77..079de98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,6 +19,7 @@ dependencies = [ "reth-rpc-eth-types", "serde", "serde_json", + "tips-core", "tokio", "tracing", "wiremock", diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index e2a28f6..39b1591 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -24,6 +24,7 @@ alloy-sol-types.workspace= true anyhow.workspace = true rdkafka.workspace = true serde_json.workspace = true +tips-core.workspace = true tracing.workspace=true [dev-dependencies] diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 3d567ca..7165ef4 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -2,12 +2,17 @@ use crate::mempool::PoolConfig; use crate::mempool::{self, Mempool}; use crate::types::WrappedUserOperation; use async_trait::async_trait; -use rdkafka::{Message, consumer::StreamConsumer, message::OwnedMessage}; +use rdkafka::{ + ClientConfig, Message, + consumer::{Consumer, StreamConsumer}, + message::OwnedMessage, +}; use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; +use tips_core::kafka::load_kafka_config_from_file; use tokio::sync::RwLock; -use tracing::info; +use tracing::{info, warn}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] @@ -68,9 +73,11 @@ impl KafkaMempoolEngine { self.mempool.clone() } - pub async fn run(&self) -> anyhow::Result<()> { + pub async fn run(&self) { loop { - self.process_next().await?; + if let Err(err) = self.process_next().await { + warn!(error = %err, "Kafka mempool engine error, continuing"); + } } } @@ -106,6 +113,36 @@ impl KafkaMempoolEngine { } } +fn create_user_operation_consumer( + properties_file: &str, + topic: &str, + consumer_group_id: &str, +) -> anyhow::Result { + let mut client_config = ClientConfig::from_iter(load_kafka_config_from_file(properties_file)?); + + client_config.set("group.id", consumer_group_id); + client_config.set("enable.auto.commit", "true"); + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic])?; + + Ok(consumer) +} + +pub fn create_mempool_engine( + properties_file: &str, + topic: &str, + consumer_group_id: &str, + pool_config: Option, +) -> anyhow::Result> { + let consumer: StreamConsumer = + create_user_operation_consumer(properties_file, topic, consumer_group_id)?; + Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(consumer), + pool_config, + ))) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index a3f1ce8..ef49082 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -1,3 +1,4 @@ +use account_abstraction_core::kafka_mempool_engine::create_mempool_engine; use alloy_provider::ProviderBuilder; use clap::Parser; use jsonrpsee::server::Server; @@ -11,7 +12,6 @@ use tips_core::{Bundle, MeterBundleResponse}; use tips_ingress_rpc::Config; use tips_ingress_rpc::connect_ingress_to_builder; use tips_ingress_rpc::health::bind_health_server; -use tips_ingress_rpc::kafka_mempool_consumer::{create_mempool_engine, run_mempool_engine}; use tips_ingress_rpc::metrics::init_prometheus_exporter; use tips_ingress_rpc::queue::KafkaMessageQueue; use tips_ingress_rpc::service::{IngressApiServer, IngressService, Providers}; @@ -85,7 +85,7 @@ async fn main() -> anyhow::Result<()> { let mempool_engine_handle = { let engine = mempool_engine.clone(); - tokio::spawn(async move { run_mempool_engine(engine).await }) + tokio::spawn(async move { engine.run().await }) }; let (builder_tx, _) = diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs deleted file mode 100644 index 2e510c9..0000000 --- a/crates/ingress-rpc/src/kafka_mempool_consumer.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use account_abstraction_core::{kafka_mempool_engine::KafkaMempoolEngine, mempool::PoolConfig}; -use backon::{ExponentialBuilder, Retryable}; -use rdkafka::{ - ClientConfig, - consumer::{Consumer, StreamConsumer}, -}; -use tips_core::kafka::load_kafka_config_from_file; -use tracing::warn; - -/// Build a Kafka consumer for the user operation topic. -/// Ensures the consumer group id is set per deployment and subscribes to the topic. -fn create_user_operation_consumer( - properties_file: &str, - topic: &str, - consumer_group_id: &str, -) -> anyhow::Result { - let mut client_config = ClientConfig::from_iter(load_kafka_config_from_file(properties_file)?); - - // Allow deployments to control group id even if the properties file omits it. - client_config.set("group.id", consumer_group_id); - // Rely on Kafka for at-least-once; we keep auto commit enabled unless overridden in the file. - client_config.set("enable.auto.commit", "true"); - - let consumer: StreamConsumer = client_config.create()?; - consumer.subscribe(&[topic])?; - - Ok(consumer) -} - -/// Factory function that creates a fully configured KafkaMempoolEngine. -/// Handles consumer creation, engine instantiation, and Arc wrapping. -pub fn create_mempool_engine( - properties_file: &str, - topic: &str, - consumer_group_id: &str, - pool_config: Option, -) -> anyhow::Result> { - let consumer: StreamConsumer = - create_user_operation_consumer(properties_file, topic, consumer_group_id)?; - Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( - Arc::new(consumer), - pool_config, - ))) -} - -/// Process a single Kafka message with exponential backoff retries. -pub async fn process_next_with_backoff(engine: &KafkaMempoolEngine) -> anyhow::Result<()> { - let process = || async { engine.process_next().await }; - - process - .retry( - &ExponentialBuilder::default() - .with_min_delay(Duration::from_millis(100)) - .with_max_delay(Duration::from_secs(5)) - .with_max_times(5), - ) - .notify(|err: &anyhow::Error, dur: Duration| { - warn!( - error = %err, - retry_in_ms = dur.as_millis(), - "Retrying Kafka mempool engine step" - ); - }) - .await -} - -/// Run the mempool engine forever, applying backoff on individual message failures. -pub async fn run_mempool_engine(engine: Arc) { - loop { - if let Err(err) = process_next_with_backoff(&engine).await { - // We log and continue to avoid stalling the consumer; repeated failures - // will still observe backoff inside `process_next_with_backoff`. - warn!(error = %err, "Kafka mempool engine exhausted retries, continuing"); - } - } -} diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index cc58640..b694d03 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -1,5 +1,4 @@ pub mod health; -pub mod kafka_mempool_consumer; pub mod metrics; pub mod queue; pub mod service; From 44d41b2f34b0d0607bfb5d65fb71eb462bd24d71 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Thu, 18 Dec 2025 14:42:35 -0500 Subject: [PATCH 24/24] chore: update reputation service --- crates/account-abstraction-core/core/src/mempool.rs | 6 +++--- .../account-abstraction-core/core/src/reputation_service.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 30988e2..ecb46be 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -103,7 +103,7 @@ pub trait Mempool { &mut self, operation: &WrappedUserOperation, ) -> Result, anyhow::Error>; - fn get_top_operations(&self, n: usize) -> impl Iterator>; + fn get_top_operations(&self, n: usize) -> impl Iterator; fn remove_operation( &mut self, operation_hash: &UserOpHash, @@ -132,7 +132,7 @@ impl Mempool for MempoolImpl { Ok(ordered_operation_result) } - fn get_top_operations(&self, n: usize) -> impl Iterator> { + fn get_top_operations(&self, n: usize) -> impl Iterator { // TODO: There is a case where we skip operations that are not the lowest nonce for an account. // But we still have not given the N number of operations, meaning we don't return those operations. @@ -148,7 +148,7 @@ impl Mempool for MempoolImpl { Some(lowest) if lowest.0.pool_operation.hash == op_by_fee.0.pool_operation.hash => { - Some(Arc::new(op_by_fee.0.pool_operation.clone())) + Some(op_by_fee.0.pool_operation.clone()) } Some(_) => None, None => { diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs index f0a5e85..c946073 100644 --- a/crates/account-abstraction-core/core/src/reputation_service.rs +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -23,7 +23,7 @@ pub struct ReputationServiceImpl { } impl ReputationServiceImpl { - pub fn new(mempool: Arc>) -> Self { + pub async fn new(mempool: Arc>) -> Self { Self { mempool } } }