@@ -187,6 +187,68 @@ impl RabbitmqClient {
187187 Err ( RabbitMqError :: DeserializationError )
188188 }
189189
190+ // Function intended to get used by the runtime
191+ pub async fn consume_message (
192+ & self ,
193+ queue_name : & str ,
194+ ack_on_success : bool ,
195+ ) -> Result < Message , RabbitMqError > {
196+ let mut consumer = {
197+ let channel = self . channel . lock ( ) . await ;
198+
199+ let consumer_res = channel
200+ . basic_consume (
201+ queue_name,
202+ "consumer" ,
203+ lapin:: options:: BasicConsumeOptions :: default ( ) ,
204+ FieldTable :: default ( ) ,
205+ )
206+ . await ;
207+
208+ match consumer_res {
209+ Ok ( consumer) => consumer,
210+ Err ( err) => panic ! ( "{}" , err) ,
211+ }
212+ } ;
213+
214+ debug ! ( "Starting to consume from {}" , queue_name) ;
215+
216+ while let Some ( delivery_result) = consumer. next ( ) . await {
217+ let delivery = match delivery_result {
218+ Ok ( del) => del,
219+ Err ( _) => return Err ( RabbitMqError :: DeserializationError ) ,
220+ } ;
221+ let data = & delivery. data ;
222+ let message_str = match std:: str:: from_utf8 ( & data) {
223+ Ok ( str) => str,
224+ Err ( _) => {
225+ return Err ( RabbitMqError :: DeserializationError ) ;
226+ }
227+ } ;
228+
229+ debug ! ( "Received message: {}" , message_str) ;
230+
231+ // Parse the message
232+ let message = match serde_json:: from_str :: < Message > ( message_str) {
233+ Ok ( m) => m,
234+ Err ( e) => {
235+ log:: error!( "Failed to parse message: {}" , e) ;
236+ return Err ( RabbitMqError :: DeserializationError ) ;
237+ }
238+ } ;
239+
240+ if ack_on_success {
241+ delivery
242+ . ack ( lapin:: options:: BasicAckOptions :: default ( ) )
243+ . await
244+ . expect ( "Failed to acknowledge message" ) ;
245+ }
246+
247+ return Ok ( message) ;
248+ }
249+ Err ( RabbitMqError :: DeserializationError )
250+ }
251+
190252 // Receive messages from a queue with timeout
191253 pub async fn await_message (
192254 & self ,
0 commit comments