diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCacheMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCacheMetrics.java
index 232b031883..ce0b9cfe58 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCacheMetrics.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCacheMetrics.java
@@ -1,3 +1,20 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
package io.aiven.inkless.cache;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java
index 78ed51100d..d4ff236828 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java
@@ -1,5 +1,24 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
package io.aiven.inkless.cache;
+import org.apache.kafka.common.metrics.Metrics;
+
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
@@ -21,32 +40,33 @@ public final class CaffeineCache implements ObjectCache {
*/
private final AsyncCache cache;
- private final CaffeineCacheMetrics metrics;
-
public CaffeineCache(
final long maxCacheSize,
final long lifespanSeconds,
- final int maxIdleSeconds) {
- cache = Caffeine.newBuilder()
- .maximumSize(maxCacheSize)
- .expireAfterWrite(Duration.ofSeconds(lifespanSeconds))
- .expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds: 180))
- .recordStats()
- .buildAsync();
- metrics = new CaffeineCacheMetrics(cache.synchronous());
+ final int maxIdleSeconds,
+ final Metrics storageMetrics
+ ) {
+ final Caffeine builder = Caffeine.newBuilder();
+ // size and weight limits
+ builder.maximumSize(maxCacheSize);
+ // expiration policies
+ builder.expireAfterWrite(Duration.ofSeconds(lifespanSeconds));
+ builder.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds : 180));
+ // enable metrics
+ builder.recordStats();
+ cache = builder.buildAsync();
+ new CaffeineCacheMetrics(storageMetrics, cache.synchronous());
}
@Override
public void close() throws IOException {
- metrics.close();
+ // no resources to close
}
@Override
public FileExtent computeIfAbsent(final CacheKey key, final Function mappingFunction) {
final CompletableFuture future = new CompletableFuture<>();
- final CompletableFuture existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> {
- return future;
- });
+ final CompletableFuture existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> future);
// If existing future is not the same object as created in this function
// there was a pending cache load and this call is required to join the existing future
// and discard the created one.
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java
index d97c830116..0390de2613 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java
@@ -1,27 +1,34 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
package io.aiven.inkless.cache;
import org.apache.kafka.common.MetricNameTemplate;
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.KafkaMetricsContext;
-import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Time;
import com.github.benmanes.caffeine.cache.Cache;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
import java.util.function.Supplier;
import io.aiven.inkless.common.metrics.MeasurableValue;
import io.aiven.inkless.common.metrics.SensorProvider;
-public final class CaffeineCacheMetrics implements Closeable {
-
- private final Metrics metrics;
-
+public final class CaffeineCacheMetrics {
private final Sensor cacheSizeSensor;
private final Sensor cacheHitCountSensor;
private final Sensor cacheHitRateSensor;
@@ -30,13 +37,7 @@ public final class CaffeineCacheMetrics implements Closeable {
private final Sensor cacheAvgLoadPenaltySensor;
private final Sensor cacheEvictionsSensor;
- public CaffeineCacheMetrics(final Cache, ?> cache) {
- final JmxReporter reporter = new JmxReporter();
- this.metrics = new Metrics(
- new MetricConfig(), List.of(reporter), Time.SYSTEM,
- new KafkaMetricsContext(CaffeineCacheMetricsRegistry.METRIC_CONTEXT)
- );
-
+ public CaffeineCacheMetrics(final Metrics metrics, final Cache, ?> cache) {
final CaffeineCacheMetricsRegistry metricsRegistry = new CaffeineCacheMetricsRegistry();
cacheSizeSensor = registerLongSensor(metrics, metricsRegistry.cacheSizeMetricName, CaffeineCacheMetricsRegistry.CACHE_SIZE, cache::estimatedSize);
cacheHitCountSensor = registerLongSensor(metrics, metricsRegistry.cacheHitCountMetricName, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT, () -> cache.stats().hitCount());
@@ -62,8 +63,7 @@ static Sensor registerLongSensor(final Metrics metrics, final MetricNameTemplate
@Override
public String toString() {
return "CaffeineCacheMetrics{" +
- "metrics=" + metrics +
- ", cacheSizeSensor=" + cacheSizeSensor +
+ "cacheSizeSensor=" + cacheSizeSensor +
", cacheHitCountSensor=" + cacheHitCountSensor +
", cacheHitRateSensor=" + cacheHitRateSensor +
", cacheMissCountSensor=" + cacheMissCountSensor +
@@ -72,9 +72,4 @@ public String toString() {
", cacheEvictionsSensor=" + cacheEvictionsSensor +
'}';
}
-
- @Override
- public void close() throws IOException {
- metrics.close();
- }
}
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java
index 6d57d988c3..42f6027002 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetricsRegistry.java
@@ -20,7 +20,6 @@
import org.apache.kafka.common.MetricNameTemplate;
public class CaffeineCacheMetricsRegistry {
- public static final String METRIC_CONTEXT = "io.aiven.inkless.cache.caffeine";
public static final String METRIC_GROUP = "wal-segment-cache";
public static final String CACHE_SIZE = "size";
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/IncreasedLogStartOffsetException.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/IncreasedLogStartOffsetException.java
index 23eaa6cd77..feb05060f4 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/cache/IncreasedLogStartOffsetException.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/IncreasedLogStartOffsetException.java
@@ -1,3 +1,20 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
package io.aiven.inkless.cache;
public class IncreasedLogStartOffsetException extends StaleLogFragmentException {
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/NullBatchCoordinateCache.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/NullBatchCoordinateCache.java
index 041be0c2cf..d650c0f3aa 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/cache/NullBatchCoordinateCache.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/NullBatchCoordinateCache.java
@@ -1,3 +1,20 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
package io.aiven.inkless.cache;
import org.apache.kafka.common.TopicIdPartition;
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java b/storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java
index 272e32e3af..4e06867780 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java
@@ -1,3 +1,20 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
package io.aiven.inkless.cache;
/* When this Exception is raised by a LogFragment, it means that the data contained in the LogFragment
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
index aa817b2bd4..facc9df792 100644
--- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
+++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
@@ -58,7 +58,8 @@ public final class SharedState implements Closeable {
private final BatchCoordinateCache batchCoordinateCache;
private final BrokerTopicStats brokerTopicStats;
private final Supplier defaultTopicConfigs;
- private final Metrics storageMetrics;
+ // Accessible for testing
+ final Metrics storageMetrics;
public SharedState(
final Time time,
@@ -71,7 +72,8 @@ public SharedState(
final ObjectCache cache,
final BatchCoordinateCache batchCoordinateCache,
final BrokerTopicStats brokerTopicStats,
- final Supplier defaultTopicConfigs
+ final Supplier defaultTopicConfigs,
+ final Metrics storageMetrics
) {
this.time = time;
this.brokerId = brokerId;
@@ -84,12 +86,7 @@ public SharedState(
this.batchCoordinateCache = batchCoordinateCache;
this.brokerTopicStats = brokerTopicStats;
this.defaultTopicConfigs = defaultTopicConfigs;
-
- final MetricsReporter reporter = new JmxReporter();
- this.storageMetrics = new Metrics(
- new MetricConfig(), List.of(reporter), Time.SYSTEM,
- new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
- );
+ this.storageMetrics = storageMetrics;
}
public static SharedState initialize(
@@ -107,6 +104,11 @@ public static SharedState initialize(
"Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2"
);
}
+ final MetricsReporter reporter = new JmxReporter();
+ final Metrics storageMetrics = new Metrics(
+ new MetricConfig(), List.of(reporter), Time.SYSTEM,
+ new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
+ );
return new SharedState(
time,
brokerId,
@@ -118,11 +120,13 @@ public static SharedState initialize(
new CaffeineCache(
config.cacheMaxCount(),
config.cacheExpirationLifespanSec(),
- config.cacheExpirationMaxIdleSec()
+ config.cacheExpirationMaxIdleSec(),
+ storageMetrics
),
config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(),
brokerTopicStats,
- defaultTopicConfigs
+ defaultTopicConfigs,
+ storageMetrics
);
}
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/cache/CaffeineCacheMetricsTest.java b/storage/inkless/src/test/java/io/aiven/inkless/cache/CaffeineCacheMetricsTest.java
new file mode 100644
index 0000000000..a5692a39f8
--- /dev/null
+++ b/storage/inkless/src/test/java/io/aiven/inkless/cache/CaffeineCacheMetricsTest.java
@@ -0,0 +1,140 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package io.aiven.inkless.cache;
+
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE;
+import static org.awaitility.Awaitility.await;
+
+class CaffeineCacheMetricsTest {
+
+ @Test
+ void testCacheHitMissMetric() {
+ // Given
+ final Metrics metrics = new Metrics();
+ final Cache cache = Caffeine.newBuilder()
+ .recordStats()
+ .build();
+ new CaffeineCacheMetrics(metrics, cache);
+ cache.put("key1", "value1");
+
+ // When there is a cache hit
+ var value = cache.getIfPresent("key1"); // hit
+ assertThat(value).isEqualTo("value1");
+
+ // Then
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isOne();
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_HIT_RATE).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isOne();
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_MISS_RATE).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isZero();
+
+ // When there is a cache miss
+ value = cache.getIfPresent("key2"); // miss
+ assertThat(value).isNull();
+
+ // Then
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_MISS_COUNT).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isOne();
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_HIT_RATE).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isEqualTo(0.5); // 1 hit, 1 miss
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_MISS_RATE).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isEqualTo(0.5); // 1 hit, 1 miss
+
+ // overall load is zero
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_AVG_LOAD_PENALTY_NANOSECONDS).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isZero();
+ }
+
+ @Test
+ void testCacheSizeAndEviction() {
+ // Given
+ final Metrics metrics = new Metrics();
+ final Cache cache = Caffeine.newBuilder()
+ .maximumSize(1)
+ .recordStats()
+ .build();
+ new CaffeineCacheMetrics(metrics, cache);
+
+ // Initially, cache size should be zero
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_SIZE).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isZero();
+
+ // When adding entries to the cache
+ cache.put("key1", "value1");
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_SIZE).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isOne();
+
+ // Adding another entry should evict the first one due to max size of 1
+ cache.put("key2", "value2"); // This should evict key1
+ await().atMost(Duration.ofSeconds(10)).until(() ->
+ // size should eventually come back to 1
+ (double) getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_SIZE).metricValue() == 1.0
+ );
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_EVICTION_COUNT).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isOne();
+ }
+
+ @Test
+ void testCacheLoadTime() {
+ final Metrics metrics = new Metrics();
+ final Cache cache = Caffeine.newBuilder()
+ .recordStats()
+ .build();
+ new CaffeineCacheMetrics(metrics, cache);
+
+ // Simulate cache loads with some computation time
+ cache.get("key1", key -> {
+ try {
+ Thread.sleep(10); // Simulate load time
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return "value1";
+ });
+
+ assertThat(getMetric(metrics, CaffeineCacheMetricsRegistry.CACHE_AVG_LOAD_PENALTY_NANOSECONDS).metricValue())
+ .asInstanceOf(DOUBLE)
+ .isGreaterThan(0.0);
+ }
+
+ private static KafkaMetric getMetric(Metrics metrics, String metricName) {
+ return metrics.metric(metrics.metricName(metricName, CaffeineCacheMetricsRegistry.METRIC_GROUP));
+ }
+}
\ No newline at end of file
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java
new file mode 100644
index 0000000000..835d672500
--- /dev/null
+++ b/storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java
@@ -0,0 +1,108 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package io.aiven.inkless.common;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import io.aiven.inkless.config.InklessConfig;
+import io.aiven.inkless.control_plane.InMemoryControlPlane;
+import io.aiven.inkless.control_plane.MetadataView;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SharedStateTest {
+
+ @Test void testStorageMetricName() {
+ // create a shared state instance
+ final MockTime time = new MockTime();
+ final SharedState sharedState = SharedState.initialize(
+ time,
+ 1,
+ new InklessConfig(Map.of()),
+ new NoopMetadataView(),
+ new InMemoryControlPlane(time),
+ new BrokerTopicStats(),
+ () -> null
+ );
+ // Ensure that the JMX reporter is registered
+ assertThat(sharedState.storageMetrics.reporters()).hasSize(1);
+ final MetricsReporter metricsReporter = sharedState.storageMetrics.reporters().get(0);
+ assertThat(metricsReporter).isInstanceOf(JmxReporter.class);
+ final JmxReporter jmxReporter = (JmxReporter) metricsReporter;
+
+ // Ensure the proper metric name is built and registered
+ final MetricName metricName = new MetricName("test-metric", "test-group", "desc", Map.of());
+ sharedState.storageMetrics.addMetric(metricName, (config, now) -> 42.0);
+ final KafkaMetric metric = sharedState.storageMetrics.metric(metricName);
+ jmxReporter.init(List.of(metric));
+ jmxReporter.containsMbean("io.aiven.inkless.storage:type=test-group,name=test-metric");
+ }
+
+ private static class NoopMetadataView implements MetadataView {
+ @Override
+ public Map getDefaultConfig() {
+ return Map.of();
+ }
+
+ @Override
+ public Iterable getAliveBrokerNodes(ListenerName listenerName) {
+ return null;
+ }
+
+ @Override
+ public Integer getBrokerCount() {
+ return 0;
+ }
+
+ @Override
+ public Uuid getTopicId(String topicName) {
+ return null;
+ }
+
+ @Override
+ public boolean isDisklessTopic(String topicName) {
+ return false;
+ }
+
+ @Override
+ public Properties getTopicConfig(String topicName) {
+ return null;
+ }
+
+ @Override
+ public Set getDisklessTopicPartitions() {
+ return Set.of();
+ }
+ }
+}
\ No newline at end of file
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java
index 43e197dbd2..722eb4e2b9 100644
--- a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java
+++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -80,6 +81,8 @@ class DeleteRecordsInterceptorTest {
static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of());
Time time = new MockTime();
+ Metrics storageMetrics = new Metrics();
+
@Mock
InklessConfig disklessConfig;
@Mock
@@ -100,20 +103,7 @@ class DeleteRecordsInterceptorTest {
public void mixingDisklessAndClassicTopicsIsNotAllowed() {
when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true);
when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false);
- final SharedState state = new SharedState(
- time,
- BROKER_ID,
- disklessConfig,
- metadataView,
- controlPlane,
- OBJECT_KEY_CREATOR,
- KEY_ALIGNMENT_STRATEGY,
- OBJECT_CACHE,
- BATCH_COORDINATE_CACHE,
- brokerTopicStats,
- DEFAULT_TOPIC_CONFIGS
- );
- final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state);
+ final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState());
final Map entriesPerPartition = Map.of(
new TopicPartition("diskless", 0),
@@ -144,9 +134,7 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() {
@Test
public void notInterceptDeletingRecordsFromClassicTopics() {
when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false);
- final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(
- new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS));
+ final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState());
final Map entriesPerPartition = Map.of(
new TopicPartition("non_diskless", 0), 4567L
@@ -182,9 +170,9 @@ public void interceptDeletingRecordsFromDisklessTopics() {
});
final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(
- new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS),
- new SynchronousExecutor());
+ buildSharedState(),
+ new SynchronousExecutor()
+ );
final TopicPartition tp0 = new TopicPartition("diskless", 0);
final TopicPartition tp1 = new TopicPartition("diskless", 1);
@@ -225,9 +213,9 @@ public void controlPlaneException() {
when(controlPlane.deleteRecords(anyList())).thenThrow(new RuntimeException("test"));
final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(
- new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS),
- new SynchronousExecutor());
+ buildSharedState(),
+ new SynchronousExecutor()
+ );
final TopicPartition topicPartition = new TopicPartition("diskless", 1);
final Map entriesPerPartition = Map.of(
@@ -266,9 +254,9 @@ public void topicIdNotFound() {
});
final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(
- new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS),
- new SynchronousExecutor());
+ buildSharedState(),
+ new SynchronousExecutor()
+ );
final TopicPartition topicPartition1 = new TopicPartition("diskless1", 1);
final TopicPartition topicPartition2 = new TopicPartition("diskless2", 2);
@@ -295,4 +283,21 @@ public void topicIdNotFound() {
.setLowWatermark(INVALID_LOW_WATERMARK)
));
}
+
+ private SharedState buildSharedState() {
+ return new SharedState(
+ time,
+ BROKER_ID,
+ disklessConfig,
+ metadataView,
+ controlPlane,
+ OBJECT_KEY_CREATOR,
+ KEY_ALIGNMENT_STRATEGY,
+ OBJECT_CACHE,
+ BATCH_COORDINATE_CACHE,
+ brokerTopicStats,
+ DEFAULT_TOPIC_CONFIGS,
+ storageMetrics
+ );
+ }
}
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java
index 6fda9ad1e8..65e29c0521 100644
--- a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java
+++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
@@ -80,6 +81,7 @@ public class AppendHandlerTest {
Time time = new MockTime();
RequestLocal requestLocal = RequestLocal.noCaching();
+ Metrics storageMetrics = new Metrics();
@Mock
InklessConfig inklessConfig;
@Mock
@@ -113,9 +115,7 @@ public class AppendHandlerTest {
@Test
public void rejectTransactionalProduce() throws Exception {
- try (final AppendHandler interceptor = new AppendHandler(
- new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) {
+ try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) {
final TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless1");
final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless2");
@@ -137,9 +137,7 @@ topicIdPartition2, new PartitionResponse(Errors.INVALID_REQUEST)
@Test
public void emptyRequests() throws Exception {
- try (final AppendHandler interceptor = new AppendHandler(
- new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) {
+ try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) {
final Map entriesPerPartition = Map.of();
@@ -165,9 +163,7 @@ topicIdPartition, new PartitionResponse(Errors.NONE)
);
when(metadataView.getTopicConfig(any())).thenReturn(new Properties());
- try (final AppendHandler interceptor = new AppendHandler(
- new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) {
+ try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) {
final var result = interceptor.handle(entriesPerPartition, requestLocal).get();
assertThat(result).isEqualTo(writeResult);
@@ -188,21 +184,34 @@ public void writeFutureFailed() throws Exception {
);
when(metadataView.getTopicConfig(any())).thenReturn(new Properties());
- try (final AppendHandler interceptor = new AppendHandler(
- new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) {
+ try (final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer)) {
assertThatThrownBy(() -> interceptor.handle(entriesPerPartition, requestLocal).get()).hasCause(exception);
}
}
@Test
public void close() throws IOException {
- final AppendHandler interceptor = new AppendHandler(
- new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane,
- OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer);
+ final AppendHandler interceptor = new AppendHandler(buildSharedState(), writer);
interceptor.close();
verify(writer).close();
}
+
+ private SharedState buildSharedState() {
+ return new SharedState(
+ time,
+ BROKER_ID,
+ inklessConfig,
+ metadataView,
+ controlPlane,
+ OBJECT_KEY_CREATOR,
+ KEY_ALIGNMENT_STRATEGY,
+ OBJECT_CACHE,
+ BATCH_COORDINATE_CACHE,
+ brokerTopicStats,
+ DEFAULT_TOPIC_CONFIGS,
+ storageMetrics
+ );
+ }
}