@@ -188,19 +188,19 @@ impl RabbitmqClient {
188188 }
189189
190190 // Function intended to get used by the runtime
191- pub async fn consume_message (
191+ pub async fn receive_messages (
192192 & self ,
193193 queue_name : & str ,
194- ack_on_success : bool ,
195- ) -> Result < Message , RabbitMqError > {
194+ handle_message : fn ( Message ) -> Result < Message , lapin :: Error > ,
195+ ) -> Result < ( ) , lapin :: Error > {
196196 let mut consumer = {
197197 let channel = self . channel . lock ( ) . await ;
198198
199199 let consumer_res = channel
200200 . basic_consume (
201201 queue_name,
202202 "consumer" ,
203- lapin :: options :: BasicConsumeOptions :: default ( ) ,
203+ BasicConsumeOptions :: default ( ) ,
204204 FieldTable :: default ( ) ,
205205 )
206206 . await ;
@@ -211,42 +211,67 @@ impl RabbitmqClient {
211211 }
212212 } ;
213213
214- debug ! ( "Starting to consume from {}" , queue_name) ;
214+ println ! ( "Starting to consume from {}" , queue_name) ;
215215
216- while let Some ( delivery_result ) = consumer. next ( ) . await {
217- let delivery = match delivery_result {
216+ while let Some ( delivery ) = consumer. next ( ) . await {
217+ let delivery = match delivery {
218218 Ok ( del) => del,
219- Err ( _) => return Err ( RabbitMqError :: DeserializationError ) ,
219+ Err ( err) => {
220+ println ! ( "Error receiving message: {}" , err) ;
221+ return Err ( err) ;
222+ }
220223 } ;
224+
221225 let data = & delivery. data ;
222226 let message_str = match std:: str:: from_utf8 ( & data) {
223- Ok ( str) => str,
224- Err ( _) => {
225- return Err ( RabbitMqError :: DeserializationError ) ;
227+ Ok ( str) => {
228+ println ! ( "Received message: {}" , str ) ;
229+ str
230+ }
231+ Err ( err) => {
232+ println ! ( "Error decoding message: {}" , err) ;
233+ return Ok ( ( ) ) ;
226234 }
227235 } ;
228-
229- debug ! ( "Received message: {}" , message_str) ;
230-
231236 // Parse the message
232237 let message = match serde_json:: from_str :: < Message > ( message_str) {
233- Ok ( m) => m,
234- Err ( e) => {
235- log:: error!( "Failed to parse message: {}" , e) ;
236- return Err ( RabbitMqError :: DeserializationError ) ;
238+ Ok ( mess) => {
239+ println ! ( "Parsed message with telegram_id: {}" , mess. message_id) ;
240+ mess
241+ }
242+ Err ( err) => {
243+ println ! ( "Error parsing message: {}" , err) ;
244+ return Ok ( ( ) ) ;
237245 }
238246 } ;
239247
240- if ack_on_success {
241- delivery
242- . ack ( lapin:: options:: BasicAckOptions :: default ( ) )
243- . await
244- . expect ( "Failed to acknowledge message" ) ;
248+ let message = match handle_message ( message) {
249+ Ok ( mess) => {
250+ println ! ( "Handled message with telegram_id: {}" , mess. message_id) ;
251+ mess
252+ }
253+ Err ( err) => {
254+ println ! ( "Error handling message: {}" , err) ;
255+ return Ok ( ( ) ) ;
256+ }
257+ } ;
258+
259+ let message_json = serde_json:: to_string ( & message) . unwrap ( ) ;
260+
261+ println ! ( "{}" , message_json) ;
262+
263+ {
264+ self . send_message ( message_json, "recieve_queue" ) . await ;
245265 }
246266
247- return Ok ( message) ;
267+ // Acknowledge the message
268+ delivery
269+ . ack ( BasicAckOptions :: default ( ) )
270+ . await
271+ . expect ( "Failed to acknowledge message" ) ;
248272 }
249- Err ( RabbitMqError :: DeserializationError )
273+
274+ Ok ( ( ) )
250275 }
251276
252277 // Receive messages from a queue with timeout
0 commit comments