Skip to content

Commit 03aa804

Browse files
author
Daniel Bustamante Ospina
committed
Merge branch 'recovery'
2 parents b61060b + a8c9f85 commit 03aa804

File tree

12 files changed

+172
-16
lines changed

12 files changed

+172
-16
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,4 +642,6 @@ MigrationBackup/
642642
.ionide/
643643

644644
# End of https://www.toptal.com/developers/gitignore/api/macos,linux,windows,gradle,java,intellij,visualstudio,eclipse
645-
contiperf-report
645+
contiperf-report
646+
647+
samples/async/local-example/

async/async-commons/src/main/java/org/reactivecommons/async/commons/ext/CustomErrorReporter.java

Whitespace-only changes.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.reactivecommons.async.commons.utils;
2+
3+
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
6+
public class ArrayUtils {
7+
8+
private ArrayUtils(){}
9+
10+
public static <E> Object[] prefixArray(E head, E[] tail) {
11+
final ArrayList<E> objects = new ArrayList<>(1 + tail.length);
12+
objects.add(head);
13+
objects.addAll(Arrays.asList(tail));
14+
return objects.toArray();
15+
}
16+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.reactivecommons.async.commons.utils;
2+
3+
import lombok.extern.java.Log;
4+
import reactor.core.publisher.BaseSubscriber;
5+
import reactor.core.publisher.SignalType;
6+
7+
import java.util.logging.Level;
8+
9+
import static org.reactivecommons.async.commons.utils.ArrayUtils.prefixArray;
10+
11+
12+
@Log
13+
public class LoggerSubscriber<T> extends BaseSubscriber<T> {
14+
15+
private final String flowName;
16+
private static final String ON_COMPLETE_MSG = "%s: ##On Complete Hook!!";
17+
private static final String ON_ERROR_MSG = "%s: ##On Error Hook!!";
18+
private static final String ON_CANCEL_MSG = "%s: ##On Cancel Hook!!";
19+
private static final String ON_FINALLY_MSG = "%s: ##On Finally Hook! Signal type: %s";
20+
21+
public LoggerSubscriber(String flowName) {
22+
this.flowName = flowName;
23+
}
24+
25+
@Override
26+
protected void hookOnComplete() {
27+
log.warning(format(ON_COMPLETE_MSG));
28+
}
29+
30+
@Override
31+
protected void hookOnError(Throwable throwable) {
32+
log.log(Level.SEVERE, format(ON_ERROR_MSG), throwable);
33+
}
34+
35+
@Override
36+
protected void hookOnCancel() {
37+
log.warning(format(ON_CANCEL_MSG));
38+
}
39+
40+
@Override
41+
protected void hookFinally(SignalType type) {
42+
log.warning(format(ON_FINALLY_MSG, type.name()));
43+
}
44+
45+
private String format(String msg, String... args) {
46+
return String.format(msg, prefixArray(flowName, args));
47+
}
48+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.reactivecommons.async.commons.utils;
2+
3+
import org.junit.jupiter.api.Test;
4+
import reactor.core.publisher.SignalType;
5+
6+
7+
public class LoggerSubscriberTest {
8+
9+
private final LoggerSubscriber<String> subscriber = new LoggerSubscriber<>("testFlow");
10+
11+
@Test
12+
public void shouldPrintOnCancelMessage() {
13+
subscriber.hookOnCancel();
14+
}
15+
16+
@Test
17+
public void shouldPrintOnErrorMessage() {
18+
subscriber.hookOnError(new RuntimeException());
19+
}
20+
21+
@Test
22+
public void shouldPrintOnFinallyMessage() {
23+
subscriber.hookFinally(SignalType.ON_ERROR);
24+
}
25+
26+
@Test
27+
public void shouldPrintOnCompleteMessage() {
28+
subscriber.hookOnComplete();
29+
}
30+
31+
}

async/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void init() throws NoSuchFieldException, IllegalAccessException {
5050
when(creator.declare(any(ExchangeSpecification.class))).thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class)));
5151
when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
5252
when(listener.getTopologyCreator()).thenReturn(creator);
53-
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.empty());
53+
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
5454
when(listener.getReceiver()).thenReturn(receiver);
5555
when(listener.getMaxConcurrency()).thenReturn(20);
5656
}

async/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void init() {
4444
when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4545
when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4646
when(listener.getTopologyCreator()).thenReturn(creator);
47-
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.empty());
47+
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
4848
when(listener.getReceiver()).thenReturn(receiver);
4949
when(listener.getMaxConcurrency()).thenReturn(20);
5050
}

async/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void init() {
4444
when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4545
when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4646
when(listener.getTopologyCreator()).thenReturn(creator);
47-
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.empty());
47+
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
4848
when(listener.getReceiver()).thenReturn(receiver);
4949
when(listener.getMaxConcurrency()).thenReturn(20);
5050
}

async/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void init() {
4646
when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4747
when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class)));
4848
when(listener.getTopologyCreator()).thenReturn(creator);
49-
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.empty());
49+
when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never());
5050
when(listener.getReceiver()).thenReturn(receiver);
5151
when(listener.getMaxConcurrency()).thenReturn(20);
5252
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationReplyListener.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.reactivecommons.async.rabbit.listeners;
22

3+
import com.rabbitmq.client.Delivery;
34
import lombok.extern.java.Log;
5+
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
46
import org.reactivecommons.async.rabbit.RabbitMessage;
57
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
68
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
79
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
10+
import reactor.core.publisher.Flux;
811
import reactor.rabbitmq.Receiver;
912

1013
import java.util.logging.Level;
@@ -19,6 +22,7 @@ public class ApplicationReplyListener {
1922
private final Receiver receiver;
2023
private final TopologyCreator creator;
2124
private final String queueName;
25+
private volatile Flux<Delivery> deliveryFlux;
2226

2327
public ApplicationReplyListener(ReactiveReplyRouter router, ReactiveMessageListener listener, String queueName) {
2428
this.router = router;
@@ -28,7 +32,7 @@ public ApplicationReplyListener(ReactiveReplyRouter router, ReactiveMessageListe
2832
}
2933

3034
public void startListening(String routeKey) {
31-
creator.declare(exchange("globalReply").type("topic").durable(true))
35+
deliveryFlux = creator.declare(exchange("globalReply").type("topic").durable(true))
3236
.then(creator.declare(queue(queueName).durable(false).autoDelete(true).exclusive(true)))
3337
.then(creator.bind(binding("globalReply", routeKey, queueName)))
3438
.thenMany(receiver.consumeAutoAck(queueName).doOnNext(delivery -> {
@@ -43,6 +47,13 @@ public void startListening(String routeKey) {
4347
} catch (Exception e) {
4448
log.log(Level.SEVERE, "Error in reply reception", e);
4549
}
46-
})).subscribe();
50+
}));
51+
onTerminate();
52+
}
53+
54+
55+
private void onTerminate() {
56+
deliveryFlux.doOnTerminate(this::onTerminate)
57+
.subscribe(new LoggerSubscriber<>(getClass().getName()));
4758
}
4859
}

0 commit comments

Comments
 (0)