Skip to content

Commit a8c9f85

Browse files
author
Daniel Bustamante Ospina
committed
Implement recovery of lost consumer for tmp queues
1 parent e00fe8b commit a8c9f85

File tree

6 files changed

+101
-55
lines changed

6 files changed

+101
-55
lines changed

async/async-commons/src/main/java/org/reactivecommons/async/commons/ext/CustomErrorReporter.java

Whitespace-only changes.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.reactivecommons.async.commons.utils;
2+
3+
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
6+
public class ArrayUtils {
7+
8+
private ArrayUtils(){}
9+
10+
public static <E> Object[] prefixArray(E head, E[] tail) {
11+
final ArrayList<E> objects = new ArrayList<>(1 + tail.length);
12+
objects.add(head);
13+
objects.addAll(Arrays.asList(tail));
14+
return objects.toArray();
15+
}
16+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.reactivecommons.async.commons.utils;
2+
3+
import lombok.extern.java.Log;
4+
import reactor.core.publisher.BaseSubscriber;
5+
import reactor.core.publisher.SignalType;
6+
7+
import java.util.logging.Level;
8+
9+
import static org.reactivecommons.async.commons.utils.ArrayUtils.prefixArray;
10+
11+
12+
@Log
13+
public class LoggerSubscriber<T> extends BaseSubscriber<T> {
14+
15+
private final String flowName;
16+
private static final String ON_COMPLETE_MSG = "%s: ##On Complete Hook!!";
17+
private static final String ON_ERROR_MSG = "%s: ##On Error Hook!!";
18+
private static final String ON_CANCEL_MSG = "%s: ##On Cancel Hook!!";
19+
private static final String ON_FINALLY_MSG = "%s: ##On Finally Hook! Signal type: %s";
20+
21+
public LoggerSubscriber(String flowName) {
22+
this.flowName = flowName;
23+
}
24+
25+
@Override
26+
protected void hookOnComplete() {
27+
log.warning(format(ON_COMPLETE_MSG));
28+
}
29+
30+
@Override
31+
protected void hookOnError(Throwable throwable) {
32+
log.log(Level.SEVERE, format(ON_ERROR_MSG), throwable);
33+
}
34+
35+
@Override
36+
protected void hookOnCancel() {
37+
log.warning(format(ON_CANCEL_MSG));
38+
}
39+
40+
@Override
41+
protected void hookFinally(SignalType type) {
42+
log.warning(format(ON_FINALLY_MSG, type.name()));
43+
}
44+
45+
private String format(String msg, String... args) {
46+
return String.format(msg, prefixArray(flowName, args));
47+
}
48+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.reactivecommons.async.commons.utils;
2+
3+
import org.junit.jupiter.api.Test;
4+
import reactor.core.publisher.SignalType;
5+
6+
7+
public class LoggerSubscriberTest {
8+
9+
private final LoggerSubscriber<String> subscriber = new LoggerSubscriber<>("testFlow");
10+
11+
@Test
12+
public void shouldPrintOnCancelMessage() {
13+
subscriber.hookOnCancel();
14+
}
15+
16+
@Test
17+
public void shouldPrintOnErrorMessage() {
18+
subscriber.hookOnError(new RuntimeException());
19+
}
20+
21+
@Test
22+
public void shouldPrintOnFinallyMessage() {
23+
subscriber.hookFinally(SignalType.ON_ERROR);
24+
}
25+
26+
@Test
27+
public void shouldPrintOnCompleteMessage() {
28+
subscriber.hookOnComplete();
29+
}
30+
31+
}
Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
11
package org.reactivecommons.async.rabbit.listeners;
22

33
import com.rabbitmq.client.Delivery;
4-
import com.rabbitmq.client.UnblockedCallback;
54
import lombok.extern.java.Log;
5+
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
66
import org.reactivecommons.async.rabbit.RabbitMessage;
77
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
88
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
99
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
10-
import org.reactivestreams.Subscription;
11-
import reactor.core.publisher.BaseSubscriber;
1210
import reactor.core.publisher.Flux;
13-
import reactor.core.publisher.SignalType;
1411
import reactor.rabbitmq.Receiver;
1512

1613
import java.util.logging.Level;
1714

1815
import static org.reactivecommons.async.commons.Headers.*;
19-
import static reactor.core.publisher.Signal.subscribe;
2016
import static reactor.rabbitmq.ResourcesSpecification.*;
2117

2218
@Log
@@ -52,37 +48,12 @@ public void startListening(String routeKey) {
5248
log.log(Level.SEVERE, "Error in reply reception", e);
5349
}
5450
}));
55-
//deliveryFlux.subscribe();
5651
onTerminate();
5752
}
5853

5954

6055
private void onTerminate() {
61-
deliveryFlux.doOnTerminate(this::onTerminate).subscribe(new BaseSubscriber<Delivery>() {
62-
@Override
63-
protected void hookOnNext(Delivery value) {
64-
log.info("#On NextHook");
65-
}
66-
67-
@Override
68-
protected void hookOnComplete() {
69-
log.warning("#On Complete Hook!!");
70-
}
71-
72-
@Override
73-
protected void hookOnError(Throwable throwable) {
74-
log.log(Level.SEVERE, "#Hook On Error!!", throwable);
75-
}
76-
77-
@Override
78-
protected void hookOnCancel() {
79-
log.warning("#On Cancel Hook!!");
80-
}
81-
82-
@Override
83-
protected void hookFinally(SignalType type) {
84-
log.warning("#On Finally Hook!! " + type.name());
85-
}
86-
});
56+
deliveryFlux.doOnTerminate(this::onTerminate)
57+
.subscribe(new LoggerSubscriber<>(getClass().getName()));
8758
}
8859
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import lombok.extern.java.Log;
66
import org.reactivecommons.async.commons.DiscardNotifier;
77
import org.reactivecommons.async.commons.FallbackStrategy;
8+
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
89
import org.reactivecommons.async.rabbit.RabbitMessage;
910
import org.reactivecommons.async.commons.communications.Message;
1011
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
@@ -86,29 +87,8 @@ public void startListener() {
8687
}
8788

8889
private void onTerminate() {
89-
log.info("Hard Subscription 2 to " + this.getClass().getName());
90-
messageFlux.doOnTerminate(this::onTerminate).subscribe(new BaseSubscriber<Delivery>() {
91-
92-
@Override
93-
protected void hookOnComplete() {
94-
log.warning("##On Complete Hook!!");
95-
}
96-
97-
@Override
98-
protected void hookOnError(Throwable throwable) {
99-
log.log(Level.SEVERE, "##Hook On Error!!", throwable);
100-
}
101-
102-
@Override
103-
protected void hookOnCancel() {
104-
log.warning("##On Cancel Hook!!");
105-
}
106-
107-
@Override
108-
protected void hookFinally(SignalType type) {
109-
log.warning("##On Finally Hook!! " + type.name());
110-
}
111-
});
90+
messageFlux.doOnTerminate(this::onTerminate)
91+
.subscribe(new LoggerSubscriber<>(getClass().getName()));
11292
}
11393

11494

0 commit comments

Comments
 (0)