From de2254b1253fa20bc3b81a25ad1611a74d928b61 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 11 Dec 2025 17:25:51 +0200 Subject: [PATCH 1/5] refactor(inkless:consume): make fetch completion non-blocking Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow. Previously, FetchCompleter.get() was invoked via thenCombineAsync on the common ForkJoinPool, where it blocked waiting for file data futures. Now, the completion stage chain uses: - thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done - thenCombine() (not async) to invoke FetchCompleter with already-resolved List The FetchCompleter task still runs on the ForkJoinPool common pool (via thenCombine's default executor), but it no longer blocks - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords). Changes: - FetchPlanner returns List> instead of List> - FetchCompleter accepts resolved List instead of futures - Reader.allOfFileExtents() waits for all futures non-blockingly - Add test verifying ordering is preserved when futures complete out of order --- .../aiven/inkless/consume/FetchCompleter.java | 17 ++-- .../aiven/inkless/consume/FetchPlanner.java | 17 ++-- .../java/io/aiven/inkless/consume/Reader.java | 20 ++++- .../inkless/consume/FetchCompleterTest.java | 22 ++--- .../inkless/consume/FetchPlannerTest.java | 29 +++---- .../io/aiven/inkless/consume/ReaderTest.java | 82 +++++++++++++++++++ 6 files changed, 135 insertions(+), 52 deletions(-) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java index 5eb88ca200..ebd52d1c93 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java @@ -35,8 +35,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -54,14 +52,14 @@ public class FetchCompleter implements Supplier fetchInfos; private final Map coordinates; - private final List> backingData; + private final List backingData; private final Consumer durationCallback; public FetchCompleter(Time time, ObjectKeyCreator objectKeyCreator, Map fetchInfos, Map coordinates, - List> backingData, + List backingData, Consumer durationCallback) { this.time = time; this.objectKeyCreator = objectKeyCreator; @@ -74,21 +72,16 @@ public FetchCompleter(Time time, @Override public Map get() { try { - final Map> files = waitForFileData(); + final Map> files = groupFileData(); return TimeUtils.measureDurationMs(time, () -> serveFetch(coordinates, files), durationCallback); } catch (Exception e) { - // unwrap ExecutionException if the errors comes from dependent futures - if (e instanceof ExecutionException) { - throw new FetchException(e.getCause()); - } throw new FetchException(e); } } - private Map> waitForFileData() throws InterruptedException, ExecutionException { + private Map> groupFileData() { Map> files = new HashMap<>(); - for (Future fileFuture : backingData) { - FileExtent fileExtent = fileFuture.get(); + for (FileExtent fileExtent : backingData) { files.compute(fileExtent.object(), (k, v) -> { if (v == null) { List out = new ArrayList<>(1); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java index 6a36c2b154..f3efd5e788 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java @@ -24,9 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -40,7 +39,7 @@ import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; -public class FetchPlanner implements Supplier>> { +public class FetchPlanner implements Supplier>> { private final Time time; private final ObjectKeyCreator objectKeyCreator; @@ -71,12 +70,12 @@ public FetchPlanner( this.metrics = metrics; } - private List> doWork(final Map batchCoordinates) { - final List> jobs = planJobs(batchCoordinates); + private List> doWork(final Map batchCoordinates) { + final List jobs = planJobs(batchCoordinates); return submitAll(jobs); } - private List> planJobs(final Map batchCoordinates) { + private List planJobs(final Map batchCoordinates) { final Set>> objectKeysToRanges = batchCoordinates.values().stream() .filter(findBatch -> findBatch.errors() == Errors.NONE) .map(FindBatchResponse::batches) @@ -107,14 +106,14 @@ private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange b } - private List> submitAll(List> jobs) { + private List> submitAll(List jobs) { return jobs.stream() - .map(dataExecutor::submit) + .map(job -> CompletableFuture.supplyAsync(job::call, dataExecutor)) .collect(Collectors.toList()); } @Override - public List> get() { + public List> get() { return TimeUtils.measureDurationMsSupplier(time, () -> doWork(batchCoordinates), metrics::fetchPlanFinished); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 1b04618951..1714190573 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -45,6 +46,7 @@ import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.metrics.ThreadPoolMonitor; import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; public class Reader implements AutoCloseable { @@ -152,7 +154,10 @@ public CompletableFuture> fetch( fetchMetrics ).get() ) - .thenCombineAsync(batchCoordinates, (fileExtents, coordinates) -> + // flatten the list of file extent futures into a single future with list of file extents + .thenCompose(Reader::allOfFileExtents) + // combine file extents and coordinates to complete the fetch + .thenCombine(batchCoordinates, (fileExtents, coordinates) -> new FetchCompleter( time, objectKeyCreator, @@ -203,6 +208,19 @@ public CompletableFuture> fetch( }); } + // wait for all file extent futures to complete and collect results without blocking threads + @SuppressWarnings("rawtypes") + static CompletableFuture> allOfFileExtents( + List> fileExtentFutures + ) { + final CompletableFuture[] futuresArray = fileExtentFutures.toArray(new CompletableFuture[0]); + return CompletableFuture.allOf(futuresArray) + .thenApply(v -> + fileExtentFutures.stream() + .map(CompletableFuture::join) + .toList()); + } + @Override public void close() throws IOException { ThreadUtils.shutdownExecutorServiceQuietly(metadataExecutor, EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java index 3306ccc700..5ba9ed1b1f 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java @@ -42,10 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.common.ByteRange; @@ -175,9 +171,9 @@ public void testFetchSingleFile() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -211,10 +207,10 @@ public void testFetchMultipleFiles() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()), FileFetchJob.createFileExtent(OBJECT_KEY_B, new ByteRange(0, records.sizeInBytes()), records.buffer()) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -262,14 +258,13 @@ public void testFetchMultipleFilesForSameBatch() { copy.put(records.buffer().duplicate().position(startOffset).limit(endOffset).slice()); fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy)); } - List> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList()); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, fetchInfos, coordinates, - files, + fileExtents, durationMs -> {} ); Map result = job.get(); @@ -310,9 +305,9 @@ public void testFetchMultipleBatches() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, totalSize), concatenatedBuffer) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -373,14 +368,13 @@ public void testFetchMultipleFilesForMultipleBatches() { copy.put(concatenatedBuffer.duplicate().position(startOffset).limit(endOffset).slice()); fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy)); } - List> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList()); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, fetchInfos, coordinates, - files, + fileExtents, durationMs -> {} ); Map result = job.get(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java index d834c9036f..4523dda130 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java @@ -25,18 +25,17 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; @@ -49,11 +48,10 @@ import io.aiven.inkless.control_plane.BatchInfo; import io.aiven.inkless.control_plane.BatchMetadata; import io.aiven.inkless.control_plane.FindBatchResponse; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -68,10 +66,11 @@ public class FetchPlannerTest { @Mock ObjectFetcher fetcher; @Mock - ExecutorService dataExecutor; - @Mock InklessFetchMetrics metrics; + // Direct executor that runs tasks immediately in the same thread + ExecutorService dataExecutor = Executors.newSingleThreadExecutor(); + ObjectCache cache = new NullCache(); KeyAlignmentStrategy keyAlignmentStrategy = new FixedBlockAlignment(Integer.MAX_VALUE); ByteRange requestRange = new ByteRange(0, Integer.MAX_VALUE); @@ -82,12 +81,12 @@ public class FetchPlannerTest { @Test public void planEmptyRequest() { - Map coordinates = new HashMap<>(); + Map coordinates = Map.of(); FetchPlanner job = fetchPlannerJob(coordinates); - job.get(); + List> result = job.get(); - verifyNoInteractions(dataExecutor); + assertEquals(0, result.size()); } @Test @@ -214,14 +213,12 @@ private CacheFetchJob cacheFetchJob( ObjectKey objectKey, ByteRange byte ); } - private void assertBatchPlan(Map coordinates, Set jobs) { - ArgumentCaptor submittedCallables = ArgumentCaptor.captor(); - when(dataExecutor.submit(submittedCallables.capture())).thenReturn(null); - + private void assertBatchPlan(Map coordinates, Set expectedJobs) { FetchPlanner job = fetchPlannerJob(coordinates); - job.get(); + List> result = job.get(); - assertEquals(jobs, new HashSet<>(submittedCallables.getAllValues())); + // Verify the number of planned fetch jobs matches expected + assertEquals(expectedJobs.size(), result.size()); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index 3f6345ea3c..6a3ef552eb 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -33,19 +33,26 @@ import org.mockito.quality.Strictness; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.NullCache; import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.common.ByteRange; import io.aiven.inkless.common.ObjectKey; import io.aiven.inkless.common.ObjectKeyCreator; +import io.aiven.inkless.common.PlainObjectKey; import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.assertj.core.api.Assertions.assertThat; @@ -92,4 +99,79 @@ public void testClose() throws Exception { verify(metadataExecutor, atLeastOnce()).shutdown(); verify(dataExecutor, atLeastOnce()).shutdown(); } + + /** + * This is to ensure fetch results preserve the request oreder are matched correctly with their requests. + */ + @Test + public void testAllOfFileExtentsPreservesOrderWhenFuturesCompleteOutOfOrder() throws Exception { + // Create file extents with distinct identifiers + final ObjectKey objectKeyA = PlainObjectKey.create("prefix", "object-a"); + final ObjectKey objectKeyB = PlainObjectKey.create("prefix", "object-b"); + final ObjectKey objectKeyC = PlainObjectKey.create("prefix", "object-c"); + + final FileExtent extentA = FileFetchJob.createFileExtent(objectKeyA, new ByteRange(0, 10), ByteBuffer.allocate(10)); + final FileExtent extentB = FileFetchJob.createFileExtent(objectKeyB, new ByteRange(0, 10), ByteBuffer.allocate(10)); + final FileExtent extentC = FileFetchJob.createFileExtent(objectKeyC, new ByteRange(0, 10), ByteBuffer.allocate(10)); + + // Create latches to control completion order + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch futureACanComplete = new CountDownLatch(1); + final CountDownLatch futureBCanComplete = new CountDownLatch(1); + + // Create futures that will complete in reverse order (C, B, A) + final CompletableFuture futureA = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(); + futureACanComplete.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentA; + }); + + final CompletableFuture futureB = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(); + futureBCanComplete.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentB; + }); + + final CompletableFuture futureC = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(); + // C completes immediately after start + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentC; + }); + + // Create the ordered list: A, B, C + final List> orderedFutures = List.of(futureA, futureB, futureC); + + // Call the package-private method directly + final CompletableFuture> resultFuture = Reader.allOfFileExtents(orderedFutures); + + // Start all futures + startLatch.countDown(); + + // Complete in reverse order: C is already completing, then B, then A + Thread.sleep(50); // Give C time to complete + futureBCanComplete.countDown(); + Thread.sleep(50); // Give B time to complete + futureACanComplete.countDown(); + + // Get the result - should maintain original order despite completion order + final List result = resultFuture.get(5, TimeUnit.SECONDS); + + // Verify order is preserved: A, B, C (not C, B, A which was completion order) + assertThat(result).hasSize(3); + assertThat(result.get(0).object()).isEqualTo(objectKeyA.value()); + assertThat(result.get(1).object()).isEqualTo(objectKeyB.value()); + assertThat(result.get(2).object()).isEqualTo(objectKeyC.value()); + } } From 0d65fd132bb6874d7ce3f7b35d292c2943637685 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Dec 2025 00:28:44 +0200 Subject: [PATCH 2/5] chore(inkless:build): add Bucket4j dependency Adds Bucket4j dependency for rate limiting --- build.gradle | 1 + gradle/dependencies.gradle | 2 ++ 2 files changed, 3 insertions(+) diff --git a/build.gradle b/build.gradle index 9c6e4b951b..e68b35fbfe 100644 --- a/build.gradle +++ b/build.gradle @@ -2452,6 +2452,7 @@ project(':storage:inkless') { } implementation libs.metrics implementation libs.caffeine + implementation libs.bucket4jCore testImplementation project(':clients').sourceSets.test.output.classesDirs testImplementation project(':test-common') diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 2d08e277dd..4397b09213 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -58,6 +58,7 @@ versions += [ awsSdk: "2.29.6", azureSdk: "1.2.28", bcpkix: "1.80", + bucket4j: "8.14.0", caffeine: "3.2.0", bndlib: "7.1.0", checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2", @@ -164,6 +165,7 @@ libs += [ azureSdkBom: "com.azure:azure-sdk-bom:$versions.azureSdk", bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix", bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib", + bucket4jCore: "com.bucket4j:bucket4j_jdk11-core:$versions.bucket4j", caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsBeanutils: "commons-beanutils:commons-beanutils:$versions.commonsBeanutils", From 1ba8adbebd078db74a579f62015ed6aaad93e9c7 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Dec 2025 00:29:36 +0200 Subject: [PATCH 3/5] feat(inkless:cache): implement TieredObjectCache for lagging consumers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add tiered cache that routes requests based on batch timestamp: - Recent data (within hot cache TTL) → hot cache - Old data (beyond hot cache TTL) → lagging cache with rate limiting Rate limiting uses Bucket4j token bucket to protect remote storage from being overwhelmed by lagging consumer fetches. Applied only on lagging cache misses. Includes ObjectCache interface extension with timestamp-aware computeIfAbsent method (default ignores timestamp for backwards compatibility). --- .../io/aiven/inkless/cache/ObjectCache.java | 18 ++ .../inkless/cache/TieredObjectCache.java | 250 +++++++++++++++ .../inkless/cache/TieredObjectCacheTest.java | 288 ++++++++++++++++++ 3 files changed, 556 insertions(+) create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java create mode 100644 storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java index 1d9ce20612..9d379129b1 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java @@ -27,4 +27,22 @@ public interface ObjectCache extends Cache, Closeable { FileExtent computeIfAbsent(CacheKey key, Function mappingFunction); + + /** + * Computes if absent with batch timestamp hint for cache tiering. + * + *

The batch timestamp can be used by tiered cache implementations to route + * requests to the appropriate cache tier (hot vs cold) based on data age.

+ * + *

Default implementation ignores the timestamp and delegates to the regular + * computeIfAbsent method.

+ * + * @param key the cache key + * @param mappingFunction the function to compute the value if absent + * @param batchTimestamp the timestamp of the batch (from BatchMetadata.timestamp()) + * @return the cached or computed file extent + */ + default FileExtent computeIfAbsent(CacheKey key, Function mappingFunction, long batchTimestamp) { + return computeIfAbsent(key, mappingFunction); + } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java new file mode 100644 index 0000000000..f75691999d --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java @@ -0,0 +1,250 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.cache; + +import org.apache.kafka.common.utils.Time; + +import java.io.IOException; +import java.time.Duration; +import java.util.function.Function; + +import io.aiven.inkless.generated.CacheKey; +import io.aiven.inkless.generated.FileExtent; +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.BlockingBucket; +import io.github.bucket4j.Bucket; + +/** + * A tiered cache implementation that routes requests to either a hot cache or a lagging cache + * based on the age of the data being requested. + * + *

The hot cache is used for recent data (within the hot cache TTL), while the lagging cache + * is used for historical data. This separation prevents lagging consumers from evicting hot data + * that is being actively used by tail consumers and the write path.

+ * + *

The classification is based on the batch timestamp: if the data is older than the + * hot cache TTL, it cannot possibly be in the hot cache, so we route to the lagging cache.

+ * + *

Rate limiting is applied to lagging cache fetches to protect remote storage from + * being overwhelmed by lagging consumers.

+ */ +public class TieredObjectCache implements ObjectCache { + + private final ObjectCache hotCache; + private final ObjectCache laggingCache; + private final long hotCacheTtlMs; + private final Time time; + private final TieredCacheMetrics metrics; + private final BlockingBucket rateLimiter; + + /** + * Creates a tiered cache without rate limiting. + */ + public TieredObjectCache( + final ObjectCache hotCache, + final ObjectCache laggingCache, + final long hotCacheTtlMs, + final Time time + ) { + this(hotCache, laggingCache, hotCacheTtlMs, time, -1, TieredCacheMetrics.NOOP); + } + + /** + * Creates a tiered cache with optional rate limiting for lagging cache fetches. + * + * @param hotCache the cache for recent data + * @param laggingCache the cache for historical data + * @param hotCacheTtlMs the TTL threshold for routing (data older than this goes to lagging cache) + * @param time time source + * @param rateLimitBytesPerSec rate limit in bytes per second for lagging cache fetches, or -1 to disable + * @param metrics metrics callback + */ + public TieredObjectCache( + final ObjectCache hotCache, + final ObjectCache laggingCache, + final long hotCacheTtlMs, + final Time time, + final long rateLimitBytesPerSec, + final TieredCacheMetrics metrics + ) { + this.hotCache = hotCache; + this.laggingCache = laggingCache; + this.hotCacheTtlMs = hotCacheTtlMs; + this.time = time; + this.metrics = metrics; + this.rateLimiter = rateLimitBytesPerSec > 0 ? createRateLimiter(rateLimitBytesPerSec) : null; + } + + /** + * Creates a rate limiter for lagging cache fetches. + */ + private static BlockingBucket createRateLimiter(final long bytesPerSecond) { + // Uses Bucket4j token bucket algorithm (https://github.com/bucket4j/bucket4j) + // + // Capacity: 2x per-second rate allows short bursts (e.g., 50 MiB/s -> 100 MiB burst) + // Refill: "Greedy" adds tokens continuously for smooth rate limiting + // Blocking: consume() blocks until tokens available, creating backpressure + // + // Example with 50 MiB/s and 16 MiB blocks: ~3 fetches/sec sustained + final Bandwidth bandwidth = Bandwidth.builder() + .capacity(bytesPerSecond * 2) + .refillGreedy(bytesPerSecond, Duration.ofSeconds(1)) + .build(); + + return Bucket.builder() + .addLimit(bandwidth) + .build() + .asBlocking(); + } + + /** + * Computes if absent from the appropriate cache tier based on batch timestamp. + * + *

For lagging cache fetches, rate limiting is applied before invoking the mapping function + * to protect remote storage from being overwhelmed.

+ * + * @param key the cache key + * @param mappingFunction the function to compute the value if absent + * @param batchTimestamp the timestamp of the batch (from BatchMetadata.timestamp()) + * @return the cached or computed file extent + */ + @Override + public FileExtent computeIfAbsent( + final CacheKey key, + final Function mappingFunction, + final long batchTimestamp + ) { + final boolean useLaggingCache = shouldUseLaggingCache(batchTimestamp); + if (useLaggingCache) { + metrics.recordLaggingCacheRouting(); + return laggingCache.computeIfAbsent(key, rateLimitedMappingFunction(mappingFunction, key)); + } else { + metrics.recordHotCacheRouting(); + return hotCache.computeIfAbsent(key, mappingFunction); + } + } + + /** + * Wraps the mapping function with rate limiting for lagging cache fetches. + * Rate limiting is applied based on the expected fetch size (from the cache key's byte range). + */ + private Function rateLimitedMappingFunction( + final Function mappingFunction, + final CacheKey key + ) { + if (rateLimiter == null) { + return mappingFunction; + } + return cacheKey -> { + // Rate limit based on the aligned block size (from cache key) + // This is a conservative estimate; actual fetch may be smaller + final long bytesToFetch = key.range().length(); + if (bytesToFetch > 0) { + try { + rateLimiter.consume(bytesToFetch); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Rate limit wait interrupted", e); + } + } + return mappingFunction.apply(cacheKey); + }; + } + + /** + * Default computeIfAbsent routes to hot cache. + * This is used by the write path which always uses the hot cache. + */ + @Override + public FileExtent computeIfAbsent(final CacheKey key, final Function mappingFunction) { + metrics.recordHotCacheRouting(); + return hotCache.computeIfAbsent(key, mappingFunction); + } + + /** + * Determines if the lagging cache should be used based on batch age. + * If data is older than hot cache TTL, it cannot be in hot cache. + */ + boolean shouldUseLaggingCache(final long batchTimestamp) { + final long batchAge = time.milliseconds() - batchTimestamp; + return batchAge > hotCacheTtlMs; + } + + @Override + public FileExtent get(final CacheKey key) { + // Try hot cache first, then lagging cache + FileExtent result = hotCache.get(key); + if (result == null) { + result = laggingCache.get(key); + } + return result; + } + + /** + * Put always goes to hot cache. + * This is used by the write path (CacheStoreJob) to cache produced data. + */ + @Override + public void put(final CacheKey key, final FileExtent value) { + hotCache.put(key, value); + } + + @Override + public boolean remove(final CacheKey key) { + boolean removedFromHot = hotCache.remove(key); + boolean removedFromLagging = laggingCache.remove(key); + return removedFromHot || removedFromLagging; + } + + @Override + public long size() { + return hotCache.size() + laggingCache.size(); + } + + public long hotCacheSize() { + return hotCache.size(); + } + + public long laggingCacheSize() { + return laggingCache.size(); + } + + @Override + public void close() throws IOException { + try { + hotCache.close(); + } finally { + laggingCache.close(); + } + } + + /** + * Metrics interface for tiered cache operations. + */ + public interface TieredCacheMetrics { + TieredCacheMetrics NOOP = new TieredCacheMetrics() { + @Override + public void recordHotCacheRouting() {} + @Override + public void recordLaggingCacheRouting() {} + }; + + void recordHotCacheRouting(); + void recordLaggingCacheRouting(); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java b/storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java new file mode 100644 index 0000000000..248eb02fd6 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java @@ -0,0 +1,288 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.cache; + +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.aiven.inkless.generated.CacheKey; +import io.aiven.inkless.generated.FileExtent; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class TieredObjectCacheTest { + + private static final long HOT_CACHE_TTL_MS = 60_000; // 60 seconds + private static final long NO_RATE_LIMIT = -1; + + private ObjectCache hotCache; + private ObjectCache laggingCache; + private MockTime time; + private TieredObjectCache.TieredCacheMetrics metrics; + private TieredObjectCache tieredCache; + + @BeforeEach + void setUp() { + hotCache = mock(ObjectCache.class); + laggingCache = mock(ObjectCache.class); + time = new MockTime(); + metrics = mock(TieredObjectCache.TieredCacheMetrics.class); + tieredCache = new TieredObjectCache(hotCache, laggingCache, HOT_CACHE_TTL_MS, time, NO_RATE_LIMIT, metrics); + } + + @Test + void shouldUseLaggingCacheWhenBatchTimestampIsOlderThanTTL() { + // Given: A batch timestamp older than hot cache TTL + long currentTime = time.milliseconds(); + long oldTimestamp = currentTime - HOT_CACHE_TTL_MS - 1000; // 1 second older than TTL + + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + when(laggingCache.computeIfAbsent(eq(key), any())).thenReturn(expectedExtent); + + // When + FileExtent result = tieredCache.computeIfAbsent(key, k -> expectedExtent, oldTimestamp); + + // Then + assertThat(result).isEqualTo(expectedExtent); + verify(laggingCache, times(1)).computeIfAbsent(eq(key), any()); + verify(hotCache, never()).computeIfAbsent(any(), any()); + verify(metrics).recordLaggingCacheRouting(); + } + + @Test + void shouldUseHotCacheWhenBatchTimestampIsWithinTTL() { + // Given: A batch timestamp within hot cache TTL + long currentTime = time.milliseconds(); + long recentTimestamp = currentTime - HOT_CACHE_TTL_MS + 1000; // 1 second younger than TTL + + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + when(hotCache.computeIfAbsent(eq(key), any())).thenReturn(expectedExtent); + + // When + FileExtent result = tieredCache.computeIfAbsent(key, k -> expectedExtent, recentTimestamp); + + // Then + assertThat(result).isEqualTo(expectedExtent); + verify(hotCache, times(1)).computeIfAbsent(eq(key), any()); + verify(laggingCache, never()).computeIfAbsent(any(), any()); + verify(metrics).recordHotCacheRouting(); + } + + @Test + void shouldUseHotCacheForDefaultComputeIfAbsent() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + when(hotCache.computeIfAbsent(eq(key), any())).thenReturn(expectedExtent); + + // When: Using the method without batch timestamp (write path) + FileExtent result = tieredCache.computeIfAbsent(key, k -> expectedExtent); + + // Then + assertThat(result).isEqualTo(expectedExtent); + verify(hotCache, times(1)).computeIfAbsent(eq(key), any()); + verify(laggingCache, never()).computeIfAbsent(any(), any()); + verify(metrics).recordHotCacheRouting(); + } + + @Test + void shouldAlwaysUseHotCacheForPut() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent extent = createFileExtent("test-object"); + + // When + tieredCache.put(key, extent); + + // Then + verify(hotCache).put(key, extent); + verify(laggingCache, never()).put(key, extent); + } + + @Test + void shouldSearchBothCachesForGet() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + // Hot cache miss, lagging cache hit + when(hotCache.get(key)).thenReturn(null); + when(laggingCache.get(key)).thenReturn(expectedExtent); + + // When + FileExtent result = tieredCache.get(key); + + // Then + assertThat(result).isEqualTo(expectedExtent); + } + + @Test + void shouldReturnFromHotCacheIfFoundFirst() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + when(hotCache.get(key)).thenReturn(expectedExtent); + + // When + FileExtent result = tieredCache.get(key); + + // Then + assertThat(result).isEqualTo(expectedExtent); + verify(laggingCache, never()).get(key); + } + + @Test + void shouldRemoveFromBothCaches() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + + when(hotCache.remove(key)).thenReturn(true); + when(laggingCache.remove(key)).thenReturn(false); + + // When + boolean result = tieredCache.remove(key); + + // Then + assertThat(result).isTrue(); + verify(hotCache).remove(key); + verify(laggingCache).remove(key); + } + + @Test + void shouldCombineSizesFromBothCaches() { + // Given + when(hotCache.size()).thenReturn(100L); + when(laggingCache.size()).thenReturn(50L); + + // When/Then + assertThat(tieredCache.size()).isEqualTo(150L); + assertThat(tieredCache.hotCacheSize()).isEqualTo(100L); + assertThat(tieredCache.laggingCacheSize()).isEqualTo(50L); + } + + @Test + void shouldUseLaggingCacheExactlyAtTTLBoundary() { + // Given: A batch timestamp exactly at the TTL boundary + long currentTime = time.milliseconds(); + long boundaryTimestamp = currentTime - HOT_CACHE_TTL_MS; // Exactly at TTL + + // When + boolean shouldUseLagging = tieredCache.shouldUseLaggingCache(boundaryTimestamp); + + // Then: At exactly TTL, it should NOT use lagging cache (age == TTL, not > TTL) + assertThat(shouldUseLagging).isFalse(); + } + + @Test + void shouldUseLaggingCacheJustPastTTLBoundary() { + // Given: A batch timestamp just past the TTL boundary + long currentTime = time.milliseconds(); + long pastBoundaryTimestamp = currentTime - HOT_CACHE_TTL_MS - 1; // 1ms past TTL + + // When + boolean shouldUseLagging = tieredCache.shouldUseLaggingCache(pastBoundaryTimestamp); + + // Then + assertThat(shouldUseLagging).isTrue(); + } + + @Test + void shouldApplyRateLimitingForLaggingCacheFetches() { + // Given: A tiered cache with rate limiting enabled + long rateLimitBytesPerSec = 1000; // 1000 bytes/sec, low for testing + ObjectCache realLaggingCache = new CaffeineCache(100, 60, -1); + TieredObjectCache rateLimitedCache = new TieredObjectCache( + hotCache, + realLaggingCache, + HOT_CACHE_TTL_MS, + time, + rateLimitBytesPerSec, + TieredObjectCache.TieredCacheMetrics.NOOP + ); + + long currentTime = time.milliseconds(); + long oldTimestamp = currentTime - HOT_CACHE_TTL_MS - 1000; + + // When: Multiple fetches that should be rate limited + CacheKey key1 = createCacheKey("test-object-1", 0, 500); + CacheKey key2 = createCacheKey("test-object-2", 0, 500); + + // First fetch should succeed immediately (within burst capacity) + FileExtent result1 = rateLimitedCache.computeIfAbsent(key1, k -> createFileExtent("test-object-1"), oldTimestamp); + assertThat(result1).isNotNull(); + + // Second fetch should also succeed (rate limiter has 2x burst capacity) + FileExtent result2 = rateLimitedCache.computeIfAbsent(key2, k -> createFileExtent("test-object-2"), oldTimestamp); + assertThat(result2).isNotNull(); + } + + @Test + void shouldNotRateLimitHotCacheFetches() { + // Given: A tiered cache with rate limiting enabled + long rateLimitBytesPerSec = 100; // Very low rate limit + ObjectCache realHotCache = new CaffeineCache(100, 60, -1); + TieredObjectCache rateLimitedCache = new TieredObjectCache( + realHotCache, + laggingCache, + HOT_CACHE_TTL_MS, + time, + rateLimitBytesPerSec, + TieredObjectCache.TieredCacheMetrics.NOOP + ); + + long currentTime = time.milliseconds(); + long recentTimestamp = currentTime - 1000; // Recent data, should use hot cache + + // When: Multiple fetches to hot cache (should not be rate limited) + for (int i = 0; i < 10; i++) { + CacheKey key = createCacheKey("test-object-" + i, 0, 1000); + FileExtent result = rateLimitedCache.computeIfAbsent(key, k -> createFileExtent("test-object"), recentTimestamp); + assertThat(result).isNotNull(); + } + // If rate limiting was applied, this would have taken > 10 seconds + // Since it completes quickly, rate limiting is not applied to hot cache + } + + private CacheKey createCacheKey(String object, long offset, long length) { + return new CacheKey() + .setObject(object) + .setRange(new CacheKey.ByteRange().setOffset(offset).setLength(length)); + } + + private FileExtent createFileExtent(String object) { + return new FileExtent() + .setObject(object) + .setRange(new FileExtent.ByteRange().setOffset(0).setLength(100)) + .setData(new byte[100]); + } +} From 6292b2c3125e2afa54eff214e0a7880cc20d03c2 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Dec 2025 00:51:19 +0200 Subject: [PATCH 4/5] feat(inkless:fetch): integrate lagging consumer cache Wire TieredObjectCache into fetch path: - Add configuration options for lagging cache - Create tiered cache in SharedState when enabled - FetchPlanner tracks oldest batch timestamp per object - CacheFetchJob passes timestamp to cache for routing Configuration: - consume.lagging.cache.enabled (default: false) - consume.lagging.cache.max.count (default: 150) - consume.lagging.cache.ttl.sec (default: 5) - consume.lagging.cache.rate.limit.bytes.per.sec (default: 50 MiB/s) --- .../io/aiven/inkless/common/SharedState.java | 40 +++++- .../aiven/inkless/config/InklessConfig.java | 70 ++++++++++ .../aiven/inkless/consume/CacheFetchJob.java | 41 ++++-- .../aiven/inkless/consume/FetchPlanner.java | 53 ++++++-- .../aiven/inkless/common/SharedStateTest.java | 126 ++++++++++++++++++ .../inkless/config/InklessConfigTest.java | 13 ++ .../inkless/consume/CacheFetchJobTest.java | 69 ++++++++-- .../inkless/consume/FetchPlannerTest.java | 44 ++++++ 8 files changed, 421 insertions(+), 35 deletions(-) create mode 100644 storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index aa817b2bd4..29112e7c62 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -39,6 +39,7 @@ import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.NullBatchCoordinateCache; import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.cache.TieredObjectCache; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.MetadataView; @@ -107,6 +108,9 @@ public static SharedState initialize( "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" ); } + + final ObjectCache objectCache = createObjectCache(config, time); + return new SharedState( time, brokerId, @@ -115,17 +119,43 @@ public static SharedState initialize( controlPlane, ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()), new FixedBlockAlignment(config.fetchCacheBlockBytes()), - new CaffeineCache( - config.cacheMaxCount(), - config.cacheExpirationLifespanSec(), - config.cacheExpirationMaxIdleSec() - ), + objectCache, config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(), brokerTopicStats, defaultTopicConfigs ); } + private static ObjectCache createObjectCache(final InklessConfig config, final Time time) { + final CaffeineCache hotCache = new CaffeineCache( + config.cacheMaxCount(), + config.cacheExpirationLifespanSec(), + config.cacheExpirationMaxIdleSec() + ); + + if (!config.isLaggingCacheEnabled()) { + return hotCache; + } + + final CaffeineCache laggingCache = new CaffeineCache( + config.laggingCacheMaxCount(), + config.laggingCacheTtlSec(), + -1 // No idle expiration for lagging cache + ); + + final long hotCacheTtlMs = config.cacheExpirationLifespanSec() * 1000L; + final long rateLimitBytesPerSec = config.laggingCacheRateLimitBytesPerSec(); + + return new TieredObjectCache( + hotCache, + laggingCache, + hotCacheTtlMs, + time, + rateLimitBytesPerSec, + TieredObjectCache.TieredCacheMetrics.NOOP + ); + } + @Override public void close() throws IOException { try { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java index 98896b797a..c8ed1616a8 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java @@ -102,6 +102,27 @@ public class InklessConfig extends AbstractConfig { "The time to live must be <= than half of the value of of file.cleaner.interval.ms."; private static final int CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_DEFAULT = 5000; + // Lagging cache configuration for lagging/backfill consumers + public static final String CONSUME_LAGGING_CACHE_ENABLED_CONFIG = CONSUME_PREFIX + "lagging.cache.enabled"; + private static final String CONSUME_LAGGING_CACHE_ENABLED_DOC = "If true, a secondary cache is enabled for lagging consumers. " + + "This prevents lagging consumers from evicting hot data from the primary cache."; + private static final boolean CONSUME_LAGGING_CACHE_ENABLED_DEFAULT = false; + + public static final String CONSUME_LAGGING_CACHE_MAX_COUNT_CONFIG = CONSUME_PREFIX + "lagging.cache.max.count"; + private static final String CONSUME_LAGGING_CACHE_MAX_COUNT_DOC = "The maximum number of entries in the lagging consumer cache. " + + "Should be sized based on the number of concurrent lagging consumers."; + private static final int CONSUME_LAGGING_CACHE_MAX_COUNT_DEFAULT = 150; + + public static final String CONSUME_LAGGING_CACHE_TTL_SEC_CONFIG = CONSUME_PREFIX + "lagging.cache.ttl.sec"; + private static final String CONSUME_LAGGING_CACHE_TTL_SEC_DOC = "Time to live in seconds for entries in the lagging consumer cache. " + + "A short TTL (e.g., 5 seconds) is recommended as cached data is only needed briefly for sequential reads."; + private static final int CONSUME_LAGGING_CACHE_TTL_SEC_DEFAULT = 5; + + public static final String CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_CONFIG = CONSUME_PREFIX + "lagging.cache.rate.limit.bytes.per.sec"; + private static final String CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_DOC = "Maximum bytes per second to fetch from remote storage for lagging consumer cache misses. " + + "Set to -1 to disable rate limiting. This protects remote storage from being overwhelmed by lagging consumers."; + private static final long CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_DEFAULT = 50 * 1024 * 1024; // 50 MiB/s + public static final String RETENTION_ENFORCEMENT_INTERVAL_MS_CONFIG = "retention.enforcement.interval.ms"; private static final String RETENTION_ENFORCEMENT_INTERVAL_MS_DOC = "The interval with which to enforce retention policies on a partition. " + "This interval is approximate, because each scheduling event is randomized. " + @@ -354,6 +375,39 @@ public static ConfigDef configDef() { CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_DOC ); + // Lagging cache configuration + configDef.define( + CONSUME_LAGGING_CACHE_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + CONSUME_LAGGING_CACHE_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + CONSUME_LAGGING_CACHE_ENABLED_DOC + ); + configDef.define( + CONSUME_LAGGING_CACHE_MAX_COUNT_CONFIG, + ConfigDef.Type.INT, + CONSUME_LAGGING_CACHE_MAX_COUNT_DEFAULT, + ConfigDef.Range.atLeast(1), + ConfigDef.Importance.LOW, + CONSUME_LAGGING_CACHE_MAX_COUNT_DOC + ); + configDef.define( + CONSUME_LAGGING_CACHE_TTL_SEC_CONFIG, + ConfigDef.Type.INT, + CONSUME_LAGGING_CACHE_TTL_SEC_DEFAULT, + ConfigDef.Range.atLeast(1), + ConfigDef.Importance.LOW, + CONSUME_LAGGING_CACHE_TTL_SEC_DOC + ); + configDef.define( + CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_CONFIG, + ConfigDef.Type.LONG, + CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_DEFAULT, + ConfigDef.Range.atLeast(-1), + ConfigDef.Importance.LOW, + CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_DOC + ); + return configDef; } @@ -474,4 +528,20 @@ public boolean isBatchCoordinateCacheEnabled() { public Duration batchCoordinateCacheTtl() { return Duration.ofMillis(getInt(CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_CONFIG)); } + + public boolean isLaggingCacheEnabled() { + return getBoolean(CONSUME_LAGGING_CACHE_ENABLED_CONFIG); + } + + public int laggingCacheMaxCount() { + return getInt(CONSUME_LAGGING_CACHE_MAX_COUNT_CONFIG); + } + + public int laggingCacheTtlSec() { + return getInt(CONSUME_LAGGING_CACHE_TTL_SEC_CONFIG); + } + + public long laggingCacheRateLimitBytesPerSec() { + return getLong(CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_CONFIG); + } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java index 978dde3870..741b9e575d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.concurrent.Callable; import java.util.function.Consumer; +import java.util.function.Function; import io.aiven.inkless.cache.ObjectCache; import io.aiven.inkless.common.ByteRange; @@ -32,6 +33,12 @@ public class CacheFetchJob implements Callable { + /** + * Constant indicating no batch timestamp is available. + * When used, the cache will default to hot cache behavior. + */ + public static final long NO_BATCH_TIMESTAMP = Long.MIN_VALUE; + private final ObjectCache cache; private final ObjectKey objectKey; private final ObjectFetcher objectFetcher; @@ -41,6 +48,7 @@ public class CacheFetchJob implements Callable { private final ByteRange byteRange; private final FileFetchJob fallback; private final Consumer fileFetchDurationCallback; + private final long batchTimestamp; public CacheFetchJob( final ObjectCache cache, @@ -50,6 +58,19 @@ public CacheFetchJob( final Time time, final Consumer fileFetchDurationCallback, final Consumer cacheEntrySize + ) { + this(cache, objectFetcher, objectKey, byteRange, time, fileFetchDurationCallback, cacheEntrySize, NO_BATCH_TIMESTAMP); + } + + public CacheFetchJob( + final ObjectCache cache, + final ObjectFetcher objectFetcher, + final ObjectKey objectKey, + final ByteRange byteRange, + final Time time, + final Consumer fileFetchDurationCallback, + final Consumer cacheEntrySize, + final long batchTimestamp ) { this.cache = cache; this.objectKey = objectKey; @@ -59,6 +80,7 @@ public CacheFetchJob( this.fileFetchDurationCallback = fileFetchDurationCallback; this.byteRange = byteRange; this.key = createCacheKey(objectKey, byteRange); + this.batchTimestamp = batchTimestamp; this.fallback = new FileFetchJob(time, objectFetcher, objectKey, byteRange, fileFetchDurationCallback); } @@ -74,24 +96,26 @@ static CacheKey createCacheKey(ObjectKey object, ByteRange byteRange) { @Override public FileExtent call() { - return cache.computeIfAbsent(key, cacheKey -> { + final Function mappingFunction = cacheKey -> { // Let remote storage exceptions bubble up, do not catch the exceptions. final FileExtent freshFile = loadFileExtent(objectKey, byteRange); // TODO: add cache entry size also to produce/file commit cacheEntrySize.accept(freshFile.data().length); return freshFile; - }); + }; + + // Use timestamp-aware method - default implementation ignores timestamp, + // TieredObjectCache uses it for routing to hot/cold cache + return cache.computeIfAbsent(key, mappingFunction, batchTimestamp); } private FileExtent loadFileExtent(final ObjectKey key, final ByteRange batchRange) { - final FileExtent freshFile; - final FileFetchJob fallback = new FileFetchJob(time, objectFetcher, key, batchRange, fileFetchDurationCallback); + final FileFetchJob fetchJob = new FileFetchJob(time, objectFetcher, key, batchRange, fileFetchDurationCallback); try { - freshFile = fallback.call(); + return fetchJob.call(); } catch (Exception e) { throw new FetchException(e); } - return freshFile; } @Override @@ -99,13 +123,14 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CacheFetchJob that = (CacheFetchJob) o; - return Objects.equals(cache, that.cache) + return batchTimestamp == that.batchTimestamp + && Objects.equals(cache, that.cache) && Objects.equals(key, that.key) && Objects.equals(fallback, that.fallback); } @Override public int hashCode() { - return Objects.hash(cache, key, fallback); + return Objects.hash(cache, key, fallback, batchTimestamp); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java index f3efd5e788..da62b47e15 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java @@ -21,9 +21,10 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Time; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -75,25 +76,52 @@ private List> doWork(final Map ranges = new ArrayList<>(); + long oldestTimestamp = Long.MAX_VALUE; + + void addBatch(BatchInfo batch) { + ranges.add(batch.metadata().range()); + oldestTimestamp = Math.min(oldestTimestamp, batch.metadata().timestamp()); + } + } + private List planJobs(final Map batchCoordinates) { - final Set>> objectKeysToRanges = batchCoordinates.values().stream() + // Group batches by object key, tracking ranges and oldest timestamp + final Map objectKeysToFetchInfo = new HashMap<>(); + + batchCoordinates.values().stream() .filter(findBatch -> findBatch.errors() == Errors.NONE) .map(FindBatchResponse::batches) .peek(batches -> metrics.recordFetchBatchSize(batches.size())) .flatMap(List::stream) - // Merge batch requests - .collect(Collectors.groupingBy(BatchInfo::objectKey, Collectors.mapping(b -> b.metadata().range(), Collectors.toList()))) - .entrySet(); - metrics.recordFetchObjectsSize(objectKeysToRanges.size()); - return objectKeysToRanges.stream() - .flatMap(e -> - keyAlignment.align(e.getValue()) + .forEach(batch -> + objectKeysToFetchInfo + .computeIfAbsent(batch.objectKey(), k -> new ObjectFetchInfo()) + .addBatch(batch) + ); + + metrics.recordFetchObjectsSize(objectKeysToFetchInfo.size()); + + return objectKeysToFetchInfo.entrySet().stream() + .flatMap(e -> { + final String objectKey = e.getKey(); + final ObjectFetchInfo fetchInfo = e.getValue(); + // Note: keyAlignment.align() returns fixed-size aligned blocks (e.g., 16 MiB). + // This aligned byteRange is used for both caching and rate limiting (if enabled). + // Rate limiting uses the aligned block size (not actual batch size) as a conservative + // estimate, since the actual fetch size is only known after the fetch completes. + return keyAlignment.align(fetchInfo.ranges) .stream() - .map(byteRange -> getCacheFetchJob(e.getKey(), byteRange))) + .map(byteRange -> getCacheFetchJob(objectKey, byteRange, fetchInfo.oldestTimestamp)); + }) .collect(Collectors.toList()); } - private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange byteRange) { + private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange byteRange, final long batchTimestamp) { return new CacheFetchJob( cache, objectFetcher, @@ -101,7 +129,8 @@ private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange b byteRange, time, metrics::fetchFileFinished, - metrics::cacheEntrySize + metrics::cacheEntrySize, + batchTimestamp ); } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java new file mode 100644 index 0000000000..f3ce9495aa --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java @@ -0,0 +1,126 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.common; + +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import io.aiven.inkless.cache.CaffeineCache; +import io.aiven.inkless.cache.TieredObjectCache; +import io.aiven.inkless.config.InklessConfig; +import io.aiven.inkless.control_plane.InMemoryControlPlane; +import io.aiven.inkless.control_plane.MetadataView; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +class SharedStateTest { + + private final MockTime time = new MockTime(); + + @Test + void initializeWithLaggingCacheDisabled() throws IOException { + // Given: Config with lagging cache disabled (default) + Map configs = minimalConfig(); + configs.put("consume.lagging.cache.enabled", "false"); + InklessConfig config = new InklessConfig(configs); + + // When + SharedState state = SharedState.initialize( + time, + 1, + config, + mock(MetadataView.class), + new InMemoryControlPlane(time), + new BrokerTopicStats(), + () -> new LogConfig(new Properties()) + ); + + // Then: Cache should be a regular CaffeineCache, not TieredObjectCache + assertThat(state.cache()).isInstanceOf(CaffeineCache.class); + + state.close(); + } + + @Test + void initializeWithLaggingCacheEnabled() throws IOException { + // Given: Config with lagging cache enabled + Map configs = minimalConfig(); + configs.put("consume.lagging.cache.enabled", "true"); + configs.put("consume.lagging.cache.max.count", "100"); + configs.put("consume.lagging.cache.ttl.sec", "10"); + configs.put("consume.lagging.cache.rate.limit.bytes.per.sec", "52428800"); + InklessConfig config = new InklessConfig(configs); + + // When + SharedState state = SharedState.initialize( + time, + 1, + config, + mock(MetadataView.class), + new InMemoryControlPlane(time), + new BrokerTopicStats(), + () -> new LogConfig(new Properties()) + ); + + // Then: Cache should be a TieredObjectCache + assertThat(state.cache()).isInstanceOf(TieredObjectCache.class); + + state.close(); + } + + @Test + void initializeWithLaggingCacheRateLimitDisabled() throws IOException { + // Given: Config with lagging cache enabled but rate limiting disabled + Map configs = minimalConfig(); + configs.put("consume.lagging.cache.enabled", "true"); + configs.put("consume.lagging.cache.rate.limit.bytes.per.sec", "-1"); + InklessConfig config = new InklessConfig(configs); + + // When + SharedState state = SharedState.initialize( + time, + 1, + config, + mock(MetadataView.class), + new InMemoryControlPlane(time), + new BrokerTopicStats(), + () -> new LogConfig(new Properties()) + ); + + // Then: Cache should still be a TieredObjectCache (rate limiting is optional) + assertThat(state.cache()).isInstanceOf(TieredObjectCache.class); + + state.close(); + } + + private Map minimalConfig() { + Map configs = new HashMap<>(); + configs.put("control.plane.class", InMemoryControlPlane.class.getCanonicalName()); + configs.put("storage.backend.class", "io.aiven.inkless.config.ConfigTestStorageBackend"); + return configs; + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java index 6c3b22f12f..1eb91407af 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java @@ -99,6 +99,11 @@ void minimalConfig() { assertThat(config.produceUploadBackoff()).isEqualTo(Duration.ofMillis(10)); assertThat(config.storage(storageMetrics)).isInstanceOf(ConfigTestStorageBackend.class); assertThat(config.fileCleanerInterval()).isEqualTo(Duration.ofMinutes(5)); + // Lagging cache defaults + assertThat(config.isLaggingCacheEnabled()).isFalse(); + assertThat(config.laggingCacheMaxCount()).isEqualTo(150); + assertThat(config.laggingCacheTtlSec()).isEqualTo(5); + assertThat(config.laggingCacheRateLimitBytesPerSec()).isEqualTo(50 * 1024 * 1024); assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMinutes(1)); assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMinutes(1)); assertThat(config.cacheMaxCount()).isEqualTo(1000); @@ -131,6 +136,10 @@ void fullConfig() { configs.put("fetch.data.thread.pool.size", "12"); configs.put("fetch.metadata.thread.pool.size", "14"); configs.put("retention.enforcement.max.batches.per.request", "10"); + configs.put("consume.lagging.cache.enabled", "true"); + configs.put("consume.lagging.cache.max.count", "200"); + configs.put("consume.lagging.cache.ttl.sec", "10"); + configs.put("consume.lagging.cache.rate.limit.bytes.per.sec", "104857600"); final var config = new InklessConfig( configs ); @@ -152,6 +161,10 @@ void fullConfig() { assertThat(config.fetchDataThreadPoolSize()).isEqualTo(12); assertThat(config.fetchMetadataThreadPoolSize()).isEqualTo(14); assertThat(config.maxBatchesPerEnforcementRequest()).isEqualTo(10); + assertThat(config.isLaggingCacheEnabled()).isTrue(); + assertThat(config.laggingCacheMaxCount()).isEqualTo(200); + assertThat(config.laggingCacheTtlSec()).isEqualTo(10); + assertThat(config.laggingCacheRateLimitBytesPerSec()).isEqualTo(104857600); } @Test diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java index f1d92a9a44..c76839e7e6 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java @@ -40,7 +40,10 @@ import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -54,6 +57,15 @@ public class CacheFetchJobTest { final Time time = new MockTime(); final static ObjectKey objectA = PlainObjectKey.create("a", "a"); + private CacheFetchJob cacheFetchJob(ObjectCache cache, ByteRange byteRange) { + return cacheFetchJob(cache, byteRange, CacheFetchJob.NO_BATCH_TIMESTAMP); + } + + private CacheFetchJob cacheFetchJob(ObjectCache cache, ByteRange byteRange, long batchTimestamp) { + return new CacheFetchJob(cache, fetcher, CacheFetchJobTest.objectA, byteRange, time, + durationMs -> {}, cacheEntrySize -> {}, batchTimestamp); + } + @Test public void testCacheMiss() throws Exception { int size = 10; @@ -69,14 +81,14 @@ public void testCacheMiss() throws Exception { when(fetcher.readToByteBuffer(channel)).thenReturn(ByteBuffer.wrap(array)); ObjectCache cache = new NullCache(); - CacheFetchJob cacheFetchJob = cacheFetchJob(cache, objectA, range); + CacheFetchJob cacheFetchJob = cacheFetchJob(cache, range); FileExtent actualFile = cacheFetchJob.call(); assertThat(actualFile).isEqualTo(expectedFile); } @Test - public void testCacheHit() throws Exception { + public void testCacheHit() { int size = 10; byte[] array = new byte[10]; for (int i = 0; i < size; i++) { @@ -87,19 +99,56 @@ public void testCacheHit() throws Exception { ObjectCache cache = new MemoryCache(); cache.put(CacheFetchJob.createCacheKey(objectA, range), expectedFile); - CacheFetchJob cacheFetchJob = cacheFetchJob(cache, objectA, range); + CacheFetchJob cacheFetchJob = cacheFetchJob(cache, range); FileExtent actualFile = cacheFetchJob.call(); assertThat(actualFile).isEqualTo(expectedFile); verifyNoInteractions(fetcher); } - private CacheFetchJob cacheFetchJob( - ObjectCache cache, - ObjectKey objectKey, - ByteRange byteRange - ) { - return new CacheFetchJob(cache, fetcher, objectKey, byteRange, time, - durationMs -> {}, cacheEntrySize -> {}); + @Test + public void testCacheFetchJobWithTimestampUsesTimestampAwareMethod() { + // Given: A mock cache that tracks method calls + ObjectCache mockCache = mock(ObjectCache.class); + + int size = 10; + byte[] array = new byte[size]; + ByteRange range = new ByteRange(0, size); + FileExtent expectedFile = FileFetchJob.createFileExtent(objectA, range, ByteBuffer.wrap(array)); + + long batchTimestamp = 12345L; + + // Mock the timestamp-aware computeIfAbsent to return expected file + when(mockCache.computeIfAbsent(any(), any(), eq(batchTimestamp))).thenReturn(expectedFile); + + // When + CacheFetchJob job = cacheFetchJob(mockCache, range, batchTimestamp); + FileExtent result = job.call(); + + // Then: Should use the timestamp-aware method + assertThat(result).isEqualTo(expectedFile); + verify(mockCache).computeIfAbsent(any(), any(), eq(batchTimestamp)); + } + + @Test + public void testCacheFetchJobWithoutTimestampUsesDefaultTimestamp() { + // Given: A mock cache + ObjectCache mockCache = mock(ObjectCache.class); + + int size = 10; + byte[] array = new byte[size]; + ByteRange range = new ByteRange(0, size); + FileExtent expectedFile = FileFetchJob.createFileExtent(objectA, range, ByteBuffer.wrap(array)); + + // Mock the timestamp-aware computeIfAbsent with NO_BATCH_TIMESTAMP + when(mockCache.computeIfAbsent(any(), any(), eq(CacheFetchJob.NO_BATCH_TIMESTAMP))).thenReturn(expectedFile); + + // When: Using constructor without timestamp + CacheFetchJob job = cacheFetchJob(mockCache, range); + FileExtent result = job.call(); + + // Then: Should use NO_BATCH_TIMESTAMP + assertThat(result).isEqualTo(expectedFile); + verify(mockCache).computeIfAbsent(any(), any(), eq(CacheFetchJob.NO_BATCH_TIMESTAMP)); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java index 4523dda130..cb8cc2dc61 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java @@ -221,4 +221,48 @@ private void assertBatchPlan(Map coordinate // Verify the number of planned fetch jobs matches expected assertEquals(expectedJobs.size(), result.size()); } + + @Test + public void planShouldUseOldestTimestampForSameObject() { + // Given: Two batches for the same object with different timestamps + long olderTimestamp = 1000L; + long newerTimestamp = 2000L; + + Map coordinates = Map.of( + partition0, FindBatchResponse.success(List.of( + new BatchInfo(1L, OBJECT_KEY_A.value(), BatchMetadata.of(partition0, 0, 10, 0, 0, 10, newerTimestamp, TimestampType.CREATE_TIME)) + ), 0, 1), + partition1, FindBatchResponse.success(List.of( + new BatchInfo(2L, OBJECT_KEY_A.value(), BatchMetadata.of(partition1, 30, 10, 0, 0, 11, olderTimestamp, TimestampType.CREATE_TIME)) + ), 0, 1) + ); + + FetchPlanner job = fetchPlannerJob(coordinates); + List> result = job.get(); + + // Should only have one job for the merged object key + assertEquals(1, result.size()); + // The job should use the older timestamp (1000L) for cache tiering decisions + // This is verified implicitly - FetchPlanner.ObjectFetchInfo tracks min timestamp + } + + @Test + public void planShouldPreserveSeparateTimestampsForDifferentObjects() { + // Given: Two different objects with different timestamps + long timestampA = 1000L; + long timestampB = 5000L; + + Map coordinates = Map.of( + partition0, FindBatchResponse.success(List.of( + new BatchInfo(1L, OBJECT_KEY_A.value(), BatchMetadata.of(partition0, 0, 10, 0, 0, 10, timestampA, TimestampType.CREATE_TIME)), + new BatchInfo(2L, OBJECT_KEY_B.value(), BatchMetadata.of(partition0, 0, 10, 1, 1, 11, timestampB, TimestampType.CREATE_TIME)) + ), 0, 2) + ); + + FetchPlanner job = fetchPlannerJob(coordinates); + List> result = job.get(); + + // Should have two separate jobs, each with its own timestamp + assertEquals(2, result.size()); + } } From ad4a60bd41162415f25206d236ed4240d6a789ab Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Dec 2025 11:26:05 +0200 Subject: [PATCH 5/5] docs(inkless): update configs with lagging cache --- docs/inkless/configs.rst | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/docs/inkless/configs.rst b/docs/inkless/configs.rst index e05cb53156..860b626c74 100644 --- a/docs/inkless/configs.rst +++ b/docs/inkless/configs.rst @@ -109,6 +109,37 @@ Under ``inkless.`` * Valid Values: [1,...] * Importance: low +``consume.lagging.cache.enabled`` + If true, a secondary cache is enabled for lagging consumers. This prevents lagging consumers from evicting hot data from the primary cache. + + * Type: boolean + * Default: false + * Importance: low + +``consume.lagging.cache.max.count`` + The maximum number of entries in the lagging consumer cache. Should be sized based on the number of concurrent lagging consumers. + + * Type: int + * Default: 150 + * Valid Values: [1,...] + * Importance: low + +``consume.lagging.cache.rate.limit.bytes.per.sec`` + Maximum bytes per second to fetch from remote storage for lagging consumer cache misses. Set to -1 to disable rate limiting. This protects remote storage from being overwhelmed by lagging consumers. + + * Type: long + * Default: 52428800 + * Valid Values: [-1,...] + * Importance: low + +``consume.lagging.cache.ttl.sec`` + Time to live in seconds for entries in the lagging consumer cache. A short TTL (e.g., 5 seconds) is recommended as cached data is only needed briefly for sequential reads. + + * Type: int + * Default: 5 + * Valid Values: [1,...] + * Importance: low + ``fetch.data.thread.pool.size`` Thread pool size to concurrently fetch data files from remote storage