@@ -12,13 +12,15 @@ use error::RuntimeError;
1212use futures_lite:: StreamExt ;
1313use log:: error;
1414use prost:: Message ;
15- use tonic_health:: pb:: health_server:: HealthServer ;
1615use registry:: FunctionStore ;
16+ use std:: collections:: HashMap ;
17+ use tonic_health:: pb:: health_server:: HealthServer ;
1718use tucana:: shared:: value:: Kind ;
1819use tucana:: shared:: { ExecutionFlow , ListValue , NodeFunction , Value } ;
1920
2021fn handle_node_function (
2122 function : NodeFunction ,
23+ node_functions : & HashMap < i64 , NodeFunction > ,
2224 store : & FunctionStore ,
2325 context : & mut Context ,
2426) -> Result < Value , RuntimeError > {
@@ -65,46 +67,26 @@ fn handle_node_function(
6567 }
6668
6769 // Its another function, that result is a direct parameter to this function
68- tucana:: shared:: node_value:: Value :: NodeFunctions ( another_node_function) => {
70+ tucana:: shared:: node_value:: Value :: NodeFunctionId ( another_node_function) => {
6971 // As this is another new indent, a new context will be opened
70- context. next_context ( ) ;
71- let function_result: Vec < _ > = another_node_function
72- . functions
73- . into_iter ( )
74- . map ( |f| handle_node_function ( f, & store, context) )
75- . collect ( ) ;
76-
77- let mut collected = Vec :: new ( ) ;
78- for res in & function_result {
79- if let Ok ( v) = res {
80- collected. push ( v. clone ( ) ) ;
81- }
82- }
8372
84- let list = Value {
85- kind : Some ( Kind :: ListValue ( ListValue { values : collected } ) ) ,
73+ let function_result = match node_functions. get ( & another_node_function) {
74+ Some ( function_result) => {
75+ context. next_context ( ) ;
76+ handle_node_function ( function_result. clone ( ) , node_functions, store, context )
77+ } ,
78+ None => {
79+ todo ! ( "Handle node not found. This should normally not happen" )
80+ }
8681 } ;
8782
88- let is_faulty = function_result. iter ( ) . any ( |res| res. is_err ( ) ) ;
8983
9084 let entry = ContextEntry :: new (
91- Result :: Ok ( list . clone ( ) ) ,
85+ Result :: Ok ( function_result . clone ( ) ? ) ,
9286 parameter_collection. clone ( ) ,
9387 ) ;
9488
9589 context. write_to_current_context ( entry) ;
96-
97- match !is_faulty {
98- true => {
99- // Add the value back to the main parameter
100- parameter_collection. push ( list. clone ( ) ) ;
101- }
102- false => {
103- todo ! (
104- "Reqired function that holds the paramter failed in execution"
105- )
106- }
107- }
10890 }
10991 }
11092 }
@@ -117,14 +99,17 @@ fn handle_node_function(
11799 context. write_to_current_context ( entry) ;
118100
119101 // Check if there is a next node, if not then this was the last one
120- match function. next_node {
121- Some ( ref next_node_function) => {
122- let next = ( * * next_node_function) . clone ( ) ;
102+ match function. next_node_id {
103+ Some ( next_node_function_id) => {
104+ let node = match node_functions. get ( & next_node_function_id) {
105+ Some ( node) => node,
106+ None => todo ! ( "Handle node not found. This should normally not happen" ) ,
107+ } ;
123108
124109 // Increment the context node!
125110 context. next_node ( ) ;
126111
127- return handle_node_function ( next , store, context) ;
112+ return handle_node_function ( node . clone ( ) , node_functions , store, context) ;
128113 }
129114 None => {
130115 if context. is_end ( ) {
@@ -142,22 +127,28 @@ fn handle_node_function(
142127fn handle_message ( flow : ExecutionFlow , store : & FunctionStore ) -> Option < Value > {
143128 let mut context = Context :: new ( ) ;
144129
145- if let Some ( node) = flow. starting_node {
146- match handle_node_function ( node, store, & mut context) {
130+ let node_functions: HashMap < i64 , NodeFunction > = flow
131+ . node_functions
132+ . into_iter ( )
133+ . map ( |node| return ( node. database_id , node) )
134+ . collect ( ) ;
135+
136+ if let Some ( node) = node_functions. get ( & flow. starting_node_id ) {
137+ return match handle_node_function ( node. clone ( ) , & node_functions, store, & mut context) {
147138 Ok ( result) => {
148139 println ! (
149140 "Execution completed successfully: The value is {:?}" ,
150141 result
151142 ) ;
152- return Some ( result) ;
143+ Some ( result)
153144 }
154145 Err ( runtime_error) => {
155146 println ! ( "Runtime Error: {:?}" , runtime_error) ;
156- return None ;
147+ None
157148 }
158- }
159- } ;
160- return None ;
149+ } ;
150+ }
151+ None
161152}
162153
163154#[ tokio:: main]
@@ -180,8 +171,7 @@ async fn main() {
180171 } ;
181172
182173 if config. with_health_service {
183- let health_service =
184- code0_flow:: flow_health:: HealthService :: new ( config. nats_url . clone ( ) ) ;
174+ let health_service = code0_flow:: flow_health:: HealthService :: new ( config. nats_url . clone ( ) ) ;
185175 let address = match format ! ( "{}:{}" , config. grpc_host, config. grpc_port) . parse ( ) {
186176 Ok ( address) => address,
187177 Err ( err) => {
0 commit comments