@@ -26,6 +26,7 @@ pub trait FlowStoreServiceBase {
2626 async fn delete_flow ( & mut self , flow_id : i64 ) -> Result < i64 , RedisError > ;
2727 async fn delete_flows ( & mut self , flow_ids : Vec < i64 > ) -> Result < i64 , RedisError > ;
2828 async fn get_all_flow_ids ( & mut self ) -> Result < Vec < i64 > , RedisError > ;
29+ async fn query_flows ( & mut self , pattern : String ) -> Result < Flows , FlowStoreError > ;
2930}
3031
3132/// Struct representing a service for managing flows in a Redis.
@@ -122,6 +123,59 @@ impl FlowStoreServiceBase for FlowStoreService {
122123
123124 Ok ( int_keys)
124125 }
126+
127+ async fn query_flows ( & mut self , pattern : String ) -> Result < Flows , FlowStoreError > {
128+ let mut connection = self . redis_client_arc . lock ( ) . await ;
129+
130+ let keys: Vec < String > = {
131+ match connection. keys ( pattern) . await {
132+ Ok ( res) => res,
133+ Err ( error) => {
134+ error ! ( "Can't retrieve keys from redis. Reason: {error}" ) ;
135+ return Err ( FlowStoreError {
136+ kind : FlowStoreErrorKind :: RedisOperation ,
137+ flow_id : 0 ,
138+ reason : error. detail ( ) . unwrap ( ) . to_string ( ) ,
139+ } ) ;
140+ }
141+ }
142+ } ;
143+
144+ if keys. is_empty ( ) {
145+ return Ok ( Flows { flows : vec ! [ ] } ) ;
146+ }
147+
148+ match connection
149+ . json_get :: < Vec < String > , & str , Vec < String > > ( keys, "$" )
150+ . await
151+ {
152+ Ok ( json_values) => {
153+ let mut all_flows: Vec < Flow > = Vec :: new ( ) ;
154+
155+ for json_str in json_values {
156+ match serde_json:: from_str :: < Vec < Flow > > ( & json_str) {
157+ Ok ( mut flows) => all_flows. append ( & mut flows) ,
158+ Err ( error) => {
159+ return Err ( FlowStoreError {
160+ kind : FlowStoreErrorKind :: Serialization ,
161+ flow_id : 0 ,
162+ reason : error. to_string ( ) ,
163+ } ) ;
164+ }
165+ }
166+ }
167+
168+ return Ok ( Flows { flows : all_flows } ) ;
169+ }
170+ Err ( error) => {
171+ return Err ( FlowStoreError {
172+ kind : FlowStoreErrorKind :: RedisOperation ,
173+ flow_id : 0 ,
174+ reason : error. detail ( ) . unwrap_or ( "Unknown Redis error" ) . to_string ( ) ,
175+ } ) ;
176+ }
177+ }
178+ }
125179}
126180
127181#[ cfg( test) ]
@@ -210,45 +264,52 @@ mod tests {
210264 } )
211265 ) ;
212266
213- // Broke after switching to redis :( need fix
214- // redis_integration_test!(
215- // insert_will_overwrite_existing_flow,
216- // (|connection: FlowStore, mut service: FlowStoreService| async move {
217- // let flow = Flow {
218- // flow_id: 1,
219- // r#type: "".to_string(),
220- // settings: vec![],
221- // starting_node: None,
222- // };
223- //
224- // match service.insert_flow(flow.clone()).await {
225- // Ok(i) => println!("{}", i),
226- // Err(err) => println!("{}", err.reason),
227- // };
228- //
229- // let flow_overwrite = Flow {
230- // flow_id: 1,
231- // r#type: "ABC".to_string(),
232- // settings: vec![],
233- // starting_node: None,
234- // };
235- //
236- // let _ = service.insert_flow(flow_overwrite).await;
237- // let amount = service.get_all_flow_ids().await;
238- // assert_eq!(amount.unwrap().len(), 1);
239- //
240- // let redis_result: Vec<String> = {
241- // let mut redis_cmd = connection.lock().await;
242- // redis_cmd.json_get("1", "$").await.unwrap()
243- // };
244- //
245- // assert_eq!(redis_result.len(), 1);
246- // let string: &str = &*redis_result[0];
247- // let redis_flow: Flow = serde_json::from_str(string).unwrap();
248- // assert_eq!(redis_flow.r#type, "ABC".to_string());
249- // })
250- // );
251- //
267+ redis_integration_test ! (
268+ insert_will_overwrite_existing_flow,
269+ ( |connection: FlowStore , mut service: FlowStoreService | async move {
270+ let flow = Flow {
271+ flow_id: 1 ,
272+ r#type: "" . to_string( ) ,
273+ settings: vec![ ] ,
274+ data_types: vec![ ] ,
275+ input_type_identifier: None ,
276+ return_type_identifier: None ,
277+ project_id: 1 ,
278+ starting_node: None ,
279+ } ;
280+
281+ match service. insert_flow( flow. clone( ) ) . await {
282+ Ok ( i) => println!( "{}" , i) ,
283+ Err ( err) => println!( "{}" , err. reason) ,
284+ } ;
285+
286+ let flow_overwrite = Flow {
287+ flow_id: 1 ,
288+ r#type: "REST" . to_string( ) ,
289+ settings: vec![ ] ,
290+ data_types: vec![ ] ,
291+ input_type_identifier: None ,
292+ return_type_identifier: None ,
293+ project_id: 1 ,
294+ starting_node: None ,
295+ } ;
296+
297+ let _ = service. insert_flow( flow_overwrite) . await ;
298+ let amount = service. get_all_flow_ids( ) . await ;
299+ assert_eq!( amount. unwrap( ) . len( ) , 1 ) ;
300+
301+ let redis_result: Vec <String > = {
302+ let mut redis_cmd = connection. lock( ) . await ;
303+ redis_cmd. json_get( "1" , "$" ) . await . unwrap( )
304+ } ;
305+
306+ assert_eq!( redis_result. len( ) , 1 ) ;
307+ let string: & str = & * redis_result[ 0 ] ;
308+ let redis_flow: Vec <Flow > = serde_json:: from_str( string) . unwrap( ) ;
309+ assert_eq!( redis_flow[ 0 ] . r#type, "REST" . to_string( ) ) ;
310+ } )
311+ ) ;
312+
252313 redis_integration_test ! (
253314 insert_many_flows,
254315 ( |_connection: FlowStore , mut service: FlowStoreService | async move {
@@ -444,4 +505,122 @@ mod tests {
444505 assert_eq!( flow_ids. unwrap( ) , Vec :: <i64 >:: new( ) ) ;
445506 } )
446507 ) ;
508+
509+ redis_integration_test ! (
510+ query_empty_flow_store,
511+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
512+ let flows = service. query_flows( String :: from( "*" ) ) . await ;
513+ assert!( flows. is_ok( ) ) ;
514+ assert!( flows. unwrap( ) . flows. is_empty( ) ) ;
515+ } )
516+ ) ;
517+
518+ redis_integration_test ! (
519+ query_all_flows,
520+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
521+ let flow_one = Flow {
522+ flow_id: 1 ,
523+ r#type: "" . to_string( ) ,
524+ settings: vec![ ] ,
525+ starting_node: None ,
526+ data_types: vec![ ] ,
527+ input_type_identifier: None ,
528+ return_type_identifier: None ,
529+ project_id: 1 ,
530+ } ;
531+
532+ let flow_two = Flow {
533+ flow_id: 2 ,
534+ r#type: "" . to_string( ) ,
535+ settings: vec![ ] ,
536+ starting_node: None ,
537+ data_types: vec![ ] ,
538+ input_type_identifier: None ,
539+ return_type_identifier: None ,
540+ project_id: 1 ,
541+ } ;
542+
543+ let flow_three = Flow {
544+ flow_id: 3 ,
545+ r#type: "" . to_string( ) ,
546+ settings: vec![ ] ,
547+ starting_node: None ,
548+ data_types: vec![ ] ,
549+ input_type_identifier: None ,
550+ return_type_identifier: None ,
551+ project_id: 1 ,
552+ } ;
553+
554+ let flows = service. query_flows( String :: from( "*" ) ) . await ;
555+ assert!( flows. is_ok( ) ) ;
556+ assert!( flows. unwrap( ) . flows. is_empty( ) ) ;
557+
558+ let flow_vec = vec![ flow_one. clone( ) , flow_two. clone( ) , flow_three. clone( ) ] ;
559+ let flows = Flows { flows: flow_vec } ;
560+
561+ let amount = service. insert_flows( flows. clone( ) ) . await . unwrap( ) ;
562+ assert_eq!( amount, 3 ) ;
563+
564+ let query_flows = service. query_flows( String :: from( "*" ) ) . await ;
565+
566+ println!( "{:?}" , & query_flows) ;
567+
568+ assert!( query_flows. is_ok( ) ) ;
569+
570+ assert_eq!( flows. flows. len( ) , query_flows. unwrap( ) . flows. len( ) )
571+ } )
572+ ) ;
573+
574+ redis_integration_test ! (
575+ query_one_existing_flow,
576+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
577+ let flow_one = Flow {
578+ flow_id: 1 ,
579+ r#type: "" . to_string( ) ,
580+ settings: vec![ ] ,
581+ starting_node: None ,
582+ data_types: vec![ ] ,
583+ input_type_identifier: None ,
584+ return_type_identifier: None ,
585+ project_id: 1 ,
586+ } ;
587+
588+ let flow_two = Flow {
589+ flow_id: 2 ,
590+ r#type: "" . to_string( ) ,
591+ settings: vec![ ] ,
592+ starting_node: None ,
593+ data_types: vec![ ] ,
594+ input_type_identifier: None ,
595+ return_type_identifier: None ,
596+ project_id: 1 ,
597+ } ;
598+
599+ let flow_three = Flow {
600+ flow_id: 3 ,
601+ r#type: "" . to_string( ) ,
602+ settings: vec![ ] ,
603+ starting_node: None ,
604+ data_types: vec![ ] ,
605+ input_type_identifier: None ,
606+ return_type_identifier: None ,
607+ project_id: 1 ,
608+ } ;
609+
610+ let flows = service. query_flows( String :: from( "*" ) ) . await ;
611+ assert!( flows. is_ok( ) ) ;
612+ assert!( flows. unwrap( ) . flows. is_empty( ) ) ;
613+
614+ let flow_vec = vec![ flow_one. clone( ) , flow_two. clone( ) , flow_three. clone( ) ] ;
615+ let flows = Flows { flows: flow_vec } ;
616+
617+ let amount = service. insert_flows( flows. clone( ) ) . await . unwrap( ) ;
618+ assert_eq!( amount, 3 ) ;
619+
620+ let query_flows = service. query_flows( String :: from( "1" ) ) . await ;
621+
622+ assert!( query_flows. is_ok( ) ) ;
623+ assert_eq!( query_flows. unwrap( ) . flows, vec![ flow_one] )
624+ } )
625+ ) ;
447626}
0 commit comments