diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCacheMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCacheMetrics.java index 232b031883..ce0b9cfe58 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCacheMetrics.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCacheMetrics.java @@ -1,3 +1,20 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ package io.aiven.inkless.cache; import org.apache.kafka.server.metrics.KafkaMetricsGroup; diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java index 78ed51100d..d4ff236828 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java @@ -1,5 +1,24 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ package io.aiven.inkless.cache; +import org.apache.kafka.common.metrics.Metrics; + import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.stats.CacheStats; @@ -21,32 +40,33 @@ public final class CaffeineCache implements ObjectCache { */ private final AsyncCache cache; - private final CaffeineCacheMetrics metrics; - public CaffeineCache( final long maxCacheSize, final long lifespanSeconds, - final int maxIdleSeconds) { - cache = Caffeine.newBuilder() - .maximumSize(maxCacheSize) - .expireAfterWrite(Duration.ofSeconds(lifespanSeconds)) - .expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds: 180)) - .recordStats() - .buildAsync(); - metrics = new CaffeineCacheMetrics(cache.synchronous()); + final int maxIdleSeconds, + final Metrics storageMetrics + ) { + final Caffeine builder = Caffeine.newBuilder(); + // size and weight limits + builder.maximumSize(maxCacheSize); + // expiration policies + builder.expireAfterWrite(Duration.ofSeconds(lifespanSeconds)); + builder.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds : 180)); + // enable metrics + builder.recordStats(); + cache = builder.buildAsync(); + new CaffeineCacheMetrics(storageMetrics, cache.synchronous()); } @Override public void close() throws IOException { - metrics.close(); + // no resources to close } @Override public FileExtent computeIfAbsent(final CacheKey key, final Function mappingFunction) { final CompletableFuture future = new CompletableFuture<>(); - final CompletableFuture existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> { - return future; - }); + final CompletableFuture existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> future); // If existing future is not the same object as created in this function // there was a pending cache load and this call is required to join the existing future // and discard the created one. diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java index d97c830116..0390de2613 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java @@ -1,27 +1,34 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ package io.aiven.inkless.cache; import org.apache.kafka.common.MetricNameTemplate; -import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.KafkaMetricsContext; -import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.Time; import com.github.benmanes.caffeine.cache.Cache; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; import java.util.function.Supplier; import io.aiven.inkless.common.metrics.MeasurableValue; import io.aiven.inkless.common.metrics.SensorProvider; -public final class CaffeineCacheMetrics implements Closeable { - - private final Metrics metrics; - +public final class CaffeineCacheMetrics { private final Sensor cacheSizeSensor; private final Sensor cacheHitCountSensor; private final Sensor cacheHitRateSensor; @@ -30,13 +37,7 @@ public final class CaffeineCacheMetrics implements Closeable { private final Sensor cacheAvgLoadPenaltySensor; private final Sensor cacheEvictionsSensor; - public CaffeineCacheMetrics(final Cache cache) { - final JmxReporter reporter = new JmxReporter(); - this.metrics = new Metrics( - new MetricConfig(), List.of(reporter), Time.SYSTEM, - new KafkaMetricsContext(CaffeineCacheMetricsRegistry.METRIC_CONTEXT) - ); - + public CaffeineCacheMetrics(final Metrics metrics, final Cache cache) { final CaffeineCacheMetricsRegistry metricsRegistry = new CaffeineCacheMetricsRegistry(); cacheSizeSensor = registerLongSensor(metrics, metricsRegistry.cacheSizeMetricName, CaffeineCacheMetricsRegistry.CACHE_SIZE, cache::estimatedSize); cacheHitCountSensor = registerLongSensor(metrics, metricsRegistry.cacheHitCountMetricName, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT, () -> cache.stats().hitCount()); @@ -62,8 +63,7 @@ static Sensor registerLongSensor(final Metrics metrics, final MetricNameTemplate @Override public String toString() { return "CaffeineCacheMetrics{" + - "metrics=" + metrics + - ", cacheSizeSensor=" + cacheSizeSensor + + "cacheSizeSensor=" + cacheSizeSensor + ", cacheHitCountSensor=" + cacheHitCountSensor + ", cacheHitRateSensor=" + cacheHitRateSensor + ", cacheMissCountSensor=" + cacheMissCountSensor + @@ -72,9 +72,4 @@ public String toString() { ", cacheEvictionsSensor=" + cacheEvictionsSensor + '}'; } - - @Override - public void close() throws IOException { - metrics.close(); - } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java index 6d57d988c3..42f6027002 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.MetricNameTemplate; public class CaffeineCacheMetricsRegistry { - public static final String METRIC_CONTEXT = "io.aiven.inkless.cache.caffeine"; public static final String METRIC_GROUP = "wal-segment-cache"; public static final String CACHE_SIZE = "size"; diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/IncreasedLogStartOffsetException.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/IncreasedLogStartOffsetException.java index 23eaa6cd77..feb05060f4 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/IncreasedLogStartOffsetException.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/IncreasedLogStartOffsetException.java @@ -1,3 +1,20 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ package io.aiven.inkless.cache; public class IncreasedLogStartOffsetException extends StaleLogFragmentException { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/NullBatchCoordinateCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/NullBatchCoordinateCache.java index 041be0c2cf..d650c0f3aa 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/NullBatchCoordinateCache.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/NullBatchCoordinateCache.java @@ -1,3 +1,20 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ package io.aiven.inkless.cache; import org.apache.kafka.common.TopicIdPartition; diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java index 272e32e3af..4e06867780 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java @@ -1,3 +1,20 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ package io.aiven.inkless.cache; /* When this Exception is raised by a LogFragment, it means that the data contained in the LogFragment diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index aa817b2bd4..facc9df792 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -58,7 +58,8 @@ public final class SharedState implements Closeable { private final BatchCoordinateCache batchCoordinateCache; private final BrokerTopicStats brokerTopicStats; private final Supplier defaultTopicConfigs; - private final Metrics storageMetrics; + // Accessible for testing + final Metrics storageMetrics; public SharedState( final Time time, @@ -71,7 +72,8 @@ public SharedState( final ObjectCache cache, final BatchCoordinateCache batchCoordinateCache, final BrokerTopicStats brokerTopicStats, - final Supplier defaultTopicConfigs + final Supplier defaultTopicConfigs, + final Metrics storageMetrics ) { this.time = time; this.brokerId = brokerId; @@ -84,12 +86,7 @@ public SharedState( this.batchCoordinateCache = batchCoordinateCache; this.brokerTopicStats = brokerTopicStats; this.defaultTopicConfigs = defaultTopicConfigs; - - final MetricsReporter reporter = new JmxReporter(); - this.storageMetrics = new Metrics( - new MetricConfig(), List.of(reporter), Time.SYSTEM, - new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) - ); + this.storageMetrics = storageMetrics; } public static SharedState initialize( @@ -107,6 +104,11 @@ public static SharedState initialize( "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" ); } + final MetricsReporter reporter = new JmxReporter(); + final Metrics storageMetrics = new Metrics( + new MetricConfig(), List.of(reporter), Time.SYSTEM, + new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) + ); return new SharedState( time, brokerId, @@ -118,11 +120,13 @@ public static SharedState initialize( new CaffeineCache( config.cacheMaxCount(), config.cacheExpirationLifespanSec(), - config.cacheExpirationMaxIdleSec() + config.cacheExpirationMaxIdleSec(), + storageMetrics ), config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(), brokerTopicStats, - defaultTopicConfigs + defaultTopicConfigs, + storageMetrics ); } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/cache/CaffeineCacheMetricsTest.java b/storage/inkless/src/test/java/io/aiven/inkless/cache/CaffeineCacheMetricsTest.java new file mode 100644 index 0000000000..a5692a39f8 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/cache/CaffeineCacheMetricsTest.java @@ -0,0 +1,140 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.cache; + +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE; +import static org.awaitility.Awaitility.await; + +class CaffeineCacheMetricsTest { + + @Test + void testCacheHitMissMetric() { + // Given + final Metrics metrics = new Metrics(); + final Cache cache = Caffeine.newBuilder() + .recordStats() + .build(); + new CaffeineCacheMetrics(metrics, cache); + cache.put("key1", "value1"); + + // When there is a cache hit + var value = cache.getIfPresent("key1"); // hit + assertThat(value).isEqualTo("value1"); + + // Then + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT).metricValue()) + .asInstanceOf(DOUBLE) + .isOne(); + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_HIT_RATE).metricValue()) + .asInstanceOf(DOUBLE) + .isOne(); + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_MISS_RATE).metricValue()) + .asInstanceOf(DOUBLE) + .isZero(); + + // When there is a cache miss + value = cache.getIfPresent("key2"); // miss + assertThat(value).isNull(); + + // Then + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_MISS_COUNT).metricValue()) + .asInstanceOf(DOUBLE) + .isOne(); + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_HIT_RATE).metricValue()) + .asInstanceOf(DOUBLE) + .isEqualTo(0.5); // 1 hit, 1 miss + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_MISS_RATE).metricValue()) + .asInstanceOf(DOUBLE) + .isEqualTo(0.5); // 1 hit, 1 miss + + // overall load is zero + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_AVG_LOAD_PENALTY_NANOSECONDS).metricValue()) + .asInstanceOf(DOUBLE) + .isZero(); + } + + @Test + void testCacheSizeAndEviction() { + // Given + final Metrics metrics = new Metrics(); + final Cache cache = Caffeine.newBuilder() + .maximumSize(1) + .recordStats() + .build(); + new CaffeineCacheMetrics(metrics, cache); + + // Initially, cache size should be zero + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_SIZE).metricValue()) + .asInstanceOf(DOUBLE) + .isZero(); + + // When adding entries to the cache + cache.put("key1", "value1"); + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_SIZE).metricValue()) + .asInstanceOf(DOUBLE) + .isOne(); + + // Adding another entry should evict the first one due to max size of 1 + cache.put("key2", "value2"); // This should evict key1 + await().atMost(Duration.ofSeconds(10)).until(() -> + // size should eventually come back to 1 + (double) getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_SIZE).metricValue() == 1.0 + ); + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_EVICTION_COUNT).metricValue()) + .asInstanceOf(DOUBLE) + .isOne(); + } + + @Test + void testCacheLoadTime() { + final Metrics metrics = new Metrics(); + final Cache cache = Caffeine.newBuilder() + .recordStats() + .build(); + new CaffeineCacheMetrics(metrics, cache); + + // Simulate cache loads with some computation time + cache.get("key1", key -> { + try { + Thread.sleep(10); // Simulate load time + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return "value1"; + }); + + assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_AVG_LOAD_PENALTY_NANOSECONDS).metricValue()) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + } + + private static KafkaMetric getMetric(Metrics metrics, String metricName) { + return metrics.metric(metrics.metricName(metricName, CaffeineCacheMetricsRegistry.METRIC_GROUP)); + } +} \ No newline at end of file diff --git a/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java new file mode 100644 index 0000000000..835d672500 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java @@ -0,0 +1,108 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.common; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import io.aiven.inkless.config.InklessConfig; +import io.aiven.inkless.control_plane.InMemoryControlPlane; +import io.aiven.inkless.control_plane.MetadataView; + +import static org.assertj.core.api.Assertions.assertThat; + +class SharedStateTest { + + @Test void testStorageMetricName() { + // create a shared state instance + final MockTime time = new MockTime(); + final SharedState sharedState = SharedState.initialize( + time, + 1, + new InklessConfig(Map.of()), + new NoopMetadataView(), + new InMemoryControlPlane(time), + new BrokerTopicStats(), + () -> null + ); + // Ensure that the JMX reporter is registered + assertThat(sharedState.storageMetrics.reporters()).hasSize(1); + final MetricsReporter metricsReporter = sharedState.storageMetrics.reporters().get(0); + assertThat(metricsReporter).isInstanceOf(JmxReporter.class); + final JmxReporter jmxReporter = (JmxReporter) metricsReporter; + + // Ensure the proper metric name is built and registered + final MetricName metricName = new MetricName("test-metric", "test-group", "desc", Map.of()); + sharedState.storageMetrics.addMetric(metricName, (config, now) -> 42.0); + final KafkaMetric metric = sharedState.storageMetrics.metric(metricName); + jmxReporter.init(List.of(metric)); + jmxReporter.containsMbean("io.aiven.inkless.storage:type=test-group,name=test-metric"); + } + + private static class NoopMetadataView implements MetadataView { + @Override + public Map getDefaultConfig() { + return Map.of(); + } + + @Override + public Iterable getAliveBrokerNodes(ListenerName listenerName) { + return null; + } + + @Override + public Integer getBrokerCount() { + return 0; + } + + @Override + public Uuid getTopicId(String topicName) { + return null; + } + + @Override + public boolean isDisklessTopic(String topicName) { + return false; + } + + @Override + public Properties getTopicConfig(String topicName) { + return null; + } + + @Override + public Set getDisklessTopicPartitions() { + return Set.of(); + } + } +} \ No newline at end of file diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java index 43e197dbd2..722eb4e2b9 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteRecordsResponseData; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -80,6 +81,8 @@ class DeleteRecordsInterceptorTest { static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); Time time = new MockTime(); + Metrics storageMetrics = new Metrics(); + @Mock InklessConfig disklessConfig; @Mock @@ -100,20 +103,7 @@ class DeleteRecordsInterceptorTest { public void mixingDisklessAndClassicTopicsIsNotAllowed() { when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final SharedState state = new SharedState( - time, - BROKER_ID, - disklessConfig, - metadataView, - controlPlane, - OBJECT_KEY_CREATOR, - KEY_ALIGNMENT_STRATEGY, - OBJECT_CACHE, - BATCH_COORDINATE_CACHE, - brokerTopicStats, - DEFAULT_TOPIC_CONFIGS - ); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState()); final Map entriesPerPartition = Map.of( new TopicPartition("diskless", 0), @@ -144,9 +134,7 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() { @Test public void notInterceptDeletingRecordsFromClassicTopics() { when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS)); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState()); final Map entriesPerPartition = Map.of( new TopicPartition("non_diskless", 0), 4567L @@ -182,9 +170,9 @@ public void interceptDeletingRecordsFromDisklessTopics() { }); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition tp0 = new TopicPartition("diskless", 0); final TopicPartition tp1 = new TopicPartition("diskless", 1); @@ -225,9 +213,9 @@ public void controlPlaneException() { when(controlPlane.deleteRecords(anyList())).thenThrow(new RuntimeException("test")); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition topicPartition = new TopicPartition("diskless", 1); final Map entriesPerPartition = Map.of( @@ -266,9 +254,9 @@ public void topicIdNotFound() { }); final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + buildSharedState(), + new SynchronousExecutor() + ); final TopicPartition topicPartition1 = new TopicPartition("diskless1", 1); final TopicPartition topicPartition2 = new TopicPartition("diskless2", 2); @@ -295,4 +283,21 @@ public void topicIdNotFound() { .setLowWatermark(INVALID_LOW_WATERMARK) )); } + + private SharedState buildSharedState() { + return new SharedState( + time, + BROKER_ID, + disklessConfig, + metadataView, + controlPlane, + OBJECT_KEY_CREATOR, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + brokerTopicStats, + DEFAULT_TOPIC_CONFIGS, + storageMetrics + ); + } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java index 6fda9ad1e8..65e29c0521 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.SimpleRecord; @@ -80,6 +81,7 @@ public class AppendHandlerTest { Time time = new MockTime(); RequestLocal requestLocal = RequestLocal.noCaching(); + Metrics storageMetrics = new Metrics(); @Mock InklessConfig inklessConfig; @Mock @@ -113,9 +115,7 @@ public class AppendHandlerTest { @Test public void rejectTransactionalProduce() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless1"); final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless2"); @@ -137,9 +137,7 @@ topicIdPartition2, new PartitionResponse(Errors.INVALID_REQUEST) @Test public void emptyRequests() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final Map entriesPerPartition = Map.of(); @@ -165,9 +163,7 @@ topicIdPartition, new PartitionResponse(Errors.NONE) ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { final var result = interceptor.handle(entriesPerPartition, requestLocal).get(); assertThat(result).isEqualTo(writeResult); @@ -188,21 +184,34 @@ public void writeFutureFailed() throws Exception { ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) { assertThatThrownBy(() -> interceptor.handle(entriesPerPartition, requestLocal).get()).hasCause(exception); } } @Test public void close() throws IOException { - final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer); + final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer); interceptor.close(); verify(writer).close(); } + + private SharedState buildSharedState() { + return new SharedState( + time, + BROKER_ID, + inklessConfig, + metadataView, + controlPlane, + OBJECT_KEY_CREATOR, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + brokerTopicStats, + DEFAULT_TOPIC_CONFIGS, + storageMetrics + ); + } }