-
Notifications
You must be signed in to change notification settings - Fork 6
feat(inkless:consume): add rate-limited fetch for lagging consumers #467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: jeqo/only-fetch-on-work-thread
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds rate-limiting capabilities for lagging consumers (consumers reading old data) to prevent resource exhaustion when multiple consumers fall behind. The implementation separates recent and lagging consumer data paths, with lagging consumers subjected to request-based rate limiting using the Bucket4j library.
Key changes:
- Adds a new lagging consumer thread pool and rate limiter to manage fetch requests for old data
- Introduces age-based path separation in FetchPlanner using a configurable threshold (defaulting to 2x cache expiration lifespan)
- Implements request-based rate limiting via Bucket4j with configurable limits (default 200 requests/second)
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| FetchPlanner.java | Implements age-based path separation and rate limiting logic; lagging path bypasses cache |
| Reader.java | Adds lagging consumer executor, threshold, and rate limiter initialization; updated constructors |
| InklessFetchMetrics.java | Adds metrics for tracking recent/lagging requests and rate limit wait times |
| InklessConfig.java | Defines configuration for lagging consumer thread pool size and rate limit |
| FetchHandler.java | Passes new configuration parameters to Reader constructor |
| FetchException.java | Adds constructor accepting message and cause for rate limit interruptions |
| FetchPlannerTest.java | Adds tests for lagging consumer path, recent path, and rate limiting behavior |
| ReaderTest.java | Refactors to use helper method for Reader construction with new parameters |
| build.gradle | Adds Bucket4j dependency |
| gradle/dependencies.gradle | Defines Bucket4j version 8.14.0 |
Comments suppressed due to low confidence (1)
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java:101
- The test reuses the same dataExecutor for both recent and lagging consumer paths, but doesn't verify that the lagging consumer executor is properly shut down. The testClose should verify that all three executors (metadataExecutor, dataExecutor, and laggingConsumerExecutor) are shut down, but since the lagging consumer executor is the same instance as dataExecutor in tests, this doesn't catch potential issues where the lagging consumer executor might not be shut down in production.
@Test
public void testClose() throws Exception {
final var reader = getReader();
reader.close();
verify(metadataExecutor, atLeastOnce()).shutdown();
verify(dataExecutor, atLeastOnce()).shutdown();
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/InklessFetchMetrics.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
6f06aff to
d4c4bc4
Compare
282a419 to
fc5bae4
Compare
31b9aa9 to
f6be2a5
Compare
d768ef9 to
4155e45
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (1)
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java:255
- The javadoc comment states "This method is called by the cache (on dataExecutor) when a cache miss occurs" but this is no longer accurate after the lagging consumer changes. The method is now called from two different paths: (1) by the cache on recentDataExecutor for recent data (line 203), and (2) directly from the lagging consumer executor without caching for lagging data (line 217). The documentation should be updated to reflect both calling contexts.
/**
* Fetches a file extent from remote storage.
* This method is called by the cache (on dataExecutor) when a cache miss occurs.
*
* @param request the fetch request with object key and byte range
* @return the fetched file extent
* @throws FetchException if remote fetch fails
*/
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
4155e45 to
d55d019
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
d55d019 to
9e79e59
Compare
chore(inkless:build): add bucket4j dependency
9e79e59 to
535f9cb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
No description provided.