Skip to content

Commit 45aa000

Browse files
committed
feat: added flow_definition_client
1 parent 650e6b3 commit 45aa000

File tree

1 file changed

+148
-0
lines changed

1 file changed

+148
-0
lines changed

src/flow_definition/mod.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use tucana::{
2+
aquila::{
3+
data_type_service_client::DataTypeServiceClient,
4+
flow_type_service_client::FlowTypeServiceClient,
5+
runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
6+
DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
7+
},
8+
shared::{DataType, FlowType, RuntimeFunctionDefinition},
9+
};
10+
11+
pub struct FlowUpdateService {
12+
aquila_url: String,
13+
data_types: Vec<DataType>,
14+
runtime_definitions: Vec<RuntimeFunctionDefinition>,
15+
flow_types: Vec<FlowType>,
16+
}
17+
18+
impl FlowUpdateService {
19+
pub fn from_url(aquila_url: String) -> Self {
20+
Self {
21+
aquila_url,
22+
data_types: Vec::new(),
23+
runtime_definitions: Vec::new(),
24+
flow_types: Vec::new(),
25+
}
26+
}
27+
28+
pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
29+
self.flow_types = flow_types;
30+
self
31+
}
32+
33+
pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
34+
self.data_types = data_types;
35+
self
36+
}
37+
38+
pub fn with_runtime_definitions(
39+
mut self,
40+
runtime_definitions: Vec<RuntimeFunctionDefinition>,
41+
) -> Self {
42+
self.runtime_definitions = runtime_definitions;
43+
self
44+
}
45+
46+
pub async fn send(&self) {
47+
self.update_data_types().await;
48+
self.update_runtime_definitions().await;
49+
self.update_flow_types().await;
50+
}
51+
52+
async fn update_data_types(&self) {
53+
if self.data_types.is_empty() {
54+
return;
55+
}
56+
57+
log::info!("Updating the current DataTypes!");
58+
let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
59+
Ok(client) => client,
60+
Err(err) => {
61+
log::error!("Failed to connect to DataTypeService: {}", err);
62+
return;
63+
}
64+
};
65+
66+
let request = DataTypeUpdateRequest {
67+
data_types: self.data_types.clone(),
68+
};
69+
70+
match client.update(request).await {
71+
Ok(response) => {
72+
log::info!(
73+
"Was the update of the DataTypes accepted by Sagittarius? {}",
74+
response.into_inner().success
75+
);
76+
}
77+
Err(err) => {
78+
log::error!("Failed to update data types: {}", err);
79+
}
80+
}
81+
}
82+
83+
async fn update_runtime_definitions(&self) {
84+
if self.runtime_definitions.is_empty() {
85+
return;
86+
}
87+
88+
log::info!("Updating the current RuntimeDefinitions!");
89+
let mut client =
90+
match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
91+
Ok(client) => client,
92+
Err(err) => {
93+
log::error!(
94+
"Failed to connect to RuntimeFunctionDefinitionService: {}",
95+
err
96+
);
97+
return;
98+
}
99+
};
100+
101+
let request = RuntimeFunctionDefinitionUpdateRequest {
102+
runtime_functions: self.runtime_definitions.clone(),
103+
};
104+
105+
match client.update(request).await {
106+
Ok(response) => {
107+
log::info!(
108+
"Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
109+
response.into_inner().success
110+
);
111+
}
112+
Err(err) => {
113+
log::error!("Failed to update runtime function definitions: {}", err);
114+
}
115+
}
116+
}
117+
118+
async fn update_flow_types(&self) {
119+
if self.flow_types.is_empty() {
120+
return;
121+
}
122+
123+
log::info!("Updating the current FlowTypes!");
124+
let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
125+
Ok(client) => client,
126+
Err(err) => {
127+
log::error!("Failed to connect to FlowTypeService: {}", err);
128+
return;
129+
}
130+
};
131+
132+
let request = FlowTypeUpdateRequest {
133+
flow_types: self.flow_types.clone(),
134+
};
135+
136+
match client.update(request).await {
137+
Ok(response) => {
138+
log::info!(
139+
"Was the update of the FlowTypes accepted by Sagittarius? {}",
140+
response.into_inner().success
141+
);
142+
}
143+
Err(err) => {
144+
log::error!("Failed to update flow types: {}", err);
145+
}
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)