Skip to content

Commit cf0b020

Browse files
committed
feat: wip new execution logic
1 parent dc122b0 commit cf0b020

File tree

2 files changed

+117
-128
lines changed

2 files changed

+117
-128
lines changed

src/executor/mod.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use crate::context::Context;
2+
use crate::context::signal::Signal;
3+
use crate::error::RuntimeError;
4+
use crate::registry::FunctionStore;
5+
use std::collections::HashMap;
6+
use tucana::shared::{NodeFunction, NodeParameter};
7+
8+
pub struct Executor<'a> {
9+
functions: &'a FunctionStore,
10+
nodes: HashMap<i64, NodeFunction>,
11+
context: Context,
12+
}
13+
14+
type HandleNodeParameterFn = fn(&mut Executor, node_parameter: &NodeParameter) -> Signal;
15+
16+
impl<'a> Executor<'a> {
17+
18+
pub fn new(functions: &'a FunctionStore, nodes: HashMap<i64, NodeFunction>, context: Context) -> Self {
19+
Executor {
20+
functions,
21+
nodes,
22+
context
23+
}
24+
}
25+
26+
27+
pub fn execute(&mut self, starting_node_id: i64) -> Signal {
28+
let mut current_node_id = starting_node_id;
29+
30+
loop {
31+
let node = match self.nodes.get(&current_node_id) {
32+
None => {
33+
return Signal::Failure(RuntimeError::simple_str(
34+
"NodeNotFound",
35+
"The node with the id was not found",
36+
));
37+
}
38+
Some(n) => n.clone(),
39+
};
40+
41+
let mut parameters = Vec::new();
42+
43+
for node_parameter in &node.parameters {
44+
match Executor::handle_node_parameter(self, node_parameter) {
45+
Signal::Success(value) | Signal::Return(value) => {
46+
parameters.push(value.clone())
47+
}
48+
Signal::Failure(error) => return Signal::Failure(error),
49+
Signal::Stop => return Signal::Stop,
50+
}
51+
}
52+
53+
let execution_result = match self.functions.get(node.runtime_function_id.as_str()) {
54+
Some(handler) => handler(&parameters, &mut self.context),
55+
None => {
56+
return Signal::Failure(RuntimeError::simple_str(
57+
"FunctionNotFound",
58+
"The function was not found",
59+
));
60+
}
61+
};
62+
63+
match execution_result {
64+
Signal::Success(value) => {
65+
if let Some(next_node_id) = node.next_node_id {
66+
current_node_id = next_node_id;
67+
continue;
68+
} else {
69+
return Signal::Success(value);
70+
}
71+
}
72+
Signal::Failure(runtime_error) => return Signal::Failure(runtime_error),
73+
Signal::Return(value) => return Signal::Return(value),
74+
Signal::Stop => return Signal::Stop,
75+
}
76+
}
77+
}
78+
79+
fn handle_node_parameter(&mut self, node_parameter: &NodeParameter) -> Signal {
80+
let node_value = match &node_parameter.value {
81+
Some(v) => v,
82+
None => {
83+
return Signal::Failure(RuntimeError::simple_str(
84+
"NodeValueNotFound",
85+
"An error occurred while executing a flow!",
86+
));
87+
}
88+
};
89+
90+
let value = match &node_value.value {
91+
Some(v) => v,
92+
None => return Signal::Failure(RuntimeError::simple_str("NodeValueNotFound", "An error occurred while executing a flow!"))
93+
};
94+
95+
match value {
96+
tucana::shared::node_value::Value::LiteralValue(value) => return Signal::Success(value.clone()),
97+
tucana::shared::node_value::Value::ReferenceValue(_reference_value) => todo!("implement reference values!"),
98+
tucana::shared::node_value::Value::NodeFunctionId(id) => {
99+
return Executor::execute(self, *id)
100+
},
101+
}
102+
}
103+
}
104+

src/main.rs

Lines changed: 13 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1,154 +1,39 @@
1-
mod configuration;
21
pub mod context;
32
pub mod error;
43
pub mod implementation;
54
pub mod registry;
5+
mod executor;
6+
mod config;
67

7-
use crate::configuration::Config;
8+
use crate::context::signal::Signal;
9+
use crate::executor::Executor;
810
use crate::implementation::collect;
911
use code0_flow::flow_config::load_env_file;
10-
use context::{Context, ContextEntry, ContextResult};
11-
use error::RuntimeError;
12+
use context::Context;
1213
use futures_lite::StreamExt;
1314
use log::error;
1415
use prost::Message;
1516
use registry::FunctionStore;
1617
use std::collections::HashMap;
1718
use tonic_health::pb::health_server::HealthServer;
1819
use tucana::shared::value::Kind;
19-
use tucana::shared::{ExecutionFlow, ListValue, NodeFunction, Value};
20-
21-
fn handle_node_function(
22-
function: NodeFunction,
23-
node_functions: &HashMap<i64, NodeFunction>,
24-
store: &FunctionStore,
25-
context: &mut Context,
26-
) -> Result<Value, RuntimeError> {
27-
let runtime_function = match store.get(function.runtime_function_id.as_str()) {
28-
Some(fc) => fc,
29-
None => todo!("Retrun if no funtion is present"),
30-
};
31-
32-
let mut parameter_collection: Vec<Value> = vec![];
33-
for parameter in function.parameters {
34-
if let Some(node_value) = parameter.value {
35-
if let Some(value) = node_value.value {
36-
match value {
37-
// Its just a normal value, directly a paramter
38-
tucana::shared::node_value::Value::LiteralValue(v) => {
39-
parameter_collection.push(v)
40-
}
41-
// Its a reference to an already executed function that returns value is the parameter of this function
42-
tucana::shared::node_value::Value::ReferenceValue(reference) => {
43-
let optional_value = context.get(&reference);
44-
45-
// Look if its even present
46-
let context_result = match optional_value {
47-
Some(context_result) => context_result,
48-
None => {
49-
todo!("Required function that holds the parameter wasnt executed")
50-
}
51-
};
52-
53-
// A reference is present. Look up the real value
54-
match context_result {
55-
// The reference is a exeuction result of a node
56-
ContextResult::NodeExecutionResult(node_result) => match node_result {
57-
Ok(value) => {
58-
parameter_collection.push(value.clone());
59-
}
60-
Err(err) => return Err(err),
61-
},
62-
// The reference is a parameter of a node
63-
ContextResult::ParameterResult(parameter_result) => {
64-
parameter_collection.push(parameter_result.clone());
65-
}
66-
}
67-
}
68-
69-
// Its another function, that result is a direct parameter to this function
70-
tucana::shared::node_value::Value::NodeFunctionId(another_node_function) => {
71-
// As this is another new indent, a new context will be opened
72-
73-
let function_result = match node_functions.get(&another_node_function) {
74-
Some(function_result) => {
75-
context.next_context();
76-
handle_node_function(function_result.clone(), node_functions, store, context )
77-
},
78-
None => {
79-
todo!("Handle node not found. This should normally not happen")
80-
}
81-
};
82-
83-
84-
let entry = ContextEntry::new(
85-
Result::Ok(function_result.clone()?),
86-
parameter_collection.clone(),
87-
);
88-
89-
context.write_to_current_context(entry);
90-
}
91-
}
92-
}
93-
}
94-
95-
let result = runtime_function(&parameter_collection, context);
96-
97-
// Result will be added to the current context
98-
let entry = ContextEntry::new(result.clone(), parameter_collection.clone());
99-
context.write_to_current_context(entry);
100-
101-
// Check if there is a next node, if not then this was the last one
102-
match function.next_node_id {
103-
Some(next_node_function_id) => {
104-
let node = match node_functions.get(&next_node_function_id) {
105-
Some(node) => node,
106-
None => todo!("Handle node not found. This should normally not happen"),
107-
};
108-
109-
// Increment the context node!
110-
context.next_node();
111-
112-
return handle_node_function(node.clone(), node_functions, store, context);
113-
}
114-
None => {
115-
if context.is_end() {
116-
return result;
117-
}
118-
119-
context.leave_context();
120-
}
121-
}
122-
}
123-
124-
Err(RuntimeError::default())
125-
}
20+
use tucana::shared::{ExecutionFlow, NodeFunction, Value};
21+
use crate::config::Config;
12622

12723
fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Option<Value> {
128-
let mut context = Context::new();
24+
let context = Context::new();
12925

13026
let node_functions: HashMap<i64, NodeFunction> = flow
13127
.node_functions
13228
.into_iter()
13329
.map(|node| return (node.database_id, node))
13430
.collect();
135-
136-
if let Some(node) = node_functions.get(&flow.starting_node_id) {
137-
return match handle_node_function(node.clone(), &node_functions, store, &mut context) {
138-
Ok(result) => {
139-
println!(
140-
"Execution completed successfully: The value is {:?}",
141-
result
142-
);
143-
Some(result)
144-
}
145-
Err(runtime_error) => {
146-
println!("Runtime Error: {:?}", runtime_error);
147-
None
148-
}
149-
};
31+
32+
let mut executor = Executor::new(store, node_functions, context);
33+
match executor.execute(flow.starting_node_id) {
34+
Signal::Success(v) => Some(v.clone()),
35+
_ => None
15036
}
151-
None
15237
}
15338

15439
#[tokio::main]

0 commit comments

Comments
 (0)