Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -54,14 +52,14 @@ public class FetchCompleter implements Supplier<Map<TopicIdPartition, FetchParti
private final ObjectKeyCreator objectKeyCreator;
private final Map<TopicIdPartition, FetchRequest.PartitionData> fetchInfos;
private final Map<TopicIdPartition, FindBatchResponse> coordinates;
private final List<Future<FileExtent>> backingData;
private final List<FileExtent> backingData;
private final Consumer<Long> durationCallback;

public FetchCompleter(Time time,
ObjectKeyCreator objectKeyCreator,
Map<TopicIdPartition, FetchRequest.PartitionData> fetchInfos,
Map<TopicIdPartition, FindBatchResponse> coordinates,
List<Future<FileExtent>> backingData,
List<FileExtent> backingData,
Consumer<Long> durationCallback) {
this.time = time;
this.objectKeyCreator = objectKeyCreator;
Expand All @@ -74,21 +72,16 @@ public FetchCompleter(Time time,
@Override
public Map<TopicIdPartition, FetchPartitionData> get() {
try {
final Map<String, List<FileExtent>> files = waitForFileData();
final Map<String, List<FileExtent>> files = groupFileData();
return TimeUtils.measureDurationMs(time, () -> serveFetch(coordinates, files), durationCallback);
} catch (Exception e) {
// unwrap ExecutionException if the errors comes from dependent futures
if (e instanceof ExecutionException) {
throw new FetchException(e.getCause());
}
throw new FetchException(e);
}
}

private Map<String, List<FileExtent>> waitForFileData() throws InterruptedException, ExecutionException {
private Map<String, List<FileExtent>> groupFileData() {
Map<String, List<FileExtent>> files = new HashMap<>();
for (Future<FileExtent> fileFuture : backingData) {
FileExtent fileExtent = fileFuture.get();
for (FileExtent fileExtent : backingData) {
files.compute(fileExtent.object(), (k, v) -> {
if (v == null) {
List<FileExtent> out = new ArrayList<>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -40,7 +39,7 @@
import io.aiven.inkless.generated.FileExtent;
import io.aiven.inkless.storage_backend.common.ObjectFetcher;

public class FetchPlanner implements Supplier<List<Future<FileExtent>>> {
public class FetchPlanner implements Supplier<List<CompletableFuture<FileExtent>>> {

private final Time time;
private final ObjectKeyCreator objectKeyCreator;
Expand Down Expand Up @@ -71,12 +70,13 @@ public FetchPlanner(
this.metrics = metrics;
}

private List<Future<FileExtent>> doWork(final Map<TopicIdPartition, FindBatchResponse> batchCoordinates) {
final List<Callable<FileExtent>> jobs = planJobs(batchCoordinates);
private List<CompletableFuture<FileExtent>> doWork(final Map<TopicIdPartition, FindBatchResponse> batchCoordinates) {
final List<CacheFetchJob> jobs = planJobs(batchCoordinates);
return submitAll(jobs);
}

private List<Callable<FileExtent>> planJobs(final Map<TopicIdPartition, FindBatchResponse> batchCoordinates) {
// package-private for testing
List<CacheFetchJob> planJobs(final Map<TopicIdPartition, FindBatchResponse> batchCoordinates) {
final Set<Map.Entry<String, List<ByteRange>>> objectKeysToRanges = batchCoordinates.values().stream()
.filter(findBatch -> findBatch.errors() == Errors.NONE)
.map(FindBatchResponse::batches)
Expand Down Expand Up @@ -107,14 +107,14 @@ private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange b

}

private List<Future<FileExtent>> submitAll(List<Callable<FileExtent>> jobs) {
private List<CompletableFuture<FileExtent>> submitAll(List<CacheFetchJob> jobs) {
return jobs.stream()
.map(dataExecutor::submit)
.map(job -> CompletableFuture.supplyAsync(job::call, dataExecutor))
.collect(Collectors.toList());
}

@Override
public List<Future<FileExtent>> get() {
public List<CompletableFuture<FileExtent>> get() {
return TimeUtils.measureDurationMsSupplier(time, () -> doWork(batchCoordinates), metrics::fetchPlanFinished);
}
}
74 changes: 60 additions & 14 deletions storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -45,6 +46,7 @@
import io.aiven.inkless.common.ObjectKeyCreator;
import io.aiven.inkless.common.metrics.ThreadPoolMonitor;
import io.aiven.inkless.control_plane.ControlPlane;
import io.aiven.inkless.generated.FileExtent;
import io.aiven.inkless.storage_backend.common.ObjectFetcher;

public class Reader implements AutoCloseable {
Expand Down Expand Up @@ -122,12 +124,28 @@ public Reader(
}
}

/**
* Asynchronously fetches data for multiple partitions using a staged pipeline:
*
* <p>1. Finds batches (metadataExecutor);
* <p>2. Plans and submits fetches (calling thread);
* <p>3. Fetches data (dataExecutor);
* <p>4. Assembles final response (calling thread)
*
* <p>Each stage uses a thread pool or the calling thread for efficiency.
*
* @param params fetch parameters
* @param fetchInfos partitions and offsets to fetch
* @return CompletableFuture with fetch results per partition
*/
public CompletableFuture<Map<TopicIdPartition, FetchPartitionData>> fetch(
final FetchParams params,
final Map<TopicIdPartition, FetchRequest.PartitionData> fetchInfos
) {
final Instant startAt = TimeUtils.durationMeasurementNow(time);
fetchMetrics.fetchStarted(fetchInfos.size());

// Find Batches (metadataExecutor): Query control plane to find which storage objects contain the requested data
final var batchCoordinates = CompletableFuture.supplyAsync(
new FindBatchesJob(
time,
Expand All @@ -139,20 +157,26 @@ public CompletableFuture<Map<TopicIdPartition, FetchPartitionData>> fetch(
),
metadataExecutor
);
return batchCoordinates.thenApply(
coordinates ->
new FetchPlanner(
time,
objectKeyCreator,
keyAlignmentStrategy,
cache,
objectFetcher,
dataExecutor,
coordinates,
fetchMetrics
).get()
)
.thenCombineAsync(batchCoordinates, (fileExtents, coordinates) ->

// Plan & Submit (calling thread): Create fetch plan and submit to cache (non-blocking)
return batchCoordinates.thenApply(coordinates -> {
// FetchPlanner creates a plan and submits requests to cache
// Returns List<CompletableFuture<FileExtent>> immediately
return new FetchPlanner(
time,
objectKeyCreator,
keyAlignmentStrategy,
cache,
objectFetcher,
dataExecutor,
coordinates,
fetchMetrics
).get();
})
// Fetch Data (dataExecutor): Flatten list of futures into single future with all results
.thenCompose(Reader::allOfFileExtents)
// Complete Fetch (calling thread): Combine fetched data with batch coordinates to build final response
.thenCombine(batchCoordinates, (fileExtents, coordinates) ->
new FetchCompleter(
time,
objectKeyCreator,
Expand Down Expand Up @@ -203,6 +227,28 @@ public CompletableFuture<Map<TopicIdPartition, FetchPartitionData>> fetch(
});
}

/**
* Waits for all file extent futures to complete and collects results in order.
*
* @param fileExtentFutures the list of futures to wait for
* @return a future that completes with a list of file extents in the same order as the input
*/
static CompletableFuture<List<FileExtent>> allOfFileExtents(
List<CompletableFuture<FileExtent>> fileExtentFutures
) {
// This does **not** block the calling thread or fork-join pool:
// 1. allOf() returns immediately with a future (non-blocking)
// 2. thenApply() registers a callback without waiting (non-blocking)
// 3. join() inside the callback is safe - it only runs _after_ allOf completes,
// meaning all futures are already done, so join() returns immediately
final CompletableFuture<?>[] futuresArray = fileExtentFutures.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futuresArray)
.thenApply(v ->
fileExtentFutures.stream()
.map(CompletableFuture::join)
.toList());
}

@Override
public void close() throws IOException {
ThreadUtils.shutdownExecutorServiceQuietly(metadataExecutor, EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.aiven.inkless.cache.FixedBlockAlignment;
import io.aiven.inkless.common.ByteRange;
Expand Down Expand Up @@ -175,9 +171,9 @@ public void testFetchSingleFile() {
), logStartOffset, highWatermark)
);

List<Future<FileExtent>> files = Stream.of(
List<FileExtent> files = List.of(
FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer())
).map(CompletableFuture::completedFuture).collect(Collectors.toList());
);
FetchCompleter job = new FetchCompleter(
new MockTime(),
OBJECT_KEY_CREATOR,
Expand Down Expand Up @@ -211,10 +207,10 @@ public void testFetchMultipleFiles() {
), logStartOffset, highWatermark)
);

List<Future<FileExtent>> files = Stream.of(
List<FileExtent> files = List.of(
FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()),
FileFetchJob.createFileExtent(OBJECT_KEY_B, new ByteRange(0, records.sizeInBytes()), records.buffer())
).map(CompletableFuture::completedFuture).collect(Collectors.toList());
);
FetchCompleter job = new FetchCompleter(
new MockTime(),
OBJECT_KEY_CREATOR,
Expand Down Expand Up @@ -262,14 +258,13 @@ public void testFetchMultipleFilesForSameBatch() {
copy.put(records.buffer().duplicate().position(startOffset).limit(endOffset).slice());
fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy));
}
List<Future<FileExtent>> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList());

FetchCompleter job = new FetchCompleter(
new MockTime(),
OBJECT_KEY_CREATOR,
fetchInfos,
coordinates,
files,
fileExtents,
durationMs -> {}
);
Map<TopicIdPartition, FetchPartitionData> result = job.get();
Expand Down Expand Up @@ -310,9 +305,9 @@ public void testFetchMultipleBatches() {
), logStartOffset, highWatermark)
);

List<Future<FileExtent>> files = Stream.of(
List<FileExtent> files = List.of(
FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, totalSize), concatenatedBuffer)
).map(CompletableFuture::completedFuture).collect(Collectors.toList());
);
FetchCompleter job = new FetchCompleter(
new MockTime(),
OBJECT_KEY_CREATOR,
Expand Down Expand Up @@ -373,14 +368,13 @@ public void testFetchMultipleFilesForMultipleBatches() {
copy.put(concatenatedBuffer.duplicate().position(startOffset).limit(endOffset).slice());
fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy));
}
List<Future<FileExtent>> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList());

FetchCompleter job = new FetchCompleter(
new MockTime(),
OBJECT_KEY_CREATOR,
fetchInfos,
coordinates,
files,
fileExtents,
durationMs -> {}
);
Map<TopicIdPartition, FetchPartitionData> result = job.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.aiven.inkless.cache.FixedBlockAlignment;
import io.aiven.inkless.cache.KeyAlignmentStrategy;
Expand All @@ -51,9 +51,7 @@
import io.aiven.inkless.control_plane.FindBatchResponse;
import io.aiven.inkless.storage_backend.common.ObjectFetcher;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.assertj.core.api.Assertions.assertThat;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
Expand All @@ -68,10 +66,10 @@ public class FetchPlannerTest {
@Mock
ObjectFetcher fetcher;
@Mock
ExecutorService dataExecutor;
@Mock
InklessFetchMetrics metrics;

ExecutorService dataExecutor = Executors.newSingleThreadExecutor();

ObjectCache cache = new NullCache();
KeyAlignmentStrategy keyAlignmentStrategy = new FixedBlockAlignment(Integer.MAX_VALUE);
ByteRange requestRange = new ByteRange(0, Integer.MAX_VALUE);
Expand All @@ -80,14 +78,19 @@ public class FetchPlannerTest {
TopicIdPartition partition0 = new TopicIdPartition(topicId, 0, "diskless-topic");
TopicIdPartition partition1 = new TopicIdPartition(topicId, 1, "diskless-topic");

@AfterEach
void tearDown() {
dataExecutor.shutdownNow();
}

@Test
public void planEmptyRequest() {
Map<TopicIdPartition, FindBatchResponse> coordinates = new HashMap<>();
FetchPlanner job = fetchPlannerJob(coordinates);
Map<TopicIdPartition, FindBatchResponse> coordinates = Map.of();
FetchPlanner planner = fetchPlannerJob(coordinates);

job.get();
List<CacheFetchJob> result = planner.planJobs(coordinates);

verifyNoInteractions(dataExecutor);
assertThat(result).isEmpty();
}

@Test
Expand Down Expand Up @@ -214,14 +217,12 @@ private CacheFetchJob cacheFetchJob( ObjectKey objectKey, ByteRange byte
);
}

private void assertBatchPlan(Map<TopicIdPartition, FindBatchResponse> coordinates, Set<CacheFetchJob> jobs) {
ArgumentCaptor<CacheFetchJob> submittedCallables = ArgumentCaptor.captor();
when(dataExecutor.submit(submittedCallables.capture())).thenReturn(null);

FetchPlanner job = fetchPlannerJob(coordinates);
private void assertBatchPlan(Map<TopicIdPartition, FindBatchResponse> coordinates, Set<CacheFetchJob> expectedJobs) {
FetchPlanner planner = fetchPlannerJob(coordinates);

job.get();
// Use the package-private planJobs method to verify the exact jobs planned
List<CacheFetchJob> actualJobs = planner.planJobs(coordinates);

assertEquals(jobs, new HashSet<>(submittedCallables.getAllValues()));
assertThat(new HashSet<>(actualJobs)).isEqualTo(expectedJobs);
}
}
Loading