diff --git a/build.gradle b/build.gradle index 9c6e4b951b..e68b35fbfe 100644 --- a/build.gradle +++ b/build.gradle @@ -2452,6 +2452,7 @@ project(':storage:inkless') { } implementation libs.metrics implementation libs.caffeine + implementation libs.bucket4jCore testImplementation project(':clients').sourceSets.test.output.classesDirs testImplementation project(':test-common') diff --git a/docs/inkless/configs.rst b/docs/inkless/configs.rst index e05cb53156..860b626c74 100644 --- a/docs/inkless/configs.rst +++ b/docs/inkless/configs.rst @@ -109,6 +109,37 @@ Under ``inkless.`` * Valid Values: [1,...] * Importance: low +``consume.lagging.cache.enabled`` + If true, a secondary cache is enabled for lagging consumers. This prevents lagging consumers from evicting hot data from the primary cache. + + * Type: boolean + * Default: false + * Importance: low + +``consume.lagging.cache.max.count`` + The maximum number of entries in the lagging consumer cache. Should be sized based on the number of concurrent lagging consumers. + + * Type: int + * Default: 150 + * Valid Values: [1,...] + * Importance: low + +``consume.lagging.cache.rate.limit.bytes.per.sec`` + Maximum bytes per second to fetch from remote storage for lagging consumer cache misses. Set to -1 to disable rate limiting. This protects remote storage from being overwhelmed by lagging consumers. + + * Type: long + * Default: 52428800 + * Valid Values: [-1,...] + * Importance: low + +``consume.lagging.cache.ttl.sec`` + Time to live in seconds for entries in the lagging consumer cache. A short TTL (e.g., 5 seconds) is recommended as cached data is only needed briefly for sequential reads. + + * Type: int + * Default: 5 + * Valid Values: [1,...] + * Importance: low + ``fetch.data.thread.pool.size`` Thread pool size to concurrently fetch data files from remote storage diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 2d08e277dd..4397b09213 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -58,6 +58,7 @@ versions += [ awsSdk: "2.29.6", azureSdk: "1.2.28", bcpkix: "1.80", + bucket4j: "8.14.0", caffeine: "3.2.0", bndlib: "7.1.0", checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2", @@ -164,6 +165,7 @@ libs += [ azureSdkBom: "com.azure:azure-sdk-bom:$versions.azureSdk", bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix", bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib", + bucket4jCore: "com.bucket4j:bucket4j_jdk11-core:$versions.bucket4j", caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsBeanutils: "commons-beanutils:commons-beanutils:$versions.commonsBeanutils", diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java index 1d9ce20612..9d379129b1 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java @@ -27,4 +27,22 @@ public interface ObjectCache extends Cache, Closeable { FileExtent computeIfAbsent(CacheKey key, Function mappingFunction); + + /** + * Computes if absent with batch timestamp hint for cache tiering. + * + *

The batch timestamp can be used by tiered cache implementations to route + * requests to the appropriate cache tier (hot vs cold) based on data age.

+ * + *

Default implementation ignores the timestamp and delegates to the regular + * computeIfAbsent method.

+ * + * @param key the cache key + * @param mappingFunction the function to compute the value if absent + * @param batchTimestamp the timestamp of the batch (from BatchMetadata.timestamp()) + * @return the cached or computed file extent + */ + default FileExtent computeIfAbsent(CacheKey key, Function mappingFunction, long batchTimestamp) { + return computeIfAbsent(key, mappingFunction); + } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java new file mode 100644 index 0000000000..f75691999d --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java @@ -0,0 +1,250 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.cache; + +import org.apache.kafka.common.utils.Time; + +import java.io.IOException; +import java.time.Duration; +import java.util.function.Function; + +import io.aiven.inkless.generated.CacheKey; +import io.aiven.inkless.generated.FileExtent; +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.BlockingBucket; +import io.github.bucket4j.Bucket; + +/** + * A tiered cache implementation that routes requests to either a hot cache or a lagging cache + * based on the age of the data being requested. + * + *

The hot cache is used for recent data (within the hot cache TTL), while the lagging cache + * is used for historical data. This separation prevents lagging consumers from evicting hot data + * that is being actively used by tail consumers and the write path.

+ * + *

The classification is based on the batch timestamp: if the data is older than the + * hot cache TTL, it cannot possibly be in the hot cache, so we route to the lagging cache.

+ * + *

Rate limiting is applied to lagging cache fetches to protect remote storage from + * being overwhelmed by lagging consumers.

+ */ +public class TieredObjectCache implements ObjectCache { + + private final ObjectCache hotCache; + private final ObjectCache laggingCache; + private final long hotCacheTtlMs; + private final Time time; + private final TieredCacheMetrics metrics; + private final BlockingBucket rateLimiter; + + /** + * Creates a tiered cache without rate limiting. + */ + public TieredObjectCache( + final ObjectCache hotCache, + final ObjectCache laggingCache, + final long hotCacheTtlMs, + final Time time + ) { + this(hotCache, laggingCache, hotCacheTtlMs, time, -1, TieredCacheMetrics.NOOP); + } + + /** + * Creates a tiered cache with optional rate limiting for lagging cache fetches. + * + * @param hotCache the cache for recent data + * @param laggingCache the cache for historical data + * @param hotCacheTtlMs the TTL threshold for routing (data older than this goes to lagging cache) + * @param time time source + * @param rateLimitBytesPerSec rate limit in bytes per second for lagging cache fetches, or -1 to disable + * @param metrics metrics callback + */ + public TieredObjectCache( + final ObjectCache hotCache, + final ObjectCache laggingCache, + final long hotCacheTtlMs, + final Time time, + final long rateLimitBytesPerSec, + final TieredCacheMetrics metrics + ) { + this.hotCache = hotCache; + this.laggingCache = laggingCache; + this.hotCacheTtlMs = hotCacheTtlMs; + this.time = time; + this.metrics = metrics; + this.rateLimiter = rateLimitBytesPerSec > 0 ? createRateLimiter(rateLimitBytesPerSec) : null; + } + + /** + * Creates a rate limiter for lagging cache fetches. + */ + private static BlockingBucket createRateLimiter(final long bytesPerSecond) { + // Uses Bucket4j token bucket algorithm (https://github.com/bucket4j/bucket4j) + // + // Capacity: 2x per-second rate allows short bursts (e.g., 50 MiB/s -> 100 MiB burst) + // Refill: "Greedy" adds tokens continuously for smooth rate limiting + // Blocking: consume() blocks until tokens available, creating backpressure + // + // Example with 50 MiB/s and 16 MiB blocks: ~3 fetches/sec sustained + final Bandwidth bandwidth = Bandwidth.builder() + .capacity(bytesPerSecond * 2) + .refillGreedy(bytesPerSecond, Duration.ofSeconds(1)) + .build(); + + return Bucket.builder() + .addLimit(bandwidth) + .build() + .asBlocking(); + } + + /** + * Computes if absent from the appropriate cache tier based on batch timestamp. + * + *

For lagging cache fetches, rate limiting is applied before invoking the mapping function + * to protect remote storage from being overwhelmed.

+ * + * @param key the cache key + * @param mappingFunction the function to compute the value if absent + * @param batchTimestamp the timestamp of the batch (from BatchMetadata.timestamp()) + * @return the cached or computed file extent + */ + @Override + public FileExtent computeIfAbsent( + final CacheKey key, + final Function mappingFunction, + final long batchTimestamp + ) { + final boolean useLaggingCache = shouldUseLaggingCache(batchTimestamp); + if (useLaggingCache) { + metrics.recordLaggingCacheRouting(); + return laggingCache.computeIfAbsent(key, rateLimitedMappingFunction(mappingFunction, key)); + } else { + metrics.recordHotCacheRouting(); + return hotCache.computeIfAbsent(key, mappingFunction); + } + } + + /** + * Wraps the mapping function with rate limiting for lagging cache fetches. + * Rate limiting is applied based on the expected fetch size (from the cache key's byte range). + */ + private Function rateLimitedMappingFunction( + final Function mappingFunction, + final CacheKey key + ) { + if (rateLimiter == null) { + return mappingFunction; + } + return cacheKey -> { + // Rate limit based on the aligned block size (from cache key) + // This is a conservative estimate; actual fetch may be smaller + final long bytesToFetch = key.range().length(); + if (bytesToFetch > 0) { + try { + rateLimiter.consume(bytesToFetch); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Rate limit wait interrupted", e); + } + } + return mappingFunction.apply(cacheKey); + }; + } + + /** + * Default computeIfAbsent routes to hot cache. + * This is used by the write path which always uses the hot cache. + */ + @Override + public FileExtent computeIfAbsent(final CacheKey key, final Function mappingFunction) { + metrics.recordHotCacheRouting(); + return hotCache.computeIfAbsent(key, mappingFunction); + } + + /** + * Determines if the lagging cache should be used based on batch age. + * If data is older than hot cache TTL, it cannot be in hot cache. + */ + boolean shouldUseLaggingCache(final long batchTimestamp) { + final long batchAge = time.milliseconds() - batchTimestamp; + return batchAge > hotCacheTtlMs; + } + + @Override + public FileExtent get(final CacheKey key) { + // Try hot cache first, then lagging cache + FileExtent result = hotCache.get(key); + if (result == null) { + result = laggingCache.get(key); + } + return result; + } + + /** + * Put always goes to hot cache. + * This is used by the write path (CacheStoreJob) to cache produced data. + */ + @Override + public void put(final CacheKey key, final FileExtent value) { + hotCache.put(key, value); + } + + @Override + public boolean remove(final CacheKey key) { + boolean removedFromHot = hotCache.remove(key); + boolean removedFromLagging = laggingCache.remove(key); + return removedFromHot || removedFromLagging; + } + + @Override + public long size() { + return hotCache.size() + laggingCache.size(); + } + + public long hotCacheSize() { + return hotCache.size(); + } + + public long laggingCacheSize() { + return laggingCache.size(); + } + + @Override + public void close() throws IOException { + try { + hotCache.close(); + } finally { + laggingCache.close(); + } + } + + /** + * Metrics interface for tiered cache operations. + */ + public interface TieredCacheMetrics { + TieredCacheMetrics NOOP = new TieredCacheMetrics() { + @Override + public void recordHotCacheRouting() {} + @Override + public void recordLaggingCacheRouting() {} + }; + + void recordHotCacheRouting(); + void recordLaggingCacheRouting(); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index aa817b2bd4..29112e7c62 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -39,6 +39,7 @@ import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.NullBatchCoordinateCache; import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.cache.TieredObjectCache; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.MetadataView; @@ -107,6 +108,9 @@ public static SharedState initialize( "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" ); } + + final ObjectCache objectCache = createObjectCache(config, time); + return new SharedState( time, brokerId, @@ -115,17 +119,43 @@ public static SharedState initialize( controlPlane, ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()), new FixedBlockAlignment(config.fetchCacheBlockBytes()), - new CaffeineCache( - config.cacheMaxCount(), - config.cacheExpirationLifespanSec(), - config.cacheExpirationMaxIdleSec() - ), + objectCache, config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(), brokerTopicStats, defaultTopicConfigs ); } + private static ObjectCache createObjectCache(final InklessConfig config, final Time time) { + final CaffeineCache hotCache = new CaffeineCache( + config.cacheMaxCount(), + config.cacheExpirationLifespanSec(), + config.cacheExpirationMaxIdleSec() + ); + + if (!config.isLaggingCacheEnabled()) { + return hotCache; + } + + final CaffeineCache laggingCache = new CaffeineCache( + config.laggingCacheMaxCount(), + config.laggingCacheTtlSec(), + -1 // No idle expiration for lagging cache + ); + + final long hotCacheTtlMs = config.cacheExpirationLifespanSec() * 1000L; + final long rateLimitBytesPerSec = config.laggingCacheRateLimitBytesPerSec(); + + return new TieredObjectCache( + hotCache, + laggingCache, + hotCacheTtlMs, + time, + rateLimitBytesPerSec, + TieredObjectCache.TieredCacheMetrics.NOOP + ); + } + @Override public void close() throws IOException { try { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java index 98896b797a..c8ed1616a8 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java @@ -102,6 +102,27 @@ public class InklessConfig extends AbstractConfig { "The time to live must be <= than half of the value of of file.cleaner.interval.ms."; private static final int CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_DEFAULT = 5000; + // Lagging cache configuration for lagging/backfill consumers + public static final String CONSUME_LAGGING_CACHE_ENABLED_CONFIG = CONSUME_PREFIX + "lagging.cache.enabled"; + private static final String CONSUME_LAGGING_CACHE_ENABLED_DOC = "If true, a secondary cache is enabled for lagging consumers. " + + "This prevents lagging consumers from evicting hot data from the primary cache."; + private static final boolean CONSUME_LAGGING_CACHE_ENABLED_DEFAULT = false; + + public static final String CONSUME_LAGGING_CACHE_MAX_COUNT_CONFIG = CONSUME_PREFIX + "lagging.cache.max.count"; + private static final String CONSUME_LAGGING_CACHE_MAX_COUNT_DOC = "The maximum number of entries in the lagging consumer cache. " + + "Should be sized based on the number of concurrent lagging consumers."; + private static final int CONSUME_LAGGING_CACHE_MAX_COUNT_DEFAULT = 150; + + public static final String CONSUME_LAGGING_CACHE_TTL_SEC_CONFIG = CONSUME_PREFIX + "lagging.cache.ttl.sec"; + private static final String CONSUME_LAGGING_CACHE_TTL_SEC_DOC = "Time to live in seconds for entries in the lagging consumer cache. " + + "A short TTL (e.g., 5 seconds) is recommended as cached data is only needed briefly for sequential reads."; + private static final int CONSUME_LAGGING_CACHE_TTL_SEC_DEFAULT = 5; + + public static final String CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_CONFIG = CONSUME_PREFIX + "lagging.cache.rate.limit.bytes.per.sec"; + private static final String CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_DOC = "Maximum bytes per second to fetch from remote storage for lagging consumer cache misses. " + + "Set to -1 to disable rate limiting. This protects remote storage from being overwhelmed by lagging consumers."; + private static final long CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_DEFAULT = 50 * 1024 * 1024; // 50 MiB/s + public static final String RETENTION_ENFORCEMENT_INTERVAL_MS_CONFIG = "retention.enforcement.interval.ms"; private static final String RETENTION_ENFORCEMENT_INTERVAL_MS_DOC = "The interval with which to enforce retention policies on a partition. " + "This interval is approximate, because each scheduling event is randomized. " + @@ -354,6 +375,39 @@ public static ConfigDef configDef() { CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_DOC ); + // Lagging cache configuration + configDef.define( + CONSUME_LAGGING_CACHE_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + CONSUME_LAGGING_CACHE_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + CONSUME_LAGGING_CACHE_ENABLED_DOC + ); + configDef.define( + CONSUME_LAGGING_CACHE_MAX_COUNT_CONFIG, + ConfigDef.Type.INT, + CONSUME_LAGGING_CACHE_MAX_COUNT_DEFAULT, + ConfigDef.Range.atLeast(1), + ConfigDef.Importance.LOW, + CONSUME_LAGGING_CACHE_MAX_COUNT_DOC + ); + configDef.define( + CONSUME_LAGGING_CACHE_TTL_SEC_CONFIG, + ConfigDef.Type.INT, + CONSUME_LAGGING_CACHE_TTL_SEC_DEFAULT, + ConfigDef.Range.atLeast(1), + ConfigDef.Importance.LOW, + CONSUME_LAGGING_CACHE_TTL_SEC_DOC + ); + configDef.define( + CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_CONFIG, + ConfigDef.Type.LONG, + CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_DEFAULT, + ConfigDef.Range.atLeast(-1), + ConfigDef.Importance.LOW, + CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_DOC + ); + return configDef; } @@ -474,4 +528,20 @@ public boolean isBatchCoordinateCacheEnabled() { public Duration batchCoordinateCacheTtl() { return Duration.ofMillis(getInt(CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_CONFIG)); } + + public boolean isLaggingCacheEnabled() { + return getBoolean(CONSUME_LAGGING_CACHE_ENABLED_CONFIG); + } + + public int laggingCacheMaxCount() { + return getInt(CONSUME_LAGGING_CACHE_MAX_COUNT_CONFIG); + } + + public int laggingCacheTtlSec() { + return getInt(CONSUME_LAGGING_CACHE_TTL_SEC_CONFIG); + } + + public long laggingCacheRateLimitBytesPerSec() { + return getLong(CONSUME_LAGGING_CACHE_RATE_LIMIT_BYTES_PER_SEC_CONFIG); + } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java index 978dde3870..741b9e575d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.concurrent.Callable; import java.util.function.Consumer; +import java.util.function.Function; import io.aiven.inkless.cache.ObjectCache; import io.aiven.inkless.common.ByteRange; @@ -32,6 +33,12 @@ public class CacheFetchJob implements Callable { + /** + * Constant indicating no batch timestamp is available. + * When used, the cache will default to hot cache behavior. + */ + public static final long NO_BATCH_TIMESTAMP = Long.MIN_VALUE; + private final ObjectCache cache; private final ObjectKey objectKey; private final ObjectFetcher objectFetcher; @@ -41,6 +48,7 @@ public class CacheFetchJob implements Callable { private final ByteRange byteRange; private final FileFetchJob fallback; private final Consumer fileFetchDurationCallback; + private final long batchTimestamp; public CacheFetchJob( final ObjectCache cache, @@ -50,6 +58,19 @@ public CacheFetchJob( final Time time, final Consumer fileFetchDurationCallback, final Consumer cacheEntrySize + ) { + this(cache, objectFetcher, objectKey, byteRange, time, fileFetchDurationCallback, cacheEntrySize, NO_BATCH_TIMESTAMP); + } + + public CacheFetchJob( + final ObjectCache cache, + final ObjectFetcher objectFetcher, + final ObjectKey objectKey, + final ByteRange byteRange, + final Time time, + final Consumer fileFetchDurationCallback, + final Consumer cacheEntrySize, + final long batchTimestamp ) { this.cache = cache; this.objectKey = objectKey; @@ -59,6 +80,7 @@ public CacheFetchJob( this.fileFetchDurationCallback = fileFetchDurationCallback; this.byteRange = byteRange; this.key = createCacheKey(objectKey, byteRange); + this.batchTimestamp = batchTimestamp; this.fallback = new FileFetchJob(time, objectFetcher, objectKey, byteRange, fileFetchDurationCallback); } @@ -74,24 +96,26 @@ static CacheKey createCacheKey(ObjectKey object, ByteRange byteRange) { @Override public FileExtent call() { - return cache.computeIfAbsent(key, cacheKey -> { + final Function mappingFunction = cacheKey -> { // Let remote storage exceptions bubble up, do not catch the exceptions. final FileExtent freshFile = loadFileExtent(objectKey, byteRange); // TODO: add cache entry size also to produce/file commit cacheEntrySize.accept(freshFile.data().length); return freshFile; - }); + }; + + // Use timestamp-aware method - default implementation ignores timestamp, + // TieredObjectCache uses it for routing to hot/cold cache + return cache.computeIfAbsent(key, mappingFunction, batchTimestamp); } private FileExtent loadFileExtent(final ObjectKey key, final ByteRange batchRange) { - final FileExtent freshFile; - final FileFetchJob fallback = new FileFetchJob(time, objectFetcher, key, batchRange, fileFetchDurationCallback); + final FileFetchJob fetchJob = new FileFetchJob(time, objectFetcher, key, batchRange, fileFetchDurationCallback); try { - freshFile = fallback.call(); + return fetchJob.call(); } catch (Exception e) { throw new FetchException(e); } - return freshFile; } @Override @@ -99,13 +123,14 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CacheFetchJob that = (CacheFetchJob) o; - return Objects.equals(cache, that.cache) + return batchTimestamp == that.batchTimestamp + && Objects.equals(cache, that.cache) && Objects.equals(key, that.key) && Objects.equals(fallback, that.fallback); } @Override public int hashCode() { - return Objects.hash(cache, key, fallback); + return Objects.hash(cache, key, fallback, batchTimestamp); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java index 5eb88ca200..ebd52d1c93 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java @@ -35,8 +35,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -54,14 +52,14 @@ public class FetchCompleter implements Supplier fetchInfos; private final Map coordinates; - private final List> backingData; + private final List backingData; private final Consumer durationCallback; public FetchCompleter(Time time, ObjectKeyCreator objectKeyCreator, Map fetchInfos, Map coordinates, - List> backingData, + List backingData, Consumer durationCallback) { this.time = time; this.objectKeyCreator = objectKeyCreator; @@ -74,21 +72,16 @@ public FetchCompleter(Time time, @Override public Map get() { try { - final Map> files = waitForFileData(); + final Map> files = groupFileData(); return TimeUtils.measureDurationMs(time, () -> serveFetch(coordinates, files), durationCallback); } catch (Exception e) { - // unwrap ExecutionException if the errors comes from dependent futures - if (e instanceof ExecutionException) { - throw new FetchException(e.getCause()); - } throw new FetchException(e); } } - private Map> waitForFileData() throws InterruptedException, ExecutionException { + private Map> groupFileData() { Map> files = new HashMap<>(); - for (Future fileFuture : backingData) { - FileExtent fileExtent = fileFuture.get(); + for (FileExtent fileExtent : backingData) { files.compute(fileExtent.object(), (k, v) -> { if (v == null) { List out = new ArrayList<>(1); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java index 6a36c2b154..da62b47e15 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java @@ -21,12 +21,12 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Time; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -40,7 +40,7 @@ import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; -public class FetchPlanner implements Supplier>> { +public class FetchPlanner implements Supplier>> { private final Time time; private final ObjectKeyCreator objectKeyCreator; @@ -71,30 +71,57 @@ public FetchPlanner( this.metrics = metrics; } - private List> doWork(final Map batchCoordinates) { - final List> jobs = planJobs(batchCoordinates); + private List> doWork(final Map batchCoordinates) { + final List jobs = planJobs(batchCoordinates); return submitAll(jobs); } - private List> planJobs(final Map batchCoordinates) { - final Set>> objectKeysToRanges = batchCoordinates.values().stream() + /** + * Helper class to track byte ranges and the oldest batch timestamp for an object key. + */ + private static class ObjectFetchInfo { + final List ranges = new ArrayList<>(); + long oldestTimestamp = Long.MAX_VALUE; + + void addBatch(BatchInfo batch) { + ranges.add(batch.metadata().range()); + oldestTimestamp = Math.min(oldestTimestamp, batch.metadata().timestamp()); + } + } + + private List planJobs(final Map batchCoordinates) { + // Group batches by object key, tracking ranges and oldest timestamp + final Map objectKeysToFetchInfo = new HashMap<>(); + + batchCoordinates.values().stream() .filter(findBatch -> findBatch.errors() == Errors.NONE) .map(FindBatchResponse::batches) .peek(batches -> metrics.recordFetchBatchSize(batches.size())) .flatMap(List::stream) - // Merge batch requests - .collect(Collectors.groupingBy(BatchInfo::objectKey, Collectors.mapping(b -> b.metadata().range(), Collectors.toList()))) - .entrySet(); - metrics.recordFetchObjectsSize(objectKeysToRanges.size()); - return objectKeysToRanges.stream() - .flatMap(e -> - keyAlignment.align(e.getValue()) + .forEach(batch -> + objectKeysToFetchInfo + .computeIfAbsent(batch.objectKey(), k -> new ObjectFetchInfo()) + .addBatch(batch) + ); + + metrics.recordFetchObjectsSize(objectKeysToFetchInfo.size()); + + return objectKeysToFetchInfo.entrySet().stream() + .flatMap(e -> { + final String objectKey = e.getKey(); + final ObjectFetchInfo fetchInfo = e.getValue(); + // Note: keyAlignment.align() returns fixed-size aligned blocks (e.g., 16 MiB). + // This aligned byteRange is used for both caching and rate limiting (if enabled). + // Rate limiting uses the aligned block size (not actual batch size) as a conservative + // estimate, since the actual fetch size is only known after the fetch completes. + return keyAlignment.align(fetchInfo.ranges) .stream() - .map(byteRange -> getCacheFetchJob(e.getKey(), byteRange))) + .map(byteRange -> getCacheFetchJob(objectKey, byteRange, fetchInfo.oldestTimestamp)); + }) .collect(Collectors.toList()); } - private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange byteRange) { + private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange byteRange, final long batchTimestamp) { return new CacheFetchJob( cache, objectFetcher, @@ -102,19 +129,20 @@ private CacheFetchJob getCacheFetchJob(final String objectKey, final ByteRange b byteRange, time, metrics::fetchFileFinished, - metrics::cacheEntrySize + metrics::cacheEntrySize, + batchTimestamp ); } - private List> submitAll(List> jobs) { + private List> submitAll(List jobs) { return jobs.stream() - .map(dataExecutor::submit) + .map(job -> CompletableFuture.supplyAsync(job::call, dataExecutor)) .collect(Collectors.toList()); } @Override - public List> get() { + public List> get() { return TimeUtils.measureDurationMsSupplier(time, () -> doWork(batchCoordinates), metrics::fetchPlanFinished); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 1b04618951..1714190573 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -45,6 +46,7 @@ import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.metrics.ThreadPoolMonitor; import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; public class Reader implements AutoCloseable { @@ -152,7 +154,10 @@ public CompletableFuture> fetch( fetchMetrics ).get() ) - .thenCombineAsync(batchCoordinates, (fileExtents, coordinates) -> + // flatten the list of file extent futures into a single future with list of file extents + .thenCompose(Reader::allOfFileExtents) + // combine file extents and coordinates to complete the fetch + .thenCombine(batchCoordinates, (fileExtents, coordinates) -> new FetchCompleter( time, objectKeyCreator, @@ -203,6 +208,19 @@ public CompletableFuture> fetch( }); } + // wait for all file extent futures to complete and collect results without blocking threads + @SuppressWarnings("rawtypes") + static CompletableFuture> allOfFileExtents( + List> fileExtentFutures + ) { + final CompletableFuture[] futuresArray = fileExtentFutures.toArray(new CompletableFuture[0]); + return CompletableFuture.allOf(futuresArray) + .thenApply(v -> + fileExtentFutures.stream() + .map(CompletableFuture::join) + .toList()); + } + @Override public void close() throws IOException { ThreadUtils.shutdownExecutorServiceQuietly(metadataExecutor, EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java b/storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java new file mode 100644 index 0000000000..248eb02fd6 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java @@ -0,0 +1,288 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.cache; + +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.aiven.inkless.generated.CacheKey; +import io.aiven.inkless.generated.FileExtent; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class TieredObjectCacheTest { + + private static final long HOT_CACHE_TTL_MS = 60_000; // 60 seconds + private static final long NO_RATE_LIMIT = -1; + + private ObjectCache hotCache; + private ObjectCache laggingCache; + private MockTime time; + private TieredObjectCache.TieredCacheMetrics metrics; + private TieredObjectCache tieredCache; + + @BeforeEach + void setUp() { + hotCache = mock(ObjectCache.class); + laggingCache = mock(ObjectCache.class); + time = new MockTime(); + metrics = mock(TieredObjectCache.TieredCacheMetrics.class); + tieredCache = new TieredObjectCache(hotCache, laggingCache, HOT_CACHE_TTL_MS, time, NO_RATE_LIMIT, metrics); + } + + @Test + void shouldUseLaggingCacheWhenBatchTimestampIsOlderThanTTL() { + // Given: A batch timestamp older than hot cache TTL + long currentTime = time.milliseconds(); + long oldTimestamp = currentTime - HOT_CACHE_TTL_MS - 1000; // 1 second older than TTL + + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + when(laggingCache.computeIfAbsent(eq(key), any())).thenReturn(expectedExtent); + + // When + FileExtent result = tieredCache.computeIfAbsent(key, k -> expectedExtent, oldTimestamp); + + // Then + assertThat(result).isEqualTo(expectedExtent); + verify(laggingCache, times(1)).computeIfAbsent(eq(key), any()); + verify(hotCache, never()).computeIfAbsent(any(), any()); + verify(metrics).recordLaggingCacheRouting(); + } + + @Test + void shouldUseHotCacheWhenBatchTimestampIsWithinTTL() { + // Given: A batch timestamp within hot cache TTL + long currentTime = time.milliseconds(); + long recentTimestamp = currentTime - HOT_CACHE_TTL_MS + 1000; // 1 second younger than TTL + + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + when(hotCache.computeIfAbsent(eq(key), any())).thenReturn(expectedExtent); + + // When + FileExtent result = tieredCache.computeIfAbsent(key, k -> expectedExtent, recentTimestamp); + + // Then + assertThat(result).isEqualTo(expectedExtent); + verify(hotCache, times(1)).computeIfAbsent(eq(key), any()); + verify(laggingCache, never()).computeIfAbsent(any(), any()); + verify(metrics).recordHotCacheRouting(); + } + + @Test + void shouldUseHotCacheForDefaultComputeIfAbsent() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + when(hotCache.computeIfAbsent(eq(key), any())).thenReturn(expectedExtent); + + // When: Using the method without batch timestamp (write path) + FileExtent result = tieredCache.computeIfAbsent(key, k -> expectedExtent); + + // Then + assertThat(result).isEqualTo(expectedExtent); + verify(hotCache, times(1)).computeIfAbsent(eq(key), any()); + verify(laggingCache, never()).computeIfAbsent(any(), any()); + verify(metrics).recordHotCacheRouting(); + } + + @Test + void shouldAlwaysUseHotCacheForPut() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent extent = createFileExtent("test-object"); + + // When + tieredCache.put(key, extent); + + // Then + verify(hotCache).put(key, extent); + verify(laggingCache, never()).put(key, extent); + } + + @Test + void shouldSearchBothCachesForGet() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + // Hot cache miss, lagging cache hit + when(hotCache.get(key)).thenReturn(null); + when(laggingCache.get(key)).thenReturn(expectedExtent); + + // When + FileExtent result = tieredCache.get(key); + + // Then + assertThat(result).isEqualTo(expectedExtent); + } + + @Test + void shouldReturnFromHotCacheIfFoundFirst() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + FileExtent expectedExtent = createFileExtent("test-object"); + + when(hotCache.get(key)).thenReturn(expectedExtent); + + // When + FileExtent result = tieredCache.get(key); + + // Then + assertThat(result).isEqualTo(expectedExtent); + verify(laggingCache, never()).get(key); + } + + @Test + void shouldRemoveFromBothCaches() { + // Given + CacheKey key = createCacheKey("test-object", 0, 1000); + + when(hotCache.remove(key)).thenReturn(true); + when(laggingCache.remove(key)).thenReturn(false); + + // When + boolean result = tieredCache.remove(key); + + // Then + assertThat(result).isTrue(); + verify(hotCache).remove(key); + verify(laggingCache).remove(key); + } + + @Test + void shouldCombineSizesFromBothCaches() { + // Given + when(hotCache.size()).thenReturn(100L); + when(laggingCache.size()).thenReturn(50L); + + // When/Then + assertThat(tieredCache.size()).isEqualTo(150L); + assertThat(tieredCache.hotCacheSize()).isEqualTo(100L); + assertThat(tieredCache.laggingCacheSize()).isEqualTo(50L); + } + + @Test + void shouldUseLaggingCacheExactlyAtTTLBoundary() { + // Given: A batch timestamp exactly at the TTL boundary + long currentTime = time.milliseconds(); + long boundaryTimestamp = currentTime - HOT_CACHE_TTL_MS; // Exactly at TTL + + // When + boolean shouldUseLagging = tieredCache.shouldUseLaggingCache(boundaryTimestamp); + + // Then: At exactly TTL, it should NOT use lagging cache (age == TTL, not > TTL) + assertThat(shouldUseLagging).isFalse(); + } + + @Test + void shouldUseLaggingCacheJustPastTTLBoundary() { + // Given: A batch timestamp just past the TTL boundary + long currentTime = time.milliseconds(); + long pastBoundaryTimestamp = currentTime - HOT_CACHE_TTL_MS - 1; // 1ms past TTL + + // When + boolean shouldUseLagging = tieredCache.shouldUseLaggingCache(pastBoundaryTimestamp); + + // Then + assertThat(shouldUseLagging).isTrue(); + } + + @Test + void shouldApplyRateLimitingForLaggingCacheFetches() { + // Given: A tiered cache with rate limiting enabled + long rateLimitBytesPerSec = 1000; // 1000 bytes/sec, low for testing + ObjectCache realLaggingCache = new CaffeineCache(100, 60, -1); + TieredObjectCache rateLimitedCache = new TieredObjectCache( + hotCache, + realLaggingCache, + HOT_CACHE_TTL_MS, + time, + rateLimitBytesPerSec, + TieredObjectCache.TieredCacheMetrics.NOOP + ); + + long currentTime = time.milliseconds(); + long oldTimestamp = currentTime - HOT_CACHE_TTL_MS - 1000; + + // When: Multiple fetches that should be rate limited + CacheKey key1 = createCacheKey("test-object-1", 0, 500); + CacheKey key2 = createCacheKey("test-object-2", 0, 500); + + // First fetch should succeed immediately (within burst capacity) + FileExtent result1 = rateLimitedCache.computeIfAbsent(key1, k -> createFileExtent("test-object-1"), oldTimestamp); + assertThat(result1).isNotNull(); + + // Second fetch should also succeed (rate limiter has 2x burst capacity) + FileExtent result2 = rateLimitedCache.computeIfAbsent(key2, k -> createFileExtent("test-object-2"), oldTimestamp); + assertThat(result2).isNotNull(); + } + + @Test + void shouldNotRateLimitHotCacheFetches() { + // Given: A tiered cache with rate limiting enabled + long rateLimitBytesPerSec = 100; // Very low rate limit + ObjectCache realHotCache = new CaffeineCache(100, 60, -1); + TieredObjectCache rateLimitedCache = new TieredObjectCache( + realHotCache, + laggingCache, + HOT_CACHE_TTL_MS, + time, + rateLimitBytesPerSec, + TieredObjectCache.TieredCacheMetrics.NOOP + ); + + long currentTime = time.milliseconds(); + long recentTimestamp = currentTime - 1000; // Recent data, should use hot cache + + // When: Multiple fetches to hot cache (should not be rate limited) + for (int i = 0; i < 10; i++) { + CacheKey key = createCacheKey("test-object-" + i, 0, 1000); + FileExtent result = rateLimitedCache.computeIfAbsent(key, k -> createFileExtent("test-object"), recentTimestamp); + assertThat(result).isNotNull(); + } + // If rate limiting was applied, this would have taken > 10 seconds + // Since it completes quickly, rate limiting is not applied to hot cache + } + + private CacheKey createCacheKey(String object, long offset, long length) { + return new CacheKey() + .setObject(object) + .setRange(new CacheKey.ByteRange().setOffset(offset).setLength(length)); + } + + private FileExtent createFileExtent(String object) { + return new FileExtent() + .setObject(object) + .setRange(new FileExtent.ByteRange().setOffset(0).setLength(100)) + .setData(new byte[100]); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java new file mode 100644 index 0000000000..f3ce9495aa --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java @@ -0,0 +1,126 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.common; + +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import io.aiven.inkless.cache.CaffeineCache; +import io.aiven.inkless.cache.TieredObjectCache; +import io.aiven.inkless.config.InklessConfig; +import io.aiven.inkless.control_plane.InMemoryControlPlane; +import io.aiven.inkless.control_plane.MetadataView; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +class SharedStateTest { + + private final MockTime time = new MockTime(); + + @Test + void initializeWithLaggingCacheDisabled() throws IOException { + // Given: Config with lagging cache disabled (default) + Map configs = minimalConfig(); + configs.put("consume.lagging.cache.enabled", "false"); + InklessConfig config = new InklessConfig(configs); + + // When + SharedState state = SharedState.initialize( + time, + 1, + config, + mock(MetadataView.class), + new InMemoryControlPlane(time), + new BrokerTopicStats(), + () -> new LogConfig(new Properties()) + ); + + // Then: Cache should be a regular CaffeineCache, not TieredObjectCache + assertThat(state.cache()).isInstanceOf(CaffeineCache.class); + + state.close(); + } + + @Test + void initializeWithLaggingCacheEnabled() throws IOException { + // Given: Config with lagging cache enabled + Map configs = minimalConfig(); + configs.put("consume.lagging.cache.enabled", "true"); + configs.put("consume.lagging.cache.max.count", "100"); + configs.put("consume.lagging.cache.ttl.sec", "10"); + configs.put("consume.lagging.cache.rate.limit.bytes.per.sec", "52428800"); + InklessConfig config = new InklessConfig(configs); + + // When + SharedState state = SharedState.initialize( + time, + 1, + config, + mock(MetadataView.class), + new InMemoryControlPlane(time), + new BrokerTopicStats(), + () -> new LogConfig(new Properties()) + ); + + // Then: Cache should be a TieredObjectCache + assertThat(state.cache()).isInstanceOf(TieredObjectCache.class); + + state.close(); + } + + @Test + void initializeWithLaggingCacheRateLimitDisabled() throws IOException { + // Given: Config with lagging cache enabled but rate limiting disabled + Map configs = minimalConfig(); + configs.put("consume.lagging.cache.enabled", "true"); + configs.put("consume.lagging.cache.rate.limit.bytes.per.sec", "-1"); + InklessConfig config = new InklessConfig(configs); + + // When + SharedState state = SharedState.initialize( + time, + 1, + config, + mock(MetadataView.class), + new InMemoryControlPlane(time), + new BrokerTopicStats(), + () -> new LogConfig(new Properties()) + ); + + // Then: Cache should still be a TieredObjectCache (rate limiting is optional) + assertThat(state.cache()).isInstanceOf(TieredObjectCache.class); + + state.close(); + } + + private Map minimalConfig() { + Map configs = new HashMap<>(); + configs.put("control.plane.class", InMemoryControlPlane.class.getCanonicalName()); + configs.put("storage.backend.class", "io.aiven.inkless.config.ConfigTestStorageBackend"); + return configs; + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java index 6c3b22f12f..1eb91407af 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java @@ -99,6 +99,11 @@ void minimalConfig() { assertThat(config.produceUploadBackoff()).isEqualTo(Duration.ofMillis(10)); assertThat(config.storage(storageMetrics)).isInstanceOf(ConfigTestStorageBackend.class); assertThat(config.fileCleanerInterval()).isEqualTo(Duration.ofMinutes(5)); + // Lagging cache defaults + assertThat(config.isLaggingCacheEnabled()).isFalse(); + assertThat(config.laggingCacheMaxCount()).isEqualTo(150); + assertThat(config.laggingCacheTtlSec()).isEqualTo(5); + assertThat(config.laggingCacheRateLimitBytesPerSec()).isEqualTo(50 * 1024 * 1024); assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMinutes(1)); assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMinutes(1)); assertThat(config.cacheMaxCount()).isEqualTo(1000); @@ -131,6 +136,10 @@ void fullConfig() { configs.put("fetch.data.thread.pool.size", "12"); configs.put("fetch.metadata.thread.pool.size", "14"); configs.put("retention.enforcement.max.batches.per.request", "10"); + configs.put("consume.lagging.cache.enabled", "true"); + configs.put("consume.lagging.cache.max.count", "200"); + configs.put("consume.lagging.cache.ttl.sec", "10"); + configs.put("consume.lagging.cache.rate.limit.bytes.per.sec", "104857600"); final var config = new InklessConfig( configs ); @@ -152,6 +161,10 @@ void fullConfig() { assertThat(config.fetchDataThreadPoolSize()).isEqualTo(12); assertThat(config.fetchMetadataThreadPoolSize()).isEqualTo(14); assertThat(config.maxBatchesPerEnforcementRequest()).isEqualTo(10); + assertThat(config.isLaggingCacheEnabled()).isTrue(); + assertThat(config.laggingCacheMaxCount()).isEqualTo(200); + assertThat(config.laggingCacheTtlSec()).isEqualTo(10); + assertThat(config.laggingCacheRateLimitBytesPerSec()).isEqualTo(104857600); } @Test diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java index f1d92a9a44..c76839e7e6 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java @@ -40,7 +40,10 @@ import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -54,6 +57,15 @@ public class CacheFetchJobTest { final Time time = new MockTime(); final static ObjectKey objectA = PlainObjectKey.create("a", "a"); + private CacheFetchJob cacheFetchJob(ObjectCache cache, ByteRange byteRange) { + return cacheFetchJob(cache, byteRange, CacheFetchJob.NO_BATCH_TIMESTAMP); + } + + private CacheFetchJob cacheFetchJob(ObjectCache cache, ByteRange byteRange, long batchTimestamp) { + return new CacheFetchJob(cache, fetcher, CacheFetchJobTest.objectA, byteRange, time, + durationMs -> {}, cacheEntrySize -> {}, batchTimestamp); + } + @Test public void testCacheMiss() throws Exception { int size = 10; @@ -69,14 +81,14 @@ public void testCacheMiss() throws Exception { when(fetcher.readToByteBuffer(channel)).thenReturn(ByteBuffer.wrap(array)); ObjectCache cache = new NullCache(); - CacheFetchJob cacheFetchJob = cacheFetchJob(cache, objectA, range); + CacheFetchJob cacheFetchJob = cacheFetchJob(cache, range); FileExtent actualFile = cacheFetchJob.call(); assertThat(actualFile).isEqualTo(expectedFile); } @Test - public void testCacheHit() throws Exception { + public void testCacheHit() { int size = 10; byte[] array = new byte[10]; for (int i = 0; i < size; i++) { @@ -87,19 +99,56 @@ public void testCacheHit() throws Exception { ObjectCache cache = new MemoryCache(); cache.put(CacheFetchJob.createCacheKey(objectA, range), expectedFile); - CacheFetchJob cacheFetchJob = cacheFetchJob(cache, objectA, range); + CacheFetchJob cacheFetchJob = cacheFetchJob(cache, range); FileExtent actualFile = cacheFetchJob.call(); assertThat(actualFile).isEqualTo(expectedFile); verifyNoInteractions(fetcher); } - private CacheFetchJob cacheFetchJob( - ObjectCache cache, - ObjectKey objectKey, - ByteRange byteRange - ) { - return new CacheFetchJob(cache, fetcher, objectKey, byteRange, time, - durationMs -> {}, cacheEntrySize -> {}); + @Test + public void testCacheFetchJobWithTimestampUsesTimestampAwareMethod() { + // Given: A mock cache that tracks method calls + ObjectCache mockCache = mock(ObjectCache.class); + + int size = 10; + byte[] array = new byte[size]; + ByteRange range = new ByteRange(0, size); + FileExtent expectedFile = FileFetchJob.createFileExtent(objectA, range, ByteBuffer.wrap(array)); + + long batchTimestamp = 12345L; + + // Mock the timestamp-aware computeIfAbsent to return expected file + when(mockCache.computeIfAbsent(any(), any(), eq(batchTimestamp))).thenReturn(expectedFile); + + // When + CacheFetchJob job = cacheFetchJob(mockCache, range, batchTimestamp); + FileExtent result = job.call(); + + // Then: Should use the timestamp-aware method + assertThat(result).isEqualTo(expectedFile); + verify(mockCache).computeIfAbsent(any(), any(), eq(batchTimestamp)); + } + + @Test + public void testCacheFetchJobWithoutTimestampUsesDefaultTimestamp() { + // Given: A mock cache + ObjectCache mockCache = mock(ObjectCache.class); + + int size = 10; + byte[] array = new byte[size]; + ByteRange range = new ByteRange(0, size); + FileExtent expectedFile = FileFetchJob.createFileExtent(objectA, range, ByteBuffer.wrap(array)); + + // Mock the timestamp-aware computeIfAbsent with NO_BATCH_TIMESTAMP + when(mockCache.computeIfAbsent(any(), any(), eq(CacheFetchJob.NO_BATCH_TIMESTAMP))).thenReturn(expectedFile); + + // When: Using constructor without timestamp + CacheFetchJob job = cacheFetchJob(mockCache, range); + FileExtent result = job.call(); + + // Then: Should use NO_BATCH_TIMESTAMP + assertThat(result).isEqualTo(expectedFile); + verify(mockCache).computeIfAbsent(any(), any(), eq(CacheFetchJob.NO_BATCH_TIMESTAMP)); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java index 3306ccc700..5ba9ed1b1f 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java @@ -42,10 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.common.ByteRange; @@ -175,9 +171,9 @@ public void testFetchSingleFile() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -211,10 +207,10 @@ public void testFetchMultipleFiles() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, records.sizeInBytes()), records.buffer()), FileFetchJob.createFileExtent(OBJECT_KEY_B, new ByteRange(0, records.sizeInBytes()), records.buffer()) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -262,14 +258,13 @@ public void testFetchMultipleFilesForSameBatch() { copy.put(records.buffer().duplicate().position(startOffset).limit(endOffset).slice()); fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy)); } - List> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList()); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, fetchInfos, coordinates, - files, + fileExtents, durationMs -> {} ); Map result = job.get(); @@ -310,9 +305,9 @@ public void testFetchMultipleBatches() { ), logStartOffset, highWatermark) ); - List> files = Stream.of( + List files = List.of( FileFetchJob.createFileExtent(OBJECT_KEY_A, new ByteRange(0, totalSize), concatenatedBuffer) - ).map(CompletableFuture::completedFuture).collect(Collectors.toList()); + ); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, @@ -373,14 +368,13 @@ public void testFetchMultipleFilesForMultipleBatches() { copy.put(concatenatedBuffer.duplicate().position(startOffset).limit(endOffset).slice()); fileExtents.add(FileFetchJob.createFileExtent(OBJECT_KEY_A, range, copy)); } - List> files = fileExtents.stream().map(CompletableFuture::completedFuture).collect(Collectors.toList()); FetchCompleter job = new FetchCompleter( new MockTime(), OBJECT_KEY_CREATOR, fetchInfos, coordinates, - files, + fileExtents, durationMs -> {} ); Map result = job.get(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java index d834c9036f..cb8cc2dc61 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java @@ -25,18 +25,17 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; @@ -49,11 +48,10 @@ import io.aiven.inkless.control_plane.BatchInfo; import io.aiven.inkless.control_plane.BatchMetadata; import io.aiven.inkless.control_plane.FindBatchResponse; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -68,10 +66,11 @@ public class FetchPlannerTest { @Mock ObjectFetcher fetcher; @Mock - ExecutorService dataExecutor; - @Mock InklessFetchMetrics metrics; + // Direct executor that runs tasks immediately in the same thread + ExecutorService dataExecutor = Executors.newSingleThreadExecutor(); + ObjectCache cache = new NullCache(); KeyAlignmentStrategy keyAlignmentStrategy = new FixedBlockAlignment(Integer.MAX_VALUE); ByteRange requestRange = new ByteRange(0, Integer.MAX_VALUE); @@ -82,12 +81,12 @@ public class FetchPlannerTest { @Test public void planEmptyRequest() { - Map coordinates = new HashMap<>(); + Map coordinates = Map.of(); FetchPlanner job = fetchPlannerJob(coordinates); - job.get(); + List> result = job.get(); - verifyNoInteractions(dataExecutor); + assertEquals(0, result.size()); } @Test @@ -214,14 +213,56 @@ private CacheFetchJob cacheFetchJob( ObjectKey objectKey, ByteRange byte ); } - private void assertBatchPlan(Map coordinates, Set jobs) { - ArgumentCaptor submittedCallables = ArgumentCaptor.captor(); - when(dataExecutor.submit(submittedCallables.capture())).thenReturn(null); + private void assertBatchPlan(Map coordinates, Set expectedJobs) { + FetchPlanner job = fetchPlannerJob(coordinates); + + List> result = job.get(); + + // Verify the number of planned fetch jobs matches expected + assertEquals(expectedJobs.size(), result.size()); + } + + @Test + public void planShouldUseOldestTimestampForSameObject() { + // Given: Two batches for the same object with different timestamps + long olderTimestamp = 1000L; + long newerTimestamp = 2000L; + + Map coordinates = Map.of( + partition0, FindBatchResponse.success(List.of( + new BatchInfo(1L, OBJECT_KEY_A.value(), BatchMetadata.of(partition0, 0, 10, 0, 0, 10, newerTimestamp, TimestampType.CREATE_TIME)) + ), 0, 1), + partition1, FindBatchResponse.success(List.of( + new BatchInfo(2L, OBJECT_KEY_A.value(), BatchMetadata.of(partition1, 30, 10, 0, 0, 11, olderTimestamp, TimestampType.CREATE_TIME)) + ), 0, 1) + ); FetchPlanner job = fetchPlannerJob(coordinates); + List> result = job.get(); + + // Should only have one job for the merged object key + assertEquals(1, result.size()); + // The job should use the older timestamp (1000L) for cache tiering decisions + // This is verified implicitly - FetchPlanner.ObjectFetchInfo tracks min timestamp + } + + @Test + public void planShouldPreserveSeparateTimestampsForDifferentObjects() { + // Given: Two different objects with different timestamps + long timestampA = 1000L; + long timestampB = 5000L; + + Map coordinates = Map.of( + partition0, FindBatchResponse.success(List.of( + new BatchInfo(1L, OBJECT_KEY_A.value(), BatchMetadata.of(partition0, 0, 10, 0, 0, 10, timestampA, TimestampType.CREATE_TIME)), + new BatchInfo(2L, OBJECT_KEY_B.value(), BatchMetadata.of(partition0, 0, 10, 1, 1, 11, timestampB, TimestampType.CREATE_TIME)) + ), 0, 2) + ); - job.get(); + FetchPlanner job = fetchPlannerJob(coordinates); + List> result = job.get(); - assertEquals(jobs, new HashSet<>(submittedCallables.getAllValues())); + // Should have two separate jobs, each with its own timestamp + assertEquals(2, result.size()); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index 3f6345ea3c..6a3ef552eb 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -33,19 +33,26 @@ import org.mockito.quality.Strictness; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import io.aiven.inkless.cache.FixedBlockAlignment; import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.NullCache; import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.common.ByteRange; import io.aiven.inkless.common.ObjectKey; import io.aiven.inkless.common.ObjectKeyCreator; +import io.aiven.inkless.common.PlainObjectKey; import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.generated.FileExtent; import io.aiven.inkless.storage_backend.common.ObjectFetcher; import static org.assertj.core.api.Assertions.assertThat; @@ -92,4 +99,79 @@ public void testClose() throws Exception { verify(metadataExecutor, atLeastOnce()).shutdown(); verify(dataExecutor, atLeastOnce()).shutdown(); } + + /** + * This is to ensure fetch results preserve the request oreder are matched correctly with their requests. + */ + @Test + public void testAllOfFileExtentsPreservesOrderWhenFuturesCompleteOutOfOrder() throws Exception { + // Create file extents with distinct identifiers + final ObjectKey objectKeyA = PlainObjectKey.create("prefix", "object-a"); + final ObjectKey objectKeyB = PlainObjectKey.create("prefix", "object-b"); + final ObjectKey objectKeyC = PlainObjectKey.create("prefix", "object-c"); + + final FileExtent extentA = FileFetchJob.createFileExtent(objectKeyA, new ByteRange(0, 10), ByteBuffer.allocate(10)); + final FileExtent extentB = FileFetchJob.createFileExtent(objectKeyB, new ByteRange(0, 10), ByteBuffer.allocate(10)); + final FileExtent extentC = FileFetchJob.createFileExtent(objectKeyC, new ByteRange(0, 10), ByteBuffer.allocate(10)); + + // Create latches to control completion order + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch futureACanComplete = new CountDownLatch(1); + final CountDownLatch futureBCanComplete = new CountDownLatch(1); + + // Create futures that will complete in reverse order (C, B, A) + final CompletableFuture futureA = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(); + futureACanComplete.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentA; + }); + + final CompletableFuture futureB = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(); + futureBCanComplete.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentB; + }); + + final CompletableFuture futureC = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(); + // C completes immediately after start + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return extentC; + }); + + // Create the ordered list: A, B, C + final List> orderedFutures = List.of(futureA, futureB, futureC); + + // Call the package-private method directly + final CompletableFuture> resultFuture = Reader.allOfFileExtents(orderedFutures); + + // Start all futures + startLatch.countDown(); + + // Complete in reverse order: C is already completing, then B, then A + Thread.sleep(50); // Give C time to complete + futureBCanComplete.countDown(); + Thread.sleep(50); // Give B time to complete + futureACanComplete.countDown(); + + // Get the result - should maintain original order despite completion order + final List result = resultFuture.get(5, TimeUnit.SECONDS); + + // Verify order is preserved: A, B, C (not C, B, A which was completion order) + assertThat(result).hasSize(3); + assertThat(result.get(0).object()).isEqualTo(objectKeyA.value()); + assertThat(result.get(1).object()).isEqualTo(objectKeyB.value()); + assertThat(result.get(2).object()).isEqualTo(objectKeyC.value()); + } }