Skip to content

Commit fa843c2

Browse files
committed
feat: added flow store
1 parent 25ce1fc commit fa843c2

File tree

3 files changed

+155
-0
lines changed

3 files changed

+155
-0
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use redis::Client;
2+
3+
pub fn build_connection(redis_url: String) -> Client {
4+
Client::open(redis_url).unwrap_or_else(|err| {
5+
panic!("Cannot connect to redis instance! Reason: {err}")
6+
})
7+
}

code0-flow/src/flow_store/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod service;
2+
pub mod connection;
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
use async_trait::async_trait;
2+
use log::{debug, error};
3+
use redis::aio::MultiplexedConnection;
4+
use redis::{AsyncCommands, RedisError};
5+
use std::sync::Arc;
6+
use tokio::sync::Mutex;
7+
use tucana::sagittarius::{Flow, Flows};
8+
9+
pub type FlowStore = Arc<Mutex<MultiplexedConnection>>;
10+
11+
#[derive(Debug)]
12+
pub struct FlowStoreError {
13+
kind: FlowStoreErrorKind,
14+
flow_id: i64,
15+
reason: String,
16+
}
17+
18+
#[derive(Debug)]
19+
enum FlowStoreErrorKind {
20+
Serialization,
21+
RedisOperation,
22+
}
23+
24+
/// Trait representing a service for managing flows in a Redis.
25+
#[async_trait]
26+
pub trait FlowStoreService {
27+
async fn new(redis_client_arc: FlowStore) -> Self;
28+
async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError>;
29+
async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError>;
30+
async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError>;
31+
async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError>;
32+
async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError>;
33+
}
34+
35+
/// Struct representing a service for managing flows in a Redis.
36+
struct FlowServiceBase {
37+
pub(crate) redis_client_arc: FlowStore,
38+
}
39+
40+
/// Implementation of a service for managing flows in a Redis.
41+
#[async_trait]
42+
impl FlowStoreService for FlowServiceBase {
43+
async fn new(redis_client_arc: FlowStore) -> Self {
44+
Self { redis_client_arc }
45+
}
46+
47+
/// Insert a list of flows into Redis
48+
async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError> {
49+
let mut connection = self.redis_client_arc.lock().await;
50+
51+
let serialized_flow = match serde_json::to_string(&flow) {
52+
Ok(serialized_flow) => serialized_flow,
53+
Err(parse_error) => {
54+
error!("An Error occurred {}", parse_error);
55+
return Err(FlowStoreError {
56+
flow_id: flow.flow_id,
57+
kind: FlowStoreErrorKind::Serialization,
58+
reason: parse_error.to_string(),
59+
});
60+
}
61+
};
62+
63+
let parsed_flow = connection
64+
.set::<String, String, i64>(flow.flow_id.to_string(), serialized_flow)
65+
.await;
66+
67+
match parsed_flow {
68+
Ok(modified) => {
69+
debug!("Inserted flow");
70+
Ok(modified)
71+
}
72+
Err(redis_error) => {
73+
error!("An Error occurred {}", redis_error);
74+
Err(FlowStoreError {
75+
flow_id: flow.flow_id,
76+
kind: FlowStoreErrorKind::RedisOperation,
77+
reason: redis_error.to_string(),
78+
})
79+
}
80+
}
81+
}
82+
83+
/// Insert a flows into Redis
84+
async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError> {
85+
let mut total_modified = 0;
86+
87+
for flow in flows.flows {
88+
let result = self.insert_flow(flow).await?;
89+
total_modified += result;
90+
}
91+
92+
Ok(total_modified)
93+
}
94+
95+
/// Deletes a flow
96+
async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError> {
97+
let mut connection = self.redis_client_arc.lock().await;
98+
let deleted_flow = connection.del::<i64, i64>(flow_id).await;
99+
100+
match deleted_flow {
101+
Ok(changed_amount) => {
102+
debug!("{} flows where deleted", changed_amount);
103+
deleted_flow
104+
}
105+
Err(redis_error) => {
106+
error!("An Error occurred {}", redis_error);
107+
Err(redis_error)
108+
}
109+
}
110+
}
111+
112+
/// Deletes a list of flows
113+
async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError> {
114+
let mut total_modified = 0;
115+
116+
for id in flow_ids {
117+
let result = self.delete_flow(id).await?;
118+
total_modified += result;
119+
}
120+
121+
Ok(total_modified)
122+
}
123+
124+
/// Queries for all ids in the redis
125+
/// Returns `Result<Vec<i64>, RedisError>`: Result of the flow ids currently in Redis
126+
async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError> {
127+
let mut connection = self.redis_client_arc.lock().await;
128+
129+
let string_keys: Vec<String> = {
130+
match connection.keys("*").await {
131+
Ok(res) => res,
132+
Err(error) => {
133+
print!("Can't retrieve keys from redis. Reason: {error}");
134+
return Err(error);
135+
}
136+
}
137+
};
138+
139+
let int_keys: Vec<i64> = string_keys
140+
.into_iter()
141+
.filter_map(|key| key.parse::<i64>().ok())
142+
.collect();
143+
144+
Ok(int_keys)
145+
}
146+
}

0 commit comments

Comments
 (0)