Skip to content

Commit 0d21d7b

Browse files
committed
feat: added graceful and propper shutdown
1 parent 5c76091 commit 0d21d7b

File tree

3 files changed

+90
-35
lines changed

3 files changed

+90
-35
lines changed

adapter/rest/src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@ use tucana::shared::{Struct, ValidationFlow, Value};
1616
#[tokio::main]
1717
async fn main() {
1818
let server = HttpServer { http_server: None };
19-
let runner = ServerRunner::new(server).await.unwrap();
20-
runner.serve().await.unwrap();
19+
let runner = match ServerRunner::new(server).await {
20+
Ok(runner) => runner,
21+
Err(err) => panic!("Failed to create server runner: {:?}", err),
22+
};
23+
match runner.serve().await {
24+
Ok(_) => (),
25+
Err(err) => panic!("Failed to start server runner: {:?}", err),
26+
};
2127
}
2228

2329
struct HttpServer {

crates/base/src/runner.rs

Lines changed: 73 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::{
33
store::AdapterStore,
44
traits::{LoadConfig, Server as AdapterServer},
55
};
6-
use code0_flow::flow_definition::FlowUpdateService;
76
use std::sync::Arc;
8-
use tokio::sync::broadcast;
7+
use code0_flow::flow_service::FlowUpdateService;
8+
use tokio::signal;
99
use tonic::transport::Server;
1010
use tonic_health::pb::health_server::HealthServer;
1111

@@ -20,11 +20,14 @@ pub struct ServerContext<C: LoadConfig> {
2020
pub struct ServerRunner<C: LoadConfig> {
2121
context: ServerContext<C>,
2222
server: Box<dyn AdapterServer<C>>,
23-
shutdown_sender: broadcast::Sender<()>,
2423
}
2524

2625
impl<C: LoadConfig> ServerRunner<C> {
2726
pub async fn new<S: AdapterServer<C>>(server: S) -> anyhow::Result<Self> {
27+
env_logger::Builder::from_default_env()
28+
.filter_level(log::LevelFilter::Debug)
29+
.init();
30+
2831
code0_flow::flow_config::load_env_file();
2932

3033
let adapter_config = AdapterConfig::from_env();
@@ -41,17 +44,15 @@ impl<C: LoadConfig> ServerRunner<C> {
4144
server_config: Arc::new(server_config),
4245
};
4346

44-
let (shutdown_tx, _) = broadcast::channel(1);
45-
4647
Ok(Self {
4748
context,
4849
server: Box::new(server),
49-
shutdown_sender: shutdown_tx,
5050
})
5151
}
5252

53-
pub async fn serve(mut self) -> anyhow::Result<()> {
53+
pub async fn serve(self) -> anyhow::Result<()> {
5454
let config = self.context.adapter_config.clone();
55+
log::info!("Starting Draco Variant: {}", config.draco_variant);
5556

5657
if !config.is_static() {
5758
let definition_service = FlowUpdateService::from_url(
@@ -61,42 +62,84 @@ impl<C: LoadConfig> ServerRunner<C> {
6162
definition_service.send().await;
6263
}
6364

64-
if config.with_health_service {
65+
let health_task = if config.with_health_service {
6566
let health_service =
6667
code0_flow::flow_health::HealthService::new(config.nats_url.clone());
6768
let address = format!("{}:{}", config.grpc_host, config.grpc_port).parse()?;
6869

69-
tokio::spawn(async move {
70-
let _ = Server::builder()
71-
.add_service(HealthServer::new(health_service))
72-
.serve(address)
73-
.await;
74-
});
75-
7670
log::info!(
77-
"Health server started at {}:{}",
71+
"Health server starting at {}:{}",
7872
config.grpc_host,
7973
config.grpc_port
8074
);
81-
}
82-
83-
self.server.init(&self.context).await?;
8475

85-
let mut rx = self.shutdown_sender.subscribe();
86-
let context = self.context;
87-
let mut server = self.server;
76+
Some(tokio::spawn(async move {
77+
if let Err(err) = Server::builder()
78+
.add_service(HealthServer::new(health_service))
79+
.serve(address)
80+
.await
81+
{
82+
log::error!("Health server error: {:?}", err);
83+
} else {
84+
log::info!("Health server stopped gracefully");
85+
}
86+
}))
87+
} else {
88+
None
89+
};
8890

89-
let handle = tokio::spawn(async move {
90-
tokio::select! {
91-
result = server.run(&context) => result,
92-
_ = rx.recv() => server.shutdown(&context).await,
91+
let ServerRunner {
92+
mut server,
93+
context,
94+
} = self;
95+
96+
// Init the adapter server (e.g. create underlying HTTP server)
97+
server.init(&context).await?;
98+
log::info!("Draco successfully initialized.");
99+
100+
match health_task {
101+
Some(mut ht) => {
102+
tokio::select! {
103+
// Main adapter server loop finished on its own
104+
res = server.run(&context) => {
105+
log::warn!("Adapter server finished, shutting down");
106+
ht.abort();
107+
res?;
108+
}
109+
110+
// Health server ended first
111+
_ = &mut ht => {
112+
log::warn!("Health server task finished, shutting down adapter");
113+
server.shutdown(&context).await?;
114+
}
115+
116+
// Ctrl+C / SIGINT
117+
_ = signal::ctrl_c() => {
118+
log::info!("Ctrl+C/Exit signal received, shutting down adapter");
119+
server.shutdown(&context).await?;
120+
ht.abort();
121+
}
122+
}
93123
}
94-
});
95124

96-
tokio::signal::ctrl_c().await?;
97-
let _ = self.shutdown_sender.send(());
98-
handle.await??;
125+
None => {
126+
tokio::select! {
127+
// Adapter server loop ends on its own
128+
res = server.run(&context) => {
129+
log::warn!("Adapter server finished");
130+
res?;
131+
}
132+
133+
// Ctrl+C / SIGINT
134+
_ = signal::ctrl_c() => {
135+
log::info!("Ctrl+C/Exit signal received, shutting down adapter");
136+
server.shutdown(&context).await?;
137+
}
138+
}
139+
}
140+
}
99141

142+
log::info!("Draco shutdown complete");
100143
Ok(())
101144
}
102145
}

crates/base/src/store.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ pub enum FlowIdentifyResult {
1919
impl AdapterStore {
2020
pub async fn from_url(url: String, bucket: String) -> Self {
2121
let client = match async_nats::connect(url).await {
22-
Ok(client) => client,
22+
Ok(client) => {
23+
log::info!("Successfully connected to NATS");
24+
client
25+
},
2326
Err(err) => panic!("Failed to connect to NATS server: {:?}", err),
2427
};
2528

@@ -33,13 +36,16 @@ impl AdapterStore {
3336
.await
3437
{
3538
Ok(_) => {
36-
log::info!("Successfully created NATS bucket");
39+
log::info!("Successfully created NATS bucket/bucket already exists");
3740
}
3841
Err(err) => panic!("Failed to create NATS bucket: {:?}", err),
3942
}
4043

4144
let kv = match stream.get_key_value(bucket).await {
42-
Ok(kv) => kv,
45+
Ok(kv) => {
46+
log::info!("Successfully got NATS bucket");
47+
kv
48+
},
4349
Err(err) => panic!("Failed to get key-value store: {}", err),
4450
};
4551

0 commit comments

Comments
 (0)