1+ use super :: flow_identifier;
12use crate :: flow_store:: connection:: FlowStore ;
23use async_trait:: async_trait;
34use log:: error;
@@ -15,6 +16,7 @@ pub struct FlowStoreError {
1516pub enum FlowStoreErrorKind {
1617 Serialization ,
1718 RedisOperation ,
19+ NoIdentifier ,
1820}
1921
2022/// Trait representing a service for managing flows in a Redis.
@@ -46,9 +48,18 @@ impl FlowStoreServiceBase for FlowStoreService {
4648 async fn insert_flow ( & mut self , flow : Flow ) -> Result < i64 , FlowStoreError > {
4749 let mut connection = self . redis_client_arc . lock ( ) . await ;
4850
49- let insert_result: RedisResult < ( ) > = connection
50- . json_set ( flow. flow_id . to_string ( ) , "$" , & flow)
51- . await ;
51+ let identifier = match flow_identifier:: get_flow_identifier ( & flow) {
52+ Some ( id) => id,
53+ None => {
54+ return Err ( FlowStoreError {
55+ kind : FlowStoreErrorKind :: NoIdentifier ,
56+ flow_id : flow. flow_id ,
57+ reason : String :: from ( "Identifier can't be determent!" ) ,
58+ } ) ;
59+ }
60+ } ;
61+
62+ let insert_result: RedisResult < ( ) > = connection. json_set ( identifier, "$" , & flow) . await ;
5263
5364 match insert_result {
5465 Err ( redis_error) => {
@@ -78,7 +89,10 @@ impl FlowStoreServiceBase for FlowStoreService {
7889 /// Deletes a flow
7990 async fn delete_flow ( & mut self , flow_id : i64 ) -> Result < i64 , RedisError > {
8091 let mut connection = self . redis_client_arc . lock ( ) . await ;
81- let deleted_flow: RedisResult < i64 > = connection. json_del ( flow_id, "." ) . await ;
92+
93+ let identifier = format ! ( "{}::*" , flow_id) ;
94+ let keys: Vec < String > = connection. keys ( & identifier) . await ?;
95+ let deleted_flow: RedisResult < i64 > = connection. json_del ( keys, "." ) . await ;
8296
8397 match deleted_flow {
8498 Ok ( int) => Ok ( int) ,
@@ -116,7 +130,18 @@ impl FlowStoreServiceBase for FlowStoreService {
116130 }
117131 } ;
118132
119- let int_keys: Vec < i64 > = string_keys
133+ let mut real_keys: Vec < String > = vec ! [ ] ;
134+
135+ for key in string_keys {
136+ if key. contains ( "::" ) {
137+ let number = key. splitn ( 2 , "::" ) . next ( ) ;
138+ if let Some ( real_number) = number {
139+ real_keys. push ( String :: from ( real_number) ) ;
140+ }
141+ }
142+ }
143+
144+ let int_keys: Vec < i64 > = real_keys
120145 . into_iter ( )
121146 . filter_map ( |key| key. parse :: < i64 > ( ) . ok ( ) )
122147 . collect ( ) ;
@@ -180,6 +205,8 @@ impl FlowStoreServiceBase for FlowStoreService {
180205
181206#[ cfg( test) ]
182207mod tests {
208+ use std:: collections:: HashMap ;
209+
183210 use crate :: flow_store:: connection:: create_flow_store_connection;
184211 use crate :: flow_store:: connection:: FlowStore ;
185212 use crate :: flow_store:: service:: FlowStoreService ;
@@ -190,8 +217,50 @@ mod tests {
190217 use testcontainers:: core:: WaitFor ;
191218 use testcontainers:: runners:: AsyncRunner ;
192219 use testcontainers:: GenericImage ;
220+ use tucana:: shared:: FlowSetting ;
221+ use tucana:: shared:: FlowSettingDefinition ;
222+ use tucana:: shared:: Struct ;
193223 use tucana:: shared:: { Flow , Flows } ;
194224
225+ fn get_string_value ( value : & str ) -> tucana:: shared:: Value {
226+ tucana:: shared:: Value {
227+ kind : Some ( tucana:: shared:: value:: Kind :: StringValue ( String :: from (
228+ value,
229+ ) ) ) ,
230+ }
231+ }
232+
233+ fn get_settings ( ) -> Vec < FlowSetting > {
234+ vec ! [
235+ FlowSetting {
236+ definition: Some ( FlowSettingDefinition {
237+ id: String :: from( "1424525" ) ,
238+ key: String :: from( "HTTP_HOST" ) ,
239+ } ) ,
240+ object: Some ( Struct {
241+ fields: {
242+ let mut map = HashMap :: new( ) ;
243+ map. insert( String :: from( "host" ) , get_string_value( "abc.code0.tech" ) ) ;
244+ map
245+ } ,
246+ } ) ,
247+ } ,
248+ FlowSetting {
249+ definition: Some ( FlowSettingDefinition {
250+ id: String :: from( "14245252352" ) ,
251+ key: String :: from( "HTTP_METHOD" ) ,
252+ } ) ,
253+ object: Some ( Struct {
254+ fields: {
255+ let mut map = HashMap :: new( ) ;
256+ map. insert( String :: from( "method" ) , get_string_value( "GET" ) ) ;
257+ map
258+ } ,
259+ } ) ,
260+ } ,
261+ ]
262+ }
263+
195264 macro_rules! redis_integration_test {
196265 ( $test_name: ident, $consumer: expr) => {
197266 #[ tokio:: test]
@@ -237,8 +306,8 @@ mod tests {
237306 ( |connection: FlowStore , mut service: FlowStoreService | async move {
238307 let flow = Flow {
239308 flow_id: 1 ,
240- r#type: "" . to_string( ) ,
241- settings: vec! [ ] ,
309+ r#type: "REST " . to_string( ) ,
310+ settings: get_settings ( ) ,
242311 starting_node: None ,
243312 data_types: vec![ ] ,
244313 input_type_identifier: None ,
@@ -253,7 +322,10 @@ mod tests {
253322
254323 let redis_result: Option <String > = {
255324 let mut redis_cmd = connection. lock( ) . await ;
256- redis_cmd. json_get( "1" , "$" ) . await . unwrap( )
325+ redis_cmd
326+ . json_get( "1::1::abc.code0.tech::GET" , "$" )
327+ . await
328+ . unwrap( )
257329 } ;
258330
259331 println!( "{}" , redis_result. clone( ) . unwrap( ) ) ;
@@ -264,13 +336,31 @@ mod tests {
264336 } )
265337 ) ;
266338
339+ redis_integration_test ! (
340+ insert_one_flow_fails_no_identifier,
341+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
342+ let flow = Flow {
343+ flow_id: 1 ,
344+ r#type: "" . to_string( ) ,
345+ settings: get_settings( ) ,
346+ starting_node: None ,
347+ data_types: vec![ ] ,
348+ input_type_identifier: None ,
349+ return_type_identifier: None ,
350+ project_id: 1 ,
351+ } ;
352+
353+ assert!( !service. insert_flow( flow. clone( ) ) . await . is_ok( ) ) ;
354+ } )
355+ ) ;
356+
267357 redis_integration_test ! (
268358 insert_will_overwrite_existing_flow,
269359 ( |connection: FlowStore , mut service: FlowStoreService | async move {
270360 let flow = Flow {
271361 flow_id: 1 ,
272- r#type: "" . to_string( ) ,
273- settings: vec! [ ] ,
362+ r#type: "REST " . to_string( ) ,
363+ settings: get_settings ( ) ,
274364 data_types: vec![ ] ,
275365 input_type_identifier: None ,
276366 return_type_identifier: None ,
@@ -286,9 +376,9 @@ mod tests {
286376 let flow_overwrite = Flow {
287377 flow_id: 1 ,
288378 r#type: "REST" . to_string( ) ,
289- settings: vec! [ ] ,
379+ settings: get_settings ( ) ,
290380 data_types: vec![ ] ,
291- input_type_identifier: None ,
381+ input_type_identifier: Some ( String :: from ( "ABC" ) ) ,
292382 return_type_identifier: None ,
293383 project_id: 1 ,
294384 starting_node: None ,
@@ -300,13 +390,16 @@ mod tests {
300390
301391 let redis_result: Vec <String > = {
302392 let mut redis_cmd = connection. lock( ) . await ;
303- redis_cmd. json_get( "1" , "$" ) . await . unwrap( )
393+ redis_cmd
394+ . json_get( "1::1::abc.code0.tech::GET" , "$" )
395+ . await
396+ . unwrap( )
304397 } ;
305398
306399 assert_eq!( redis_result. len( ) , 1 ) ;
307400 let string: & str = & * redis_result[ 0 ] ;
308401 let redis_flow: Vec <Flow > = serde_json:: from_str( string) . unwrap( ) ;
309- assert_eq !( redis_flow[ 0 ] . r#type , "REST" . to_string ( ) ) ;
402+ assert !( redis_flow[ 0 ] . r#input_type_identifier . is_some ( ) ) ;
310403 } )
311404 ) ;
312405
@@ -315,8 +408,8 @@ mod tests {
315408 ( |_connection: FlowStore , mut service: FlowStoreService | async move {
316409 let flow_one = Flow {
317410 flow_id: 1 ,
318- r#type: "" . to_string( ) ,
319- settings: vec! [ ] ,
411+ r#type: "REST " . to_string( ) ,
412+ settings: get_settings ( ) ,
320413 data_types: vec![ ] ,
321414 input_type_identifier: None ,
322415 return_type_identifier: None ,
@@ -326,8 +419,8 @@ mod tests {
326419
327420 let flow_two = Flow {
328421 flow_id: 2 ,
329- r#type: "" . to_string( ) ,
330- settings: vec! [ ] ,
422+ r#type: "REST " . to_string( ) ,
423+ settings: get_settings ( ) ,
331424 data_types: vec![ ] ,
332425 input_type_identifier: None ,
333426 return_type_identifier: None ,
@@ -337,8 +430,8 @@ mod tests {
337430
338431 let flow_three = Flow {
339432 flow_id: 3 ,
340- r#type: "" . to_string( ) ,
341- settings: vec! [ ] ,
433+ r#type: "REST " . to_string( ) ,
434+ settings: get_settings ( ) ,
342435 starting_node: None ,
343436 data_types: vec![ ] ,
344437 input_type_identifier: None ,
@@ -359,8 +452,8 @@ mod tests {
359452 ( |connection: FlowStore , mut service: FlowStoreService | async move {
360453 let flow = Flow {
361454 flow_id: 1 ,
362- r#type: "" . to_string( ) ,
363- settings: vec! [ ] ,
455+ r#type: "REST " . to_string( ) ,
456+ settings: get_settings ( ) ,
364457 starting_node: None ,
365458 data_types: vec![ ] ,
366459 input_type_identifier: None ,
@@ -399,8 +492,8 @@ mod tests {
399492 ( |_connection: FlowStore , mut service: FlowStoreService | async move {
400493 let flow_one = Flow {
401494 flow_id: 1 ,
402- r#type: "" . to_string( ) ,
403- settings: vec! [ ] ,
495+ r#type: "REST " . to_string( ) ,
496+ settings: get_settings ( ) ,
404497 starting_node: None ,
405498 data_types: vec![ ] ,
406499 input_type_identifier: None ,
@@ -410,8 +503,8 @@ mod tests {
410503
411504 let flow_two = Flow {
412505 flow_id: 2 ,
413- r#type: "" . to_string( ) ,
414- settings: vec! [ ] ,
506+ r#type: "REST " . to_string( ) ,
507+ settings: get_settings ( ) ,
415508 starting_node: None ,
416509 data_types: vec![ ] ,
417510 input_type_identifier: None ,
@@ -421,8 +514,8 @@ mod tests {
421514
422515 let flow_three = Flow {
423516 flow_id: 3 ,
424- r#type: "" . to_string( ) ,
425- settings: vec! [ ] ,
517+ r#type: "REST " . to_string( ) ,
518+ settings: get_settings ( ) ,
426519 starting_node: None ,
427520 data_types: vec![ ] ,
428521 input_type_identifier: None ,
@@ -454,8 +547,8 @@ mod tests {
454547 ( |_connection: FlowStore , mut service: FlowStoreService | async move {
455548 let flow_one = Flow {
456549 flow_id: 1 ,
457- r#type: "" . to_string( ) ,
458- settings: vec! [ ] ,
550+ r#type: "REST " . to_string( ) ,
551+ settings: get_settings ( ) ,
459552 starting_node: None ,
460553 data_types: vec![ ] ,
461554 input_type_identifier: None ,
@@ -465,8 +558,8 @@ mod tests {
465558
466559 let flow_two = Flow {
467560 flow_id: 2 ,
468- r#type: "" . to_string( ) ,
469- settings: vec! [ ] ,
561+ r#type: "REST " . to_string( ) ,
562+ settings: get_settings ( ) ,
470563 starting_node: None ,
471564 data_types: vec![ ] ,
472565 input_type_identifier: None ,
@@ -476,8 +569,8 @@ mod tests {
476569
477570 let flow_three = Flow {
478571 flow_id: 3 ,
479- r#type: "" . to_string( ) ,
480- settings: vec! [ ] ,
572+ r#type: "REST " . to_string( ) ,
573+ settings: get_settings ( ) ,
481574 starting_node: None ,
482575 data_types: vec![ ] ,
483576 input_type_identifier: None ,
@@ -520,8 +613,8 @@ mod tests {
520613 ( |_connection: FlowStore , mut service: FlowStoreService | async move {
521614 let flow_one = Flow {
522615 flow_id: 1 ,
523- r#type: "" . to_string( ) ,
524- settings: vec! [ ] ,
616+ r#type: "REST " . to_string( ) ,
617+ settings: get_settings ( ) ,
525618 starting_node: None ,
526619 data_types: vec![ ] ,
527620 input_type_identifier: None ,
@@ -531,8 +624,8 @@ mod tests {
531624
532625 let flow_two = Flow {
533626 flow_id: 2 ,
534- r#type: "" . to_string( ) ,
535- settings: vec! [ ] ,
627+ r#type: "REST " . to_string( ) ,
628+ settings: get_settings ( ) ,
536629 starting_node: None ,
537630 data_types: vec![ ] ,
538631 input_type_identifier: None ,
@@ -542,8 +635,8 @@ mod tests {
542635
543636 let flow_three = Flow {
544637 flow_id: 3 ,
545- r#type: "" . to_string( ) ,
546- settings: vec! [ ] ,
638+ r#type: "REST " . to_string( ) ,
639+ settings: get_settings ( ) ,
547640 starting_node: None ,
548641 data_types: vec![ ] ,
549642 input_type_identifier: None ,
@@ -576,8 +669,8 @@ mod tests {
576669 ( |_connection: FlowStore , mut service: FlowStoreService | async move {
577670 let flow_one = Flow {
578671 flow_id: 1 ,
579- r#type: "" . to_string( ) ,
580- settings: vec! [ ] ,
672+ r#type: "REST " . to_string( ) ,
673+ settings: get_settings ( ) ,
581674 starting_node: None ,
582675 data_types: vec![ ] ,
583676 input_type_identifier: None ,
@@ -587,8 +680,8 @@ mod tests {
587680
588681 let flow_two = Flow {
589682 flow_id: 2 ,
590- r#type: "" . to_string( ) ,
591- settings: vec! [ ] ,
683+ r#type: "REST " . to_string( ) ,
684+ settings: get_settings ( ) ,
592685 starting_node: None ,
593686 data_types: vec![ ] ,
594687 input_type_identifier: None ,
@@ -598,8 +691,8 @@ mod tests {
598691
599692 let flow_three = Flow {
600693 flow_id: 3 ,
601- r#type: "" . to_string( ) ,
602- settings: vec! [ ] ,
694+ r#type: "REST " . to_string( ) ,
695+ settings: get_settings ( ) ,
603696 starting_node: None ,
604697 data_types: vec![ ] ,
605698 input_type_identifier: None ,
@@ -617,7 +710,7 @@ mod tests {
617710 let amount = service. insert_flows( flows. clone( ) ) . await . unwrap( ) ;
618711 assert_eq!( amount, 3 ) ;
619712
620- let query_flows = service. query_flows( String :: from( "1" ) ) . await ;
713+ let query_flows = service. query_flows( String :: from( "1::* " ) ) . await ;
621714
622715 assert!( query_flows. is_ok( ) ) ;
623716 assert_eq!( query_flows. unwrap( ) . flows, vec![ flow_one] )
0 commit comments