Skip to content

Commit f8c2d15

Browse files
refs #255: Add Lambda and Async Lamba Message Processors (#257)
Instead of using reflection to invoke the methods, you can now use lambda (Java functional APIs) to process the message. The usage using the Java core API looks like: ``` new LambdaMessageProcessor(client, queueProperties, (message) -> {}); ``` Usage in the Kotlin DSL looks like: ``` coreMessageListener("identifier" client, queueUrl) { processor = lambdaProcessor { method { message -> } } } ```
1 parent e0f731b commit f8c2d15

File tree

28 files changed

+1735
-364
lines changed

28 files changed

+1735
-364
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package com.jashmore.sqs.processor;
2+
3+
import com.jashmore.sqs.QueueProperties;
4+
import com.jashmore.sqs.argument.visibility.DefaultVisibilityExtender;
5+
import com.jashmore.sqs.processor.argument.Acknowledge;
6+
import com.jashmore.sqs.processor.argument.VisibilityExtender;
7+
import com.jashmore.sqs.util.concurrent.CompletableFutureUtils;
8+
import lombok.extern.slf4j.Slf4j;
9+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
10+
import software.amazon.awssdk.services.sqs.model.Message;
11+
12+
import java.util.concurrent.CompletableFuture;
13+
import java.util.function.BiFunction;
14+
import java.util.function.Function;
15+
import java.util.function.Supplier;
16+
17+
/**
18+
* {@link MessageProcessor} that takes a lambda/function for asynchronous processing of a message.
19+
*/
20+
@Slf4j
21+
public class AsyncLambdaMessageProcessor implements MessageProcessor {
22+
private final SqsAsyncClient sqsAsyncClient;
23+
private final QueueProperties queueProperties;
24+
private final MessageProcessingFunction messageProcessingFunction;
25+
private final boolean usesAcknowledgeParameter;
26+
27+
/**
28+
* Constructor.
29+
*
30+
* @param sqsAsyncClient the client to communicate with SQS
31+
* @param queueProperties the properties of the queue
32+
* @param messageProcessor the function to consume a message and return the future
33+
*/
34+
public AsyncLambdaMessageProcessor(final SqsAsyncClient sqsAsyncClient,
35+
final QueueProperties queueProperties,
36+
final Function<Message, CompletableFuture<?>> messageProcessor) {
37+
this.sqsAsyncClient = sqsAsyncClient;
38+
this.queueProperties = queueProperties;
39+
this.usesAcknowledgeParameter = false;
40+
41+
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.apply(message);
42+
}
43+
44+
/**
45+
* Constructor.
46+
*
47+
* @param sqsAsyncClient the client to communicate with SQS
48+
* @param queueProperties the properties of the queue
49+
* @param messageProcessor the function to consume a message and acknowledge and return the future
50+
*/
51+
public AsyncLambdaMessageProcessor(final SqsAsyncClient sqsAsyncClient,
52+
final QueueProperties queueProperties,
53+
final BiFunction<Message, Acknowledge, CompletableFuture<?>> messageProcessor) {
54+
this.sqsAsyncClient = sqsAsyncClient;
55+
this.queueProperties = queueProperties;
56+
this.usesAcknowledgeParameter = true;
57+
58+
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.apply(message, acknowledge);
59+
}
60+
61+
/**
62+
* Constructor.
63+
*
64+
* @param sqsAsyncClient the client to communicate with SQS
65+
* @param queueProperties the properties of the queue
66+
* @param messageProcessor the function to consume a message, acknowledge and visibility extender and return the future
67+
*/
68+
public AsyncLambdaMessageProcessor(final SqsAsyncClient sqsAsyncClient,
69+
final QueueProperties queueProperties,
70+
final MessageProcessingFunction messageProcessor) {
71+
this.sqsAsyncClient = sqsAsyncClient;
72+
this.queueProperties = queueProperties;
73+
this.usesAcknowledgeParameter = true;
74+
75+
this.messageProcessingFunction = messageProcessor;
76+
}
77+
78+
/**
79+
* Constructor.
80+
*
81+
* <p>As Java generics has type erasure and will convert <code>BiFunction&lt;A, B, C&gt;</code> to <code>BiFunction</code> we need to change
82+
* the type signature to distinguish the function that consumes a message and an acknowledge compared to the function that consumes a message
83+
* and a visibility extender. As the visibility extender use case seems less common, this one has the unused parameter.
84+
*
85+
* @param sqsAsyncClient the client to communicate with SQS
86+
* @param queueProperties the properties of the queue
87+
* @param ignoredForTypeErasure field needed due to type erasure
88+
* @param messageProcessor the function to consume a message and visibility extender and return the future
89+
*/
90+
public AsyncLambdaMessageProcessor(final SqsAsyncClient sqsAsyncClient,
91+
final QueueProperties queueProperties,
92+
@SuppressWarnings("unused") final boolean ignoredForTypeErasure,
93+
final BiFunction<Message, VisibilityExtender, CompletableFuture<?>> messageProcessor) {
94+
this.sqsAsyncClient = sqsAsyncClient;
95+
this.queueProperties = queueProperties;
96+
this.usesAcknowledgeParameter = false;
97+
98+
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.apply(message, visibilityExtender);
99+
}
100+
101+
@Override
102+
public CompletableFuture<?> processMessage(Message message, Supplier<CompletableFuture<?>> resolveMessageCallback) {
103+
final Acknowledge acknowledge = resolveMessageCallback::get;
104+
final VisibilityExtender visibilityExtender = new DefaultVisibilityExtender(sqsAsyncClient, queueProperties, message);
105+
final CompletableFuture<?> result;
106+
try {
107+
result = messageProcessingFunction.processMessage(message, acknowledge, visibilityExtender);
108+
} catch (RuntimeException runtimeException) {
109+
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException(runtimeException));
110+
}
111+
112+
if (result == null) {
113+
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException("Method returns CompletableFuture but null was returned"));
114+
}
115+
116+
if (usesAcknowledgeParameter) {
117+
return result;
118+
}
119+
120+
final Runnable resolveCallbackLoggingErrorsOnly = () -> {
121+
try {
122+
resolveMessageCallback.get()
123+
.handle((i, throwable) -> {
124+
if (throwable != null) {
125+
log.error("Error resolving successfully processed message", throwable);
126+
}
127+
return null;
128+
});
129+
} catch (RuntimeException runtimeException) {
130+
log.error("Failed to trigger message resolving", runtimeException);
131+
}
132+
};
133+
134+
return result
135+
.thenAccept((ignored) -> resolveCallbackLoggingErrorsOnly.run());
136+
}
137+
138+
/**
139+
* Represents a message processing function that consumes the {@link Message}, {@link Acknowledge} and {@link VisibilityExtender}.
140+
*/
141+
@FunctionalInterface
142+
public interface MessageProcessingFunction {
143+
CompletableFuture<?> processMessage(Message message, Acknowledge acknowledge, VisibilityExtender visibilityExtender);
144+
}
145+
}

core/src/main/java/com/jashmore/sqs/processor/CoreMessageProcessor.java

Lines changed: 80 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
import static java.util.stream.Collectors.toList;
44

5+
import com.jashmore.documentation.annotations.Nullable;
56
import com.jashmore.documentation.annotations.ThreadSafe;
67
import com.jashmore.sqs.QueueProperties;
78
import com.jashmore.sqs.argument.ArgumentResolver;
89
import com.jashmore.sqs.argument.ArgumentResolverService;
910
import com.jashmore.sqs.argument.DefaultMethodParameter;
1011
import com.jashmore.sqs.argument.MethodParameter;
11-
import com.jashmore.sqs.argument.visibility.DefaultVisibilityExtender;
1212
import com.jashmore.sqs.processor.argument.Acknowledge;
1313
import com.jashmore.sqs.processor.argument.VisibilityExtender;
1414
import com.jashmore.sqs.util.concurrent.CompletableFutureUtils;
@@ -22,6 +22,8 @@
2222
import java.util.Arrays;
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
25+
import java.util.function.Consumer;
26+
import java.util.function.Function;
2527
import java.util.function.Supplier;
2628
import java.util.stream.IntStream;
2729

@@ -32,122 +34,99 @@
3234
@Slf4j
3335
@ThreadSafe
3436
public class CoreMessageProcessor implements MessageProcessor {
35-
private final QueueProperties queueProperties;
36-
private final SqsAsyncClient sqsAsyncClient;
37-
private final Method messageConsumerMethod;
38-
private final Object messageConsumerBean;
39-
40-
// These are calculated in the constructor so that it is not recalculated each time a message is processed
41-
private final List<InternalArgumentResolver> methodArgumentResolvers;
42-
private final boolean hasAcknowledgeParameter;
43-
private final boolean returnsCompletableFuture;
37+
private final MessageProcessor delegate;
4438

4539
public CoreMessageProcessor(final ArgumentResolverService argumentResolverService,
4640
final QueueProperties queueProperties,
4741
final SqsAsyncClient sqsAsyncClient,
4842
final Method messageConsumerMethod,
4943
final Object messageConsumerBean) {
50-
this.queueProperties = queueProperties;
51-
this.sqsAsyncClient = sqsAsyncClient;
52-
this.messageConsumerMethod = messageConsumerMethod;
53-
this.messageConsumerBean = messageConsumerBean;
54-
55-
this.methodArgumentResolvers = getArgumentResolvers(argumentResolverService);
56-
this.hasAcknowledgeParameter = hasAcknowledgeParameter();
57-
this.returnsCompletableFuture = CompletableFuture.class.isAssignableFrom(messageConsumerMethod.getReturnType());
58-
}
59-
60-
@Override
61-
public CompletableFuture<?> processMessage(final Message message, final Supplier<CompletableFuture<?>> resolveMessageCallback) {
62-
final Object[] arguments;
63-
try {
64-
arguments = getArguments(message, resolveMessageCallback);
65-
} catch (RuntimeException runtimeException) {
66-
throw new MessageProcessingException("Error building arguments for the message listener", runtimeException);
67-
}
68-
69-
final Object result;
70-
try {
71-
result = messageConsumerMethod.invoke(messageConsumerBean, arguments);
72-
} catch (IllegalAccessException exception) {
73-
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException(exception));
74-
} catch (InvocationTargetException exception) {
75-
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException(exception.getCause()));
76-
}
77-
78-
if (hasAcknowledgeParameter) {
79-
// If the method has the Acknowledge parameter it is up to them to resolve the message
80-
return CompletableFuture.completedFuture(null);
81-
}
82-
83-
final CompletableFuture<?> resultCompletableFuture;
84-
if (returnsCompletableFuture) {
85-
if (result == null) {
86-
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException("Method returns CompletableFuture but null was returned"));
44+
final boolean hasAcknowledgeParameter = hasAcknowledgeParameter(messageConsumerMethod);
45+
final boolean isAsynchronous = CompletableFuture.class.isAssignableFrom(messageConsumerMethod.getReturnType());
46+
final ArgumentResolvers argumentResolvers = determineArgumentResolvers(argumentResolverService, queueProperties, messageConsumerMethod);
47+
48+
if (isAsynchronous) {
49+
final Function<Object[], CompletableFuture<?>> messageExecutor = (arguments) -> {
50+
try {
51+
return (CompletableFuture<?>) messageConsumerMethod.invoke(messageConsumerBean, arguments);
52+
} catch (IllegalAccessException exception) {
53+
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException(exception));
54+
} catch (InvocationTargetException exception) {
55+
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException(exception.getCause()));
56+
}
57+
};
58+
59+
if (hasAcknowledgeParameter) {
60+
this.delegate = new AsyncLambdaMessageProcessor(sqsAsyncClient, queueProperties, (message, acknowledge, visibilityExtender) -> {
61+
final Object[] arguments = argumentResolvers.resolveArgument(message, acknowledge, visibilityExtender);
62+
return messageExecutor.apply(arguments);
63+
});
64+
} else {
65+
this.delegate = new AsyncLambdaMessageProcessor(sqsAsyncClient, queueProperties, false, (message, visibilityExtender) -> {
66+
final Object[] arguments = argumentResolvers.resolveArgument(message, null, visibilityExtender);
67+
return messageExecutor.apply(arguments);
68+
});
8769
}
88-
resultCompletableFuture = (CompletableFuture<?>) result;
8970
} else {
90-
resultCompletableFuture = CompletableFuture.completedFuture(null);
91-
}
92-
93-
final Runnable resolveCallbackLoggingErrorsOnly = () -> {
94-
try {
95-
resolveMessageCallback.get()
96-
.handle((i, throwable) -> {
97-
if (throwable != null) {
98-
log.error("Error resolving successfully processed message", throwable);
99-
}
100-
return null;
101-
});
102-
} catch (RuntimeException runtimeException) {
103-
log.error("Failed to trigger message resolving", runtimeException);
71+
final Consumer<Object[]> messageExecutor = (arguments) -> {
72+
try {
73+
messageConsumerMethod.invoke(messageConsumerBean, arguments);
74+
} catch (IllegalAccessException illegalAccessException) {
75+
throw new MessageProcessingException(illegalAccessException);
76+
} catch (InvocationTargetException exception) {
77+
throw new MessageProcessingException(exception.getCause());
78+
}
79+
};
80+
81+
if (hasAcknowledgeParameter) {
82+
this.delegate = new LambdaMessageProcessor(sqsAsyncClient, queueProperties, (message, acknowledge, visibilityExtender) -> {
83+
final Object[] arguments = argumentResolvers.resolveArgument(message, acknowledge, visibilityExtender);
84+
messageExecutor.accept(arguments);
85+
});
86+
} else {
87+
this.delegate = new LambdaMessageProcessor(sqsAsyncClient, queueProperties, false, (message, visibilityExtender) -> {
88+
final Object[] arguments = argumentResolvers.resolveArgument(message, null, visibilityExtender);
89+
messageExecutor.accept(arguments);
90+
});
10491
}
105-
};
106-
107-
return resultCompletableFuture
108-
.thenAccept((ignored) -> resolveCallbackLoggingErrorsOnly.run());
109-
}
110-
111-
/**
112-
* Get the arguments for the method for the message that is being processed.
113-
*
114-
* @param message the message to populate the arguments from
115-
* @return the array of arguments to call the method with
116-
*/
117-
private Object[] getArguments(final Message message, final Supplier<CompletableFuture<?>> resolveMessageCallback) {
118-
return methodArgumentResolvers.stream()
119-
.map(resolver -> resolver.resolveArgument(message, resolveMessageCallback))
120-
.toArray(Object[]::new);
92+
}
12193
}
12294

123-
private List<InternalArgumentResolver> getArgumentResolvers(final ArgumentResolverService argumentResolverService) {
124-
final Parameter[] parameters = messageConsumerMethod.getParameters();
125-
return IntStream.range(0, parameters.length)
95+
private static ArgumentResolvers determineArgumentResolvers(final ArgumentResolverService argumentResolverService,
96+
final QueueProperties queueProperties,
97+
final Method method) {
98+
final Parameter[] parameters = method.getParameters();
99+
List<InternalArgumentResolver> argumentResolvers = IntStream.range(0, parameters.length)
126100
.<InternalArgumentResolver>mapToObj(parameterIndex -> {
127101
final Parameter parameter = parameters[parameterIndex];
128102

129103
final MethodParameter methodParameter = DefaultMethodParameter.builder()
130-
.method(messageConsumerMethod)
104+
.method(method)
131105
.parameter(parameter)
132106
.parameterIndex(parameterIndex)
133107
.build();
134108

135109
if (isAcknowledgeParameter(parameter)) {
136-
return (message, resolveMessageCallback) -> (Acknowledge) resolveMessageCallback::get;
110+
return (message, acknowledge, visibilityExtender) -> acknowledge;
137111
}
138112

139113
if (isVisibilityExtenderParameter(parameter)) {
140-
return (message, resolveMessageCallback) -> new DefaultVisibilityExtender(sqsAsyncClient, queueProperties, message);
114+
return (message, acknowledge, visibilityExtender) -> visibilityExtender;
141115
}
142116

143117
final ArgumentResolver<?> argumentResolver = argumentResolverService.getArgumentResolver(methodParameter);
144-
return (message, resolveMessageCallback) -> argumentResolver.resolveArgumentForParameter(queueProperties, methodParameter, message);
118+
return (message, acknowledge, visibilityExtender)
119+
-> argumentResolver.resolveArgumentForParameter(queueProperties, methodParameter, message);
145120
})
146121
.collect(toList());
122+
123+
return (message, acknowledge, visibilityExtender) -> argumentResolvers.stream()
124+
.map(argumentResolver -> argumentResolver.resolveArgument(message, acknowledge, visibilityExtender))
125+
.toArray(Object[]::new);
147126
}
148127

149-
private boolean hasAcknowledgeParameter() {
150-
return Arrays.stream(messageConsumerMethod.getParameters())
128+
private static boolean hasAcknowledgeParameter(final Method method) {
129+
return Arrays.stream(method.getParameters())
151130
.anyMatch(CoreMessageProcessor::isAcknowledgeParameter);
152131
}
153132

@@ -159,18 +138,24 @@ private static boolean isVisibilityExtenderParameter(final Parameter parameter)
159138
return VisibilityExtender.class.isAssignableFrom(parameter.getType());
160139
}
161140

141+
@Override
142+
public CompletableFuture<?> processMessage(final Message message, final Supplier<CompletableFuture<?>> resolveMessageCallback) {
143+
return delegate.processMessage(message, resolveMessageCallback);
144+
}
145+
162146
/**
163147
* Internal resolver for resolving the argument given the message.
164148
*/
165149
@FunctionalInterface
166150
interface InternalArgumentResolver {
167-
/**
168-
* Resolve the argument of the method.
169-
*
170-
* @param message the message that is being processed
171-
* @param resolveMessageCallback the callback that should be executed when the message has successfully been processed
172-
* @return the argument that should be used for the corresponding parameter
173-
*/
174-
Object resolveArgument(final Message message, final Supplier<CompletableFuture<?>> resolveMessageCallback);
151+
Object resolveArgument(final Message message, @Nullable final Acknowledge acknowledge, @Nullable final VisibilityExtender visibilityExtender);
152+
}
153+
154+
/**
155+
* Resolve all of the arguments for this message.
156+
*/
157+
@FunctionalInterface
158+
interface ArgumentResolvers {
159+
Object[] resolveArgument(final Message message, @Nullable final Acknowledge acknowledge, @Nullable final VisibilityExtender visibilityExtender);
175160
}
176161
}

0 commit comments

Comments
 (0)