Skip to content

Commit 6826874

Browse files
committed
feat: started working on graceful shutdown
1 parent 3f3c8fa commit 6826874

File tree

1 file changed

+112
-41
lines changed

1 file changed

+112
-41
lines changed

taurus/src/main.rs

Lines changed: 112 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ use crate::context::executor::Executor;
88
use crate::context::registry::FunctionStore;
99
use crate::context::signal::Signal;
1010
use crate::implementation::collect;
11+
1112
use code0_flow::flow_config::load_env_file;
13+
use code0_flow::flow_config::mode::Mode::DYNAMIC;
14+
use code0_flow::flow_definition::FlowUpdateService;
1215
use context::context::Context;
1316
use futures_lite::StreamExt;
1417
use log::error;
1518
use prost::Message;
1619
use std::collections::HashMap;
20+
use tokio::signal;
1721
use tonic_health::pb::health_server::HealthServer;
1822
use tucana::shared::value::Kind;
1923
use tucana::shared::{ExecutionFlow, NodeFunction, Value};
@@ -43,13 +47,17 @@ async fn main() {
4347
store.populate(collect());
4448

4549
let client = match async_nats::connect(config.nats_url.clone()).await {
46-
Ok(client) => client,
50+
Ok(client) => {
51+
log::info!("Connected to nats server");
52+
client
53+
}
4754
Err(err) => {
4855
panic!("Failed to connect to NATS server: {}", err);
4956
}
5057
};
5158

52-
if config.with_health_service {
59+
// Optional health service task
60+
let health_task = if config.with_health_service {
5361
let health_service = code0_flow::flow_health::HealthService::new(config.nats_url.clone());
5462
let address = match format!("{}:{}", config.grpc_host, config.grpc_port).parse() {
5563
Ok(address) => address,
@@ -59,51 +67,114 @@ async fn main() {
5967
}
6068
};
6169

62-
tokio::spawn(async move {
63-
let _ = tonic::transport::Server::builder()
70+
log::info!("Health server starting at {}", address);
71+
72+
Some(tokio::spawn(async move {
73+
if let Err(err) = tonic::transport::Server::builder()
6474
.add_service(HealthServer::new(health_service))
6575
.serve(address)
66-
.await;
67-
});
76+
.await
77+
{
78+
log::error!("Health server error: {:?}", err);
79+
} else {
80+
log::info!("Health server started!");
81+
}
82+
}))
83+
} else {
84+
None
85+
};
6886

69-
println!("Health server started at {}", address);
87+
// Optional: dynamic mode sync at startup
88+
if config.mode == DYNAMIC {
89+
FlowUpdateService::from_url(
90+
config.aquila_url.clone(),
91+
config.definitions.clone().as_str(),
92+
)
93+
.send()
94+
.await;
7095
}
7196

72-
match client
73-
.queue_subscribe(String::from("execution.*"), "taurus".into())
74-
.await
75-
{
76-
Ok(mut sub) => {
77-
println!("Subscribed to 'execution.*'");
78-
79-
while let Some(msg) = sub.next().await {
80-
let flow: ExecutionFlow = match ExecutionFlow::decode(&*msg.payload) {
81-
Ok(flow) => flow,
82-
Err(err) => {
83-
println!("Failed to deserialize flow: {}, {:?}", err, &msg.payload);
84-
continue;
85-
}
86-
};
87-
88-
let value = match handle_message(flow, &store) {
89-
Signal::Failure(error) => error.as_value(),
90-
Signal::Success(v) => v,
91-
Signal::Return(v) => v,
92-
Signal::Respond(v) => v,
93-
Signal::Stop => Value {
94-
kind: Some(Kind::NullValue(0)),
95-
},
96-
};
97-
98-
// Send a response to the reply subject
99-
if let Some(reply) = msg.reply {
100-
match client.publish(reply, value.encode_to_vec().into()).await {
101-
Ok(_) => println!("Response sent"),
102-
Err(err) => println!("Failed to send response: {}", err),
103-
}
97+
let mut worker_task = tokio::spawn(async move {
98+
let mut sub = match client
99+
.queue_subscribe(String::from("execution.*"), "taurus".into())
100+
.await
101+
{
102+
Ok(sub) => {
103+
log::info!("Subscribed to 'execution.*'");
104+
sub
105+
}
106+
Err(err) => {
107+
log::error!("Failed to subscribe to 'execution.*': {:?}", err);
108+
return;
109+
}
110+
};
111+
112+
while let Some(msg) = sub.next().await {
113+
let flow: ExecutionFlow = match ExecutionFlow::decode(&*msg.payload) {
114+
Ok(flow) => flow,
115+
Err(err) => {
116+
log::error!(
117+
"Failed to deserialize flow: {:?}, payload: {:?}",
118+
err,
119+
&msg.payload
120+
);
121+
continue;
122+
}
123+
};
124+
125+
let value = match handle_message(flow, &store) {
126+
Signal::Failure(error) => error.as_value(),
127+
Signal::Success(v) => v,
128+
Signal::Return(v) => v,
129+
Signal::Respond(v) => v,
130+
Signal::Stop => Value {
131+
kind: Some(Kind::NullValue(0)),
132+
},
133+
};
134+
135+
// Send a response to the reply subject
136+
if let Some(reply) = msg.reply {
137+
match client.publish(reply, value.encode_to_vec().into()).await {
138+
Ok(_) => log::info!("Response sent"),
139+
Err(err) => log::error!("Failed to send response: {:?}", err),
104140
}
105141
}
106142
}
107-
Err(err) => panic!("Failed to subscribe to 'execution.*': {}", err),
108-
};
143+
144+
log::info!("NATS worker loop ended");
145+
});
146+
147+
match health_task {
148+
Some(mut health_task) => {
149+
// both are mutable JoinHandle<()> so we can borrow them in select!
150+
tokio::select! {
151+
_ = &mut worker_task => {
152+
log::warn!("NATS worker task finished, shutting down");
153+
health_task.abort();
154+
}
155+
_ = &mut health_task => {
156+
log::warn!("Health server task finished, shutting down");
157+
worker_task.abort();
158+
}
159+
_ = signal::ctrl_c() => {
160+
log::info!("Ctrl+C/Exit signal received, shutting down");
161+
worker_task.abort();
162+
health_task.abort();
163+
}
164+
}
165+
}
166+
None => {
167+
tokio::select! {
168+
_ = &mut worker_task => {
169+
log::warn!("NATS worker task finished, shutting down");
170+
}
171+
_ = signal::ctrl_c() => {
172+
log::info!("Ctrl+C/Exit signal received, shutting down");
173+
worker_task.abort();
174+
}
175+
}
176+
}
177+
}
178+
179+
log::info!("Taurus shutdown complete");
109180
}

0 commit comments

Comments
 (0)