diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java index 5eb88ca200..ebd52d1c93 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java @@ -35,8 +35,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -54,14 +52,14 @@ public class FetchCompleter implements Supplier fetchInfos; private final Map coordinates; - private final List> backingData; + private final List backingData; private final Consumer durationCallback; public FetchCompleter(Time time, ObjectKeyCreator objectKeyCreator, Map fetchInfos, Map coordinates, - List> backingData, + List backingData, Consumer durationCallback) { this.time = time; this.objectKeyCreator = objectKeyCreator; @@ -74,21 +72,16 @@ public FetchCompleter(Time time, @Override public Map get() { try { - final Map> files = waitForFileData(); + final Map> files = groupFileData(); return TimeUtils.measureDurationMs(time, () -> serveFetch(coordinates, files), durationCallback); } catch (Exception e) { - // unwrap ExecutionException if the errors comes from dependent futures - if (e instanceof ExecutionException) { - throw new FetchException(e.getCause()); - } throw new FetchException(e); } } - private Map> waitForFileData() throws InterruptedException, ExecutionException { + private Map> groupFileData() { Map> files = new HashMap<>(); - for (Future fileFuture : backingData) { - FileExtent fileExtent = fileFuture.get(); + for (FileExtent fileExtent : backingData) { files.compute(fileExtent.object(), (k, v) -> { if (v == null) { List out = new ArrayList<>(1); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java index 6a36c2b154..b01727c791 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java @@ -24,9 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -40,7 +39,7 @@ import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; -public class FetchPlanner implements Supplier>> { +public class FetchPlanner implements Supplier>> { private final Time time; private final ObjectKeyCreator objectKeyCreator; @@ -71,12 +70,13 @@ public FetchPlanner( this.metrics = metrics; } - private List> doWork(final Map batchCoordinates) { - final List> jobs = planJobs(batchCoordinates); + private List> doWork(final Map batchCoordinates) { + final List jobs = planJobs(batchCoordinates); return submitAll(jobs); } - private List> planJobs(final Map batchCoordinates) { + // package-private for testing + List planJobs(final Map batchCoordinates) { final Set>> objectKeysToRanges = batchCoordinates.values().stream() .filter(findBatch -> findBatch.errors() == Errors.NONE) .map(FindBatchResponse::batches) @@ -107,14 +107,14 @@ private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange b } - private List> submitAll(List> jobs) { + private List> submitAll(List jobs) { return jobs.stream() - .map(dataExecutor::submit) + .map(job -> CompletableFuture.supplyAsync(job::call, dataExecutor)) .collect(Collectors.toList()); } @Override - public List> get() { + public List> get() { return TimeUtils.measureDurationMsSupplier(time, () -> doWork(batchCoordinates), metrics::fetchPlanFinished); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 1b04618951..97c458a49f 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -45,6 +46,7 @@ import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.metrics.ThreadPoolMonitor; import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; public class Reader implements AutoCloseable { @@ -122,12 +124,28 @@ public Reader( } } + /** + * Asynchronously fetches data for multiple partitions using a staged pipeline: + * + *

1. Finds batches (metadataExecutor); + *

2. Plans and submits fetches (calling thread); + *

3. Fetches data (dataExecutor); + *

4. Assembles final response (calling thread) + * + *

Each stage uses a thread pool or the calling thread for efficiency. + * + * @param params fetch parameters + * @param fetchInfos partitions and offsets to fetch + * @return CompletableFuture with fetch results per partition + */ public CompletableFuture> fetch( final FetchParams params, final Map fetchInfos ) { final Instant startAt = TimeUtils.durationMeasurementNow(time); fetchMetrics.fetchStarted(fetchInfos.size()); + + // Find Batches (metadataExecutor): Query control plane to find which storage objects contain the requested data final var batchCoordinates = CompletableFuture.supplyAsync( new FindBatchesJob( time, @@ -139,20 +157,26 @@ public CompletableFuture> fetch( ), metadataExecutor ); - return batchCoordinates.thenApply( - coordinates -> - new FetchPlanner( - time, - objectKeyCreator, - keyAlignmentStrategy, - cache, - objectFetcher, - dataExecutor, - coordinates, - fetchMetrics - ).get() - ) - .thenCombineAsync(batchCoordinates, (fileExtents, coordinates) -> + + // Plan & Submit (calling thread): Create fetch plan and submit to cache (non-blocking) + return batchCoordinates.thenApply(coordinates -> { + // FetchPlanner creates a plan and submits requests to cache + // Returns List> immediately + return new FetchPlanner( + time, + objectKeyCreator, + keyAlignmentStrategy, + cache, + objectFetcher, + dataExecutor, + coordinates, + fetchMetrics + ).get(); + }) + // Fetch Data (dataExecutor): Flatten list of futures into single future with all results + .thenCompose(Reader::allOfFileExtents) + // Complete Fetch (calling thread): Combine fetched data with batch coordinates to build final response + .thenCombine(batchCoordinates, (fileExtents, coordinates) -> new FetchCompleter( time, objectKeyCreator, @@ -203,6 +227,28 @@ public CompletableFuture> fetch( }); } + /** + * Waits for all file extent futures to complete and collects results in order. + * + * @param fileExtentFutures the list of futures to wait for + * @return a future that completes with a list of file extents in the same order as the input + */ + static CompletableFuture> allOfFileExtents( + List> fileExtentFutures + ) { + // This does **not** block the calling thread or fork-join pool: + // 1. allOf() returns immediately with a future (non-blocking) + // 2. thenApply() registers a callback without waiting (non-blocking) + // 3. join() inside the callback is safe - it only runs _after_ allOf completes, + // meaning all futures are already done, so join() returns immediately + final CompletableFuture[] futuresArray = fileExtentFutures.toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(futuresArray) + .thenApply(v -> + fileExtentFutures.stream() + .map(CompletableFuture::join) + .toList()); + } + @Override public void close() throws IOException { ThreadUtils.shutdownExecutorServiceQuietly(metadataExecutor, EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java index 3306ccc700..5ba9ed1b1f 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java @@ -42,10 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.common.ByteRange; @@ -175,9 +171,9 @@ public void testFetchSingleFile() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -211,10 +207,10 @@ public void testFetchMultipleFiles() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()), FileFetchJob.createFileExtent(OBJECT_KEY_B, new ByteRange(0, records.sizeInBytes()), records.buffer()) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -262,14 +258,13 @@ public void testFetchMultipleFilesForSameBatch() { copy.put(records.buffer().duplicate().position(startOffset).limit(endOffset).slice()); fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy)); } - List> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList()); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, fetchInfos, coordinates, - files, + fileExtents, durationMs -> {} ); Map result = job.get(); @@ -310,9 +305,9 @@ public void testFetchMultipleBatches() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, totalSize), concatenatedBuffer) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -373,14 +368,13 @@ public void testFetchMultipleFilesForMultipleBatches() { copy.put(concatenatedBuffer.duplicate().position(startOffset).limit(endOffset).slice()); fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy)); } - List> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList()); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, fetchInfos, coordinates, - files, + fileExtents, durationMs -> {} ); Map result = job.get(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java index d834c9036f..5cbeb6d897 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java @@ -23,20 +23,20 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; @@ -51,9 +51,7 @@ import io.aiven.inkless.control_plane.FindBatchResponse; import io.aiven.inkless.storage_backend.common.ObjectFetcher; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; +import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -68,10 +66,10 @@ public class FetchPlannerTest { @Mock ObjectFetcher fetcher; @Mock - ExecutorService dataExecutor; - @Mock InklessFetchMetrics metrics; + ExecutorService dataExecutor = Executors.newSingleThreadExecutor(); + ObjectCache cache = new NullCache(); KeyAlignmentStrategy keyAlignmentStrategy = new FixedBlockAlignment(Integer.MAX_VALUE); ByteRange requestRange = new ByteRange(0, Integer.MAX_VALUE); @@ -80,14 +78,19 @@ public class FetchPlannerTest { TopicIdPartition partition0 = new TopicIdPartition(topicId, 0, "diskless-topic"); TopicIdPartition partition1 = new TopicIdPartition(topicId, 1, "diskless-topic"); + @AfterEach + void tearDown() { + dataExecutor.shutdownNow(); + } + @Test public void planEmptyRequest() { - Map coordinates = new HashMap<>(); - FetchPlanner job = fetchPlannerJob(coordinates); + Map coordinates = Map.of(); + FetchPlanner planner = fetchPlannerJob(coordinates); - job.get(); + List result = planner.planJobs(coordinates); - verifyNoInteractions(dataExecutor); + assertThat(result).isEmpty(); } @Test @@ -214,14 +217,12 @@ private CacheFetchJob cacheFetchJob( ObjectKey objectKey, ByteRange byte ); } - private void assertBatchPlan(Map coordinates, Set jobs) { - ArgumentCaptor submittedCallables = ArgumentCaptor.captor(); - when(dataExecutor.submit(submittedCallables.capture())).thenReturn(null); - - FetchPlanner job = fetchPlannerJob(coordinates); + private void assertBatchPlan(Map coordinates, Set expectedJobs) { + FetchPlanner planner = fetchPlannerJob(coordinates); - job.get(); + // Use the package-private planJobs method to verify the exact jobs planned + List actualJobs = planner.planJobs(coordinates); - assertEquals(jobs, new HashSet<>(submittedCallables.getAllValues())); + assertThat(new HashSet<>(actualJobs)).isEqualTo(expectedJobs); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index 3f6345ea3c..63243b082e 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -33,22 +33,30 @@ import org.mockito.quality.Strictness; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.NullCache; import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.common.ByteRange; import io.aiven.inkless.common.ObjectKey; import io.aiven.inkless.common.ObjectKeyCreator; +import io.aiven.inkless.common.PlainObjectKey; import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.verify; @@ -92,4 +100,108 @@ public void testClose() throws Exception { verify(metadataExecutor, atLeastOnce()).shutdown(); verify(dataExecutor, atLeastOnce()).shutdown(); } + + /** + * Tests that allOfFileExtents preserves the original input order regardless of completion order. + */ + @Test + public void testAllOfFileExtentsPreservesOrder() throws Exception { + // Create file extents with distinct identifiers + final ObjectKey objectKeyA = PlainObjectKey.create("prefix", "object-a"); + final ObjectKey objectKeyB = PlainObjectKey.create("prefix", "object-b"); + final ObjectKey objectKeyC = PlainObjectKey.create("prefix", "object-c"); + + final FileExtent extentA = FileFetchJob.createFileExtent(objectKeyA, new ByteRange(0, 10), ByteBuffer.allocate(10)); + final FileExtent extentB = FileFetchJob.createFileExtent(objectKeyB, new ByteRange(0, 10), ByteBuffer.allocate(10)); + final FileExtent extentC = FileFetchJob.createFileExtent(objectKeyC, new ByteRange(0, 10), ByteBuffer.allocate(10)); + + // Create futures that complete in reverse order: C (100ms), B (200ms), A (300ms) + final CompletableFuture futureA = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(300); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentA; + }); + + final CompletableFuture futureB = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentB; + }); + + final CompletableFuture futureC = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentC; + }); + + // Create the ordered list: A, B, C + final List> orderedFutures = List.of(futureA, futureB, futureC); + + // Call allOfFileExtents and wait for result + final CompletableFuture> resultFuture = Reader.allOfFileExtents(orderedFutures); + final List result = resultFuture.get(5, TimeUnit.SECONDS); + + // Verify result order is preserved as A, B, C (not C, B, A which was the completion order) + assertThat(result) + .hasSize(3) + .extracting(FileExtent::object) + .containsExactly(objectKeyA.value(), objectKeyB.value(), objectKeyC.value()); + } + + /** + * Tests that allOfFileExtents returns immediately without blocking the calling thread. + */ + @Test + public void testAllOfFileExtentsDoesNotBlock() { + // Create file extents + final ObjectKey objectKey = PlainObjectKey.create("prefix", "object"); + final FileExtent extent = FileFetchJob.createFileExtent(objectKey, new ByteRange(0, 10), ByteBuffer.allocate(10)); + + // Create a future that will complete after a delay + final AtomicBoolean futureCompleted = new AtomicBoolean(false); + final CompletableFuture delayedFuture = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1000); // Simulate slow operation + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + futureCompleted.set(true); + return extent; + }); + + final List> futures = List.of(delayedFuture); + + // Call allOfFileExtents - this should return immediately + final long startTime = System.currentTimeMillis(); + final CompletableFuture> resultFuture = Reader.allOfFileExtents(futures); + final long callDuration = System.currentTimeMillis() - startTime; + + // Verify the call returned immediately (within 100ms) + assertThat(callDuration).isLessThan(100); + + // Verify the future is not yet complete + assertThat(resultFuture).isNotCompleted(); + assertThat(futureCompleted).isFalse(); + + // Wait for the result to actually complete + await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(resultFuture).isCompleted(); + assertThat(futureCompleted).isTrue(); + }); + + // Verify the result is correct + assertThat(resultFuture.join()) + .hasSize(1) + .extracting(FileExtent::object) + .containsExactly(objectKey.value()); + } }