-
Notifications
You must be signed in to change notification settings - Fork 6
refactor(inkless:consume): make fetch completion non-blocking #464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
dc8508c to
de2254b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the fetch completion pipeline to use non-blocking CompletableFuture composition instead of blocking Future.get() calls, preventing thread exhaustion when file fetches are slow.
Key Changes:
- Replaced
Future<FileExtent>withCompletableFuture<FileExtent>throughout the fetch pipeline - Introduced
Reader.allOfFileExtents()to wait for all file fetch futures non-blockingly - Changed fetch completion to receive pre-resolved file extents instead of blocking on futures
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java | Added allOfFileExtents() method to compose futures non-blockingly; changed thenCombineAsync to thenCombine for FetchCompleter |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java | Changed return type from List<Future> to List<CompletableFuture>; replaced executor.submit() with CompletableFuture.supplyAsync() |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java | Changed to accept List<FileExtent> instead of List<Future>; removed blocking Future.get() calls and ExecutionException unwrapping |
| storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java | Added test to verify order preservation when futures complete out of order |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java | Updated tests to work with CompletableFuture; replaced mock executor with real executor; simplified assertions |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java | Updated tests to pass resolved FileExtent instances instead of Future wrappers |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java
Outdated
Show resolved
Hide resolved
6be17b6 to
a5e838e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java
Outdated
Show resolved
Hide resolved
a5e838e to
be6b6f3
Compare
Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow. Previously, FetchCompleter.get() was invoked via thenCombineAsync on the common ForkJoinPool, where it blocked waiting for file data futures. Now, the completion stage chain uses: - thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done - thenCombine() (not async) to invoke FetchCompleter with already-resolved List<FileExtent> The FetchCompleter task still runs on the ForkJoinPool common pool (via thenCombine's default executor), but it no longer blocks - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords). Changes: - FetchPlanner returns List<CompletableFuture<FileExtent>> instead of List<Future<FileExtent>> - FetchCompleter accepts resolved List<FileExtent> instead of futures - Reader.allOfFileExtents() waits for all futures non-blockingly - Add test verifying ordering is preserved when futures complete out of order
be6b6f3 to
bf9600d
Compare
Explains how the proposed approach does not block.
Improves testing approach by separating validations on different tests, and removing the time conditions.
Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow.
Previously, FetchCompleter.get() was invoked via thenCombineAsync on the common ForkJoinPool, where it blocked waiting for file data futures.
Now, the completion stage chain uses:
The FetchCompleter task still runs on the ForkJoinPool common pool (via thenCombine's default executor), but it no longer blocks - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords).
Changes: