Skip to content

Commit 57b125b

Browse files
refs #231: Converts properties to use Durations and not Numbers (#235)
This converts a lot of properties from property values like timeoutInSeconds as timeout using the Java duration type. This makes the code easier to read and reduces errors in submitting the wrong values.
1 parent 851313c commit 57b125b

File tree

40 files changed

+382
-423
lines changed

40 files changed

+382
-423
lines changed

core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker.java

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.jashmore.sqs.broker.concurrent;
22

3-
import static com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerConstants.DEFAULT_BACKOFF_TIME_IN_MS;
3+
import static com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerConstants.DEFAULT_BACKOFF_TIME;
4+
import static com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerConstants.DEFAULT_CONCURRENCY_POLLING;
5+
import static com.jashmore.sqs.util.properties.PropertyUtils.safelyGetPositiveOrZeroDuration;
46
import static java.util.concurrent.TimeUnit.MILLISECONDS;
57

68
import com.jashmore.sqs.broker.MessageBroker;
@@ -9,6 +11,7 @@
911
import lombok.extern.slf4j.Slf4j;
1012
import software.amazon.awssdk.services.sqs.model.Message;
1113

14+
import java.time.Duration;
1215
import java.util.concurrent.CancellationException;
1316
import java.util.concurrent.CompletableFuture;
1417
import java.util.concurrent.ExecutorService;
@@ -53,8 +56,8 @@ public void processMessages(final ExecutorService messageProcessingExecutorServi
5356
try {
5457
updateConcurrencyLevelIfChanged(concurrentMessagesBeingProcessedSemaphore);
5558

56-
final long numberOfMillisecondsToObtainPermit = getNumberOfMillisecondsToObtainPermit();
57-
final boolean obtainedPermit = concurrentMessagesBeingProcessedSemaphore.tryAcquire(numberOfMillisecondsToObtainPermit, MILLISECONDS);
59+
final Duration permitWaitTime = getPermitWaitTime();
60+
final boolean obtainedPermit = concurrentMessagesBeingProcessedSemaphore.tryAcquire(permitWaitTime.toMillis(), MILLISECONDS);
5861
if (!obtainedPermit) {
5962
continue;
6063
}
@@ -74,10 +77,11 @@ public void processMessages(final ExecutorService messageProcessingExecutorServi
7477
throw runtimeException;
7578
}
7679
} catch (final RuntimeException runtimeException) {
77-
final long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds();
78-
log.error("Error thrown while organising threads to process messages. Backing off for {}ms", errorBackoffTimeInMilliseconds,
80+
final Duration errorBackoffTime = safelyGetPositiveOrZeroDuration("errorBackoffTime", properties::getErrorBackoffTime, DEFAULT_BACKOFF_TIME);
81+
final long errorBackoffTimeInMs = errorBackoffTime.toMillis();
82+
log.error("Error thrown while organising threads to process messages. Backing off for {}ms", errorBackoffTimeInMs,
7983
runtimeException);
80-
Thread.sleep(errorBackoffTimeInMilliseconds);
84+
Thread.sleep(errorBackoffTimeInMs);
8185
}
8286
}
8387
log.debug("Ending processing of messages");
@@ -86,15 +90,16 @@ public void processMessages(final ExecutorService messageProcessingExecutorServi
8690
/**
8791
* Safely get the number of milliseconds that should wait to get a permit for creating a new thread.
8892
*
89-
* @return the number of milliseconds to wait
90-
* @see ConcurrentMessageBrokerProperties#getConcurrencyPollingRateInMilliseconds() for more information
93+
* @return the duration to wait for an available permit
94+
* @see ConcurrentMessageBrokerProperties#getConcurrencyPollingRate() for more information
9195
*/
92-
private long getNumberOfMillisecondsToObtainPermit() {
93-
return PropertyUtils.safelyGetPositiveLongValue(
94-
"numberOfMillisecondsToObtainPermit",
95-
properties::getConcurrencyPollingRateInMilliseconds,
96-
DEFAULT_BACKOFF_TIME_IN_MS
97-
);
96+
private Duration getPermitWaitTime() {
97+
final Duration pollingRate = properties.getConcurrencyPollingRate();
98+
if (pollingRate != null && !pollingRate.isNegative()) {
99+
return pollingRate;
100+
}
101+
102+
return DEFAULT_CONCURRENCY_POLLING;
98103
}
99104

100105
/**
@@ -123,18 +128,4 @@ private int getConcurrencyLevel() {
123128
0
124129
);
125130
}
126-
127-
/**
128-
* Get the number of seconds that the thread should wait when there was an error trying to organise a thread to process.
129-
*
130-
* @return the backoff time in milliseconds
131-
* @see ConcurrentMessageBrokerProperties#getErrorBackoffTimeInMilliseconds() for more information
132-
*/
133-
private long getErrorBackoffTimeInMilliseconds() {
134-
return PropertyUtils.safelyGetPositiveOrZeroLongValue(
135-
"errorBackoffTimeInMilliseconds",
136-
properties::getErrorBackoffTimeInMilliseconds,
137-
DEFAULT_BACKOFF_TIME_IN_MS
138-
);
139-
}
140131
}

core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBrokerConstants.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22

33
import lombok.experimental.UtilityClass;
44

5+
import java.time.Duration;
6+
57
@UtilityClass
68
class ConcurrentMessageBrokerConstants {
79
/**
810
* The default amount of time to sleep the thread when there was an error organising the message processing threads.
911
*/
10-
static final int DEFAULT_BACKOFF_TIME_IN_MS = 10_000;
12+
static final Duration DEFAULT_BACKOFF_TIME = Duration.ofSeconds(10);
1113

1214
/**
1315
* The default amount of time the thread should wait for a thread to process a message before it tries again and checks the available concurrency.
1416
*/
15-
static final long DEFAULT_CONCURRENCY_POLLING_IN_MS = 60_000L;
17+
static final Duration DEFAULT_CONCURRENCY_POLLING = Duration.ofMinutes(1);
1618
}

core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBrokerProperties.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
import com.jashmore.documentation.annotations.NotThreadSafe;
44
import com.jashmore.documentation.annotations.Nullable;
5+
import com.jashmore.documentation.annotations.Positive;
56
import com.jashmore.documentation.annotations.PositiveOrZero;
67

8+
import java.time.Duration;
9+
710
/**
811
* Properties for dynamically configuring how the {@link ConcurrentMessageBroker} is able to process messages concurrently.
912
*
@@ -25,7 +28,7 @@ public interface ConcurrentMessageBrokerProperties {
2528
* <p>The {@link ConcurrentMessageBroker} will maintain the rate of concurrency by checking that the current concurrency rate is aligned with this value.
2629
* If there is currently less messages being processed than this value, more requests for messages will be made to meet this value. If there are
2730
* more messages being processed or requested than this value, this coordinating thread will block until enough messages have been processed. If a
28-
* permit has not be obtained before the timeout defined by {@link #getConcurrencyPollingRateInMilliseconds()}, it will recalculate this
31+
* permit has not be obtained before the timeout defined by {@link #getConcurrencyPollingRate()}, it will recalculate this
2932
* concurrency rate again and wait for a permit.
3033
*
3134
* <p>Note that the concurrency rate does not get applied instantly and there are multiple attributing factors for when the rate of concurrency will
@@ -40,7 +43,7 @@ public interface ConcurrentMessageBrokerProperties {
4043
* </li>
4144
* <li>
4245
* The delay in the concurrency rate change may be due to transitioning from allowing zero threads to a number of threads. In a worst
43-
* case scenario there would be a delay of {@link #getConcurrencyPollingRateInMilliseconds()} before the rate of concurrency is changed.
46+
* case scenario there would be a delay of {@link #getConcurrencyPollingRate()} before the rate of concurrency is changed.
4447
* </li>
4548
* </ul>
4649
*
@@ -50,7 +53,7 @@ public interface ConcurrentMessageBrokerProperties {
5053
int getConcurrencyLevel();
5154

5255
/**
53-
* The number of milliseconds that the coordinating thread will sleep when the maximum rate of concurrency has been reached before checking the
56+
* The duration that the coordinating thread will sleep when the maximum rate of concurrency has been reached before checking the
5457
* concurrency rate again.
5558
*
5659
* <p>The reason that this property is needed is because during the processing of messages, which could be a significantly long period, the rate
@@ -62,25 +65,27 @@ public interface ConcurrentMessageBrokerProperties {
6265
* that the coordinating thread is awoken and is therefore less CPU intensive. However, decreasing this polling period makes it more responsive to changes
6366
* to the rate of concurrency.
6467
*
65-
* <p>If this value is null or less than zero, {@link ConcurrentMessageBrokerConstants#DEFAULT_CONCURRENCY_POLLING_IN_MS} will be used instead.
68+
* <p>If this duration is null or negative, {@link ConcurrentMessageBrokerConstants#DEFAULT_CONCURRENCY_POLLING} will be used instead. It is not
69+
* recommended to have a low value as that will result in this background thread constantly trying to determine fi the concurrency rate can be
70+
* changed.
6671
*
67-
* @return the number of milliseconds between polls for the concurrency level
72+
* @return the amount of time between polls for the concurrency level
6873
*/
6974
@Nullable
70-
@PositiveOrZero
71-
Long getConcurrencyPollingRateInMilliseconds();
75+
@Positive
76+
Duration getConcurrencyPollingRate();
7277

7378
/**
74-
* The number of milliseconds that the coordinating thread should backoff if there was an error trying to request a message.
79+
* The duration that the coordinating thread should backoff if there was an error trying to request a message.
7580
*
7681
* <p>This is needed to stop the background thread from trying again and again over and over causing a flood of error log messages that may make it
7782
* difficult to debug.
7883
*
79-
* <p>If this value is null or negative, {@link ConcurrentMessageBrokerConstants#DEFAULT_BACKOFF_TIME_IN_MS} will be used as the backoff period.
84+
* <p>If this value is null or negative, {@link ConcurrentMessageBrokerConstants#DEFAULT_BACKOFF_TIME} will be used as the backoff period.
8085
*
81-
* @return the number of milliseconds to sleep the thread after an error is thrown
86+
* @return the amount of time to sleep the thread after an error is thrown
8287
*/
8388
@Nullable
8489
@PositiveOrZero
85-
Long getErrorBackoffTimeInMilliseconds();
90+
Duration getErrorBackoffTime();
8691
}

core/src/main/java/com/jashmore/sqs/broker/concurrent/StaticConcurrentMessageBrokerProperties.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import lombok.EqualsAndHashCode;
99
import lombok.ToString;
1010

11+
import java.time.Duration;
12+
1113
/**
1214
* Implementation that stores the value as non-mutable field values and therefore will return the same value on every call.
1315
*
@@ -21,20 +23,18 @@
2123
@ThreadSafe
2224
public final class StaticConcurrentMessageBrokerProperties implements ConcurrentMessageBrokerProperties {
2325
private final Integer concurrencyLevel;
24-
private final Long preferredConcurrencyPollingRateInMilliseconds;
25-
private final Long errorBackoffTimeInMilliseconds;
26+
private final Duration preferredConcurrencyPollingRate;
27+
private final Duration errorBackoffTime;
2628

2729
public StaticConcurrentMessageBrokerProperties(final Integer concurrencyLevel,
28-
final Long preferredConcurrencyPollingRateInMilliseconds,
29-
final Long errorBackoffTimeInMilliseconds) {
30+
final Duration preferredConcurrencyPollingRate,
31+
final Duration errorBackoffTime) {
3032
Preconditions.checkNotNull(concurrencyLevel, "concurrencyLevel should not be null");
3133
Preconditions.checkPositiveOrZero(concurrencyLevel, "concurrencyLevel should be greater than or equal to zero");
32-
Preconditions.checkArgument(preferredConcurrencyPollingRateInMilliseconds == null || preferredConcurrencyPollingRateInMilliseconds >= 0,
33-
"preferredConcurrencyPollingRateInMilliseconds should null or greater than or equal to zero");
3434

3535
this.concurrencyLevel = concurrencyLevel;
36-
this.preferredConcurrencyPollingRateInMilliseconds = preferredConcurrencyPollingRateInMilliseconds;
37-
this.errorBackoffTimeInMilliseconds = errorBackoffTimeInMilliseconds;
36+
this.preferredConcurrencyPollingRate = preferredConcurrencyPollingRate;
37+
this.errorBackoffTime = errorBackoffTime;
3838
}
3939

4040
@PositiveOrZero
@@ -43,16 +43,17 @@ public int getConcurrencyLevel() {
4343
return concurrencyLevel;
4444
}
4545

46+
@Nullable
4647
@PositiveOrZero
4748
@Override
48-
public Long getConcurrencyPollingRateInMilliseconds() {
49-
return preferredConcurrencyPollingRateInMilliseconds;
49+
public Duration getConcurrencyPollingRate() {
50+
return preferredConcurrencyPollingRate;
5051
}
5152

5253
@Nullable
5354
@PositiveOrZero
5455
@Override
55-
public Long getErrorBackoffTimeInMilliseconds() {
56-
return errorBackoffTimeInMilliseconds;
56+
public Duration getErrorBackoffTime() {
57+
return errorBackoffTime;
5758
}
5859
}

0 commit comments

Comments
 (0)