Skip to content

Commit 6708d4e

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #68 from reactive-commons/feature/upgrade-spring
Feature/upgrade spring
2 parents 6c8d4e6 + 4c766f5 commit 6708d4e

File tree

5 files changed

+34
-29
lines changed

5 files changed

+34
-29
lines changed

async/async-rabbit/async-rabbit.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ dependencies {
88
api project(":domain-events-api")
99
api project(":async-commons")
1010

11-
compileOnly 'io.projectreactor:reactor-core'
12-
api ("io.projectreactor.rabbitmq:reactor-rabbitmq")
11+
api 'io.projectreactor:reactor-core'
12+
api 'io.projectreactor.rabbitmq:reactor-rabbitmq'
13+
api 'com.rabbitmq:amqp-client'
1314
api 'com.fasterxml.jackson.core:jackson-databind'
1415
testImplementation 'io.projectreactor:reactor-test'
1516
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
import lombok.extern.java.Log;
55
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
66
import org.reactivecommons.async.commons.DiscardNotifier;
7-
import org.reactivecommons.async.rabbit.HandlerResolver;
8-
import org.reactivecommons.async.commons.communications.Message;
97
import org.reactivecommons.async.commons.QueryExecutor;
8+
import org.reactivecommons.async.commons.communications.Message;
9+
import org.reactivecommons.async.commons.converters.MessageConverter;
10+
import org.reactivecommons.async.commons.ext.CustomReporter;
11+
import org.reactivecommons.async.rabbit.HandlerResolver;
1012
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1113
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1214
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
13-
import org.reactivecommons.async.commons.converters.MessageConverter;
14-
import org.reactivecommons.async.commons.ext.CustomReporter;
1515
import reactor.core.publisher.Mono;
1616
import reactor.rabbitmq.AcknowledgableDelivery;
1717
import reactor.rabbitmq.BindingSpecification;
@@ -24,9 +24,11 @@
2424
import java.util.function.Function;
2525
import java.util.logging.Level;
2626

27-
import static java.lang.Boolean.TRUE;
2827
import static java.util.Optional.ofNullable;
29-
import static org.reactivecommons.async.commons.Headers.*;
28+
import static org.reactivecommons.async.commons.Headers.CORRELATION_ID;
29+
import static org.reactivecommons.async.commons.Headers.REPLY_ID;
30+
import static org.reactivecommons.async.commons.Headers.REPLY_TIMEOUT_MILLIS;
31+
import static org.reactivecommons.async.commons.Headers.SERVED_QUERY_ID;
3032

3133
@Log
3234
//TODO: Organizar inferencia de tipos de la misma forma que en comandos y eventos
@@ -139,16 +141,15 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
139141
if (signal.isOnError()) {
140142
return Mono.error(ofNullable(signal.getThrowable()).orElseGet(RuntimeException::new));
141143
}
144+
if (signal.isOnComplete()) {
145+
return Mono.empty();
146+
}
142147

143148
final String replyID = msg.getProperties().getHeaders().get(REPLY_ID).toString();
144149
final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString();
145150
final HashMap<String, Object> headers = new HashMap<>();
146151
headers.put(CORRELATION_ID, correlationID);
147152

148-
if (!signal.hasValue()) {
149-
headers.put(COMPLETION_ONLY_SIGNAL, TRUE.toString());
150-
}
151-
152153
return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false);
153154
});
154155
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListenerTest.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -121,26 +121,15 @@ void shouldHandleErrorWhenNoQueryHandler() {
121121
}
122122

123123
@Test
124-
@SuppressWarnings("unchecked")
125-
void enrichPostProcessShouldPropagateEmptyResponses() {
126-
Message message = TestStubs.mockMessage();
127-
PublisherProbe<Void> publishProbe = PublisherProbe.empty();
128-
129-
ArgumentCaptor<Map<String, Object>> headersCaptor = ArgumentCaptor.forClass(Map.class);
130-
131-
when(sender.sendNoConfirm(any(), anyString(), anyString(), headersCaptor.capture(), anyBoolean()))
132-
.thenReturn(publishProbe.mono());
133-
134-
Function<Mono<Object>, Mono<Object>> transformer = applicationQueryListener.enrichPostProcess(message);
135-
Mono<Object> result = transformer.apply(empty());
124+
void shouldNotRespondQueryEnrichPostProcess() {
125+
Message message = spy(TestStubs.mockMessage());
126+
Function<Mono<Object>, Mono<Object>> handler = applicationQueryListener.enrichPostProcess(message);
127+
Mono<Object> result = handler.apply(empty());
136128

137129
StepVerifier.create(result)
138130
.verifyComplete();
139131

140-
publishProbe.assertWasSubscribed();
141-
142-
assertThat(headersCaptor.getValue().get(COMPLETION_ONLY_SIGNAL))
143-
.isEqualTo("true");
132+
verify(message, times(0)).getProperties();
144133
}
145134

146135
@Test

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
version=1.0.5
2-
springBootVersion=2.4.2
2+
springBootVersion=2.6.6
33
gradleVersionsVersion=0.36.0
44
toPublish=async-commons,async-commons-api,async-commons-rabbit-standalone,async-commons-rabbit-starter,domain-events-api,async-rabbit
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package sample;
2+
3+
4+
import com.rabbitmq.client.ConnectionFactory;
5+
import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider;
6+
import org.springframework.context.annotation.Bean;
7+
8+
public class Config {
9+
10+
@Bean
11+
public ConnectionFactoryProvider sampleBeanWhenProgrammaticConfiguration() {
12+
return ConnectionFactory::new;
13+
}
14+
}

0 commit comments

Comments
 (0)