From 75368750947764ca0592c32e611298c49e0dc8bd Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 20 Nov 2025 14:12:49 +0200 Subject: [PATCH 1/2] refactor(inkless): reuse storage metrics instance on cache Instead of creating yet another metrics instance, reuse the storage metrics instance used by storage layer, as caching is a related module. This will change the metric name prefix from: `io.aiven.inkless.cache.caffeine` to `io.aiven.inkless.storage` --- .../io/aiven/inkless/cache/CaffeineCache.java | 31 +++++----- .../inkless/cache/CaffeineCacheMetrics.java | 28 +-------- .../cache/CaffeineCacheMetricsRegistry.java | 1 - .../io/aiven/inkless/common/SharedState.java | 21 ++++--- .../delete/DeleteRecordsInterceptorTest.java | 57 ++++++++++--------- .../inkless/produce/AppendHandlerTest.java | 39 ++++++++----- 6 files changed, 87 insertions(+), 90 deletions(-) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java index 78ed51100d..c9ef26f7ce 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java @@ -1,5 +1,7 @@ package io.aiven.inkless.cache; +import org.apache.kafka.common.metrics.Metrics; + import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.stats.CacheStats; @@ -21,32 +23,33 @@ public final class CaffeineCache implements ObjectCache { */ private final AsyncCache cache; - private final CaffeineCacheMetrics metrics; - public CaffeineCache( final long maxCacheSize, final long lifespanSeconds, - final int maxIdleSeconds) { - cache = Caffeine.newBuilder() - .maximumSize(maxCacheSize) - .expireAfterWrite(Duration.ofSeconds(lifespanSeconds)) - .expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds: 180)) - .recordStats() - .buildAsync(); - metrics = new CaffeineCacheMetrics(cache.synchronous()); + final int maxIdleSeconds, + final Metrics storageMetrics + ) { + final Caffeine builder = Caffeine.newBuilder(); + // size and weight limits + builder.maximumSize(maxCacheSize); + // expiration policies + builder.expireAfterWrite(Duration.ofSeconds(lifespanSeconds)); + builder.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds : 180)); + // enable metrics + builder.recordStats(); + cache = builder.buildAsync(); + new CaffeineCacheMetrics(storageMetrics, cache.synchronous()); } @Override public void close() throws IOException { - metrics.close(); + cache.synchronous().cleanUp(); } @Override public FileExtent computeIfAbsent(final CacheKey key, final Function mappingFunction) { final CompletableFuture future = new CompletableFuture<>(); - final CompletableFuture existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> { - return future; - }); + final CompletableFuture existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> future); // If existing future is not the same object as created in this function // there was a pending cache load and this call is required to join the existing future // and discard the created one. diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java index d97c830116..9a27588d00 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java @@ -1,27 +1,17 @@ package io.aiven.inkless.cache; import org.apache.kafka.common.MetricNameTemplate; -import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.KafkaMetricsContext; -import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.Time; import com.github.benmanes.caffeine.cache.Cache; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; import java.util.function.Supplier; import io.aiven.inkless.common.metrics.MeasurableValue; import io.aiven.inkless.common.metrics.SensorProvider; -public final class CaffeineCacheMetrics implements Closeable { - - private final Metrics metrics; - +public final class CaffeineCacheMetrics { private final Sensor cacheSizeSensor; private final Sensor cacheHitCountSensor; private final Sensor cacheHitRateSensor; @@ -30,13 +20,7 @@ public final class CaffeineCacheMetrics implements Closeable { private final Sensor cacheAvgLoadPenaltySensor; private final Sensor cacheEvictionsSensor; - public CaffeineCacheMetrics(final Cache cache) { - final JmxReporter reporter = new JmxReporter(); - this.metrics = new Metrics( - new MetricConfig(), List.of(reporter), Time.SYSTEM, - new KafkaMetricsContext(CaffeineCacheMetricsRegistry.METRIC_CONTEXT) - ); - + public CaffeineCacheMetrics(final Metrics metrics, final Cache cache) { final CaffeineCacheMetricsRegistry metricsRegistry = new CaffeineCacheMetricsRegistry(); cacheSizeSensor = registerLongSensor(metrics, metricsRegistry.cacheSizeMetricName, CaffeineCacheMetricsRegistry.CACHE_SIZE, cache::estimatedSize); cacheHitCountSensor = registerLongSensor(metrics, metricsRegistry.cacheHitCountMetricName, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT, () -> cache.stats().hitCount()); @@ -62,8 +46,7 @@ static Sensor registerLongSensor(final Metrics metrics, final MetricNameTemplate @Override public String toString() { return "CaffeineCacheMetrics{" + - "metrics=" + metrics + - ", cacheSizeSensor=" + cacheSizeSensor + + "cacheSizeSensor=" + cacheSizeSensor + ", cacheHitCountSensor=" + cacheHitCountSensor + ", cacheHitRateSensor=" + cacheHitRateSensor + ", cacheMissCountSensor=" + cacheMissCountSensor + @@ -72,9 +55,4 @@ public String toString() { ", cacheEvictionsSensor=" + cacheEvictionsSensor + '}'; } - - @Override - public void close() throws IOException { - metrics.close(); - } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java index 6d57d988c3..42f6027002 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.MetricNameTemplate; public class CaffeineCacheMetricsRegistry { - public static final String METRIC_CONTEXT = "io.aiven.inkless.cache.caffeine"; public static final String METRIC_GROUP = "wal-segment-cache"; public static final String CACHE_SIZE = "size"; diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index aa817b2bd4..93fb086b97 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -71,7 +71,8 @@ public SharedState( final ObjectCache cache, final BatchCoordinateCache batchCoordinateCache, final BrokerTopicStats brokerTopicStats, - final Supplier defaultTopicConfigs + final Supplier defaultTopicConfigs, + final Metrics storageMetrics ) { this.time = time; this.brokerId = brokerId; @@ -84,12 +85,7 @@ public SharedState( this.batchCoordinateCache = batchCoordinateCache; this.brokerTopicStats = brokerTopicStats; this.defaultTopicConfigs = defaultTopicConfigs; - - final MetricsReporter reporter = new JmxReporter(); - this.storageMetrics = new Metrics( - new MetricConfig(), List.of(reporter), Time.SYSTEM, - new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) - ); + this.storageMetrics = storageMetrics; } public static SharedState initialize( @@ -107,6 +103,11 @@ public static SharedState initialize( "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" ); } + final MetricsReporter reporter = new JmxReporter(); + final Metrics storageMetrics = new Metrics( + new MetricConfig(), List.of(reporter), Time.SYSTEM, + new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) + ); return new SharedState( time, brokerId, @@ -118,11 +119,13 @@ public static SharedState initialize( new CaffeineCache( config.cacheMaxCount(), config.cacheExpirationLifespanSec(), - config.cacheExpirationMaxIdleSec() + config.cacheExpirationMaxIdleSec(), + storageMetrics ), config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(), brokerTopicStats, - defaultTopicConfigs + defaultTopicConfigs, + storageMetrics ); } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java index 43e197dbd2..722eb4e2b9 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteRecordsResponseData; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -80,6 +81,8 @@ class DeleteRecordsInterceptorTest { static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); Time time = new MockTime(); + Metrics storageMetrics = new Metrics(); + @Mock InklessConfig disklessConfig; @Mock @@ -100,20 +103,7 @@ class DeleteRecordsInterceptorTest { public void mixingDisklessAndClassicTopicsIsNotAllowed() { when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final SharedState state = new SharedState( - time, - BROKER_ID, - disklessConfig, - metadataView, - controlPlane, - OBJECT_KEY_CREATOR, - KEY_ALIGNMENT_STRATEGY, - OBJECT_CACHE, - BATCH_COORDINATE_CACHE, - brokerTopicStats, - DEFAULT_TOPIC_CONFIGS - ); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState()); final Map entriesPerPartition = Map.of( new TopicPartition("diskless", 0), @@ -144,9 +134,7 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() { @Test public void notInterceptDeletingRecordsFromClassicTopics() { when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS)); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState()); final Map entriesPerPartition = Map.of( new TopicPartition("non_diskless", 0), 4567L @@ -182,9 +170,9 @@ public void interceptDeletingRecordsFromDisklessTopics() { }); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition tp0 = new TopicPartition("diskless", 0); final TopicPartition tp1 = new TopicPartition("diskless", 1); @@ -225,9 +213,9 @@ public void controlPlaneException() { when(controlPlane.deleteRecords(anyList())).thenThrow(new RuntimeException("test")); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition topicPartition = new TopicPartition("diskless", 1); final Map entriesPerPartition = Map.of( @@ -266,9 +254,9 @@ public void topicIdNotFound() { }); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition topicPartition1 = new TopicPartition("diskless1", 1); final TopicPartition topicPartition2 = new TopicPartition("diskless2", 2); @@ -295,4 +283,21 @@ public void topicIdNotFound() { .setLowWatermark(INVALID_LOW_WATERMARK) )); } + + private SharedState buildSharedState() { + return new SharedState( + time, + BROKER_ID, + disklessConfig, + metadataView, + controlPlane, + OBJECT_KEY_CREATOR, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + brokerTopicStats, + DEFAULT_TOPIC_CONFIGS, + storageMetrics + ); + } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java index 6fda9ad1e8..65e29c0521 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.SimpleRecord; @@ -80,6 +81,7 @@ public class AppendHandlerTest { Time time = new MockTime(); RequestLocal requestLocal = RequestLocal.noCaching(); + Metrics storageMetrics = new Metrics(); @Mock InklessConfig inklessConfig; @Mock @@ -113,9 +115,7 @@ public class AppendHandlerTest { @Test public void rejectTransactionalProduce() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless1"); final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless2"); @@ -137,9 +137,7 @@ topicIdPartition2, new PartitionResponse(Errors.INVALID_REQUEST) @Test public void emptyRequests() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final Map entriesPerPartition = Map.of(); @@ -165,9 +163,7 @@ topicIdPartition, new PartitionResponse(Errors.NONE) ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final var result = interceptor.handle(entriesPerPartition, requestLocal).get(); assertThat(result).isEqualTo(writeResult); @@ -188,21 +184,34 @@ public void writeFutureFailed() throws Exception { ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { assertThatThrownBy(() -> interceptor.handle(entriesPerPartition, requestLocal).get()).hasCause(exception); } } @Test public void close() throws IOException { - final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer); + final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer); interceptor.close(); verify(writer).close(); } + + private SharedState buildSharedState() { + return new SharedState( + time, + BROKER_ID, + inklessConfig, + metadataView, + controlPlane, + OBJECT_KEY_CREATOR, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + brokerTopicStats, + DEFAULT_TOPIC_CONFIGS, + storageMetrics + ); + } } From 083bea8f91a337fb36c6b3bf18f7f75576ebd07e Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 20 Nov 2025 12:03:38 +0200 Subject: [PATCH 2/2] feat(inkless): add bytes limit to cache Add a new limiting configuration for cache sizing based on bytes instead of entries. Historically, previous caching library only supported sizing based on entries. With Caffeine, sizing can now be defined by both entries and max bytes using weigther. --- .../io/aiven/inkless/cache/CaffeineCache.java | 8 +++++++ .../io/aiven/inkless/common/SharedState.java | 1 + .../aiven/inkless/config/InklessConfig.java | 22 ++++++++++++++++--- .../inkless/config/InklessConfigTest.java | 5 +++++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java index c9ef26f7ce..1376320889 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java @@ -4,10 +4,12 @@ import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Weigher; import com.github.benmanes.caffeine.cache.stats.CacheStats; import java.io.IOException; import java.time.Duration; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -25,6 +27,7 @@ public final class CaffeineCache implements ObjectCache { public CaffeineCache( final long maxCacheSize, + final OptionalLong maxCacheBytes, final long lifespanSeconds, final int maxIdleSeconds, final Metrics storageMetrics @@ -32,6 +35,7 @@ public CaffeineCache( final Caffeine builder = Caffeine.newBuilder(); // size and weight limits builder.maximumSize(maxCacheSize); + maxCacheBytes.ifPresent(max -> builder.maximumWeight(max).weigher(weigher())); // expiration policies builder.expireAfterWrite(Duration.ofSeconds(lifespanSeconds)); builder.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds : 180)); @@ -41,6 +45,10 @@ public CaffeineCache( new CaffeineCacheMetrics(storageMetrics, cache.synchronous()); } + public Weigher weigher() { + return (key, value) -> value.data().length; + } + @Override public void close() throws IOException { cache.synchronous().cleanUp(); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index 93fb086b97..a768bd1f52 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -118,6 +118,7 @@ public static SharedState initialize( new FixedBlockAlignment(config.fetchCacheBlockBytes()), new CaffeineCache( config.cacheMaxCount(), + config.cacheMaxBytes(), config.cacheExpirationLifespanSec(), config.cacheExpirationMaxIdleSec(), storageMetrics diff --git a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java index 98896b797a..dd328a1921 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java @@ -25,6 +25,8 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; import io.aiven.inkless.common.config.validators.Subclass; import io.aiven.inkless.control_plane.ControlPlane; @@ -79,11 +81,12 @@ public class InklessConfig extends AbstractConfig { private static final int CONSUME_CACHE_BLOCK_BYTES_DEFAULT = 16 * 1024 * 1024; // 16 MiB public static final String CONSUME_CACHE_MAX_COUNT_CONFIG = CONSUME_PREFIX + "cache.max.count"; - private static final String CONSUME_CACHE_MAX_COUNT_DOC = "The maximum number of objects to cache in memory. " + - "If the cache exceeds this limit, and the cache persistence is enabled, " + - "the least recently used objects will be persisted to disk and removed from memory."; + private static final String CONSUME_CACHE_MAX_COUNT_DOC = "The maximum number of objects to cache in memory."; private static final int CONSUME_CACHE_MAX_COUNT_DEFAULT = 1000; + public static final String CONSUME_CACHE_MAX_BYTES_CONFIG = CONSUME_PREFIX + "cache.max.bytes"; + private static final String CONSUME_CACHE_MAX_BYTES_DOC = "The maximum number of bytes to cache in memory."; + public static final String CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG = CONSUME_PREFIX + "cache.expiration.lifespan.sec"; private static final String CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DOC = "The lifespan in seconds of a cache entry before it will be removed from all storages."; private static final int CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DEFAULT = 60; // Defaults to 1 minute @@ -282,6 +285,13 @@ public static ConfigDef configDef() { ConfigDef.Importance.LOW, CONSUME_CACHE_MAX_COUNT_DOC ); + configDef.define( + CONSUME_CACHE_MAX_BYTES_CONFIG, + ConfigDef.Type.LONG, + null, + ConfigDef.Importance.LOW, + CONSUME_CACHE_MAX_BYTES_DOC + ); configDef.define( CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG, ConfigDef.Type.INT, @@ -474,4 +484,10 @@ public boolean isBatchCoordinateCacheEnabled() { public Duration batchCoordinateCacheTtl() { return Duration.ofMillis(getInt(CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_CONFIG)); } + + public OptionalLong cacheMaxBytes() { + return Optional.ofNullable(getLong(CONSUME_CACHE_MAX_BYTES_CONFIG)) + .map(OptionalLong::of) + .orElse(OptionalLong.empty()); + } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java index 6c3b22f12f..74e91a8fcd 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java @@ -53,6 +53,7 @@ void publicConstructor() { configs.put("inkless.file.cleaner.retention.period.ms", "200"); configs.put("inkless.file.merger.interval.ms", "100"); configs.put("inkless.consume.cache.max.count", "100"); + configs.put("inkless.consume.cache.max.bytes", "1048576"); configs.put("inkless.consume.cache.expiration.lifespan.sec", "200"); configs.put("inkless.consume.cache.expiration.max.idle.sec", "100"); configs.put("inkless.produce.upload.thread.pool.size", "16"); @@ -73,6 +74,7 @@ void publicConstructor() { assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMillis(200)); assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMillis(100)); assertThat(config.cacheMaxCount()).isEqualTo(100); + assertThat(config.cacheMaxBytes()).hasValue(1048576L); assertThat(config.cacheExpirationLifespanSec()).isEqualTo(200); assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(100); assertThat(config.produceUploadThreadPoolSize()).isEqualTo(16); @@ -102,6 +104,7 @@ void minimalConfig() { assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMinutes(1)); assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMinutes(1)); assertThat(config.cacheMaxCount()).isEqualTo(1000); + assertThat(config.cacheMaxBytes()).isEmpty(); assertThat(config.cacheExpirationLifespanSec()).isEqualTo(60); assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(-1); assertThat(config.produceUploadThreadPoolSize()).isEqualTo(8); @@ -125,6 +128,7 @@ void fullConfig() { configs.put("file.cleaner.retention.period.ms", "200"); configs.put("file.merger.interval.ms", "100"); configs.put("consume.cache.max.count", "100"); + configs.put("consume.cache.max.bytes", "1048576"); configs.put("consume.cache.expiration.lifespan.sec", "200"); configs.put("consume.cache.expiration.max.idle.sec", "100"); configs.put("produce.upload.thread.pool.size", "16"); @@ -146,6 +150,7 @@ void fullConfig() { assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMillis(200)); assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMillis(100)); assertThat(config.cacheMaxCount()).isEqualTo(100); + assertThat(config.cacheMaxBytes()).hasValue(1048576L); assertThat(config.cacheExpirationLifespanSec()).isEqualTo(200); assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(100); assertThat(config.produceUploadThreadPoolSize()).isEqualTo(16);