You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This is the core implementation of the Dynamic SQS Listener API. This contains basic implementations that should cover a
4
-
significant amount of use cases but if it doesn't the consumer can easily implement their own.
3
+
This is the core implementation of the Dynamic SQS Listener API. This contains basic implementations that should cover a significant amount of use cases
4
+
but if it doesn't, the consumer can easily implement their own.
5
5
6
6
## Usage
7
7
8
-
See [java-dynamic-sqs-listener-core-examples](../examples/core-examples) for examples of
9
-
using this core code to listen to a queue.
8
+
See the [core-example](../examples/core-example) for examples of using this core code to listen to a queue in a Java application. If it is a Kotlin application,
9
+
the [core-kotlin-example](../examples/core-kotlin-example) uses a [Core Kotlin DSL](../extensions/core-kotlin-dsl) for constructing a core message listener.
10
10
11
11
## More Information
12
12
13
-
For more information you can look at the root project [README.md](../README.md) which provides more information about
14
-
the architecture of the application. The [API](../api) is also a good location to find more
15
-
information about what each part of the framework is how they interact with each other.
13
+
For more information you can look at the root project [README.md](../README.md) which provides more information about the architecture
14
+
of the application. The [API](../api) is also a good location to find more information about what each part of the framework is how
this will prefetch messages from the queue so that new messages can be processed as soon as possible. This implementation is not appropriate if the
22
-
prefetched message's visibility timeout expires before it can be picked up for process due to the message processing of previous messages taking too long.
23
-
The result is that if the message has a re-drive policy it will be placed back into the queue and processed multiple times. This implementation is appropriate
21
+
this will prefetch messages from the queue so that new messages can be processed as soon as possible. This implementation is not appropriate if the time
22
+
to process messages is long enough for the prefetched message's visibility timeout to expire before it can be processed. In this scenario, the message's
23
+
re-drive policy may place the message back into the queue resulting in it being processed multiple times. This implementation is appropriate
24
24
for high volumes of messages that take little time to process.
This will batch requests for messages from the consumer into a single call out to the SQS queue once a certain threshold of messages were requested or at
27
-
a given period if this threshold is not reached. This reduces the number of calls out to the SQS queue but reduces the performance
28
-
as messages are not being requested while the batch is waiting to be built.
26
+
This will batch requests for messages from the consumer into a single call out to the SQS queue once reaching the threshold of messages, or the batching
27
+
timeout expires. This reduces the number of calls out to the SQS queue but can reduce the performance as no messages processing will occur while waiting
default implementation that calls out to a `ArgumentResolverService` to resolve the arguments and calls the method.
42
+
-[DecoratingMessageProcessor](../core/src/main/java/com/jashmore/sqs/processor/DecoratingMessageProcessor.java): implementation that allows for the
43
+
message processing to be decorated with [MessageProcessingDecorator](../api/src/main/java/com/jashmore/sqs/decorator/MessageProcessingDecorator.java) logic.
44
+
This can be useful for adding tracing, metrics or other extra functionality in the message processing.
42
45
43
46
### ArgumentResolverService
44
47
45
48
The [ArgumentResolverService](../api/src/main/java/com/jashmore/sqs/argument/ArgumentResolverService.java) is used to obtain the
46
49
[ArgumentResolver](../api/src/main/java/com/jashmore/sqs/argument/ArgumentResolver.java) that can be used to populate an argument
47
-
in a method when a message is being processed. For example, a parameter with the
50
+
in a method when processing a message. For example, a parameter with the
48
51
[@Payload](../core/src/main/java/com/jashmore/sqs/argument/payload/Payload.java) annotation will be resolved with the body
49
52
of the message cast to that type.
50
53
@@ -55,7 +58,7 @@ this implementation delegates to specific [ArgumentResolver](../api/src/main/jav
55
58
that have been passed in. See below for the core
56
59
[ArgumentResolver](../api/src/main/java/com/jashmore/sqs/argument/ArgumentResolver.java)s that are available.
57
60
-[CoreArgumentResolverService](../core/src/main/java/com/jashmore/sqs/argument/CoreArgumentResolverService.java): this is
58
-
a helper implementation that uses the DelegatingArgumentResolverService under the hood with all of the core
61
+
a helper implementation that uses the DelegatingArgumentResolverService under the hood with the core
@@ -64,56 +67,58 @@ The core arguments that be resolved include:
64
67
this argument. This is useful if you need to forward this message to other services or want to manually extract information from the service. This is
65
68
provided by the [MessageArgumentResolver](../core/src/main/java/com/jashmore/sqs/argument/message/MessageArgumentResolver.java).
66
69
-[@Payload](../core/src/main/java/com/jashmore/sqs/argument/payload/Payload.java): arguments annotated with this will parse the
67
-
message body into that object. If this is a String a direct transfer of the message contents is passed in, otherwise if it is a Java Bean, an attempt to
70
+
message body into that object. If this is a String, the raw message body will be provided, otherwise if it is a Java Bean, an attempt to
68
71
cast the message body to that bean will be used. This is provided by the
69
-
[PayloadArgumentResolver](../core/src/main/java/com/jashmore/sqs/argument/payload/PayloadArgumentResolver.java), which uses
72
+
[PayloadArgumentResolver](../core/src/main/java/com/jashmore/sqs/argument/payload/PayloadArgumentResolver.java) which uses
70
73
a [PayloadMapper](../core/src/main/java/com/jashmore/sqs/argument/payload/mapper/PayloadMapper.java), such as
71
-
the [JacksonPayloadMapper](../core/src/main/java/com/jashmore/sqs/argument/payload/mapper/JacksonPayloadMapper.java)
72
-
that uses a Jackson `ObjectMapper` to parse the message body.
74
+
the [JacksonPayloadMapper](../core/src/main/java/com/jashmore/sqs/argument/payload/mapper/JacksonPayloadMapper.java), to parse the message body.
73
75
-[@MessageId](../core/src/main/java/com/jashmore/sqs/argument/messageid/MessageId.java): string arguments annotated with this will
74
76
place the message ID of the message into this argument. This is provided by the
-[Acknowledge](../api/src/main/java/com/jashmore/sqs/processor/argument/Acknowledge.java): arguments of this type will be injected
77
-
with an implementation that allows for a message to be manually acknowledged when it is successfully processed. Note that if this is included in the messages
78
-
signature, the [MessageProcessor](../api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java) is not required to
79
-
acknowledge the message after a successful execution. These implementations should be provided by the
80
-
[MessageProcessor](../api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java) being used.
81
-
-[MessageAttribute](../core/src/main/java/com/jashmore/sqs/argument/attribute/MessageAttribute.java): arguments annotated with this
79
+
with an implementation that allows for a message to be manually acknowledged when it is successfully processed. Note that if this is included,
80
+
the [MessageProcessor](../api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java) is not required to
81
+
acknowledge the message after a successful execution and the consumer must acknowledge the message them self. The implementation of the
82
+
[Acknowledge](../api/src/main/java/com/jashmore/sqs/processor/argument/Acknowledge.java) should be provided by the
83
+
[MessageProcessor](../api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java) instead of
84
+
an [ArgumentResolver](../api/src/main/java/com/jashmore/sqs/argument/ArgumentResolver.java).
85
+
-[@MessageAttribute](../core/src/main/java/com/jashmore/sqs/argument/attribute/MessageAttribute.java): arguments annotated with this
82
86
will attempt to parse the contents of the message attribute into this field. For example, if the argument is a String then the attribute will be cast to a
83
87
string where as if the argument is an integer it will try and parse the string into the number. This also works with POJOs in that the resolver will
84
88
attempt to deserialised the message attribute into this POJO shape, e.g. via the Jackson Object Mapper. This is provided by the
with this will attempt to parse the contents of a system message attribute into this field. For example, the `SENT_TIMESTAMP` of the message can be obtained
-[VisibilityExtender](../api/src/main/java/com/jashmore/sqs/processor/argument/VisibilityExtender.java): arguments of this type
91
95
will be injected with an implementation that extends the message visibility of the current message. These implementations should be provided by the
92
-
[MessageProcessor](../api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java) being used.
96
+
[MessageProcessor](../api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java) instead of
97
+
an [ArgumentResolver](../api/src/main/java/com/jashmore/sqs/argument/ArgumentResolver.java).
93
98
94
99
### Message Broker
95
100
96
101
The [MessageBroker](../api/src/main/java/com/jashmore/sqs/broker/MessageBroker.java) is the main container that controls the
97
102
whole flow of messages from the [MessageRetriever](../api/src/main/java/com/jashmore/sqs/retriever) to the
98
103
[MessageProcessor](../api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java). It can provide logic like the rate
99
-
of concurrency of the messages being processed or when messages should be processed.
104
+
of concurrency of messages processing or when messages should be processed.
100
105
101
106
Core implementation include:
102
107
103
108
-[ConcurrentMessageBroker](../core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker.java): this
104
-
implementation will run on multiple threads each processing messages. It has dynamic configuration and this allows the rate of concurrency to change
105
-
dynamically while the application is running.
109
+
implementation will run on multiple threads each processing messages. It allows the configuration to be changed dynamically, such as changing the rate of
110
+
concurrency to change while the application is running.
106
111
107
112
### Message Resolver
108
113
109
114
The [MessageResolver](../api/src/main/java/com/jashmore/sqs/resolver/MessageResolver.java) is used when the message has been
110
-
successfully processed and it is needed to be removed from the SQS queue so it isn't processed again.
115
+
successfully processed, and it needs to be removed from the SQS queue.
111
116
112
117
Core implementation include:
113
118
114
119
-[BatchingMessageResolver](../core/src/main/java/com/jashmore/sqs/resolver/batching/BatchingMessageResolver.java): this
115
120
implementation will batch calls to delete messages from the SQS queue into a batch that will go out together once asynchronously. This is useful if you
116
-
are processing many messages at the same time and it is desirable to reduce the number of calls out to SQS. A disadvantage is that the message may
117
-
sit in the batch for enough time that the visibility expires and it is placed onto the queue. To mitigate this, smaller batch
118
-
timeout should be used or by increasing the visibility timeout. Note you can configure this to always delete a message as soon as it is finished by
121
+
are processing many messages at the same time, and it is desirable to reduce the number of calls out to SQS. A disadvantage is that the message may
122
+
sit in the batch for enough time for the visibility timeout to expire, and it is placed onto the queue again. To mitigate this, a smaller batch
123
+
timeout should be used or by increasing the visibility timeout. Note that you can configure this to always delete a message as soon as it is finished by
Once you have built your own retriever you can see how this can be integrated into the framework by looking in the [examples](../../../examples) directory
15
-
and more specifically in the [core-examples module](../../../examples/core-examples).
15
+
and more specifically in the [core-example](../../../examples/core-example) module.
16
16
17
17
### Integrating the new message retriever into the spring app
0 commit comments