Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,7 @@ project(':storage:inkless') {
}
implementation libs.metrics
implementation libs.caffeine
implementation libs.bucket4jCore

testImplementation project(':clients').sourceSets.test.output.classesDirs
testImplementation project(':test-common')
Expand Down
31 changes: 31 additions & 0 deletions docs/inkless/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@ Under ``inkless.``
* Valid Values: [1,...]
* Importance: low

``consume.lagging.cache.enabled``
If true, a secondary cache is enabled for lagging consumers. This prevents lagging consumers from evicting hot data from the primary cache.

* Type: boolean
* Default: false
* Importance: low

``consume.lagging.cache.max.count``
The maximum number of entries in the lagging consumer cache. Should be sized based on the number of concurrent lagging consumers.

* Type: int
* Default: 150
* Valid Values: [1,...]
* Importance: low

``consume.lagging.cache.rate.limit.bytes.per.sec``
Maximum bytes per second to fetch from remote storage for lagging consumer cache misses. Set to -1 to disable rate limiting. This protects remote storage from being overwhelmed by lagging consumers.

* Type: long
* Default: 52428800
* Valid Values: [-1,...]
* Importance: low

``consume.lagging.cache.ttl.sec``
Time to live in seconds for entries in the lagging consumer cache. A short TTL (e.g., 5 seconds) is recommended as cached data is only needed briefly for sequential reads.

* Type: int
* Default: 5
* Valid Values: [1,...]
* Importance: low

``fetch.data.thread.pool.size``
Thread pool size to concurrently fetch data files from remote storage

Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ versions += [
awsSdk: "2.29.6",
azureSdk: "1.2.28",
bcpkix: "1.80",
bucket4j: "8.14.0",
caffeine: "3.2.0",
bndlib: "7.1.0",
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2",
Expand Down Expand Up @@ -164,6 +165,7 @@ libs += [
azureSdkBom: "com.azure:azure-sdk-bom:$versions.azureSdk",
bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix",
bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib",
bucket4jCore: "com.bucket4j:bucket4j_jdk11-core:$versions.bucket4j",
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsBeanutils: "commons-beanutils:commons-beanutils:$versions.commonsBeanutils",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,22 @@

public interface ObjectCache extends Cache<CacheKey, FileExtent>, Closeable {
FileExtent computeIfAbsent(CacheKey key, Function<CacheKey, FileExtent> mappingFunction);

/**
* Computes if absent with batch timestamp hint for cache tiering.
*
* <p>The batch timestamp can be used by tiered cache implementations to route
* requests to the appropriate cache tier (hot vs cold) based on data age.</p>
*
* <p>Default implementation ignores the timestamp and delegates to the regular
* computeIfAbsent method.</p>
*
* @param key the cache key
* @param mappingFunction the function to compute the value if absent
* @param batchTimestamp the timestamp of the batch (from BatchMetadata.timestamp())
* @return the cached or computed file extent
*/
default FileExtent computeIfAbsent(CacheKey key, Function<CacheKey, FileExtent> mappingFunction, long batchTimestamp) {
return computeIfAbsent(key, mappingFunction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.apache.kafka.common.utils.Time;

import java.io.IOException;
import java.time.Duration;
import java.util.function.Function;

import io.aiven.inkless.generated.CacheKey;
import io.aiven.inkless.generated.FileExtent;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.BlockingBucket;
import io.github.bucket4j.Bucket;

/**
* A tiered cache implementation that routes requests to either a hot cache or a lagging cache
* based on the age of the data being requested.
*
* <p>The hot cache is used for recent data (within the hot cache TTL), while the lagging cache
* is used for historical data. This separation prevents lagging consumers from evicting hot data
* that is being actively used by tail consumers and the write path.</p>
*
* <p>The classification is based on the batch timestamp: if the data is older than the
* hot cache TTL, it cannot possibly be in the hot cache, so we route to the lagging cache.</p>
*
* <p>Rate limiting is applied to lagging cache fetches to protect remote storage from
* being overwhelmed by lagging consumers.</p>
*/
public class TieredObjectCache implements ObjectCache {

private final ObjectCache hotCache;
private final ObjectCache laggingCache;
private final long hotCacheTtlMs;
private final Time time;
private final TieredCacheMetrics metrics;
private final BlockingBucket rateLimiter;

/**
* Creates a tiered cache without rate limiting.
*/
public TieredObjectCache(
final ObjectCache hotCache,
final ObjectCache laggingCache,
final long hotCacheTtlMs,
final Time time
) {
this(hotCache, laggingCache, hotCacheTtlMs, time, -1, TieredCacheMetrics.NOOP);
}

/**
* Creates a tiered cache with optional rate limiting for lagging cache fetches.
*
* @param hotCache the cache for recent data
* @param laggingCache the cache for historical data
* @param hotCacheTtlMs the TTL threshold for routing (data older than this goes to lagging cache)
* @param time time source
* @param rateLimitBytesPerSec rate limit in bytes per second for lagging cache fetches, or -1 to disable
* @param metrics metrics callback
*/
public TieredObjectCache(
final ObjectCache hotCache,
final ObjectCache laggingCache,
final long hotCacheTtlMs,
final Time time,
final long rateLimitBytesPerSec,
final TieredCacheMetrics metrics
) {
this.hotCache = hotCache;
this.laggingCache = laggingCache;
this.hotCacheTtlMs = hotCacheTtlMs;
this.time = time;
this.metrics = metrics;
this.rateLimiter = rateLimitBytesPerSec > 0 ? createRateLimiter(rateLimitBytesPerSec) : null;
}

/**
* Creates a rate limiter for lagging cache fetches.
*/
private static BlockingBucket createRateLimiter(final long bytesPerSecond) {
// Uses Bucket4j token bucket algorithm (https://github.com/bucket4j/bucket4j)
//
// Capacity: 2x per-second rate allows short bursts (e.g., 50 MiB/s -> 100 MiB burst)
// Refill: "Greedy" adds tokens continuously for smooth rate limiting
// Blocking: consume() blocks until tokens available, creating backpressure
//
// Example with 50 MiB/s and 16 MiB blocks: ~3 fetches/sec sustained
final Bandwidth bandwidth = Bandwidth.builder()
.capacity(bytesPerSecond * 2)
.refillGreedy(bytesPerSecond, Duration.ofSeconds(1))
.build();

return Bucket.builder()
.addLimit(bandwidth)
.build()
.asBlocking();
}

/**
* Computes if absent from the appropriate cache tier based on batch timestamp.
*
* <p>For lagging cache fetches, rate limiting is applied before invoking the mapping function
* to protect remote storage from being overwhelmed.</p>
Comment on lines +118 to +119
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rate limiting only applies when the mapping function is executed (i.e., on a cache miss). However, the computeIfAbsent operation on the laggingCache will only invoke the mapping function if the key is absent. This means rate limiting is correctly applied only to cache misses, but the javadoc at line 118 says "rate limiting is applied before invoking the mapping function" which could be misinterpreted. When there's a cache hit, the mapping function is never invoked and no rate limiting occurs. Consider clarifying the documentation to explicitly state "rate limiting is applied only on cache miss before fetching from remote storage".

Suggested change
* <p>For lagging cache fetches, rate limiting is applied before invoking the mapping function
* to protect remote storage from being overwhelmed.</p>
* <p>For lagging cache fetches, rate limiting is applied <b>only on cache miss</b>,
* before invoking the mapping function to fetch from remote storage. This protects remote storage from being overwhelmed.</p>

Copilot uses AI. Check for mistakes.
*
* @param key the cache key
* @param mappingFunction the function to compute the value if absent
* @param batchTimestamp the timestamp of the batch (from BatchMetadata.timestamp())
* @return the cached or computed file extent
*/
@Override
public FileExtent computeIfAbsent(
final CacheKey key,
final Function<CacheKey, FileExtent> mappingFunction,
final long batchTimestamp
) {
final boolean useLaggingCache = shouldUseLaggingCache(batchTimestamp);
if (useLaggingCache) {
metrics.recordLaggingCacheRouting();
return laggingCache.computeIfAbsent(key, rateLimitedMappingFunction(mappingFunction, key));
} else {
metrics.recordHotCacheRouting();
return hotCache.computeIfAbsent(key, mappingFunction);
}
}

/**
* Wraps the mapping function with rate limiting for lagging cache fetches.
* Rate limiting is applied based on the expected fetch size (from the cache key's byte range).
*/
private Function<CacheKey, FileExtent> rateLimitedMappingFunction(
final Function<CacheKey, FileExtent> mappingFunction,
final CacheKey key
) {
if (rateLimiter == null) {
return mappingFunction;
}
return cacheKey -> {
// Rate limit based on the aligned block size (from cache key)
// This is a conservative estimate; actual fetch may be smaller
final long bytesToFetch = key.range().length();
if (bytesToFetch > 0) {
try {
rateLimiter.consume(bytesToFetch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Rate limit wait interrupted", e);
Comment on lines +158 to +162
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rate limiter blocks the thread while waiting for tokens (line 159). This could lead to thread pool exhaustion if many lagging consumer requests are rate-limited simultaneously, as they'll hold executor threads while blocked. The dataExecutor thread pool could become saturated with blocked threads, preventing other fetch operations from proceeding. Consider using non-blocking rate limiting (tryConsume with proper backpressure) or documenting this behavior and its implications for thread pool sizing.

Suggested change
try {
rateLimiter.consume(bytesToFetch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Rate limit wait interrupted", e);
if (!rateLimiter.tryConsume(bytesToFetch)) {
throw new RuntimeException("Rate limit exceeded for lagging cache fetch: not enough tokens available");

Copilot uses AI. Check for mistakes.
}
}
return mappingFunction.apply(cacheKey);
};
}

/**
* Default computeIfAbsent routes to hot cache.
* This is used by the write path which always uses the hot cache.
*/
@Override
public FileExtent computeIfAbsent(final CacheKey key, final Function<CacheKey, FileExtent> mappingFunction) {
metrics.recordHotCacheRouting();
return hotCache.computeIfAbsent(key, mappingFunction);
}

/**
* Determines if the lagging cache should be used based on batch age.
* If data is older than hot cache TTL, it cannot be in hot cache.
*/
boolean shouldUseLaggingCache(final long batchTimestamp) {
final long batchAge = time.milliseconds() - batchTimestamp;
return batchAge > hotCacheTtlMs;
}

@Override
public FileExtent get(final CacheKey key) {
// Try hot cache first, then lagging cache
FileExtent result = hotCache.get(key);
if (result == null) {
result = laggingCache.get(key);
}
return result;
}

/**
* Put always goes to hot cache.
* This is used by the write path (CacheStoreJob) to cache produced data.
*/
@Override
public void put(final CacheKey key, final FileExtent value) {
hotCache.put(key, value);
}

@Override
public boolean remove(final CacheKey key) {
boolean removedFromHot = hotCache.remove(key);
boolean removedFromLagging = laggingCache.remove(key);
return removedFromHot || removedFromLagging;
}

@Override
public long size() {
return hotCache.size() + laggingCache.size();
}

public long hotCacheSize() {
return hotCache.size();
}

public long laggingCacheSize() {
return laggingCache.size();
}

@Override
public void close() throws IOException {
try {
hotCache.close();
} finally {
laggingCache.close();
}
}

/**
* Metrics interface for tiered cache operations.
*/
public interface TieredCacheMetrics {
TieredCacheMetrics NOOP = new TieredCacheMetrics() {
@Override
public void recordHotCacheRouting() {}
@Override
public void recordLaggingCacheRouting() {}
};

void recordHotCacheRouting();
void recordLaggingCacheRouting();
}
}
Loading