Skip to content

Commit 7af34ca

Browse files
committed
Refactor commons
1 parent bcd6f62 commit 7af34ca

22 files changed

+673
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
plugins {
2+
id 'java-library'
3+
id 'maven'
4+
id 'maven-publish'
5+
}
6+
7+
test.onlyIf { true }
8+
9+
repositories {
10+
// Use jcenter for resolving dependencies.
11+
// You can declare any Maven/Ivy/file repository here.
12+
jcenter()
13+
}
14+
15+
dependencies {
16+
api project(":async-commons")
17+
18+
testImplementation 'io.projectreactor:reactor-test'
19+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.reactivecommons.async.parent;
2+
3+
4+
import org.reactivecommons.api.domain.Command;
5+
import org.reactivecommons.async.api.handlers.CommandHandler;
6+
import org.reactivecommons.async.impl.communications.Message;
7+
import reactor.core.publisher.Mono;
8+
9+
import java.util.function.Function;
10+
11+
public class CommandExecutor<T> {
12+
private final CommandHandler<T> eventHandler;
13+
private final Function<Message, Command<T>> converter;
14+
15+
public CommandExecutor(CommandHandler<T> eventHandler, Function<Message, Command<T>> converter) {
16+
this.eventHandler = eventHandler;
17+
this.converter = converter;
18+
}
19+
20+
public Mono<Void> execute(Message rawMessage){
21+
return eventHandler.handle(converter.apply(rawMessage));
22+
}
23+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.reactivecommons.async.parent;
2+
3+
import org.reactivecommons.async.impl.communications.Message;
4+
import reactor.core.publisher.Mono;
5+
6+
public interface DiscardNotifier {
7+
8+
Mono<Void> notifyDiscard(Message message);
9+
10+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.reactivecommons.async.parent;
2+
3+
4+
import org.reactivecommons.api.domain.DomainEvent;
5+
import org.reactivecommons.async.api.handlers.EventHandler;
6+
import org.reactivecommons.async.impl.communications.Message;
7+
import reactor.core.publisher.Mono;
8+
9+
import java.util.function.Function;
10+
11+
public class EventExecutor<T> {
12+
private final EventHandler<T> eventHandler;
13+
private final Function<Message, DomainEvent<T>> converter;
14+
15+
public EventExecutor(EventHandler<T> eventHandler, Function<Message, DomainEvent<T>> converter) {
16+
this.eventHandler = eventHandler;
17+
this.converter = converter;
18+
}
19+
20+
public Mono<Void> execute(Message rawMessage){
21+
return eventHandler.handle(converter.apply(rawMessage));
22+
}
23+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.reactivecommons.async.parent;
2+
3+
public enum FallbackStrategy {
4+
FAST_RETRY("ATTENTION!! Fast retry message to same Queue: %s"),
5+
DEFINITIVE_DISCARD("ATTENTION!! DEFINITIVE DISCARD!! of the message: %s"),
6+
RETRY_DLQ("ATTENTION!! Sending message to Retry DLQ: %s");
7+
8+
public final String message;
9+
FallbackStrategy(String message){
10+
this.message = message;
11+
}
12+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package org.reactivecommons.async.parent;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
5+
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
6+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
7+
8+
import java.util.Collection;
9+
import java.util.HashSet;
10+
import java.util.Map;
11+
import java.util.Set;
12+
13+
@RequiredArgsConstructor
14+
public class HandlerResolver {
15+
16+
private final Map<String, RegisteredQueryHandler<?, ?>> queryHandlers;
17+
private final Map<String, RegisteredEventListener<?>> eventListeners;
18+
private final Map<String, RegisteredEventListener<?>> eventNotificationListeners;
19+
private final Map<String, RegisteredEventListener<?>> dynamicEventsHandlers;
20+
private final Map<String, RegisteredCommandHandler<?>> commandHandlers;
21+
22+
@SuppressWarnings("unchecked")
23+
public <T, M> RegisteredQueryHandler<T, M> getQueryHandler(String path) {
24+
return (RegisteredQueryHandler<T, M>) queryHandlers.get(path);
25+
}
26+
27+
@SuppressWarnings("unchecked")
28+
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
29+
return (RegisteredCommandHandler<T>) commandHandlers.get(path);
30+
}
31+
32+
@SuppressWarnings("unchecked")
33+
public <T> RegisteredEventListener<T> getEventListener(String path) {
34+
return (RegisteredEventListener<T>) eventListeners.get(path);
35+
}
36+
37+
@SuppressWarnings("unchecked")
38+
public <T> RegisteredEventListener<T> getDynamicEventsHandler(String path) {
39+
return (RegisteredEventListener<T>) dynamicEventsHandlers.get(path);
40+
}
41+
42+
public Collection<RegisteredEventListener<?>> getNotificationListeners() {
43+
return eventNotificationListeners.values();
44+
}
45+
46+
@SuppressWarnings("unchecked")
47+
public <T> RegisteredEventListener<T> getNotificationListener(String path) {
48+
return (RegisteredEventListener<T>) eventNotificationListeners.get(path);
49+
}
50+
51+
public Collection<RegisteredEventListener<?>> getEventListeners() {
52+
return eventListeners.values();
53+
}
54+
55+
public Set<String> getToListenEventNames() {
56+
Set<String> toListenEventNames = new HashSet<>(eventListeners.size() +
57+
dynamicEventsHandlers.size());
58+
59+
toListenEventNames.addAll(eventListeners.keySet());
60+
toListenEventNames.addAll(dynamicEventsHandlers.keySet());
61+
62+
return toListenEventNames;
63+
}
64+
65+
void addEventListener(RegisteredEventListener listener) {
66+
eventListeners.put(listener.getPath(), listener);
67+
}
68+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.reactivecommons.async.parent;
2+
3+
4+
import org.reactivecommons.async.api.From;
5+
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
6+
import org.reactivecommons.async.impl.communications.Message;
7+
import reactor.core.publisher.Mono;
8+
9+
import java.util.function.Function;
10+
11+
import static org.reactivecommons.async.impl.Headers.CORRELATION_ID;
12+
import static org.reactivecommons.async.impl.Headers.REPLY_ID;
13+
14+
public class QueryExecutor<T, M> {
15+
private final QueryHandlerDelegate<T, M> queryHandler;
16+
private final Function<Message, M> converter;
17+
18+
public QueryExecutor(QueryHandlerDelegate<T, M> queryHandler, Function<Message, M> converter) {
19+
this.queryHandler = queryHandler;
20+
this.converter = converter;
21+
}
22+
23+
public Mono<T> execute(Message rawMessage) {
24+
From from = new From();
25+
from.setCorrelationID(rawMessage.getProperties().getHeaders().getOrDefault(CORRELATION_ID, "").toString());
26+
from.setReplyID(rawMessage.getProperties().getHeaders().getOrDefault(REPLY_ID, "").toString());
27+
return queryHandler.handle(from, converter.apply(rawMessage));
28+
}
29+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.reactivecommons.async.parent.communications;
2+
3+
import java.util.Map;
4+
5+
/**
6+
* Simple Internal Message representation
7+
* @author Daniel Bustamante Ospina
8+
*/
9+
public interface Message {
10+
11+
byte[] getBody();
12+
Properties getProperties();
13+
14+
interface Properties {
15+
String getContentType();
16+
String getContentEncoding();
17+
long getContentLength();
18+
Map<String, Object> getHeaders();
19+
}
20+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.reactivecommons.async.parent.config;
2+
3+
import java.time.Duration;
4+
import java.util.UUID;
5+
6+
public class BrokerConfig {
7+
private final String routingKey = UUID.randomUUID().toString().replaceAll("-", "");
8+
private final boolean persistentQueries;
9+
private final boolean persistentCommands;
10+
private final boolean persistentEvents;
11+
private final Duration replyTimeout;
12+
13+
public BrokerConfig() {
14+
this.persistentQueries = false;
15+
this.persistentCommands = true;
16+
this.persistentEvents = true;
17+
this.replyTimeout = Duration.ofSeconds(15);
18+
}
19+
20+
public BrokerConfig(boolean persistentQueries, boolean persistentCommands, boolean persistentEvents, Duration replyTimeout) {
21+
this.persistentQueries = persistentQueries;
22+
this.persistentCommands = persistentCommands;
23+
this.persistentEvents = persistentEvents;
24+
this.replyTimeout = replyTimeout;
25+
}
26+
27+
public boolean isPersistentQueries() {
28+
return persistentQueries;
29+
}
30+
31+
public boolean isPersistentCommands() {
32+
return persistentCommands;
33+
}
34+
35+
public boolean isPersistentEvents() {
36+
return persistentEvents;
37+
}
38+
39+
public Duration getReplyTimeout() {
40+
return replyTimeout;
41+
}
42+
43+
public String getRoutingKey() {
44+
return routingKey;
45+
}
46+
47+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.parent.config;
2+
3+
public interface IBrokerConfigProps {
4+
String getEventsQueue();
5+
6+
String getQueriesQueue();
7+
8+
String getCommandsQueue();
9+
10+
String getReplyQueue();
11+
12+
String getAppName();
13+
14+
String getDomainEventsExchangeName();
15+
16+
String getDirectMessagesExchangeName();
17+
18+
java.util.concurrent.atomic.AtomicReference<String> getReplyQueueName();
19+
}

0 commit comments

Comments
 (0)