Skip to content

Commit f32b330

Browse files
author
Juan Marín
authored
Merge pull request #67 from juanpmarin/fix/propagate-empty-responses
fix: propagate empty response
2 parents 7b72337 + d76f438 commit f32b330

File tree

2 files changed

+25
-9
lines changed

2 files changed

+25
-9
lines changed

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.function.Function;
2525
import java.util.logging.Level;
2626

27+
import static java.lang.Boolean.TRUE;
2728
import static java.util.Optional.ofNullable;
2829
import static org.reactivecommons.async.commons.Headers.*;
2930

@@ -138,15 +139,16 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
138139
if (signal.isOnError()) {
139140
return Mono.error(ofNullable(signal.getThrowable()).orElseGet(RuntimeException::new));
140141
}
141-
if (signal.isOnComplete()) {
142-
return Mono.empty();
143-
}
144142

145143
final String replyID = msg.getProperties().getHeaders().get(REPLY_ID).toString();
146144
final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString();
147145
final HashMap<String, Object> headers = new HashMap<>();
148146
headers.put(CORRELATION_ID, correlationID);
149147

148+
if (!signal.hasValue()) {
149+
headers.put(COMPLETION_ONLY_SIGNAL, TRUE.toString());
150+
}
151+
150152
return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false);
151153
});
152154
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListenerTest.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.junit.jupiter.api.BeforeEach;
99
import org.junit.jupiter.api.Test;
1010
import org.junit.jupiter.api.extension.ExtendWith;
11+
import org.mockito.ArgumentCaptor;
1112
import org.mockito.Mock;
1213
import org.mockito.junit.jupiter.MockitoExtension;
1314
import org.reactivecommons.async.api.AsyncQuery;
@@ -29,6 +30,7 @@
2930
import reactor.rabbitmq.AcknowledgableDelivery;
3031
import reactor.rabbitmq.Receiver;
3132
import reactor.test.StepVerifier;
33+
import reactor.test.publisher.PublisherProbe;
3234

3335
import java.time.Instant;
3436
import java.util.Date;
@@ -37,6 +39,7 @@
3739
import java.util.Optional;
3840
import java.util.function.Function;
3941

42+
import static org.assertj.core.api.Assertions.assertThat;
4043
import static org.mockito.Mockito.when;
4144
import static org.mockito.Mockito.*;
4245
import static org.reactivecommons.async.commons.Headers.*;
@@ -80,7 +83,7 @@ public void setUp() {
8083
QueryHandler<String, SampleClass> handler = (message) -> just("OK");
8184
handlers.put("queryDirect", new RegisteredQueryHandler<>("queryDirect",
8285
(from, message) -> handler.handle(message), SampleClass.class));
83-
HandlerResolver resolver = new HandlerResolver(handlers, null,null, null, null);
86+
HandlerResolver resolver = new HandlerResolver(handlers, null, null, null, null);
8487
applicationQueryListener = new ApplicationQueryListener(reactiveMessageListener, "queue", resolver, sender,
8588
"directExchange", messageConverter, "replyExchange", false,
8689
1, 100, maxLengthBytes, true, discardNotifier, errorReporter);
@@ -118,15 +121,26 @@ void shouldHandleErrorWhenNoQueryHandler() {
118121
}
119122

120123
@Test
121-
void shouldNotRespondQueryEnrichPostProcess() {
122-
Message message = spy(TestStubs.mockMessage());
123-
Function<Mono<Object>, Mono<Object>> handler = applicationQueryListener.enrichPostProcess(message);
124-
Mono<Object> result = handler.apply(empty());
124+
@SuppressWarnings("unchecked")
125+
void enrichPostProcessShouldPropagateEmptyResponses() {
126+
Message message = TestStubs.mockMessage();
127+
PublisherProbe<Void> publishProbe = PublisherProbe.empty();
128+
129+
ArgumentCaptor<Map<String, Object>> headersCaptor = ArgumentCaptor.forClass(Map.class);
130+
131+
when(sender.sendNoConfirm(any(), anyString(), anyString(), headersCaptor.capture(), anyBoolean()))
132+
.thenReturn(publishProbe.mono());
133+
134+
Function<Mono<Object>, Mono<Object>> transformer = applicationQueryListener.enrichPostProcess(message);
135+
Mono<Object> result = transformer.apply(empty());
125136

126137
StepVerifier.create(result)
127138
.verifyComplete();
128139

129-
verify(message, times(0)).getProperties();
140+
publishProbe.assertWasSubscribed();
141+
142+
assertThat(headersCaptor.getValue().get(COMPLETION_ONLY_SIGNAL))
143+
.isEqualTo("true");
130144
}
131145

132146
@Test

0 commit comments

Comments
 (0)