-
Notifications
You must be signed in to change notification settings - Fork 6
feat(inkless): lagging consumer cache #465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow. Previously, FetchCompleter.get() was invoked via thenCombineAsync on the common ForkJoinPool, where it blocked waiting for file data futures. Now, the completion stage chain uses: - thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done - thenCombine() (not async) to invoke FetchCompleter with already-resolved List<FileExtent> The FetchCompleter task still runs on the ForkJoinPool common pool (via thenCombine's default executor), but it no longer blocks - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords). Changes: - FetchPlanner returns List<CompletableFuture<FileExtent>> instead of List<Future<FileExtent>> - FetchCompleter accepts resolved List<FileExtent> instead of futures - Reader.allOfFileExtents() waits for all futures non-blockingly - Add test verifying ordering is preserved when futures complete out of order
Adds Bucket4j dependency for rate limiting
Add tiered cache that routes requests based on batch timestamp: - Recent data (within hot cache TTL) → hot cache - Old data (beyond hot cache TTL) → lagging cache with rate limiting Rate limiting uses Bucket4j token bucket to protect remote storage from being overwhelmed by lagging consumer fetches. Applied only on lagging cache misses. Includes ObjectCache interface extension with timestamp-aware computeIfAbsent method (default ignores timestamp for backwards compatibility).
Wire TieredObjectCache into fetch path: - Add configuration options for lagging cache - Create tiered cache in SharedState when enabled - FetchPlanner tracks oldest batch timestamp per object - CacheFetchJob passes timestamp to cache for routing Configuration: - consume.lagging.cache.enabled (default: false) - consume.lagging.cache.max.count (default: 150) - consume.lagging.cache.ttl.sec (default: 5) - consume.lagging.cache.rate.limit.bytes.per.sec (default: 50 MiB/s)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a tiered cache architecture to address performance issues caused by lagging consumers. The solution introduces a secondary "lagging cache" that prevents lagging consumers reading historical data from evicting hot data used by tail consumers and the write path. The implementation includes integrated rate limiting using Bucket4j to protect remote storage from burst fetches.
Key Changes:
- Added TieredObjectCache that routes requests to hot/lagging caches based on batch timestamp age
- Integrated rate limiting on lagging cache misses to prevent remote storage overload
- Extended ObjectCache interface with timestamp-aware computeIfAbsent method for cache tier routing
Reviewed changes
Copilot reviewed 16 out of 17 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/main/java/io/aiven/inkless/cache/TieredObjectCache.java | New tiered cache implementation with hot/lagging routing and rate limiting |
| storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java | Added timestamp-aware computeIfAbsent method with default implementation |
| storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java | Factory method to create tiered cache when enabled via configuration |
| storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java | Added four configuration parameters for lagging cache feature |
| storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java | Extended to accept and propagate batch timestamp for cache routing |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java | Tracks oldest timestamp per object and passes it to CacheFetchJob |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java | Changed to accept List<FileExtent> instead of List<Future<FileExtent>> |
| storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java | Added allOfFileExtents helper to flatten CompletableFuture list |
| storage/inkless/src/test/java/io/aiven/inkless/cache/TieredObjectCacheTest.java | Comprehensive tests for tiered cache routing and rate limiting behavior |
| storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java | Tests for cache initialization with different lagging cache configurations |
| storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java | Verification of default and custom lagging cache config values |
| storage/inkless/src/test/java/io/aiven/inkless/consume/CacheFetchJobTest.java | Tests for timestamp propagation in cache fetch operations |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java | Tests for timestamp tracking and merging across batches |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java | Updated to work with List<FileExtent> instead of futures |
| storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java | Test for future completion order preservation |
| gradle/dependencies.gradle | Added Bucket4j dependency version 8.14.0 |
| build.gradle | Added bucket4jCore library to inkless project dependencies |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Thread.sleep(50); // Give C time to complete | ||
| futureBCanComplete.countDown(); | ||
| Thread.sleep(50); // Give B time to complete | ||
| futureACanComplete.countDown(); |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Thread.sleep() calls in this test make it flaky and slow. Consider using a more deterministic approach with countdown latches or mock time controls instead of relying on arbitrary sleep durations (50ms) that may not be sufficient under load or slow CI environments.
| } | ||
|
|
||
| /** | ||
| * This is to ensure fetch results preserve the request oreder are matched correctly with their requests. |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test comment states "This is to ensure fetch results preserve the request oreder are matched correctly with their requests" but the test only verifies that the order is preserved when futures complete out of order. The test doesn't verify that results are "matched correctly with their requests" beyond ordering. Consider clarifying the comment to specifically state it tests "order preservation despite out-of-order completion".
| * This is to ensure fetch results preserve the request oreder are matched correctly with their requests. | |
| * This test verifies that fetch results preserve the request order, | |
| * even when the underlying futures complete out of order. |
| * <p>For lagging cache fetches, rate limiting is applied before invoking the mapping function | ||
| * to protect remote storage from being overwhelmed.</p> |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rate limiting only applies when the mapping function is executed (i.e., on a cache miss). However, the computeIfAbsent operation on the laggingCache will only invoke the mapping function if the key is absent. This means rate limiting is correctly applied only to cache misses, but the javadoc at line 118 says "rate limiting is applied before invoking the mapping function" which could be misinterpreted. When there's a cache hit, the mapping function is never invoked and no rate limiting occurs. Consider clarifying the documentation to explicitly state "rate limiting is applied only on cache miss before fetching from remote storage".
| * <p>For lagging cache fetches, rate limiting is applied before invoking the mapping function | |
| * to protect remote storage from being overwhelmed.</p> | |
| * <p>For lagging cache fetches, rate limiting is applied <b>only on cache miss</b>, | |
| * before invoking the mapping function to fetch from remote storage. This protects remote storage from being overwhelmed.</p> |
| @Test | ||
| void shouldApplyRateLimitingForLaggingCacheFetches() { | ||
| // Given: A tiered cache with rate limiting enabled | ||
| long rateLimitBytesPerSec = 1000; // 1000 bytes/sec, low for testing | ||
| ObjectCache realLaggingCache = new CaffeineCache(100, 60, -1); | ||
| TieredObjectCache rateLimitedCache = new TieredObjectCache( | ||
| hotCache, | ||
| realLaggingCache, | ||
| HOT_CACHE_TTL_MS, | ||
| time, | ||
| rateLimitBytesPerSec, | ||
| TieredObjectCache.TieredCacheMetrics.NOOP | ||
| ); | ||
|
|
||
| long currentTime = time.milliseconds(); | ||
| long oldTimestamp = currentTime - HOT_CACHE_TTL_MS - 1000; | ||
|
|
||
| // When: Multiple fetches that should be rate limited | ||
| CacheKey key1 = createCacheKey("test-object-1", 0, 500); | ||
| CacheKey key2 = createCacheKey("test-object-2", 0, 500); | ||
|
|
||
| // First fetch should succeed immediately (within burst capacity) | ||
| FileExtent result1 = rateLimitedCache.computeIfAbsent(key1, k -> createFileExtent("test-object-1"), oldTimestamp); | ||
| assertThat(result1).isNotNull(); | ||
|
|
||
| // Second fetch should also succeed (rate limiter has 2x burst capacity) | ||
| FileExtent result2 = rateLimitedCache.computeIfAbsent(key2, k -> createFileExtent("test-object-2"), oldTimestamp); | ||
| assertThat(result2).isNotNull(); | ||
| } |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test for rate limiting (shouldApplyRateLimitingForLaggingCacheFetches) doesn't actually verify that rate limiting is working. It only checks that the fetches succeed, but doesn't verify that they were delayed or that the rate limiter consumed tokens. With a 2x burst capacity (2000 bytes) and two fetches of 500 bytes each (1000 bytes total), both fetches will complete immediately without any rate limiting delay. Consider adding assertions that verify the rate limiter's state (e.g., available tokens) or use timing assertions to verify that rate limiting actually occurs when burst capacity is exceeded.
| InklessFetchMetrics metrics; | ||
|
|
||
| // Direct executor that runs tasks immediately in the same thread | ||
| ExecutorService dataExecutor = Executors.newSingleThreadExecutor(); |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ExecutorService created on line 72 is never shut down. This will cause test resource leaks and may accumulate threads over the test suite execution. Consider adding an @AfterEach method to properly shut down the executor service, or use a try-with-resources pattern if possible.
| @Test | ||
| public void planShouldUseOldestTimestampForSameObject() { | ||
| // Given: Two batches for the same object with different timestamps | ||
| long olderTimestamp = 1000L; | ||
| long newerTimestamp = 2000L; | ||
|
|
||
| Map<TopicIdPartition, FindBatchResponse> coordinates = Map.of( | ||
| partition0, FindBatchResponse.success(List.of( | ||
| new BatchInfo(1L, OBJECT_KEY_A.value(), BatchMetadata.of(partition0, 0, 10, 0, 0, 10, newerTimestamp, TimestampType.CREATE_TIME)) | ||
| ), 0, 1), | ||
| partition1, FindBatchResponse.success(List.of( | ||
| new BatchInfo(2L, OBJECT_KEY_A.value(), BatchMetadata.of(partition1, 30, 10, 0, 0, 11, olderTimestamp, TimestampType.CREATE_TIME)) | ||
| ), 0, 1) | ||
| ); | ||
|
|
||
| FetchPlanner job = fetchPlannerJob(coordinates); | ||
| List<CompletableFuture<FileExtent>> result = job.get(); | ||
|
|
||
| // Should only have one job for the merged object key | ||
| assertEquals(1, result.size()); | ||
| // The job should use the older timestamp (1000L) for cache tiering decisions | ||
| // This is verified implicitly - FetchPlanner.ObjectFetchInfo tracks min timestamp | ||
| } |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test "planShouldUseOldestTimestampForSameObject" only verifies that one job is created for the merged object key, but doesn't actually verify that the oldest timestamp (1000L) is used. The comment states "This is verified implicitly" but there's no actual assertion to verify this behavior. Consider extracting the created CacheFetchJob and asserting that its batchTimestamp field equals the older timestamp, or add a more explicit test that validates the timestamp is correctly propagated.
| } | ||
|
|
||
| /** | ||
| * This is to ensure fetch results preserve the request oreder are matched correctly with their requests. |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a typo in the comment: "oreder" should be "order".
| * This is to ensure fetch results preserve the request oreder are matched correctly with their requests. | |
| * This is to ensure fetch results preserve the request order are matched correctly with their requests. |
| // Note: keyAlignment.align() returns fixed-size aligned blocks (e.g., 16 MiB). | ||
| // This aligned byteRange is used for both caching and rate limiting (if enabled). | ||
| // Rate limiting uses the aligned block size (not actual batch size) as a conservative | ||
| // estimate, since the actual fetch size is only known after the fetch completes. | ||
| return keyAlignment.align(fetchInfo.ranges) |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment mentions that keyAlignment.align() returns "fixed-size aligned blocks (e.g., 16 MiB)" and that this is used as a "conservative estimate" for rate limiting. However, this could be significantly inaccurate if the actual batch range is much smaller than the aligned block size. For example, if a batch is 100 KB but gets aligned to a 16 MiB block, the rate limiter would consume 16 MiB worth of tokens for only 100 KB of actual data transfer. This could make the rate limiting far more restrictive than intended. Consider documenting this limitation more prominently or exploring ways to use actual fetch sizes for rate limiting.
| try { | ||
| rateLimiter.consume(bytesToFetch); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new RuntimeException("Rate limit wait interrupted", e); |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rate limiter blocks the thread while waiting for tokens (line 159). This could lead to thread pool exhaustion if many lagging consumer requests are rate-limited simultaneously, as they'll hold executor threads while blocked. The dataExecutor thread pool could become saturated with blocked threads, preventing other fetch operations from proceeding. Consider using non-blocking rate limiting (tryConsume with proper backpressure) or documenting this behavior and its implications for thread pool sizing.
| try { | |
| rateLimiter.consume(bytesToFetch); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw new RuntimeException("Rate limit wait interrupted", e); | |
| if (!rateLimiter.tryConsume(bytesToFetch)) { | |
| throw new RuntimeException("Rate limit exceeded for lagging cache fetch: not enough tokens available"); |
| @SuppressWarnings("rawtypes") | ||
| static CompletableFuture<java.util.List<FileExtent>> allOfFileExtents( | ||
| List<CompletableFuture<FileExtent>> fileExtentFutures | ||
| ) { | ||
| final CompletableFuture[] futuresArray = fileExtentFutures.toArray(new CompletableFuture[0]); |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The @SuppressWarnings("rawtypes") annotation is used to suppress warnings about the raw type CompletableFuture[], but this could be avoided by using a properly typed approach. Consider using CompletableFuture.allOf(fileExtentFutures.toArray(CompletableFuture[]::new)) or explicitly typing the array as CompletableFuture<FileExtent>[] to eliminate the need for suppression.
| @SuppressWarnings("rawtypes") | |
| static CompletableFuture<java.util.List<FileExtent>> allOfFileExtents( | |
| List<CompletableFuture<FileExtent>> fileExtentFutures | |
| ) { | |
| final CompletableFuture[] futuresArray = fileExtentFutures.toArray(new CompletableFuture[0]); | |
| static CompletableFuture<java.util.List<FileExtent>> allOfFileExtents( | |
| List<CompletableFuture<FileExtent>> fileExtentFutures | |
| ) { | |
| final CompletableFuture<?>[] futuresArray = fileExtentFutures.toArray(CompletableFuture[]::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
Implement a secondary cache for lagging consumers with integrated rate limiting.
Problem: Lagging consumers reading historical data cause:
Solution:
Tiered cache architecture:
Key design: Rate limiting integrated into TieredObjectCache, applied
only on cache miss when fetching from remote storage.
Configuration:
consume.lagging.cache.enabledfalseconsume.lagging.cache.max.count150consume.lagging.cache.ttl.sec5consume.lagging.cache.rate.limit.bytes.per.sec52428800Dependencies:
com.bucket4j:bucket4j_jdk11-core:8.14.0