Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions configs/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ token_contract_address = "0x2000000000000000000000000000000000000001"
recipient_distribution_factor = 20 # 1/20 of accounts receive transfers.
max_transfer_amount = 10

batch_size = 1000 # Number of transactions to generate before pushing to queue.

[rate_limiting]
initial_ratelimit = 100 # txs/s

Expand Down
8 changes: 4 additions & 4 deletions crescendo/src/bin/generate_genesis_alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
const NUM_ACCOUNTS: u32 = 50_000;
const MNEMONIC: &str = "test test test test test test test test test test test junk";

println!("Generating {} accounts...", NUM_ACCOUNTS);
println!("Generating {NUM_ACCOUNTS} accounts...");

let genesis_alloc: BTreeMap<String, AccountBalance> = (0..NUM_ACCOUNTS)
.into_par_iter()
Expand All @@ -27,17 +27,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let signer =
MnemonicBuilder::<English>::default().phrase(MNEMONIC).index(worker_id).unwrap().build().unwrap();

let address = secret_key_to_address(&signer.credential());
let address = secret_key_to_address(signer.credential());

(format!("{:?}", address), AccountBalance { balance: "0xD3C21BCECCEDA1000000".to_string() })
(format!("{address:?}"), AccountBalance { balance: "0xD3C21BCECCEDA1000000".to_string() })
})
.collect();

let output_path = Path::new("genesis-alloc.json");
let json = serde_json::to_string_pretty(&genesis_alloc)?;
fs::write(output_path, json)?;

println!("\nSuccessfully generated {} accounts!", NUM_ACCOUNTS);
println!("\nSuccessfully generated {NUM_ACCOUNTS} accounts!");
println!("Accounts saved to: {}", output_path.display());

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions crescendo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub struct TxGenWorkerConfig {
pub token_contract_address: String,
pub recipient_distribution_factor: u32,
pub max_transfer_amount: u64,

pub batch_size: u32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
3 changes: 1 addition & 2 deletions crescendo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::thread;
use std::time::Duration;

use clap::Parser;
use core_affinity;
use mimalloc::MiMalloc;

mod config;
Expand Down Expand Up @@ -70,7 +69,7 @@ async fn main() {

let connections_per_network_worker =
config::get().network_worker.total_connections / worker_counts[&WorkerType::Network];
println!("[*] Connections per network worker: {}", connections_per_network_worker);
println!("[*] Connections per network worker: {connections_per_network_worker}");

// TODO: Having the assign_workers function do this would be cleaner.
let mut tx_gen_worker_id = 0;
Expand Down
24 changes: 17 additions & 7 deletions crescendo/src/tx_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,35 @@ impl TxQueue {
pub static TX_QUEUE: std::sync::LazyLock<TxQueue> = std::sync::LazyLock::new(TxQueue::new);

impl TxQueue {
pub fn push_tx(&self, tx: Vec<u8>) {
self.total_added.fetch_add(1, Ordering::Relaxed);
self.queue.lock().push_back(tx);
pub fn push_txs(&self, txs: Vec<Vec<u8>>) {
self.total_added.fetch_add(txs.len() as u64, Ordering::Relaxed);
self.queue.lock().extend(txs);
}

pub fn queue_len(&self) -> usize {
self.queue.lock().len()
}

pub async fn pop_at_most(&self, max_count: usize) -> Option<Vec<Vec<u8>>> {
let mut queue = self.queue.lock();
let allowed = (0..queue.len().min(max_count)).take_while(|_| self.rate_limiter.try_wait().is_ok()).count();
// Assume the queue has sufficient items for now.
let allowed = (0..max_count).take_while(|_| self.rate_limiter.try_wait().is_ok()).count();
if allowed == 0 {
return None;
}

// Scope to release lock asap.
let drained = {
let mut queue = self.queue.lock();
let to_drain = allowed.min(queue.len());
if to_drain == 0 {
return None;
}
queue.drain(..to_drain).collect::<Vec<_>>()
};

self.total_popped.fetch_add(allowed as u64, Ordering::Relaxed);
self.total_popped.fetch_add(drained.len() as u64, Ordering::Relaxed);

Some(queue.drain(..allowed).collect())
Some(drained)
}

pub async fn start_reporter(&self, measurement_interval: std::time::Duration) {
Expand Down
4 changes: 2 additions & 2 deletions crescendo/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub fn increase_nofile_limit(min_limit: u64) -> io::Result<u64> {
println!("[*] At startup, file descriptor limit: soft = {soft}, hard = {hard}");

if hard < min_limit {
panic!("[!] File descriptor hard limit is too low. Please increase it to at least {}.", min_limit);
panic!("[!] File descriptor hard limit is too low. Please increase it to at least {min_limit}.");
}

if soft != hard {
Expand Down Expand Up @@ -61,7 +61,7 @@ pub fn format_ranges(nums: &[usize]) -> String {
if start == end {
ranges.push(start.to_string());
} else {
ranges.push(format!("{}-{}", start, end));
ranges.push(format!("{start}-{end}"));
}

i += 1;
Expand Down
14 changes: 7 additions & 7 deletions crescendo/src/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub fn assign_workers(
if let Some(core_id) = core_ids.pop() {
result.push((core_id, worker_type));
*worker_counts.entry(worker_type).or_insert(0) += 1;
worker_cores.entry(worker_type).or_insert_with(Vec::new).push(core_id);
worker_cores.entry(worker_type).or_default().push(core_id);
remaining_cores -= 1;
}
}
Expand All @@ -74,7 +74,7 @@ pub fn assign_workers(
if let Some(core_id) = core_ids.pop() {
result.push((core_id, *worker_type));
*worker_counts.entry(*worker_type).or_insert(0) += 1;
worker_cores.entry(*worker_type).or_insert_with(Vec::new).push(core_id);
worker_cores.entry(*worker_type).or_default().push(core_id);
remaining_cores -= 1;
}
}
Expand All @@ -87,26 +87,26 @@ pub fn assign_workers(
if let Some(core_id) = core_ids.pop() {
result.push((core_id, worker_type));
*worker_counts.entry(worker_type).or_insert(0) += 1;
worker_cores.entry(worker_type).or_insert_with(Vec::new).push(core_id);
worker_cores.entry(worker_type).or_default().push(core_id);
}
}
}

println!("[+] Spawning {} workers:", total_starting_cores);
println!("[+] Spawning {total_starting_cores} workers:");
for (worker_type, count) in worker_counts.clone() {
if log_core_ranges {
if let Some(cores) = worker_cores.get(&worker_type) {
let mut core_ids: Vec<usize> = cores.iter().map(|c| c.id).collect();
core_ids.sort();

let core_str = match core_ids.as_slice() {
[single] => format!("core {}", single),
[single] => format!("core {single}"),
ids => format!("cores {}", format_ranges(ids)),
};
println!("- {:?}: {} ({})", worker_type, count, core_str);
println!("- {worker_type:?}: {count} ({core_str})");
}
} else {
println!("- {:?}: {}", worker_type, count);
println!("- {worker_type:?}: {count}");
}
}

Expand Down
6 changes: 3 additions & 3 deletions crescendo/src/workers/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,19 @@ pub async fn network_worker(worker_id: usize) {
NETWORK_STATS.inc_requests_by(txs.len() - error_count);
}
Err(e) => {
eprintln!("[!] Failed to read response body: {:?}", e);
eprintln!("[!] Failed to read response body: {e:?}");
NETWORK_STATS.inc_errors_by(txs.len());
tokio::time::sleep(Duration::from_millis(config.error_sleep_ms)).await;
}
}
} else {
println!("[!] Request did not have OK status: {:?}", res);
println!("[!] Request did not have OK status: {res:?}");
NETWORK_STATS.inc_errors_by(txs.len());
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Err(e) => {
eprintln!("[!] Request failed: {:?}", e);
eprintln!("[!] Request failed: {e:?}");
NETWORK_STATS.inc_errors_by(txs.len());
tokio::time::sleep(Duration::from_millis(100)).await;
}
Expand Down
12 changes: 9 additions & 3 deletions crescendo/src/workers/tx_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ sol! {
}

pub fn tx_gen_worker(_worker_id: u32) {
let config = &config::get().tx_gen_worker;

let mut rng = rand::rng();
let mut tx_batch = Vec::with_capacity(config.batch_size as usize);

loop {
let config = &config::get().tx_gen_worker;

let account_index = rng.random_range(0..config.num_accounts); // Acount we'll be sending from.

// Get and increment nonce atomically.
Expand Down Expand Up @@ -83,7 +84,12 @@ pub fn tx_gen_worker(_worker_id: u32) {
},
);

TX_QUEUE.push_tx(tx);
tx_batch.push(tx);

// Once we've accumulated batch_size transactions, drain them all to the queue.
if tx_batch.len() >= config.batch_size as usize {
TX_QUEUE.push_txs(std::mem::take(&mut tx_batch));
}
}
}

Expand Down