2424import org .reactivecommons .async .commons .converters .json .DefaultObjectMapperSupplier ;
2525import org .reactivecommons .async .rabbit .converters .json .JacksonMessageConverter ;
2626import org .reactivecommons .async .commons .ext .CustomReporter ;
27+ import org .reactivecommons .async .utils .TestUtils ;
2728import reactor .core .publisher .Flux ;
2829import reactor .core .publisher .Mono ;
2930import reactor .rabbitmq .*;
@@ -97,14 +98,7 @@ public void shouldProcessMessagesInOptimalTime() throws JsonProcessingException,
9798 );
9899 messageListener = new StubGenericMessageListener ("test-queue" , reactiveMessageListener , true , 10 , discardNotifier , "command" , handlerResolver , messageConverter , errorReporter );
99100 Flux <AcknowledgableDelivery > messageFlux = createSource (messageCount );
100- final AtomicBoolean flag = new AtomicBoolean (true );
101- when (receiver .consumeManualAck (Mockito .anyString (), any (ConsumeOptions .class ))).thenAnswer (invocation -> defer (() -> {
102- if (flag .getAndSet (false )) {
103- return messageFlux ;
104- } else {
105- return Flux .never ();
106- }
107- }));
101+ TestUtils .instructSafeReceiverMock (receiver , messageFlux );
108102
109103 messageListener .startListener ();
110104 final long init = System .currentTimeMillis ();
@@ -133,14 +127,7 @@ public void shouldProcessAsyncMessagesConcurrent() throws JsonProcessingExceptio
133127 );
134128 messageListener = new StubGenericMessageListener ("test-queue" , reactiveMessageListener , true , 10 , discardNotifier , "command" , handlerResolver , messageConverter , errorReporter );
135129 Flux <AcknowledgableDelivery > messageFlux = createSource (messageCount );
136- final AtomicBoolean flag = new AtomicBoolean (true );
137- when (receiver .consumeManualAck (Mockito .anyString (), any (ConsumeOptions .class ))).thenAnswer (invocation -> defer (() -> {
138- if (flag .getAndSet (false )) {
139- return messageFlux ;
140- } else {
141- return Flux .never ();
142- }
143- }));
130+ TestUtils .instructSafeReceiverMock (receiver , messageFlux );
144131 System .out .println ("Permits before: " + semaphore .availablePermits ());
145132 final long init = System .currentTimeMillis ();
146133 messageListener .startListener ();
@@ -205,14 +192,9 @@ public void shouldProcessCPUMessagesInParallel() throws JsonProcessingException,
205192 reactiveMessageListener = new ReactiveMessageListener (receiver , topologyCreator , 250 , 250 );
206193 messageListener = new StubGenericMessageListener ("test-queue" , reactiveMessageListener , true , 10 , discardNotifier , "command" , handlerResolver , messageConverter , errorReporter );
207194 Flux <AcknowledgableDelivery > messageFlux = createSource (messageCount );
208- final AtomicBoolean flag = new AtomicBoolean (true );
209- when (receiver .consumeManualAck (Mockito .anyString (), any (ConsumeOptions .class ))).thenAnswer (invocation -> defer (() -> {
210- if (flag .getAndSet (false )) {
211- return messageFlux ;
212- } else {
213- return Flux .never ();
214- }
215- }));
195+
196+ TestUtils .instructSafeReceiverMock (receiver , messageFlux );
197+
216198 System .out .println ("Permits before: " + semaphore .availablePermits ());
217199 final long init = System .currentTimeMillis ();
218200 messageListener .startListener ();
@@ -238,14 +220,9 @@ public void shouldProcessCPUWorkMessagesInParallel() throws JsonProcessingExcept
238220 reactiveMessageListener = new ReactiveMessageListener (receiver , topologyCreator , 500 , 250 );
239221 messageListener = new StubGenericMessageListener ("test-queue" , reactiveMessageListener , true , 10 , discardNotifier , "command" , handlerResolver , messageConverter , errorReporter );
240222 Flux <AcknowledgableDelivery > messageFlux = createSource (messageCount );
241- final AtomicBoolean flag = new AtomicBoolean (true );
242- when (receiver .consumeManualAck (Mockito .anyString (), any (ConsumeOptions .class ))).thenAnswer (invocation -> defer (() -> {
243- if (flag .getAndSet (false )) {
244- return messageFlux ;
245- } else {
246- return Flux .never ();
247- }
248- }));
223+
224+ TestUtils .instructSafeReceiverMock (receiver , messageFlux );
225+
249226 System .out .println ("Permits before: " + semaphore .availablePermits ());
250227 final long init = System .currentTimeMillis ();
251228 messageListener .startListener ();
@@ -271,14 +248,9 @@ public void shouldProcessPasiveBlockingMessagesInParallel() throws JsonProcessin
271248 reactiveMessageListener = new ReactiveMessageListener (receiver , topologyCreator , 500 , 250 );
272249 messageListener = new StubGenericMessageListener ("test-queue" , reactiveMessageListener , true , 10 , discardNotifier , "command" , handlerResolver , messageConverter , errorReporter );
273250 Flux <AcknowledgableDelivery > messageFlux = createSource (messageCount );
274- final AtomicBoolean flag = new AtomicBoolean (true );
275- when (receiver .consumeManualAck (Mockito .anyString (), any (ConsumeOptions .class ))).thenAnswer (invocation -> defer (() -> {
276- if (flag .getAndSet (false )) {
277- return messageFlux ;
278- } else {
279- return Flux .never ();
280- }
281- }));
251+
252+ TestUtils .instructSafeReceiverMock (receiver , messageFlux );
253+
282254 System .out .println ("Permits before: " + semaphore .availablePermits ());
283255 final long init = System .currentTimeMillis ();
284256 messageListener .startListener ();
0 commit comments