From bf9600df9d6efaee41e9a6155be6262092558149 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 11 Dec 2025 17:25:51 +0200 Subject: [PATCH 1/3] refactor(inkless:consume): make fetch completion non-blocking Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow. Previously, FetchCompleter.get() was invoked via thenCombineAsync on the common ForkJoinPool, where it blocked waiting for file data futures. Now, the completion stage chain uses: - thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done - thenCombine() (not async) to invoke FetchCompleter with already-resolved List The FetchCompleter task still runs on the ForkJoinPool common pool (via thenCombine's default executor), but it no longer blocks - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords). Changes: - FetchPlanner returns List> instead of List> - FetchCompleter accepts resolved List instead of futures - Reader.allOfFileExtents() waits for all futures non-blockingly - Add test verifying ordering is preserved when futures complete out of order --- .../aiven/inkless/consume/FetchCompleter.java | 17 +--- .../aiven/inkless/consume/FetchPlanner.java | 18 ++-- .../java/io/aiven/inkless/consume/Reader.java | 72 ++++++++++++--- .../inkless/consume/FetchCompleterTest.java | 22 ++--- .../inkless/consume/FetchPlannerTest.java | 37 ++++---- .../io/aiven/inkless/consume/ReaderTest.java | 91 +++++++++++++++++++ 6 files changed, 190 insertions(+), 67 deletions(-) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java index 5eb88ca200..ebd52d1c93 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java @@ -35,8 +35,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -54,14 +52,14 @@ public class FetchCompleter implements Supplier fetchInfos; private final Map coordinates; - private final List> backingData; + private final List backingData; private final Consumer durationCallback; public FetchCompleter(Time time, ObjectKeyCreator objectKeyCreator, Map fetchInfos, Map coordinates, - List> backingData, + List backingData, Consumer durationCallback) { this.time = time; this.objectKeyCreator = objectKeyCreator; @@ -74,21 +72,16 @@ public FetchCompleter(Time time, @Override public Map get() { try { - final Map> files = waitForFileData(); + final Map> files = groupFileData(); return TimeUtils.measureDurationMs(time, () -> serveFetch(coordinates, files), durationCallback); } catch (Exception e) { - // unwrap ExecutionException if the errors comes from dependent futures - if (e instanceof ExecutionException) { - throw new FetchException(e.getCause()); - } throw new FetchException(e); } } - private Map> waitForFileData() throws InterruptedException, ExecutionException { + private Map> groupFileData() { Map> files = new HashMap<>(); - for (Future fileFuture : backingData) { - FileExtent fileExtent = fileFuture.get(); + for (FileExtent fileExtent : backingData) { files.compute(fileExtent.object(), (k, v) -> { if (v == null) { List out = new ArrayList<>(1); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java index 6a36c2b154..b01727c791 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java @@ -24,9 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -40,7 +39,7 @@ import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; -public class FetchPlanner implements Supplier>> { +public class FetchPlanner implements Supplier>> { private final Time time; private final ObjectKeyCreator objectKeyCreator; @@ -71,12 +70,13 @@ public FetchPlanner( this.metrics = metrics; } - private List> doWork(final Map batchCoordinates) { - final List> jobs = planJobs(batchCoordinates); + private List> doWork(final Map batchCoordinates) { + final List jobs = planJobs(batchCoordinates); return submitAll(jobs); } - private List> planJobs(final Map batchCoordinates) { + // package-private for testing + List planJobs(final Map batchCoordinates) { final Set>> objectKeysToRanges = batchCoordinates.values().stream() .filter(findBatch -> findBatch.errors() == Errors.NONE) .map(FindBatchResponse::batches) @@ -107,14 +107,14 @@ private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange b } - private List> submitAll(List> jobs) { + private List> submitAll(List jobs) { return jobs.stream() - .map(dataExecutor::submit) + .map(job -> CompletableFuture.supplyAsync(job::call, dataExecutor)) .collect(Collectors.toList()); } @Override - public List> get() { + public List> get() { return TimeUtils.measureDurationMsSupplier(time, () -> doWork(batchCoordinates), metrics::fetchPlanFinished); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 1b04618951..c515f7e3f4 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -45,6 +46,7 @@ import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.metrics.ThreadPoolMonitor; import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; public class Reader implements AutoCloseable { @@ -122,12 +124,28 @@ public Reader( } } + /** + * Asynchronously fetches data for multiple partitions using a staged pipeline: + * + *

1. Finds batches (metadataExecutor); + *

2. Plans and submits fetches (calling thread); + *

3. Fetches data (dataExecutor); + *

4. Assembles final response (calling thread) + * + *

Each stage uses a thread pool or the calling thread for efficiency. + * + * @param params fetch parameters + * @param fetchInfos partitions and offsets to fetch + * @return CompletableFuture with fetch results per partition + */ public CompletableFuture> fetch( final FetchParams params, final Map fetchInfos ) { final Instant startAt = TimeUtils.durationMeasurementNow(time); fetchMetrics.fetchStarted(fetchInfos.size()); + + // Find Batches (metadataExecutor): Query control plane to find which storage objects contain the requested data final var batchCoordinates = CompletableFuture.supplyAsync( new FindBatchesJob( time, @@ -139,20 +157,26 @@ public CompletableFuture> fetch( ), metadataExecutor ); - return batchCoordinates.thenApply( - coordinates -> - new FetchPlanner( - time, - objectKeyCreator, - keyAlignmentStrategy, - cache, - objectFetcher, - dataExecutor, - coordinates, - fetchMetrics - ).get() - ) - .thenCombineAsync(batchCoordinates, (fileExtents, coordinates) -> + + // Plan & Submit (calling thread): Create fetch plan and submit to cache (non-blocking) + return batchCoordinates.thenApply(coordinates -> { + // FetchPlanner creates a plan and submits requests to cache + // Returns List> immediately + return new FetchPlanner( + time, + objectKeyCreator, + keyAlignmentStrategy, + cache, + objectFetcher, + dataExecutor, + coordinates, + fetchMetrics + ).get(); + }) + // Fetch Data (dataExecutor): Flatten list of futures into single future with all results + .thenCompose(Reader::allOfFileExtents) + // Complete Fetch (calling thread): Combine fetched data with batch coordinates to build final response + .thenCombine(batchCoordinates, (fileExtents, coordinates) -> new FetchCompleter( time, objectKeyCreator, @@ -203,6 +227,26 @@ public CompletableFuture> fetch( }); } + /** + * Waits for all file extent futures to complete and collects results without blocking threads. + *

+ * This method preserves the original order of the input list, regardless of the order + * in which the futures complete. + * + * @param fileExtentFutures the list of futures to wait for + * @return a future that completes with a list of file extents in the same order as the input + */ + static CompletableFuture> allOfFileExtents( + List> fileExtentFutures + ) { + final CompletableFuture[] futuresArray = fileExtentFutures.toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(futuresArray) + .thenApply(v -> + fileExtentFutures.stream() + .map(CompletableFuture::join) + .toList()); + } + @Override public void close() throws IOException { ThreadUtils.shutdownExecutorServiceQuietly(metadataExecutor, EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java index 3306ccc700..5ba9ed1b1f 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java @@ -42,10 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.common.ByteRange; @@ -175,9 +171,9 @@ public void testFetchSingleFile() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -211,10 +207,10 @@ public void testFetchMultipleFiles() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()), FileFetchJob.createFileExtent(OBJECT_KEY_B, new ByteRange(0, records.sizeInBytes()), records.buffer()) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -262,14 +258,13 @@ public void testFetchMultipleFilesForSameBatch() { copy.put(records.buffer().duplicate().position(startOffset).limit(endOffset).slice()); fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy)); } - List> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList()); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, fetchInfos, coordinates, - files, + fileExtents, durationMs -> {} ); Map result = job.get(); @@ -310,9 +305,9 @@ public void testFetchMultipleBatches() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, totalSize), concatenatedBuffer) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -373,14 +368,13 @@ public void testFetchMultipleFilesForMultipleBatches() { copy.put(concatenatedBuffer.duplicate().position(startOffset).limit(endOffset).slice()); fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy)); } - List> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList()); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, fetchInfos, coordinates, - files, + fileExtents, durationMs -> {} ); Map result = job.get(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java index d834c9036f..5cbeb6d897 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java @@ -23,20 +23,20 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; @@ -51,9 +51,7 @@ import io.aiven.inkless.control_plane.FindBatchResponse; import io.aiven.inkless.storage_backend.common.ObjectFetcher; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; +import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -68,10 +66,10 @@ public class FetchPlannerTest { @Mock ObjectFetcher fetcher; @Mock - ExecutorService dataExecutor; - @Mock InklessFetchMetrics metrics; + ExecutorService dataExecutor = Executors.newSingleThreadExecutor(); + ObjectCache cache = new NullCache(); KeyAlignmentStrategy keyAlignmentStrategy = new FixedBlockAlignment(Integer.MAX_VALUE); ByteRange requestRange = new ByteRange(0, Integer.MAX_VALUE); @@ -80,14 +78,19 @@ public class FetchPlannerTest { TopicIdPartition partition0 = new TopicIdPartition(topicId, 0, "diskless-topic"); TopicIdPartition partition1 = new TopicIdPartition(topicId, 1, "diskless-topic"); + @AfterEach + void tearDown() { + dataExecutor.shutdownNow(); + } + @Test public void planEmptyRequest() { - Map coordinates = new HashMap<>(); - FetchPlanner job = fetchPlannerJob(coordinates); + Map coordinates = Map.of(); + FetchPlanner planner = fetchPlannerJob(coordinates); - job.get(); + List result = planner.planJobs(coordinates); - verifyNoInteractions(dataExecutor); + assertThat(result).isEmpty(); } @Test @@ -214,14 +217,12 @@ private CacheFetchJob cacheFetchJob( ObjectKey objectKey, ByteRange byte ); } - private void assertBatchPlan(Map coordinates, Set jobs) { - ArgumentCaptor submittedCallables = ArgumentCaptor.captor(); - when(dataExecutor.submit(submittedCallables.capture())).thenReturn(null); - - FetchPlanner job = fetchPlannerJob(coordinates); + private void assertBatchPlan(Map coordinates, Set expectedJobs) { + FetchPlanner planner = fetchPlannerJob(coordinates); - job.get(); + // Use the package-private planJobs method to verify the exact jobs planned + List actualJobs = planner.planJobs(coordinates); - assertEquals(jobs, new HashSet<>(submittedCallables.getAllValues())); + assertThat(new HashSet<>(actualJobs)).isEqualTo(expectedJobs); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index 3f6345ea3c..2751325f3a 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -33,19 +33,26 @@ import org.mockito.quality.Strictness; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.NullCache; import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.common.ByteRange; import io.aiven.inkless.common.ObjectKey; import io.aiven.inkless.common.ObjectKeyCreator; +import io.aiven.inkless.common.PlainObjectKey; import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.assertj.core.api.Assertions.assertThat; @@ -92,4 +99,88 @@ public void testClose() throws Exception { verify(metadataExecutor, atLeastOnce()).shutdown(); verify(dataExecutor, atLeastOnce()).shutdown(); } + + /** + * This is to ensure fetch results preserve the request order and are matched correctly with their requests. + */ + @Test + public void testAllOfFileExtentsPreservesOrderWhenFuturesCompleteOutOfOrder() throws Exception { + // Create file extents with distinct identifiers + final ObjectKey objectKeyA = PlainObjectKey.create("prefix", "object-a"); + final ObjectKey objectKeyB = PlainObjectKey.create("prefix", "object-b"); + final ObjectKey objectKeyC = PlainObjectKey.create("prefix", "object-c"); + + final FileExtent extentA = FileFetchJob.createFileExtent(objectKeyA, new ByteRange(0, 10), ByteBuffer.allocate(10)); + final FileExtent extentB = FileFetchJob.createFileExtent(objectKeyB, new ByteRange(0, 10), ByteBuffer.allocate(10)); + final FileExtent extentC = FileFetchJob.createFileExtent(objectKeyC, new ByteRange(0, 10), ByteBuffer.allocate(10)); + + // Latches to control when each future can complete + final CountDownLatch futureACanComplete = new CountDownLatch(1); + final CountDownLatch futureBCanComplete = new CountDownLatch(1); + final CountDownLatch futureCCanComplete = new CountDownLatch(1); + + // Latches to signal when each future has actually completed + final CountDownLatch futureACompleted = new CountDownLatch(1); + final CountDownLatch futureBCompleted = new CountDownLatch(1); + final CountDownLatch futureCCompleted = new CountDownLatch(1); + + // Create futures that will complete in reverse order (C, B, A) + final CompletableFuture futureA = CompletableFuture.supplyAsync(() -> { + try { + futureACanComplete.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + futureACompleted.countDown(); + return extentA; + }); + + final CompletableFuture futureB = CompletableFuture.supplyAsync(() -> { + try { + futureBCanComplete.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + futureBCompleted.countDown(); + return extentB; + }); + + final CompletableFuture futureC = CompletableFuture.supplyAsync(() -> { + try { + futureCCanComplete.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + futureCCompleted.countDown(); + return extentC; + }); + + // Create the ordered list: A, B, C + final List> orderedFutures = List.of(futureA, futureB, futureC); + + // Call the package-private method directly + final CompletableFuture> resultFuture = Reader.allOfFileExtents(orderedFutures); + + // Complete futures in reverse order: C, then B, then A + // Allow C to complete and wait for confirmation + futureCCanComplete.countDown(); + assertThat(futureCCompleted.await(5, TimeUnit.SECONDS)).isTrue(); + + // Allow B to complete and wait for confirmation + futureBCanComplete.countDown(); + assertThat(futureBCompleted.await(5, TimeUnit.SECONDS)).isTrue(); + + // Allow A to complete and wait for confirmation + futureACanComplete.countDown(); + assertThat(futureACompleted.await(5, TimeUnit.SECONDS)).isTrue(); + + // Get the result - should maintain original order despite completion order + final List result = resultFuture.get(5, TimeUnit.SECONDS); + + // Verify order is preserved: A, B, C (not C, B, A which was completion order) + assertThat(result).hasSize(3); + assertThat(result.get(0).object()).isEqualTo(objectKeyA.value()); + assertThat(result.get(1).object()).isEqualTo(objectKeyB.value()); + assertThat(result.get(2).object()).isEqualTo(objectKeyC.value()); + } } From 1b5994817a221e268c9268c20f615e5687db7442 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Dec 2025 17:11:55 +0200 Subject: [PATCH 2/3] fixup! refactor(inkless:consume): make fetch completion non-blocking Explains how the proposed approach does not block. --- .../src/main/java/io/aiven/inkless/consume/Reader.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index c515f7e3f4..97c458a49f 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -228,10 +228,7 @@ public CompletableFuture> fetch( } /** - * Waits for all file extent futures to complete and collects results without blocking threads. - *

- * This method preserves the original order of the input list, regardless of the order - * in which the futures complete. + * Waits for all file extent futures to complete and collects results in order. * * @param fileExtentFutures the list of futures to wait for * @return a future that completes with a list of file extents in the same order as the input @@ -239,6 +236,11 @@ public CompletableFuture> fetch( static CompletableFuture> allOfFileExtents( List> fileExtentFutures ) { + // This does **not** block the calling thread or fork-join pool: + // 1. allOf() returns immediately with a future (non-blocking) + // 2. thenApply() registers a callback without waiting (non-blocking) + // 3. join() inside the callback is safe - it only runs _after_ allOf completes, + // meaning all futures are already done, so join() returns immediately final CompletableFuture[] futuresArray = fileExtentFutures.toArray(CompletableFuture[]::new); return CompletableFuture.allOf(futuresArray) .thenApply(v -> From de91a8f3f174cb0d5aa16f76d2a3353cf3fae1b0 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Dec 2025 17:12:53 +0200 Subject: [PATCH 3/3] fixup! refactor(inkless:consume): make fetch completion non-blocking Improves testing approach by separating validations on different tests, and removing the time conditions. --- .../io/aiven/inkless/consume/ReaderTest.java | 97 +++++++++++-------- 1 file changed, 59 insertions(+), 38 deletions(-) diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index 2751325f3a..63243b082e 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -38,10 +38,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; @@ -56,6 +56,7 @@ import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.verify; @@ -101,10 +102,10 @@ public void testClose() throws Exception { } /** - * This is to ensure fetch results preserve the request order and are matched correctly with their requests. + * Tests that allOfFileExtents preserves the original input order regardless of completion order. */ @Test - public void testAllOfFileExtentsPreservesOrderWhenFuturesCompleteOutOfOrder() throws Exception { + public void testAllOfFileExtentsPreservesOrder() throws Exception { // Create file extents with distinct identifiers final ObjectKey objectKeyA = PlainObjectKey.create("prefix", "object-a"); final ObjectKey objectKeyB = PlainObjectKey.create("prefix", "object-b"); @@ -114,73 +115,93 @@ public void testAllOfFileExtentsPreservesOrderWhenFuturesCompleteOutOfOrder() th final FileExtent extentB = FileFetchJob.createFileExtent(objectKeyB, new ByteRange(0, 10), ByteBuffer.allocate(10)); final FileExtent extentC = FileFetchJob.createFileExtent(objectKeyC, new ByteRange(0, 10), ByteBuffer.allocate(10)); - // Latches to control when each future can complete - final CountDownLatch futureACanComplete = new CountDownLatch(1); - final CountDownLatch futureBCanComplete = new CountDownLatch(1); - final CountDownLatch futureCCanComplete = new CountDownLatch(1); - - // Latches to signal when each future has actually completed - final CountDownLatch futureACompleted = new CountDownLatch(1); - final CountDownLatch futureBCompleted = new CountDownLatch(1); - final CountDownLatch futureCCompleted = new CountDownLatch(1); - - // Create futures that will complete in reverse order (C, B, A) + // Create futures that complete in reverse order: C (100ms), B (200ms), A (300ms) final CompletableFuture futureA = CompletableFuture.supplyAsync(() -> { try { - futureACanComplete.await(5, TimeUnit.SECONDS); + Thread.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - futureACompleted.countDown(); return extentA; }); final CompletableFuture futureB = CompletableFuture.supplyAsync(() -> { try { - futureBCanComplete.await(5, TimeUnit.SECONDS); + Thread.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - futureBCompleted.countDown(); return extentB; }); final CompletableFuture futureC = CompletableFuture.supplyAsync(() -> { try { - futureCCanComplete.await(5, TimeUnit.SECONDS); + Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - futureCCompleted.countDown(); return extentC; }); // Create the ordered list: A, B, C final List> orderedFutures = List.of(futureA, futureB, futureC); - // Call the package-private method directly + // Call allOfFileExtents and wait for result final CompletableFuture> resultFuture = Reader.allOfFileExtents(orderedFutures); + final List result = resultFuture.get(5, TimeUnit.SECONDS); - // Complete futures in reverse order: C, then B, then A - // Allow C to complete and wait for confirmation - futureCCanComplete.countDown(); - assertThat(futureCCompleted.await(5, TimeUnit.SECONDS)).isTrue(); + // Verify result order is preserved as A, B, C (not C, B, A which was the completion order) + assertThat(result) + .hasSize(3) + .extracting(FileExtent::object) + .containsExactly(objectKeyA.value(), objectKeyB.value(), objectKeyC.value()); + } - // Allow B to complete and wait for confirmation - futureBCanComplete.countDown(); - assertThat(futureBCompleted.await(5, TimeUnit.SECONDS)).isTrue(); + /** + * Tests that allOfFileExtents returns immediately without blocking the calling thread. + */ + @Test + public void testAllOfFileExtentsDoesNotBlock() { + // Create file extents + final ObjectKey objectKey = PlainObjectKey.create("prefix", "object"); + final FileExtent extent = FileFetchJob.createFileExtent(objectKey, new ByteRange(0, 10), ByteBuffer.allocate(10)); + + // Create a future that will complete after a delay + final AtomicBoolean futureCompleted = new AtomicBoolean(false); + final CompletableFuture delayedFuture = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1000); // Simulate slow operation + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + futureCompleted.set(true); + return extent; + }); - // Allow A to complete and wait for confirmation - futureACanComplete.countDown(); - assertThat(futureACompleted.await(5, TimeUnit.SECONDS)).isTrue(); + final List> futures = List.of(delayedFuture); - // Get the result - should maintain original order despite completion order - final List result = resultFuture.get(5, TimeUnit.SECONDS); + // Call allOfFileExtents - this should return immediately + final long startTime = System.currentTimeMillis(); + final CompletableFuture> resultFuture = Reader.allOfFileExtents(futures); + final long callDuration = System.currentTimeMillis() - startTime; + + // Verify the call returned immediately (within 100ms) + assertThat(callDuration).isLessThan(100); + + // Verify the future is not yet complete + assertThat(resultFuture).isNotCompleted(); + assertThat(futureCompleted).isFalse(); + + // Wait for the result to actually complete + await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(resultFuture).isCompleted(); + assertThat(futureCompleted).isTrue(); + }); - // Verify order is preserved: A, B, C (not C, B, A which was completion order) - assertThat(result).hasSize(3); - assertThat(result.get(0).object()).isEqualTo(objectKeyA.value()); - assertThat(result.get(1).object()).isEqualTo(objectKeyB.value()); - assertThat(result.get(2).object()).isEqualTo(objectKeyC.value()); + // Verify the result is correct + assertThat(resultFuture.join()) + .hasSize(1) + .extracting(FileExtent::object) + .containsExactly(objectKey.value()); } }