Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ where
let store = store.ok_or("beacon_chain_start_method requires a store")?;
let runtime_context =
runtime_context.ok_or("beacon_chain_start_method requires a runtime context")?;
let context = runtime_context.service_context("beacon".into());
let context = runtime_context.clone();
let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?;
let event_handler = if self.http_api_config.enabled {
Some(ServerSentEventHandler::new(
Expand All @@ -178,7 +178,7 @@ where
};

let execution_layer = if let Some(config) = config.execution_layer.clone() {
let context = runtime_context.service_context("exec".into());
let context = runtime_context.clone();
let execution_layer = ExecutionLayer::from_config(config, context.executor.clone())
.map_err(|e| format!("unable to start execution layer endpoints: {:?}", e))?;
Some(execution_layer)
Expand Down Expand Up @@ -521,7 +521,7 @@ where
.runtime_context
.as_ref()
.ok_or("node timer requires a runtime_context")?
.service_context("node_timer".into());
.clone();
let beacon_chain = self
.beacon_chain
.clone()
Expand Down Expand Up @@ -561,7 +561,7 @@ where
.runtime_context
.as_ref()
.ok_or("slasher requires a runtime_context")?
.service_context("slasher_service_ctxt".into());
.clone();
SlasherService::new(beacon_chain, network_senders.network_send()).run(&context.executor)
}

Expand All @@ -572,7 +572,7 @@ where
.runtime_context
.as_ref()
.ok_or("monitoring_client requires a runtime_context")?
.service_context("monitoring_client".into());
.clone();
let monitoring_client = MonitoringHttpClient::new(config)?;
monitoring_client.auto_update(
context.executor,
Expand All @@ -587,7 +587,7 @@ where
.runtime_context
.as_ref()
.ok_or("slot_notifier requires a runtime_context")?
.service_context("slot_notifier".into());
.clone();
let beacon_chain = self
.beacon_chain
.clone()
Expand Down Expand Up @@ -695,7 +695,7 @@ where

if let Some(beacon_chain) = self.beacon_chain.as_ref() {
if let Some(network_globals) = &self.network_globals {
let beacon_processor_context = runtime_context.service_context("bproc".into());
let beacon_processor_context = runtime_context.clone();
BeaconProcessor {
network_globals: network_globals.clone(),
executor: beacon_processor_context.executor.clone(),
Expand All @@ -718,7 +718,7 @@ where
)?;
}

let state_advance_context = runtime_context.service_context("state_advance".into());
let state_advance_context = runtime_context.clone();
spawn_state_advance_timer(state_advance_context.executor, beacon_chain.clone());

if let Some(execution_layer) = beacon_chain.execution_layer.as_ref() {
Expand Down Expand Up @@ -770,8 +770,7 @@ where
// Spawn service to publish light_client updates at some interval into the slot.
if let Some(light_client_server_rv) = self.light_client_server_rv {
let inner_chain = beacon_chain.clone();
let light_client_update_context =
runtime_context.service_context("lc_update".to_string());
let light_client_update_context = runtime_context.clone();
light_client_update_context.executor.spawn(
async move {
compute_light_client_updates(
Expand Down
16 changes: 2 additions & 14 deletions beacon_node/lighthouse_network/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ pub async fn build_libp2p_instance(
boot_nodes: Vec<Enr>,
fork_name: ForkName,
chain_spec: Arc<ChainSpec>,
service_name: String,
disable_peer_scoring: bool,
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
) -> Libp2pInstance {
Expand All @@ -129,7 +128,7 @@ pub async fn build_libp2p_instance(

let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx, service_name);
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx);
let custody_group_count = chain_spec.custody_requirement;
let libp2p_context = lighthouse_network::Context {
config,
Expand Down Expand Up @@ -179,7 +178,6 @@ pub async fn build_node_pair(
vec![],
fork_name,
spec.clone(),
"sender".to_string(),
disable_peer_scoring,
inbound_rate_limiter.clone(),
)
Expand All @@ -189,7 +187,6 @@ pub async fn build_node_pair(
vec![],
fork_name,
spec.clone(),
"receiver".to_string(),
disable_peer_scoring,
inbound_rate_limiter,
)
Expand Down Expand Up @@ -268,16 +265,7 @@ pub async fn build_linear(
let mut nodes = Vec::with_capacity(n);
for _ in 0..n {
nodes.push(
build_libp2p_instance(
rt.clone(),
vec![],
fork_name,
spec.clone(),
"linear".to_string(),
false,
None,
)
.await,
build_libp2p_instance(rt.clone(), vec![], fork_name, spec.clone(), false, None).await,
);
}

Expand Down
15 changes: 3 additions & 12 deletions beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ fn test_dht_persistence() {

let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
exit,
shutdown_tx,
"test-dht-persistence".to_string(),
);
let executor = task_executor::TaskExecutor::new(Arc::downgrade(&runtime), exit, shutdown_tx);

let mut config = NetworkConfig::default();
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212, 21213);
Expand Down Expand Up @@ -115,12 +110,8 @@ fn test_removing_topic_weight_on_old_topics() {
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
let (_, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
exit,
shutdown_tx,
"test-removing-topic-weight-on-old-topics".to_string(),
);
let executor =
task_executor::TaskExecutor::new(Arc::downgrade(&runtime), exit, shutdown_tx);

let mut config = NetworkConfig::default();
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21214, 21214, 21215);
Expand Down
18 changes: 0 additions & 18 deletions common/task_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@ pub struct TaskExecutor {
/// The task must provide a reason for shutting down.
signal_tx: Sender<ShutdownReason>,

/// The name of the service for inclusion in the logger output.
// FIXME(sproul): delete?
#[allow(dead_code)]
service_name: String,

rayon_pool_provider: Arc<RayonPoolProvider>,
}

Expand All @@ -103,28 +98,15 @@ impl TaskExecutor {
handle: T,
exit: async_channel::Receiver<()>,
signal_tx: Sender<ShutdownReason>,
service_name: String,
) -> Self {
Self {
handle_provider: handle.into(),
exit,
signal_tx,
service_name,
rayon_pool_provider: Arc::new(RayonPoolProvider::default()),
}
}

/// Clones the task executor adding a service name.
pub fn clone_with_name(&self, service_name: String) -> Self {
TaskExecutor {
handle_provider: self.handle_provider.clone(),
exit: self.exit.clone(),
signal_tx: self.signal_tx.clone(),
service_name,
rayon_pool_provider: self.rayon_pool_provider.clone(),
}
}

/// A convenience wrapper for `Self::spawn` which ignores a `Result` as long as both `Ok`/`Err`
/// are of type `()`.
///
Expand Down
2 changes: 1 addition & 1 deletion common/task_executor/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Default for TestRuntime {
(Some(runtime), handle)
};

let task_executor = TaskExecutor::new(handle, exit, shutdown_tx, "test".to_string());
let task_executor = TaskExecutor::new(handle, exit, shutdown_tx);

Self {
runtime,
Expand Down
30 changes: 0 additions & 30 deletions lighthouse/environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,6 @@ pub struct RuntimeContext<E: EthSpec> {
}

impl<E: EthSpec> RuntimeContext<E> {
/// Returns a sub-context of this context.
///
/// The generated service will have the `service_name` in all it's logs.
pub fn service_context(&self, service_name: String) -> Self {
Self {
executor: self.executor.clone_with_name(service_name),
eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(),
eth2_network_config: self.eth2_network_config.clone(),
sse_logging_components: self.sse_logging_components.clone(),
}
}

/// Returns the `eth2_config` for this service.
pub fn eth2_config(&self) -> &Eth2Config {
&self.eth2_config
Expand Down Expand Up @@ -349,23 +336,6 @@ impl<E: EthSpec> Environment<E> {
Arc::downgrade(self.runtime()),
self.exit.clone(),
self.signal_tx.clone(),
"core".to_string(),
),
eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(),
eth2_network_config: self.eth2_network_config.clone(),
sse_logging_components: self.sse_logging_components.clone(),
}
}

/// Returns a `Context` where the `service_name` is added to the logger output.
pub fn service_context(&self, service_name: String) -> RuntimeContext<E> {
RuntimeContext {
executor: TaskExecutor::new(
Arc::downgrade(self.runtime()),
self.exit.clone(),
self.signal_tx.clone(),
service_name,
),
eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(),
Expand Down
7 changes: 1 addition & 6 deletions testing/execution_engine_integration/src/test_rig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,7 @@ impl<Engine: GenericExecutionEngine> TestRig<Engine> {
);
let (runtime_shutdown, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = TaskExecutor::new(
Arc::downgrade(&runtime),
exit,
shutdown_tx,
"test".to_string(),
);
let executor = TaskExecutor::new(Arc::downgrade(&runtime), exit, shutdown_tx);
let mut spec = TEST_FORK.make_genesis_spec(MainnetEthSpec::default_spec());
spec.terminal_total_difficulty = Uint256::ZERO;

Expand Down
1 change: 0 additions & 1 deletion testing/simulator/src/basic_sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ pub fn run_basic_sim(matches: &ArgMatches) -> Result<(), String> {
network_1
.add_validator_client_with_fallbacks(
validator_config,
i,
beacon_nodes,
files,
)
Expand Down
7 changes: 1 addition & 6 deletions testing/simulator/src/fallback_sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,7 @@ pub fn run_fallback_sim(matches: &ArgMatches) -> Result<(), String> {
Some(SUGGESTED_FEE_RECIPIENT.into());
println!("Adding validator client {}", i);
network_1
.add_validator_client_with_fallbacks(
validator_config,
i,
beacon_nodes,
files,
)
.add_validator_client_with_fallbacks(validator_config, beacon_nodes, files)
.await
.expect("should add validator");
},
Expand Down
31 changes: 6 additions & 25 deletions testing/simulator/src/local_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ impl<E: EthSpec> LocalNetwork<E> {
beacon_config.network.enr_tcp4_port = Some(BOOTNODE_PORT.try_into().expect("non zero"));
beacon_config.network.discv5_config.table_filter = |_| true;

let execution_node = LocalExecutionNode::new(
self.context.service_context("boot_node_el".into()),
mock_execution_config,
);
let execution_node = LocalExecutionNode::new(self.context.clone(), mock_execution_config);

beacon_config.execution_layer = Some(execution_layer::Config {
execution_endpoint: Some(SensitiveUrl::parse(&execution_node.server.url()).unwrap()),
Expand All @@ -218,11 +215,7 @@ impl<E: EthSpec> LocalNetwork<E> {
..Default::default()
});

let beacon_node = LocalBeaconNode::production(
self.context.service_context("boot_node".into()),
beacon_config,
)
.await?;
let beacon_node = LocalBeaconNode::production(self.context.clone(), beacon_config).await?;

Ok((beacon_node, execution_node))
}
Expand Down Expand Up @@ -252,10 +245,7 @@ impl<E: EthSpec> LocalNetwork<E> {
mock_execution_config.server_config.listen_port = EXECUTION_PORT + count;

// Construct execution node.
let execution_node = LocalExecutionNode::new(
self.context.service_context(format!("node_{}_el", count)),
mock_execution_config,
);
let execution_node = LocalExecutionNode::new(self.context.clone(), mock_execution_config);

// Pair the beacon node and execution node.
beacon_config.execution_layer = Some(execution_layer::Config {
Expand All @@ -266,11 +256,7 @@ impl<E: EthSpec> LocalNetwork<E> {
});

// Construct beacon node using the config,
let beacon_node = LocalBeaconNode::production(
self.context.service_context(format!("node_{}", count)),
beacon_config,
)
.await?;
let beacon_node = LocalBeaconNode::production(self.context.clone(), beacon_config).await?;

Ok((beacon_node, execution_node))
}
Expand Down Expand Up @@ -343,9 +329,7 @@ impl<E: EthSpec> LocalNetwork<E> {
beacon_node: usize,
validator_files: ValidatorFiles,
) -> Result<(), String> {
let context = self
.context
.service_context(format!("validator_{}", beacon_node));
let context = self.context.clone();
let self_1 = self.clone();
let socket_addr = {
let read_lock = self.beacon_nodes.read();
Expand Down Expand Up @@ -401,13 +385,10 @@ impl<E: EthSpec> LocalNetwork<E> {
pub async fn add_validator_client_with_fallbacks(
&self,
mut validator_config: ValidatorConfig,
validator_index: usize,
beacon_nodes: Vec<usize>,
validator_files: ValidatorFiles,
) -> Result<(), String> {
let context = self
.context
.service_context(format!("validator_{}", validator_index));
let context = self.context.clone();
let self_1 = self.clone();
let mut beacon_node_urls = vec![];
for beacon_node in beacon_nodes {
Expand Down
7 changes: 1 addition & 6 deletions testing/web3signer_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,7 @@ mod tests {
);
let (runtime_shutdown, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = TaskExecutor::new(
Arc::downgrade(&runtime),
exit,
shutdown_tx,
"test".to_string(),
);
let executor = TaskExecutor::new(Arc::downgrade(&runtime), exit, shutdown_tx);

let slashing_db_path = validator_dir.path().join(SLASHING_PROTECTION_FILENAME);
let slashing_protection = SlashingDatabase::open_or_create(&slashing_db_path).unwrap();
Expand Down
Loading