Skip to content

Commit b171795

Browse files
author
Daniel Bustamante Ospina
committed
Recover consumer from tmp queue
1 parent e5caada commit b171795

File tree

14 files changed

+192
-157
lines changed

14 files changed

+192
-157
lines changed

async/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void init() throws NoSuchFieldException, IllegalAccessException {
5050
when(creator.declare(any(ExchangeSpecification.class))).thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class)));
5151
when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
5252
when(listener.getTopologyCreator()).thenReturn(creator);
53-
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.empty());
53+
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
5454
when(listener.getReceiver()).thenReturn(receiver);
5555
when(listener.getMaxConcurrency()).thenReturn(20);
5656
}

async/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void init() {
4444
when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4545
when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4646
when(listener.getTopologyCreator()).thenReturn(creator);
47-
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.empty());
47+
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
4848
when(listener.getReceiver()).thenReturn(receiver);
4949
when(listener.getMaxConcurrency()).thenReturn(20);
5050
}

async/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void init() {
4444
when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4545
when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4646
when(listener.getTopologyCreator()).thenReturn(creator);
47-
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.empty());
47+
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
4848
when(listener.getReceiver()).thenReturn(receiver);
4949
when(listener.getMaxConcurrency()).thenReturn(20);
5050
}

async/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void init() {
4646
when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4747
when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4848
when(listener.getTopologyCreator()).thenReturn(creator);
49-
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.empty());
49+
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
5050
when(listener.getReceiver()).thenReturn(receiver);
5151
when(listener.getMaxConcurrency()).thenReturn(20);
5252
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationReplyListener.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
package org.reactivecommons.async.rabbit.listeners;
22

3+
import com.rabbitmq.client.Delivery;
4+
import com.rabbitmq.client.UnblockedCallback;
35
import lombok.extern.java.Log;
46
import org.reactivecommons.async.rabbit.RabbitMessage;
57
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
68
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
79
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
10+
import org.reactivestreams.Subscription;
11+
import reactor.core.publisher.BaseSubscriber;
12+
import reactor.core.publisher.Flux;
13+
import reactor.core.publisher.SignalType;
814
import reactor.rabbitmq.Receiver;
915

1016
import java.util.logging.Level;
1117

1218
import static org.reactivecommons.async.commons.Headers.*;
19+
import static reactor.core.publisher.Signal.subscribe;
1320
import static reactor.rabbitmq.ResourcesSpecification.*;
1421

1522
@Log
@@ -19,6 +26,7 @@ public class ApplicationReplyListener {
1926
private final Receiver receiver;
2027
private final TopologyCreator creator;
2128
private final String queueName;
29+
private volatile Flux<Delivery> deliveryFlux;
2230

2331
public ApplicationReplyListener(ReactiveReplyRouter router, ReactiveMessageListener listener, String queueName) {
2432
this.router = router;
@@ -28,7 +36,7 @@ public ApplicationReplyListener(ReactiveReplyRouter router, ReactiveMessageListe
2836
}
2937

3038
public void startListening(String routeKey) {
31-
creator.declare(exchange("globalReply").type("topic").durable(true))
39+
deliveryFlux = creator.declare(exchange("globalReply").type("topic").durable(true))
3240
.then(creator.declare(queue(queueName).durable(false).autoDelete(true).exclusive(true)))
3341
.then(creator.bind(binding("globalReply", routeKey, queueName)))
3442
.thenMany(receiver.consumeAutoAck(queueName).doOnNext(delivery -> {
@@ -43,6 +51,38 @@ public void startListening(String routeKey) {
4351
} catch (Exception e) {
4452
log.log(Level.SEVERE, "Error in reply reception", e);
4553
}
46-
})).subscribe();
54+
}));
55+
//deliveryFlux.subscribe();
56+
onTerminate();
57+
}
58+
59+
60+
private void onTerminate() {
61+
deliveryFlux.doOnTerminate(this::onTerminate).subscribe(new BaseSubscriber<Delivery>() {
62+
@Override
63+
protected void hookOnNext(Delivery value) {
64+
log.info("#On NextHook");
65+
}
66+
67+
@Override
68+
protected void hookOnComplete() {
69+
log.warning("#On Complete Hook!!");
70+
}
71+
72+
@Override
73+
protected void hookOnError(Throwable throwable) {
74+
log.log(Level.SEVERE, "#Hook On Error!!", throwable);
75+
}
76+
77+
@Override
78+
protected void hookOnCancel() {
79+
log.warning("#On Cancel Hook!!");
80+
}
81+
82+
@Override
83+
protected void hookFinally(SignalType type) {
84+
log.warning("#On Finally Hook!! " + type.name());
85+
}
86+
});
4787
}
4888
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.reactivecommons.async.rabbit.listeners;
22

33
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Delivery;
45
import lombok.extern.java.Log;
56
import org.reactivecommons.async.commons.DiscardNotifier;
67
import org.reactivecommons.async.commons.FallbackStrategy;
@@ -9,8 +10,10 @@
910
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1011
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1112
import org.reactivecommons.async.commons.ext.CustomReporter;
13+
import reactor.core.publisher.BaseSubscriber;
1214
import reactor.core.publisher.Flux;
1315
import reactor.core.publisher.Mono;
16+
import reactor.core.publisher.SignalType;
1417
import reactor.core.scheduler.Scheduler;
1518
import reactor.core.scheduler.Schedulers;
1619
import reactor.rabbitmq.AcknowledgableDelivery;
@@ -46,6 +49,7 @@ public abstract class GenericMessageListener {
4649
private final DiscardNotifier discardNotifier;
4750
private final String objectType;
4851
private final CustomReporter customReporter;
52+
private volatile Flux<AcknowledgableDelivery> messageFlux;
4953

5054
public GenericMessageListener(String queueName, ReactiveMessageListener listener, boolean useDLQRetries,
5155
long maxRetries, DiscardNotifier discardNotifier, String objectType, CustomReporter customReporter) {
@@ -74,10 +78,37 @@ public void startListener() {
7478
ConsumeOptions consumeOptions = new ConsumeOptions();
7579
consumeOptions.qos(messageListener.getPrefetchCount());
7680

77-
setUpBindings(messageListener.getTopologyCreator()).thenMany(
78-
receiver.consumeManualAck(queueName, consumeOptions)
79-
.transform(this::consumeFaultTolerant))
80-
.subscribe();
81+
this.messageFlux = setUpBindings(messageListener.getTopologyCreator()).thenMany(
82+
receiver.consumeManualAck(queueName, consumeOptions)
83+
.transform(this::consumeFaultTolerant));
84+
onTerminate();
85+
86+
}
87+
88+
private void onTerminate() {
89+
log.info("Hard Subscription 2 to " + this.getClass().getName());
90+
messageFlux.doOnTerminate(this::onTerminate).subscribe(new BaseSubscriber<Delivery>() {
91+
92+
@Override
93+
protected void hookOnComplete() {
94+
log.warning("##On Complete Hook!!");
95+
}
96+
97+
@Override
98+
protected void hookOnError(Throwable throwable) {
99+
log.log(Level.SEVERE, "##Hook On Error!!", throwable);
100+
}
101+
102+
@Override
103+
protected void hookOnCancel() {
104+
log.warning("##On Cancel Hook!!");
105+
}
106+
107+
@Override
108+
protected void hookFinally(SignalType type) {
109+
log.warning("##On Finally Hook!! " + type.name());
110+
}
111+
});
81112
}
82113

83114

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@
3232
import java.time.Duration;
3333
import java.util.*;
3434
import java.util.concurrent.*;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3536
import java.util.stream.Collectors;
3637
import java.util.stream.IntStream;
3738

3839
import static org.mockito.ArgumentMatchers.any;
3940
import static org.mockito.Mockito.mock;
4041
import static org.mockito.Mockito.when;
42+
import static reactor.core.publisher.Flux.defer;
4143
import static reactor.core.publisher.Flux.range;
4244
import static reactor.core.publisher.Mono.just;
4345

@@ -95,7 +97,14 @@ public void shouldProcessMessagesInOptimalTime() throws JsonProcessingException,
9597
);
9698
messageListener = new StubGenericMessageListener("test-queue", reactiveMessageListener, true, 10, discardNotifier, "command", handlerResolver, messageConverter, errorReporter);
9799
Flux<AcknowledgableDelivery> messageFlux = createSource(messageCount);
98-
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenReturn(messageFlux);
100+
final AtomicBoolean flag = new AtomicBoolean(true);
101+
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenAnswer(invocation -> defer(() -> {
102+
if (flag.getAndSet(false)) {
103+
return messageFlux;
104+
} else {
105+
return Flux.never();
106+
}
107+
}));
99108

100109
messageListener.startListener();
101110
final long init = System.currentTimeMillis();
@@ -124,7 +133,14 @@ public void shouldProcessAsyncMessagesConcurrent() throws JsonProcessingExceptio
124133
);
125134
messageListener = new StubGenericMessageListener("test-queue", reactiveMessageListener, true, 10, discardNotifier, "command", handlerResolver, messageConverter, errorReporter);
126135
Flux<AcknowledgableDelivery> messageFlux = createSource(messageCount);
127-
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenReturn(messageFlux);
136+
final AtomicBoolean flag = new AtomicBoolean(true);
137+
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenAnswer(invocation -> defer(() -> {
138+
if (flag.getAndSet(false)) {
139+
return messageFlux;
140+
} else {
141+
return Flux.never();
142+
}
143+
}));
128144
System.out.println("Permits before: " + semaphore.availablePermits());
129145
final long init = System.currentTimeMillis();
130146
messageListener.startListener();
@@ -189,7 +205,14 @@ public void shouldProcessCPUMessagesInParallel() throws JsonProcessingException,
189205
reactiveMessageListener = new ReactiveMessageListener(receiver, topologyCreator, 250, 250);
190206
messageListener = new StubGenericMessageListener("test-queue", reactiveMessageListener, true, 10, discardNotifier, "command", handlerResolver, messageConverter, errorReporter);
191207
Flux<AcknowledgableDelivery> messageFlux = createSource(messageCount);
192-
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenReturn(messageFlux);
208+
final AtomicBoolean flag = new AtomicBoolean(true);
209+
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenAnswer(invocation -> defer(() -> {
210+
if (flag.getAndSet(false)) {
211+
return messageFlux;
212+
} else {
213+
return Flux.never();
214+
}
215+
}));
193216
System.out.println("Permits before: " + semaphore.availablePermits());
194217
final long init = System.currentTimeMillis();
195218
messageListener.startListener();
@@ -215,7 +238,14 @@ public void shouldProcessCPUWorkMessagesInParallel() throws JsonProcessingExcept
215238
reactiveMessageListener = new ReactiveMessageListener(receiver, topologyCreator, 500, 250);
216239
messageListener = new StubGenericMessageListener("test-queue", reactiveMessageListener, true, 10, discardNotifier, "command", handlerResolver, messageConverter, errorReporter);
217240
Flux<AcknowledgableDelivery> messageFlux = createSource(messageCount);
218-
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenReturn(messageFlux);
241+
final AtomicBoolean flag = new AtomicBoolean(true);
242+
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenAnswer(invocation -> defer(() -> {
243+
if (flag.getAndSet(false)) {
244+
return messageFlux;
245+
} else {
246+
return Flux.never();
247+
}
248+
}));
219249
System.out.println("Permits before: " + semaphore.availablePermits());
220250
final long init = System.currentTimeMillis();
221251
messageListener.startListener();
@@ -241,7 +271,14 @@ public void shouldProcessPasiveBlockingMessagesInParallel() throws JsonProcessin
241271
reactiveMessageListener = new ReactiveMessageListener(receiver, topologyCreator, 500, 250);
242272
messageListener = new StubGenericMessageListener("test-queue", reactiveMessageListener, true, 10, discardNotifier, "command", handlerResolver, messageConverter, errorReporter);
243273
Flux<AcknowledgableDelivery> messageFlux = createSource(messageCount);
244-
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenReturn(messageFlux);
274+
final AtomicBoolean flag = new AtomicBoolean(true);
275+
when(receiver.consumeManualAck(Mockito.anyString(), any(ConsumeOptions.class))).thenAnswer(invocation -> defer(() -> {
276+
if (flag.getAndSet(false)) {
277+
return messageFlux;
278+
} else {
279+
return Flux.never();
280+
}
281+
}));
245282
System.out.println("Permits before: " + semaphore.availablePermits());
246283
final long init = System.currentTimeMillis();
247284
messageListener.startListener();

samples/async/receiver-responder/async-receiver-sample.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ apply plugin: 'org.springframework.boot'
22

33
dependencies {
44
compile project(":async-commons-rabbit-starter")
5-
// compile 'org.reactivecommons:async-commons-starter:1.0.0-beta7'
65
compile('org.springframework.boot:spring-boot-starter')
76
runtime('org.springframework.boot:spring-boot-devtools')
87
}

samples/async/receiver-responder/src/main/java/sample/AddMemberCommand.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

samples/async/receiver-responder/src/main/java/sample/EmptyReceiver.java

Lines changed: 0 additions & 14 deletions
This file was deleted.

0 commit comments

Comments
 (0)