diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java index 78ed51100d..1376320889 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java @@ -1,11 +1,15 @@ package io.aiven.inkless.cache; +import org.apache.kafka.common.metrics.Metrics; + import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Weigher; import com.github.benmanes.caffeine.cache.stats.CacheStats; import java.io.IOException; import java.time.Duration; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -21,32 +25,39 @@ public final class CaffeineCache implements ObjectCache { */ private final AsyncCache cache; - private final CaffeineCacheMetrics metrics; - public CaffeineCache( final long maxCacheSize, + final OptionalLong maxCacheBytes, final long lifespanSeconds, - final int maxIdleSeconds) { - cache = Caffeine.newBuilder() - .maximumSize(maxCacheSize) - .expireAfterWrite(Duration.ofSeconds(lifespanSeconds)) - .expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds: 180)) - .recordStats() - .buildAsync(); - metrics = new CaffeineCacheMetrics(cache.synchronous()); + final int maxIdleSeconds, + final Metrics storageMetrics + ) { + final Caffeine builder = Caffeine.newBuilder(); + // size and weight limits + builder.maximumSize(maxCacheSize); + maxCacheBytes.ifPresent(max -> builder.maximumWeight(max).weigher(weigher())); + // expiration policies + builder.expireAfterWrite(Duration.ofSeconds(lifespanSeconds)); + builder.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds : 180)); + // enable metrics + builder.recordStats(); + cache = builder.buildAsync(); + new CaffeineCacheMetrics(storageMetrics, cache.synchronous()); + } + + public Weigher weigher() { + return (key, value) -> value.data().length; } @Override public void close() throws IOException { - metrics.close(); + cache.synchronous().cleanUp(); } @Override public FileExtent computeIfAbsent(final CacheKey key, final Function mappingFunction) { final CompletableFuture future = new CompletableFuture<>(); - final CompletableFuture existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> { - return future; - }); + final CompletableFuture existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> future); // If existing future is not the same object as created in this function // there was a pending cache load and this call is required to join the existing future // and discard the created one. diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java index d97c830116..9a27588d00 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java @@ -1,27 +1,17 @@ package io.aiven.inkless.cache; import org.apache.kafka.common.MetricNameTemplate; -import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.KafkaMetricsContext; -import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.Time; import com.github.benmanes.caffeine.cache.Cache; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; import java.util.function.Supplier; import io.aiven.inkless.common.metrics.MeasurableValue; import io.aiven.inkless.common.metrics.SensorProvider; -public final class CaffeineCacheMetrics implements Closeable { - - private final Metrics metrics; - +public final class CaffeineCacheMetrics { private final Sensor cacheSizeSensor; private final Sensor cacheHitCountSensor; private final Sensor cacheHitRateSensor; @@ -30,13 +20,7 @@ public final class CaffeineCacheMetrics implements Closeable { private final Sensor cacheAvgLoadPenaltySensor; private final Sensor cacheEvictionsSensor; - public CaffeineCacheMetrics(final Cache cache) { - final JmxReporter reporter = new JmxReporter(); - this.metrics = new Metrics( - new MetricConfig(), List.of(reporter), Time.SYSTEM, - new KafkaMetricsContext(CaffeineCacheMetricsRegistry.METRIC_CONTEXT) - ); - + public CaffeineCacheMetrics(final Metrics metrics, final Cache cache) { final CaffeineCacheMetricsRegistry metricsRegistry = new CaffeineCacheMetricsRegistry(); cacheSizeSensor = registerLongSensor(metrics, metricsRegistry.cacheSizeMetricName, CaffeineCacheMetricsRegistry.CACHE_SIZE, cache::estimatedSize); cacheHitCountSensor = registerLongSensor(metrics, metricsRegistry.cacheHitCountMetricName, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT, () -> cache.stats().hitCount()); @@ -62,8 +46,7 @@ static Sensor registerLongSensor(final Metrics metrics, final MetricNameTemplate @Override public String toString() { return "CaffeineCacheMetrics{" + - "metrics=" + metrics + - ", cacheSizeSensor=" + cacheSizeSensor + + "cacheSizeSensor=" + cacheSizeSensor + ", cacheHitCountSensor=" + cacheHitCountSensor + ", cacheHitRateSensor=" + cacheHitRateSensor + ", cacheMissCountSensor=" + cacheMissCountSensor + @@ -72,9 +55,4 @@ public String toString() { ", cacheEvictionsSensor=" + cacheEvictionsSensor + '}'; } - - @Override - public void close() throws IOException { - metrics.close(); - } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java index 6d57d988c3..42f6027002 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.MetricNameTemplate; public class CaffeineCacheMetricsRegistry { - public static final String METRIC_CONTEXT = "io.aiven.inkless.cache.caffeine"; public static final String METRIC_GROUP = "wal-segment-cache"; public static final String CACHE_SIZE = "size"; diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index aa817b2bd4..a768bd1f52 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -71,7 +71,8 @@ public SharedState( final ObjectCache cache, final BatchCoordinateCache batchCoordinateCache, final BrokerTopicStats brokerTopicStats, - final Supplier defaultTopicConfigs + final Supplier defaultTopicConfigs, + final Metrics storageMetrics ) { this.time = time; this.brokerId = brokerId; @@ -84,12 +85,7 @@ public SharedState( this.batchCoordinateCache = batchCoordinateCache; this.brokerTopicStats = brokerTopicStats; this.defaultTopicConfigs = defaultTopicConfigs; - - final MetricsReporter reporter = new JmxReporter(); - this.storageMetrics = new Metrics( - new MetricConfig(), List.of(reporter), Time.SYSTEM, - new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) - ); + this.storageMetrics = storageMetrics; } public static SharedState initialize( @@ -107,6 +103,11 @@ public static SharedState initialize( "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" ); } + final MetricsReporter reporter = new JmxReporter(); + final Metrics storageMetrics = new Metrics( + new MetricConfig(), List.of(reporter), Time.SYSTEM, + new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) + ); return new SharedState( time, brokerId, @@ -117,12 +118,15 @@ public static SharedState initialize( new FixedBlockAlignment(config.fetchCacheBlockBytes()), new CaffeineCache( config.cacheMaxCount(), + config.cacheMaxBytes(), config.cacheExpirationLifespanSec(), - config.cacheExpirationMaxIdleSec() + config.cacheExpirationMaxIdleSec(), + storageMetrics ), config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(), brokerTopicStats, - defaultTopicConfigs + defaultTopicConfigs, + storageMetrics ); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java index 98896b797a..dd328a1921 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java @@ -25,6 +25,8 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; import io.aiven.inkless.common.config.validators.Subclass; import io.aiven.inkless.control_plane.ControlPlane; @@ -79,11 +81,12 @@ public class InklessConfig extends AbstractConfig { private static final int CONSUME_CACHE_BLOCK_BYTES_DEFAULT = 16 * 1024 * 1024; // 16 MiB public static final String CONSUME_CACHE_MAX_COUNT_CONFIG = CONSUME_PREFIX + "cache.max.count"; - private static final String CONSUME_CACHE_MAX_COUNT_DOC = "The maximum number of objects to cache in memory. " + - "If the cache exceeds this limit, and the cache persistence is enabled, " + - "the least recently used objects will be persisted to disk and removed from memory."; + private static final String CONSUME_CACHE_MAX_COUNT_DOC = "The maximum number of objects to cache in memory."; private static final int CONSUME_CACHE_MAX_COUNT_DEFAULT = 1000; + public static final String CONSUME_CACHE_MAX_BYTES_CONFIG = CONSUME_PREFIX + "cache.max.bytes"; + private static final String CONSUME_CACHE_MAX_BYTES_DOC = "The maximum number of bytes to cache in memory."; + public static final String CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG = CONSUME_PREFIX + "cache.expiration.lifespan.sec"; private static final String CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DOC = "The lifespan in seconds of a cache entry before it will be removed from all storages."; private static final int CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DEFAULT = 60; // Defaults to 1 minute @@ -282,6 +285,13 @@ public static ConfigDef configDef() { ConfigDef.Importance.LOW, CONSUME_CACHE_MAX_COUNT_DOC ); + configDef.define( + CONSUME_CACHE_MAX_BYTES_CONFIG, + ConfigDef.Type.LONG, + null, + ConfigDef.Importance.LOW, + CONSUME_CACHE_MAX_BYTES_DOC + ); configDef.define( CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG, ConfigDef.Type.INT, @@ -474,4 +484,10 @@ public boolean isBatchCoordinateCacheEnabled() { public Duration batchCoordinateCacheTtl() { return Duration.ofMillis(getInt(CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_CONFIG)); } + + public OptionalLong cacheMaxBytes() { + return Optional.ofNullable(getLong(CONSUME_CACHE_MAX_BYTES_CONFIG)) + .map(OptionalLong::of) + .orElse(OptionalLong.empty()); + } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java index 6c3b22f12f..74e91a8fcd 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java @@ -53,6 +53,7 @@ void publicConstructor() { configs.put("inkless.file.cleaner.retention.period.ms", "200"); configs.put("inkless.file.merger.interval.ms", "100"); configs.put("inkless.consume.cache.max.count", "100"); + configs.put("inkless.consume.cache.max.bytes", "1048576"); configs.put("inkless.consume.cache.expiration.lifespan.sec", "200"); configs.put("inkless.consume.cache.expiration.max.idle.sec", "100"); configs.put("inkless.produce.upload.thread.pool.size", "16"); @@ -73,6 +74,7 @@ void publicConstructor() { assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMillis(200)); assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMillis(100)); assertThat(config.cacheMaxCount()).isEqualTo(100); + assertThat(config.cacheMaxBytes()).hasValue(1048576L); assertThat(config.cacheExpirationLifespanSec()).isEqualTo(200); assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(100); assertThat(config.produceUploadThreadPoolSize()).isEqualTo(16); @@ -102,6 +104,7 @@ void minimalConfig() { assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMinutes(1)); assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMinutes(1)); assertThat(config.cacheMaxCount()).isEqualTo(1000); + assertThat(config.cacheMaxBytes()).isEmpty(); assertThat(config.cacheExpirationLifespanSec()).isEqualTo(60); assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(-1); assertThat(config.produceUploadThreadPoolSize()).isEqualTo(8); @@ -125,6 +128,7 @@ void fullConfig() { configs.put("file.cleaner.retention.period.ms", "200"); configs.put("file.merger.interval.ms", "100"); configs.put("consume.cache.max.count", "100"); + configs.put("consume.cache.max.bytes", "1048576"); configs.put("consume.cache.expiration.lifespan.sec", "200"); configs.put("consume.cache.expiration.max.idle.sec", "100"); configs.put("produce.upload.thread.pool.size", "16"); @@ -146,6 +150,7 @@ void fullConfig() { assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMillis(200)); assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMillis(100)); assertThat(config.cacheMaxCount()).isEqualTo(100); + assertThat(config.cacheMaxBytes()).hasValue(1048576L); assertThat(config.cacheExpirationLifespanSec()).isEqualTo(200); assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(100); assertThat(config.produceUploadThreadPoolSize()).isEqualTo(16); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java index 43e197dbd2..722eb4e2b9 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteRecordsResponseData; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -80,6 +81,8 @@ class DeleteRecordsInterceptorTest { static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); Time time = new MockTime(); + Metrics storageMetrics = new Metrics(); + @Mock InklessConfig disklessConfig; @Mock @@ -100,20 +103,7 @@ class DeleteRecordsInterceptorTest { public void mixingDisklessAndClassicTopicsIsNotAllowed() { when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final SharedState state = new SharedState( - time, - BROKER_ID, - disklessConfig, - metadataView, - controlPlane, - OBJECT_KEY_CREATOR, - KEY_ALIGNMENT_STRATEGY, - OBJECT_CACHE, - BATCH_COORDINATE_CACHE, - brokerTopicStats, - DEFAULT_TOPIC_CONFIGS - ); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState()); final Map entriesPerPartition = Map.of( new TopicPartition("diskless", 0), @@ -144,9 +134,7 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() { @Test public void notInterceptDeletingRecordsFromClassicTopics() { when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS)); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState()); final Map entriesPerPartition = Map.of( new TopicPartition("non_diskless", 0), 4567L @@ -182,9 +170,9 @@ public void interceptDeletingRecordsFromDisklessTopics() { }); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition tp0 = new TopicPartition("diskless", 0); final TopicPartition tp1 = new TopicPartition("diskless", 1); @@ -225,9 +213,9 @@ public void controlPlaneException() { when(controlPlane.deleteRecords(anyList())).thenThrow(new RuntimeException("test")); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition topicPartition = new TopicPartition("diskless", 1); final Map entriesPerPartition = Map.of( @@ -266,9 +254,9 @@ public void topicIdNotFound() { }); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition topicPartition1 = new TopicPartition("diskless1", 1); final TopicPartition topicPartition2 = new TopicPartition("diskless2", 2); @@ -295,4 +283,21 @@ public void topicIdNotFound() { .setLowWatermark(INVALID_LOW_WATERMARK) )); } + + private SharedState buildSharedState() { + return new SharedState( + time, + BROKER_ID, + disklessConfig, + metadataView, + controlPlane, + OBJECT_KEY_CREATOR, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + brokerTopicStats, + DEFAULT_TOPIC_CONFIGS, + storageMetrics + ); + } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java index 6fda9ad1e8..65e29c0521 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.SimpleRecord; @@ -80,6 +81,7 @@ public class AppendHandlerTest { Time time = new MockTime(); RequestLocal requestLocal = RequestLocal.noCaching(); + Metrics storageMetrics = new Metrics(); @Mock InklessConfig inklessConfig; @Mock @@ -113,9 +115,7 @@ public class AppendHandlerTest { @Test public void rejectTransactionalProduce() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless1"); final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless2"); @@ -137,9 +137,7 @@ topicIdPartition2, new PartitionResponse(Errors.INVALID_REQUEST) @Test public void emptyRequests() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final Map entriesPerPartition = Map.of(); @@ -165,9 +163,7 @@ topicIdPartition, new PartitionResponse(Errors.NONE) ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final var result = interceptor.handle(entriesPerPartition, requestLocal).get(); assertThat(result).isEqualTo(writeResult); @@ -188,21 +184,34 @@ public void writeFutureFailed() throws Exception { ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { assertThatThrownBy(() -> interceptor.handle(entriesPerPartition, requestLocal).get()).hasCause(exception); } } @Test public void close() throws IOException { - final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer); + final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer); interceptor.close(); verify(writer).close(); } + + private SharedState buildSharedState() { + return new SharedState( + time, + BROKER_ID, + inklessConfig, + metadataView, + controlPlane, + OBJECT_KEY_CREATOR, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + brokerTopicStats, + DEFAULT_TOPIC_CONFIGS, + storageMetrics + ); + } }