Skip to content

Commit 25ce1fc

Browse files
committed
feat: added flow queue
1 parent 7aabdd3 commit 25ce1fc

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use rabbitmq_stream_client::Environment;
2+
use std::sync::Arc;
3+
use tokio::sync::Mutex;
4+
5+
pub type FlowQueue = Arc<Mutex<Box<Environment>>>;
6+
7+
pub struct RedisConfiguration {
8+
host: String,
9+
port: u16,
10+
username: String,
11+
password: String,
12+
}
13+
14+
impl RedisConfiguration {
15+
pub fn new(host: String, port: u16, username: String, password: String) -> Self {
16+
Self {
17+
host,
18+
port,
19+
username,
20+
password,
21+
}
22+
}
23+
}
24+
25+
pub async fn init_rabbitmq(redis_configuration: RedisConfiguration) -> FlowQueue {
26+
Arc::new(Mutex::new(Box::new(connect(redis_configuration).await)))
27+
}
28+
29+
async fn connect(redis_configuration: RedisConfiguration) -> Environment {
30+
match Environment::builder()
31+
.host(&*redis_configuration.host)
32+
.port(redis_configuration.port)
33+
.username(&*redis_configuration.username)
34+
.password(&*redis_configuration.password)
35+
.build()
36+
.await
37+
{
38+
Ok(env) => env,
39+
Err(error) => panic!("Cannot connect to redis instance! Reason: {:?}", error),
40+
}
41+
}

code0-flow/src/flow_queue/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod connection;
2+
pub mod service;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use std::ops::Add;
2+
3+
/// # Queue Prefix
4+
/// - Every incoming message will have the `Send` prefix
5+
/// - Every processed message (answer) from taurus will have the `Receive` prefix
6+
pub enum QueuePrefix {
7+
Send,
8+
Receive,
9+
}
10+
11+
/// Supported Protocols
12+
pub enum QueueProtocol {
13+
Rest,
14+
WebSocket,
15+
}
16+
17+
/// Implementation to turn a protocol into a str
18+
impl QueueProtocol {
19+
20+
/// Function to turn a protocol into a str
21+
///
22+
/// # Example:
23+
/// ```
24+
/// use code0_flow::flow_queue::service::QueueProtocol;
25+
/// let proto_str = QueueProtocol::Rest.as_str().to_string();
26+
/// let result = "REST".to_string();
27+
///
28+
/// assert_eq!(result, proto_str);
29+
/// ```
30+
pub fn as_str(&self) -> &'static str {
31+
match self {
32+
QueueProtocol::Rest => "REST",
33+
QueueProtocol::WebSocket => "WS",
34+
}
35+
}
36+
}
37+
38+
/// Implementation to add a prefix and a protocol to a queue name
39+
impl Add<QueueProtocol> for QueuePrefix {
40+
type Output = String;
41+
42+
/// Function to add a prefix and a protocol to a queue name
43+
///
44+
/// # Example:
45+
/// ```
46+
/// use code0_flow::flow_queue::service::{QueuePrefix, QueueProtocol};
47+
/// let send_rest_queue_name = QueuePrefix::Send + QueueProtocol::Rest;
48+
/// let result = "S_REST".to_string();
49+
///
50+
/// assert_eq!(result, send_rest_queue_name);
51+
/// ```
52+
fn add(self, rhs: QueueProtocol) -> Self::Output {
53+
match self {
54+
QueuePrefix::Send => "S_".to_string() + rhs.as_str(),
55+
QueuePrefix::Receive => "R_".to_string() + rhs.as_str(),
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)