Skip to content

Conversation

@jeqo
Copy link
Contributor

@jeqo jeqo commented Dec 11, 2025

Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow.

Previously, FetchCompleter.get() was invoked via thenCombineAsync on the common ForkJoinPool, where it blocked waiting for file data futures.

Now, the completion stage chain uses:

  • thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done
  • thenCombine() (not async) to invoke FetchCompleter with already-resolved List

The FetchCompleter task still runs on the ForkJoinPool common pool (via thenCombine's default executor), but it no longer blocks - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords).

Changes:

  • FetchPlanner returns List<CompletableFuture> instead of List<Future>
  • FetchCompleter accepts resolved List instead of futures
  • Reader.allOfFileExtents() waits for all futures non-blockingly
  • Add test verifying ordering is preserved when futures complete out of order

@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch from dc8508c to de2254b Compare December 11, 2025 16:27
@jeqo jeqo requested a review from Copilot December 11, 2025 22:27
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the fetch completion pipeline to use non-blocking CompletableFuture composition instead of blocking Future.get() calls, preventing thread exhaustion when file fetches are slow.

Key Changes:

  • Replaced Future<FileExtent> with CompletableFuture<FileExtent> throughout the fetch pipeline
  • Introduced Reader.allOfFileExtents() to wait for all file fetch futures non-blockingly
  • Changed fetch completion to receive pre-resolved file extents instead of blocking on futures

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java Added allOfFileExtents() method to compose futures non-blockingly; changed thenCombineAsync to thenCombine for FetchCompleter
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java Changed return type from List<Future> to List<CompletableFuture>; replaced executor.submit() with CompletableFuture.supplyAsync()
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java Changed to accept List<FileExtent> instead of List<Future>; removed blocking Future.get() calls and ExecutionException unwrapping
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java Added test to verify order preservation when futures complete out of order
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java Updated tests to work with CompletableFuture; replaced mock executor with real executor; simplified assertions
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java Updated tests to pass resolved FileExtent instances instead of Future wrappers

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch 2 times, most recently from 6be17b6 to a5e838e Compare December 12, 2025 09:47
@jeqo jeqo requested a review from Copilot December 12, 2025 09:47
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch from a5e838e to be6b6f3 Compare December 12, 2025 09:58
@jeqo jeqo marked this pull request as ready for review December 15, 2025 20:19
@jeqo jeqo requested a review from AnatolyPopov December 15, 2025 20:19
Replace blocking Future.get() calls in FetchCompleter with non-blocking
CompletableFuture composition. This prevents thread exhaustion when file
fetches are slow.

Previously, FetchCompleter.get() was invoked via thenCombineAsync on the
common ForkJoinPool, where it blocked waiting for file data futures.

Now, the completion stage chain uses:
- thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture
  that completes only when ALL file fetches are done
- thenCombine() (not async) to invoke FetchCompleter with already-resolved
  List<FileExtent>

The FetchCompleter task still runs on the ForkJoinPool common pool (via
thenCombine's default executor), but it no longer blocks - it receives
pre-fetched data and only performs in-memory data assembly (grouping file
extents, copying byte ranges, and constructing MemoryRecords).

Changes:
- FetchPlanner returns List<CompletableFuture<FileExtent>> instead of List<Future<FileExtent>>
- FetchCompleter accepts resolved List<FileExtent> instead of futures
- Reader.allOfFileExtents() waits for all futures non-blockingly
- Add test verifying ordering is preserved when futures complete out of order
@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch from be6b6f3 to bf9600d Compare December 23, 2025 09:24
jeqo added 2 commits December 23, 2025 17:11
Explains how the proposed approach does not block.
Improves testing approach by separating validations on different tests,
and removing the time conditions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants