Skip to content

Conversation

@jeqo
Copy link
Contributor

@jeqo jeqo commented Dec 11, 2025

Implement a secondary cache for lagging consumers with integrated rate limiting.

Problem: Lagging consumers reading historical data cause:

  • Cache eviction of hot data used by tail consumers and write path
  • Potential remote storage overload from burst fetches

Solution:
Tiered cache architecture:

  • Hot cache: Tail consumers and writes (unchanged behavior)
  • Lagging cache: Consumers reading data older than hot cache TTL
    • Smaller size (150 entries), shorter TTL (5s)
    • Rate-limited fetches (50 MiB/s default) using Bucket4j

Key design: Rate limiting integrated into TieredObjectCache, applied
only on cache miss when fetching from remote storage.

Configuration:

Config Default Description
consume.lagging.cache.enabled false Enable feature
consume.lagging.cache.max.count 150 Max entries
consume.lagging.cache.ttl.sec 5 Entry TTL
consume.lagging.cache.rate.limit.bytes.per.sec 52428800 Rate limit (-1 to disable)

Dependencies:

  • com.bucket4j:bucket4j_jdk11-core:8.14.0

jeqo added 4 commits December 11, 2025 18:27
Replace blocking Future.get() calls in FetchCompleter with non-blocking
CompletableFuture composition. This prevents thread exhaustion when file
fetches are slow.

Previously, FetchCompleter.get() was invoked via thenCombineAsync on the
common ForkJoinPool, where it blocked waiting for file data futures.

Now, the completion stage chain uses:
- thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture
  that completes only when ALL file fetches are done
- thenCombine() (not async) to invoke FetchCompleter with already-resolved
  List<FileExtent>

The FetchCompleter task still runs on the ForkJoinPool common pool (via
thenCombine's default executor), but it no longer blocks - it receives
pre-fetched data and only performs in-memory data assembly (grouping file
extents, copying byte ranges, and constructing MemoryRecords).

Changes:
- FetchPlanner returns List<CompletableFuture<FileExtent>> instead of List<Future<FileExtent>>
- FetchCompleter accepts resolved List<FileExtent> instead of futures
- Reader.allOfFileExtents() waits for all futures non-blockingly
- Add test verifying ordering is preserved when futures complete out of order
Adds Bucket4j dependency for rate limiting
Add tiered cache that routes requests based on batch timestamp:
- Recent data (within hot cache TTL) → hot cache
- Old data (beyond hot cache TTL) → lagging cache with rate limiting

Rate limiting uses Bucket4j token bucket to protect remote storage
from being overwhelmed by lagging consumer fetches. Applied only
on lagging cache misses.

Includes ObjectCache interface extension with timestamp-aware
computeIfAbsent method (default ignores timestamp for backwards
compatibility).
Wire TieredObjectCache into fetch path:
- Add configuration options for lagging cache
- Create tiered cache in SharedState when enabled
- FetchPlanner tracks oldest batch timestamp per object
- CacheFetchJob passes timestamp to cache for routing

Configuration:
- consume.lagging.cache.enabled (default: false)
- consume.lagging.cache.max.count (default: 150)
- consume.lagging.cache.ttl.sec (default: 5)
- consume.lagging.cache.rate.limit.bytes.per.sec (default: 50 MiB/s)
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a tiered cache architecture to address performance issues caused by lagging consumers. The solution introduces a secondary "lagging cache" that prevents lagging consumers reading historical data from evicting hot data used by tail consumers and the write path. The implementation includes integrated rate limiting using Bucket4j to protect remote storage from burst fetches.

Key Changes:

  • Added TieredObjectCache that routes requests to hot/lagging caches based on batch timestamp age
  • Integrated rate limiting on lagging cache misses to prevent remote storage overload
  • Extended ObjectCache interface with timestamp-aware computeIfAbsent method for cache tier routing

Reviewed changes

Copilot reviewed 16 out of 17 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java New tiered cache implementation with hot/lagging routing and rate limiting
storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java Added timestamp-aware computeIfAbsent method with default implementation
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java Factory method to create tiered cache when enabled via configuration
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java Added four configuration parameters for lagging cache feature
storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java Extended to accept and propagate batch timestamp for cache routing
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java Tracks oldest timestamp per object and passes it to CacheFetchJob
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java Changed to accept List<FileExtent> instead of List<Future<FileExtent>>
storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java Added allOfFileExtents helper to flatten CompletableFuture list
storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java Comprehensive tests for tiered cache routing and rate limiting behavior
storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java Tests for cache initialization with different lagging cache configurations
storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java Verification of default and custom lagging cache config values
storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java Tests for timestamp propagation in cache fetch operations
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java Tests for timestamp tracking and merging across batches
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java Updated to work with List<FileExtent> instead of futures
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java Test for future completion order preservation
gradle/dependencies.gradle Added Bucket4j dependency version 8.14.0
build.gradle Added bucket4jCore library to inkless project dependencies

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +163 to +166
Thread.sleep(50); // Give C time to complete
futureBCanComplete.countDown();
Thread.sleep(50); // Give B time to complete
futureACanComplete.countDown();
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Thread.sleep() calls in this test make it flaky and slow. Consider using a more deterministic approach with countdown latches or mock time controls instead of relying on arbitrary sleep durations (50ms) that may not be sufficient under load or slow CI environments.

Copilot uses AI. Check for mistakes.
}

/**
* This is to ensure fetch results preserve the request oreder are matched correctly with their requests.
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test comment states "This is to ensure fetch results preserve the request oreder are matched correctly with their requests" but the test only verifies that the order is preserved when futures complete out of order. The test doesn't verify that results are "matched correctly with their requests" beyond ordering. Consider clarifying the comment to specifically state it tests "order preservation despite out-of-order completion".

Suggested change
* This is to ensure fetch results preserve the request oreder are matched correctly with their requests.
* This test verifies that fetch results preserve the request order,
* even when the underlying futures complete out of order.

Copilot uses AI. Check for mistakes.
Comment on lines +118 to +119
* <p>For lagging cache fetches, rate limiting is applied before invoking the mapping function
* to protect remote storage from being overwhelmed.</p>
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rate limiting only applies when the mapping function is executed (i.e., on a cache miss). However, the computeIfAbsent operation on the laggingCache will only invoke the mapping function if the key is absent. This means rate limiting is correctly applied only to cache misses, but the javadoc at line 118 says "rate limiting is applied before invoking the mapping function" which could be misinterpreted. When there's a cache hit, the mapping function is never invoked and no rate limiting occurs. Consider clarifying the documentation to explicitly state "rate limiting is applied only on cache miss before fetching from remote storage".

Suggested change
* <p>For lagging cache fetches, rate limiting is applied before invoking the mapping function
* to protect remote storage from being overwhelmed.</p>
* <p>For lagging cache fetches, rate limiting is applied <b>only on cache miss</b>,
* before invoking the mapping function to fetch from remote storage. This protects remote storage from being overwhelmed.</p>

Copilot uses AI. Check for mistakes.
Comment on lines +219 to +247
@Test
void shouldApplyRateLimitingForLaggingCacheFetches() {
// Given: A tiered cache with rate limiting enabled
long rateLimitBytesPerSec = 1000; // 1000 bytes/sec, low for testing
ObjectCache realLaggingCache = new CaffeineCache(100, 60, -1);
TieredObjectCache rateLimitedCache = new TieredObjectCache(
hotCache,
realLaggingCache,
HOT_CACHE_TTL_MS,
time,
rateLimitBytesPerSec,
TieredObjectCache.TieredCacheMetrics.NOOP
);

long currentTime = time.milliseconds();
long oldTimestamp = currentTime - HOT_CACHE_TTL_MS - 1000;

// When: Multiple fetches that should be rate limited
CacheKey key1 = createCacheKey("test-object-1", 0, 500);
CacheKey key2 = createCacheKey("test-object-2", 0, 500);

// First fetch should succeed immediately (within burst capacity)
FileExtent result1 = rateLimitedCache.computeIfAbsent(key1, k -> createFileExtent("test-object-1"), oldTimestamp);
assertThat(result1).isNotNull();

// Second fetch should also succeed (rate limiter has 2x burst capacity)
FileExtent result2 = rateLimitedCache.computeIfAbsent(key2, k -> createFileExtent("test-object-2"), oldTimestamp);
assertThat(result2).isNotNull();
}
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test for rate limiting (shouldApplyRateLimitingForLaggingCacheFetches) doesn't actually verify that rate limiting is working. It only checks that the fetches succeed, but doesn't verify that they were delayed or that the rate limiter consumed tokens. With a 2x burst capacity (2000 bytes) and two fetches of 500 bytes each (1000 bytes total), both fetches will complete immediately without any rate limiting delay. Consider adding assertions that verify the rate limiter's state (e.g., available tokens) or use timing assertions to verify that rate limiting actually occurs when burst capacity is exceeded.

Copilot uses AI. Check for mistakes.
InklessFetchMetrics metrics;

// Direct executor that runs tasks immediately in the same thread
ExecutorService dataExecutor = Executors.newSingleThreadExecutor();
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExecutorService created on line 72 is never shut down. This will cause test resource leaks and may accumulate threads over the test suite execution. Consider adding an @AfterEach method to properly shut down the executor service, or use a try-with-resources pattern if possible.

Copilot uses AI. Check for mistakes.
Comment on lines +225 to +247
@Test
public void planShouldUseOldestTimestampForSameObject() {
// Given: Two batches for the same object with different timestamps
long olderTimestamp = 1000L;
long newerTimestamp = 2000L;

Map<TopicIdPartition, FindBatchResponse> coordinates = Map.of(
partition0, FindBatchResponse.success(List.of(
new BatchInfo(1L, OBJECT_KEY_A.value(), BatchMetadata.of(partition0, 0, 10, 0, 0, 10, newerTimestamp, TimestampType.CREATE_TIME))
), 0, 1),
partition1, FindBatchResponse.success(List.of(
new BatchInfo(2L, OBJECT_KEY_A.value(), BatchMetadata.of(partition1, 30, 10, 0, 0, 11, olderTimestamp, TimestampType.CREATE_TIME))
), 0, 1)
);

FetchPlanner job = fetchPlannerJob(coordinates);
List<CompletableFuture<FileExtent>> result = job.get();

// Should only have one job for the merged object key
assertEquals(1, result.size());
// The job should use the older timestamp (1000L) for cache tiering decisions
// This is verified implicitly - FetchPlanner.ObjectFetchInfo tracks min timestamp
}
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test "planShouldUseOldestTimestampForSameObject" only verifies that one job is created for the merged object key, but doesn't actually verify that the oldest timestamp (1000L) is used. The comment states "This is verified implicitly" but there's no actual assertion to verify this behavior. Consider extracting the created CacheFetchJob and asserting that its batchTimestamp field equals the older timestamp, or add a more explicit test that validates the timestamp is correctly propagated.

Copilot uses AI. Check for mistakes.
}

/**
* This is to ensure fetch results preserve the request oreder are matched correctly with their requests.
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo in the comment: "oreder" should be "order".

Suggested change
* This is to ensure fetch results preserve the request oreder are matched correctly with their requests.
* This is to ensure fetch results preserve the request order are matched correctly with their requests.

Copilot uses AI. Check for mistakes.
Comment on lines +113 to +117
// Note: keyAlignment.align() returns fixed-size aligned blocks (e.g., 16 MiB).
// This aligned byteRange is used for both caching and rate limiting (if enabled).
// Rate limiting uses the aligned block size (not actual batch size) as a conservative
// estimate, since the actual fetch size is only known after the fetch completes.
return keyAlignment.align(fetchInfo.ranges)
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment mentions that keyAlignment.align() returns "fixed-size aligned blocks (e.g., 16 MiB)" and that this is used as a "conservative estimate" for rate limiting. However, this could be significantly inaccurate if the actual batch range is much smaller than the aligned block size. For example, if a batch is 100 KB but gets aligned to a 16 MiB block, the rate limiter would consume 16 MiB worth of tokens for only 100 KB of actual data transfer. This could make the rate limiting far more restrictive than intended. Consider documenting this limitation more prominently or exploring ways to use actual fetch sizes for rate limiting.

Copilot uses AI. Check for mistakes.
Comment on lines +158 to +162
try {
rateLimiter.consume(bytesToFetch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Rate limit wait interrupted", e);
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rate limiter blocks the thread while waiting for tokens (line 159). This could lead to thread pool exhaustion if many lagging consumer requests are rate-limited simultaneously, as they'll hold executor threads while blocked. The dataExecutor thread pool could become saturated with blocked threads, preventing other fetch operations from proceeding. Consider using non-blocking rate limiting (tryConsume with proper backpressure) or documenting this behavior and its implications for thread pool sizing.

Suggested change
try {
rateLimiter.consume(bytesToFetch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Rate limit wait interrupted", e);
if (!rateLimiter.tryConsume(bytesToFetch)) {
throw new RuntimeException("Rate limit exceeded for lagging cache fetch: not enough tokens available");

Copilot uses AI. Check for mistakes.
Comment on lines +212 to +216
@SuppressWarnings("rawtypes")
static CompletableFuture<java.util.List<FileExtent>> allOfFileExtents(
List<CompletableFuture<FileExtent>> fileExtentFutures
) {
final CompletableFuture[] futuresArray = fileExtentFutures.toArray(new CompletableFuture[0]);
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @SuppressWarnings("rawtypes") annotation is used to suppress warnings about the raw type CompletableFuture[], but this could be avoided by using a properly typed approach. Consider using CompletableFuture.allOf(fileExtentFutures.toArray(CompletableFuture[]::new)) or explicitly typing the array as CompletableFuture<FileExtent>[] to eliminate the need for suppression.

Suggested change
@SuppressWarnings("rawtypes")
static CompletableFuture<java.util.List<FileExtent>> allOfFileExtents(
List<CompletableFuture<FileExtent>> fileExtentFutures
) {
final CompletableFuture[] futuresArray = fileExtentFutures.toArray(new CompletableFuture[0]);
static CompletableFuture<java.util.List<FileExtent>> allOfFileExtents(
List<CompletableFuture<FileExtent>> fileExtentFutures
) {
final CompletableFuture<?>[] futuresArray = fileExtentFutures.toArray(CompletableFuture[]::new);

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants