|
4 | 4 | import lombok.extern.java.Log; |
5 | 5 | import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; |
6 | 6 | import org.reactivecommons.async.commons.DiscardNotifier; |
7 | | -import org.reactivecommons.async.rabbit.HandlerResolver; |
8 | | -import org.reactivecommons.async.commons.communications.Message; |
9 | 7 | import org.reactivecommons.async.commons.QueryExecutor; |
| 8 | +import org.reactivecommons.async.commons.communications.Message; |
| 9 | +import org.reactivecommons.async.commons.converters.MessageConverter; |
| 10 | +import org.reactivecommons.async.commons.ext.CustomReporter; |
| 11 | +import org.reactivecommons.async.rabbit.HandlerResolver; |
10 | 12 | import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; |
11 | 13 | import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; |
12 | 14 | import org.reactivecommons.async.rabbit.communications.TopologyCreator; |
13 | | -import org.reactivecommons.async.commons.converters.MessageConverter; |
14 | | -import org.reactivecommons.async.commons.ext.CustomReporter; |
15 | 15 | import reactor.core.publisher.Mono; |
16 | 16 | import reactor.rabbitmq.AcknowledgableDelivery; |
17 | 17 | import reactor.rabbitmq.BindingSpecification; |
|
24 | 24 | import java.util.function.Function; |
25 | 25 | import java.util.logging.Level; |
26 | 26 |
|
27 | | -import static java.lang.Boolean.TRUE; |
28 | 27 | import static java.util.Optional.ofNullable; |
29 | | -import static org.reactivecommons.async.commons.Headers.*; |
| 28 | +import static org.reactivecommons.async.commons.Headers.CORRELATION_ID; |
| 29 | +import static org.reactivecommons.async.commons.Headers.REPLY_ID; |
| 30 | +import static org.reactivecommons.async.commons.Headers.REPLY_TIMEOUT_MILLIS; |
| 31 | +import static org.reactivecommons.async.commons.Headers.SERVED_QUERY_ID; |
30 | 32 |
|
31 | 33 | @Log |
32 | 34 | //TODO: Organizar inferencia de tipos de la misma forma que en comandos y eventos |
@@ -139,16 +141,15 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) { |
139 | 141 | if (signal.isOnError()) { |
140 | 142 | return Mono.error(ofNullable(signal.getThrowable()).orElseGet(RuntimeException::new)); |
141 | 143 | } |
| 144 | + if (signal.isOnComplete()) { |
| 145 | + return Mono.empty(); |
| 146 | + } |
142 | 147 |
|
143 | 148 | final String replyID = msg.getProperties().getHeaders().get(REPLY_ID).toString(); |
144 | 149 | final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString(); |
145 | 150 | final HashMap<String, Object> headers = new HashMap<>(); |
146 | 151 | headers.put(CORRELATION_ID, correlationID); |
147 | 152 |
|
148 | | - if (!signal.hasValue()) { |
149 | | - headers.put(COMPLETION_ONLY_SIGNAL, TRUE.toString()); |
150 | | - } |
151 | | - |
152 | 153 | return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false); |
153 | 154 | }); |
154 | 155 | } |
|
0 commit comments