11package com .jashmore .sqs .processor ;
22
3+ import static java .util .stream .Collectors .toList ;
4+
35import com .jashmore .sqs .QueueProperties ;
4- import com .jashmore .sqs .argument .ArgumentResolutionException ;
6+ import com .jashmore .sqs .argument .ArgumentResolver ;
57import com .jashmore .sqs .argument .ArgumentResolverService ;
68import com .jashmore .sqs .argument .DefaultMethodParameter ;
79import com .jashmore .sqs .argument .MethodParameter ;
1416import java .lang .reflect .Method ;
1517import java .lang .reflect .Parameter ;
1618import java .util .Arrays ;
19+ import java .util .List ;
1720import java .util .concurrent .CompletableFuture ;
1821import java .util .concurrent .ExecutionException ;
1922import java .util .stream .IntStream ;
2629@ ThreadSafe
2730@ AllArgsConstructor
2831public class DefaultMessageProcessor implements MessageProcessor {
29- private final ArgumentResolverService argumentResolverService ;
3032 private final QueueProperties queueProperties ;
3133 private final MessageResolver messageResolver ;
3234 private final Method messageConsumerMethod ;
3335 private final Object messageConsumerBean ;
36+ private final Class <?> returnType ;
37+ private final List <InternalArgumentResolver > methodArgumentResolvers ;
38+ private final boolean hasAcknowledgeParameter ;
39+
40+ public DefaultMessageProcessor (final ArgumentResolverService argumentResolverService ,
41+ final QueueProperties queueProperties ,
42+ final MessageResolver messageResolver ,
43+ final Method messageConsumerMethod ,
44+ final Object messageConsumerBean ) {
45+ this .queueProperties = queueProperties ;
46+ this .messageResolver = messageResolver ;
47+ this .messageConsumerMethod = messageConsumerMethod ;
48+ this .messageConsumerBean = messageConsumerBean ;
49+ this .hasAcknowledgeParameter = hasAcknowledgeParameter ();
50+ this .returnType = messageConsumerMethod .getReturnType ();
51+ this .methodArgumentResolvers = getArgumentResolvers (argumentResolverService );
52+ }
3453
3554 @ Override
3655 public void processMessage (final Message message ) throws MessageProcessingException {
37- final Acknowledge acknowledge = () -> messageResolver .resolveMessage (message );
38- final Object [] arguments = getArguments (acknowledge , message );
56+ final Object [] arguments = getArguments (message );
3957
4058 final Object result ;
4159 try {
@@ -44,12 +62,11 @@ public void processMessage(final Message message) throws MessageProcessingExcept
4462 throw new MessageProcessingException ("Error processing message" , exception );
4563 }
4664
47- if (hasAcknowledgeParameter () ) {
65+ if (hasAcknowledgeParameter ) {
4866 // If the method has the Acknowledge parameter it is up to them to resolve the message
4967 return ;
5068 }
5169
52- final Class <?> returnType = messageConsumerMethod .getReturnType ();
5370 if (CompletableFuture .class .isAssignableFrom (returnType )) {
5471 final CompletableFuture <?> resultCompletableFuture = (CompletableFuture ) result ;
5572
@@ -59,7 +76,7 @@ public void processMessage(final Message message) throws MessageProcessingExcept
5976
6077 try {
6178 resultCompletableFuture
62- .thenAccept ((ignored ) -> acknowledge . acknowledgeSuccessful ( ))
79+ .thenAccept ((ignored ) -> messageResolver . resolveMessage ( message ))
6380 .get ();
6481 } catch (final InterruptedException interruptedException ) {
6582 Thread .currentThread ().interrupt ();
@@ -68,49 +85,64 @@ public void processMessage(final Message message) throws MessageProcessingExcept
6885 throw new MessageProcessingException ("Error processing message" , executionException .getCause ());
6986 }
7087 } else {
71- acknowledge . acknowledgeSuccessful ( );
88+ messageResolver . resolveMessage ( message );
7289 }
7390 }
7491
75- private boolean hasAcknowledgeParameter () {
76- return Arrays .stream (messageConsumerMethod .getParameters ())
77- .anyMatch (DefaultMessageProcessor ::isAcknowledgeParameter );
78- }
79-
80- private static boolean isAcknowledgeParameter (final Parameter parameter ) {
81- return Acknowledge .class .isAssignableFrom (parameter .getType ());
82- }
83-
8492 /**
8593 * Get the arguments for the method for the message that is being processed.
8694 *
87- * @param acknowledge the acknowledge object that should be used if a parameter is an {@link Acknowledge}
8895 * @param message the message to populate the arguments from
8996 * @return the array of arguments to call the method with
9097 */
91- private Object [] getArguments (final Acknowledge acknowledge , final Message message ) {
98+ private Object [] getArguments (final Message message ) {
99+ return methodArgumentResolvers .stream ()
100+ .map (resolver -> resolver .resolveArgument (message ))
101+ .toArray (Object []::new );
102+ }
103+
104+ private List <InternalArgumentResolver > getArgumentResolvers (final ArgumentResolverService argumentResolverService ) {
92105 final Parameter [] parameters = messageConsumerMethod .getParameters ();
93106 return IntStream .range (0 , parameters .length )
94- .mapToObj (parameterIndex -> {
107+ .< InternalArgumentResolver > mapToObj (parameterIndex -> {
95108 final Parameter parameter = parameters [parameterIndex ];
96109
97- if (isAcknowledgeParameter (parameter )) {
98- return acknowledge ;
99- }
100-
101110 final MethodParameter methodParameter = DefaultMethodParameter .builder ()
102111 .method (messageConsumerMethod )
103112 .parameter (parameter )
104113 .parameterIndex (parameterIndex )
105114 .build ();
106115
107- try {
108- return argumentResolverService .resolveArgument (queueProperties , methodParameter , message );
109- } catch (final ArgumentResolutionException argumentResolutionException ) {
110- throw new MessageProcessingException ("Error resolving arguments for message" , argumentResolutionException );
116+ if (isAcknowledgeParameter (parameter )) {
117+ return message -> (Acknowledge ) () -> messageResolver .resolveMessage (message );
111118 }
119+
120+ final ArgumentResolver <?> argumentResolver = argumentResolverService .getArgumentResolver (methodParameter );
121+ return message -> argumentResolver .resolveArgumentForParameter (queueProperties , methodParameter , message );
112122 })
113- .toArray (Object []::new );
123+ .collect (toList ());
124+ }
125+
126+ private boolean hasAcknowledgeParameter () {
127+ return Arrays .stream (messageConsumerMethod .getParameters ())
128+ .anyMatch (DefaultMessageProcessor ::isAcknowledgeParameter );
129+ }
130+
131+ private static boolean isAcknowledgeParameter (final Parameter parameter ) {
132+ return Acknowledge .class .isAssignableFrom (parameter .getType ());
133+ }
114134
135+ /**
136+ * Internal resolver for resolving the argument given the message.
137+ */
138+ @ FunctionalInterface
139+ interface InternalArgumentResolver {
140+ /**
141+ * Resolve the argument of the method.
142+ *
143+ * @param message the message that is being processed
144+ * @return the argument that should be used for the corresponding parameter
145+ */
146+ Object resolveArgument (final Message message );
115147 }
116148}
0 commit comments