@@ -2,9 +2,9 @@ use std::{sync::Arc, time::Duration};
22
33use futures_lite:: StreamExt ;
44use lapin:: {
5+ Channel ,
56 options:: { BasicConsumeOptions , QueueDeclareOptions } ,
67 types:: FieldTable ,
7- Channel ,
88} ;
99use log:: debug;
1010use serde:: { Deserialize , Serialize } ;
@@ -44,6 +44,7 @@ pub enum RabbitMqError {
4444 ConnectionError ( String ) ,
4545 TimeoutError ,
4646 DeserializationError ,
47+ SerializationError ,
4748}
4849
4950impl From < lapin:: Error > for RabbitMqError {
@@ -65,6 +66,7 @@ impl std::fmt::Display for RabbitMqError {
6566 RabbitMqError :: ConnectionError ( msg) => write ! ( f, "Connection error: {}" , msg) ,
6667 RabbitMqError :: TimeoutError => write ! ( f, "Operation timed out" ) ,
6768 RabbitMqError :: DeserializationError => write ! ( f, "Failed to deserialize message" ) ,
69+ RabbitMqError :: SerializationError => write ! ( f, "Failed to serialize message" ) ,
6870 }
6971 }
7072}
@@ -83,8 +85,10 @@ impl RabbitmqClient {
8385 )
8486 . await
8587 {
86- Ok ( _) => ( ) ,
87- Err ( err) => log:: error!( "Failed to declare send_queue: {}" , err) ,
88+ Ok ( _) => {
89+ log:: info!( "Successfully declared send_queue" ) ;
90+ }
91+ Err ( err) => log:: error!( "Failed to declare send_queue: {:?}" , err) ,
8892 }
8993
9094 match channel
@@ -95,8 +99,10 @@ impl RabbitmqClient {
9599 )
96100 . await
97101 {
98- Ok ( _) => ( ) ,
99- Err ( err) => log:: error!( "Failed to declare recieve_queue: {}" , err) ,
102+ Ok ( _) => {
103+ log:: info!( "Successfully declared recieve_queue" ) ;
104+ }
105+ Err ( err) => log:: error!( "Failed to declare recieve_queue: {:?}" , err) ,
100106 }
101107
102108 RabbitmqClient {
@@ -109,23 +115,27 @@ impl RabbitmqClient {
109115 & self ,
110116 message_json : String ,
111117 queue_name : & str ,
112- ) -> Result < ( ) , lapin :: Error > {
118+ ) -> Result < ( ) , RabbitMqError > {
113119 let channel = self . channel . lock ( ) . await ;
114120
115- channel
121+ match channel
116122 . basic_publish (
117123 "" , // exchange
118124 queue_name, // routing key (queue name)
119125 lapin:: options:: BasicPublishOptions :: default ( ) ,
120126 message_json. as_bytes ( ) ,
121127 lapin:: BasicProperties :: default ( ) ,
122128 )
123- . await ?;
124-
125- Ok ( ( ) )
129+ . await
130+ {
131+ Err ( err) => {
132+ log:: error!( "Failed to publish message: {:?}" , err) ;
133+ Err ( RabbitMqError :: LapinError ( err) )
134+ }
135+ Ok ( _) => Ok ( ( ) ) ,
136+ }
126137 }
127138
128- // Receive messages from a queue
129139 // Receive messages from a queue with no timeout
130140 pub async fn await_message_no_timeout (
131141 & self ,
@@ -146,8 +156,14 @@ impl RabbitmqClient {
146156 . await ;
147157
148158 match consumer_res {
149- Ok ( consumer) => consumer,
150- Err ( err) => panic ! ( "{}" , err) ,
159+ Ok ( consumer) => {
160+ log:: info!( "Established queue connection to {}" , queue_name) ;
161+ consumer
162+ }
163+ Err ( err) => {
164+ log:: error!( "Cannot create consumer for queue {}: {:?}" , queue_name, err) ;
165+ return Err ( RabbitMqError :: LapinError ( err) ) ;
166+ }
151167 }
152168 } ;
153169
@@ -172,17 +188,19 @@ impl RabbitmqClient {
172188 let message = match serde_json:: from_str :: < Message > ( message_str) {
173189 Ok ( m) => m,
174190 Err ( e) => {
175- log:: error!( "Failed to parse message: {}" , e) ;
191+ log:: error!( "Failed to parse message: {:? }" , e) ;
176192 return Err ( RabbitMqError :: DeserializationError ) ;
177193 }
178194 } ;
179195
180196 if message. message_id == message_id {
181197 if ack_on_success {
182- delivery
198+ if let Err ( delivery_error ) = delivery
183199 . ack ( lapin:: options:: BasicAckOptions :: default ( ) )
184200 . await
185- . expect ( "Failed to acknowledge message" ) ;
201+ {
202+ log:: error!( "Failed to acknowledge message: {:?}" , delivery_error) ;
203+ }
186204 }
187205
188206 return Ok ( message) ;
@@ -196,7 +214,7 @@ impl RabbitmqClient {
196214 & self ,
197215 queue_name : & str ,
198216 handle_message : fn ( Message ) -> Result < Message , lapin:: Error > ,
199- ) -> Result < ( ) , lapin :: Error > {
217+ ) -> Result < ( ) , RabbitMqError > {
200218 let mut consumer = {
201219 let channel = self . channel . lock ( ) . await ;
202220
@@ -210,8 +228,14 @@ impl RabbitmqClient {
210228 . await ;
211229
212230 match consumer_res {
213- Ok ( consumer) => consumer,
214- Err ( err) => panic ! ( "Cannot consume messages: {}" , err) ,
231+ Ok ( consumer) => {
232+ log:: info!( "Established queue connection to {}" , queue_name) ;
233+ consumer
234+ }
235+ Err ( err) => {
236+ log:: error!( "Cannot create consumer for queue {}: {:?}" , queue_name, err) ;
237+ return Err ( RabbitMqError :: LapinError ( err) ) ;
238+ }
215239 }
216240 } ;
217241
@@ -221,8 +245,8 @@ impl RabbitmqClient {
221245 let delivery = match delivery {
222246 Ok ( del) => del,
223247 Err ( err) => {
224- log:: error!( "Error receiving message: {}" , err) ;
225- return Err ( err) ;
248+ log:: error!( "Error receiving message: {:? }" , err) ;
249+ return Err ( RabbitMqError :: LapinError ( err) ) ;
226250 }
227251 } ;
228252
@@ -233,32 +257,32 @@ impl RabbitmqClient {
233257 str
234258 }
235259 Err ( err) => {
236- log:: error!( "Error decoding message: {}" , err) ;
237- return Ok ( ( ) ) ;
260+ log:: error!( "Error decoding message: {:? }" , err) ;
261+ return Err ( RabbitMqError :: DeserializationError ) ;
238262 }
239263 } ;
240264 // Parse the message
241265 let inc_message = match serde_json:: from_str :: < Message > ( message_str) {
242266 Ok ( mess) => mess,
243267 Err ( err) => {
244- log:: error!( "Error parsing message: {}" , err) ;
245- return Ok ( ( ) ) ;
268+ log:: error!( "Error parsing message: {:? }" , err) ;
269+ return Err ( RabbitMqError :: DeserializationError ) ;
246270 }
247271 } ;
248272
249273 let message = match handle_message ( inc_message) {
250274 Ok ( mess) => mess,
251275 Err ( err) => {
252- log:: error!( "Error handling message: {}" , err) ;
253- return Ok ( ( ) ) ;
276+ log:: error!( "Error handling message: {:? }" , err) ;
277+ return Err ( RabbitMqError :: DeserializationError ) ;
254278 }
255279 } ;
256280
257281 let message_json = match serde_json:: to_string ( & message) {
258282 Ok ( json) => json,
259283 Err ( err) => {
260- log:: error!( "Error serializing message: {}" , err) ;
261- return Ok ( ( ) ) ;
284+ log:: error!( "Error serializing message: {:? }" , err) ;
285+ return Err ( RabbitMqError :: SerializationError ) ;
262286 }
263287 } ;
264288
@@ -267,10 +291,12 @@ impl RabbitmqClient {
267291 }
268292
269293 // Acknowledge the message
270- delivery
294+ if let Err ( delivery_error ) = delivery
271295 . ack ( lapin:: options:: BasicAckOptions :: default ( ) )
272296 . await
273- . expect ( "Failed to acknowledge message" ) ;
297+ {
298+ log:: error!( "Failed to acknowledge message: {:?}" , delivery_error) ;
299+ }
274300 }
275301
276302 Ok ( ( ) )
0 commit comments