Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package io.aiven.inkless.cache;

import org.apache.kafka.common.metrics.Metrics;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
import com.github.benmanes.caffeine.cache.stats.CacheStats;

import java.io.IOException;
import java.time.Duration;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

Expand All @@ -21,32 +25,39 @@ public final class CaffeineCache implements ObjectCache {
*/
private final AsyncCache<CacheKey, FileExtent> cache;

private final CaffeineCacheMetrics metrics;

public CaffeineCache(
final long maxCacheSize,
final OptionalLong maxCacheBytes,
final long lifespanSeconds,
final int maxIdleSeconds) {
cache = Caffeine.newBuilder()
.maximumSize(maxCacheSize)
.expireAfterWrite(Duration.ofSeconds(lifespanSeconds))
.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds: 180))
.recordStats()
.buildAsync();
metrics = new CaffeineCacheMetrics(cache.synchronous());
final int maxIdleSeconds,
final Metrics storageMetrics
) {
final Caffeine<Object, Object> builder = Caffeine.newBuilder();
// size and weight limits
builder.maximumSize(maxCacheSize);
maxCacheBytes.ifPresent(max -> builder.maximumWeight(max).weigher(weigher()));
// expiration policies
builder.expireAfterWrite(Duration.ofSeconds(lifespanSeconds));
builder.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds : 180));
// enable metrics
builder.recordStats();
cache = builder.buildAsync();
new CaffeineCacheMetrics(storageMetrics, cache.synchronous());
}

public Weigher<CacheKey, FileExtent> weigher() {
return (key, value) -> value.data().length;
}

@Override
public void close() throws IOException {
metrics.close();
cache.synchronous().cleanUp();
}

@Override
public FileExtent computeIfAbsent(final CacheKey key, final Function<CacheKey, FileExtent> mappingFunction) {
final CompletableFuture<FileExtent> future = new CompletableFuture<>();
final CompletableFuture<FileExtent> existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> {
return future;
});
final CompletableFuture<FileExtent> existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> future);
// If existing future is not the same object as created in this function
// there was a pending cache load and this call is required to join the existing future
// and discard the created one.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,17 @@
package io.aiven.inkless.cache;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;

import com.github.benmanes.caffeine.cache.Cache;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

import io.aiven.inkless.common.metrics.MeasurableValue;
import io.aiven.inkless.common.metrics.SensorProvider;

public final class CaffeineCacheMetrics implements Closeable {

private final Metrics metrics;

public final class CaffeineCacheMetrics {
private final Sensor cacheSizeSensor;
private final Sensor cacheHitCountSensor;
private final Sensor cacheHitRateSensor;
Expand All @@ -30,13 +20,7 @@ public final class CaffeineCacheMetrics implements Closeable {
private final Sensor cacheAvgLoadPenaltySensor;
private final Sensor cacheEvictionsSensor;

public CaffeineCacheMetrics(final Cache<?, ?> cache) {
final JmxReporter reporter = new JmxReporter();
this.metrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(CaffeineCacheMetricsRegistry.METRIC_CONTEXT)
);

public CaffeineCacheMetrics(final Metrics metrics, final Cache<?, ?> cache) {
final CaffeineCacheMetricsRegistry metricsRegistry = new CaffeineCacheMetricsRegistry();
cacheSizeSensor = registerLongSensor(metrics, metricsRegistry.cacheSizeMetricName, CaffeineCacheMetricsRegistry.CACHE_SIZE, cache::estimatedSize);
cacheHitCountSensor = registerLongSensor(metrics, metricsRegistry.cacheHitCountMetricName, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT, () -> cache.stats().hitCount());
Expand All @@ -62,8 +46,7 @@ static Sensor registerLongSensor(final Metrics metrics, final MetricNameTemplate
@Override
public String toString() {
return "CaffeineCacheMetrics{" +
"metrics=" + metrics +
", cacheSizeSensor=" + cacheSizeSensor +
"cacheSizeSensor=" + cacheSizeSensor +
", cacheHitCountSensor=" + cacheHitCountSensor +
", cacheHitRateSensor=" + cacheHitRateSensor +
", cacheMissCountSensor=" + cacheMissCountSensor +
Expand All @@ -72,9 +55,4 @@ public String toString() {
", cacheEvictionsSensor=" + cacheEvictionsSensor +
'}';
}

@Override
public void close() throws IOException {
metrics.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.MetricNameTemplate;

public class CaffeineCacheMetricsRegistry {
public static final String METRIC_CONTEXT = "io.aiven.inkless.cache.caffeine";
public static final String METRIC_GROUP = "wal-segment-cache";

public static final String CACHE_SIZE = "size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public SharedState(
final ObjectCache cache,
final BatchCoordinateCache batchCoordinateCache,
final BrokerTopicStats brokerTopicStats,
final Supplier<LogConfig> defaultTopicConfigs
final Supplier<LogConfig> defaultTopicConfigs,
final Metrics storageMetrics
) {
this.time = time;
this.brokerId = brokerId;
Expand All @@ -84,12 +85,7 @@ public SharedState(
this.batchCoordinateCache = batchCoordinateCache;
this.brokerTopicStats = brokerTopicStats;
this.defaultTopicConfigs = defaultTopicConfigs;

final MetricsReporter reporter = new JmxReporter();
this.storageMetrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
);
this.storageMetrics = storageMetrics;
}

public static SharedState initialize(
Expand All @@ -107,6 +103,11 @@ public static SharedState initialize(
"Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2"
);
}
final MetricsReporter reporter = new JmxReporter();
final Metrics storageMetrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
);
return new SharedState(
time,
brokerId,
Expand All @@ -117,12 +118,15 @@ public static SharedState initialize(
new FixedBlockAlignment(config.fetchCacheBlockBytes()),
new CaffeineCache(
config.cacheMaxCount(),
config.cacheMaxBytes(),
config.cacheExpirationLifespanSec(),
config.cacheExpirationMaxIdleSec()
config.cacheExpirationMaxIdleSec(),
storageMetrics
),
config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(),
brokerTopicStats,
defaultTopicConfigs
defaultTopicConfigs,
storageMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import io.aiven.inkless.common.config.validators.Subclass;
import io.aiven.inkless.control_plane.ControlPlane;
Expand Down Expand Up @@ -79,11 +81,12 @@ public class InklessConfig extends AbstractConfig {
private static final int CONSUME_CACHE_BLOCK_BYTES_DEFAULT = 16 * 1024 * 1024; // 16 MiB

public static final String CONSUME_CACHE_MAX_COUNT_CONFIG = CONSUME_PREFIX + "cache.max.count";
private static final String CONSUME_CACHE_MAX_COUNT_DOC = "The maximum number of objects to cache in memory. " +
"If the cache exceeds this limit, and the cache persistence is enabled, " +
"the least recently used objects will be persisted to disk and removed from memory.";
private static final String CONSUME_CACHE_MAX_COUNT_DOC = "The maximum number of objects to cache in memory.";
private static final int CONSUME_CACHE_MAX_COUNT_DEFAULT = 1000;

public static final String CONSUME_CACHE_MAX_BYTES_CONFIG = CONSUME_PREFIX + "cache.max.bytes";
private static final String CONSUME_CACHE_MAX_BYTES_DOC = "The maximum number of bytes to cache in memory.";

public static final String CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG = CONSUME_PREFIX + "cache.expiration.lifespan.sec";
private static final String CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DOC = "The lifespan in seconds of a cache entry before it will be removed from all storages.";
private static final int CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DEFAULT = 60; // Defaults to 1 minute
Expand Down Expand Up @@ -282,6 +285,13 @@ public static ConfigDef configDef() {
ConfigDef.Importance.LOW,
CONSUME_CACHE_MAX_COUNT_DOC
);
configDef.define(
CONSUME_CACHE_MAX_BYTES_CONFIG,
ConfigDef.Type.LONG,
null,
ConfigDef.Importance.LOW,
CONSUME_CACHE_MAX_BYTES_DOC
);
configDef.define(
CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -474,4 +484,10 @@ public boolean isBatchCoordinateCacheEnabled() {
public Duration batchCoordinateCacheTtl() {
return Duration.ofMillis(getInt(CONSUME_BATCH_COORDINATE_CACHE_TTL_MS_CONFIG));
}

public OptionalLong cacheMaxBytes() {
return Optional.ofNullable(getLong(CONSUME_CACHE_MAX_BYTES_CONFIG))
.map(OptionalLong::of)
.orElse(OptionalLong.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ void publicConstructor() {
configs.put("inkless.file.cleaner.retention.period.ms", "200");
configs.put("inkless.file.merger.interval.ms", "100");
configs.put("inkless.consume.cache.max.count", "100");
configs.put("inkless.consume.cache.max.bytes", "1048576");
configs.put("inkless.consume.cache.expiration.lifespan.sec", "200");
configs.put("inkless.consume.cache.expiration.max.idle.sec", "100");
configs.put("inkless.produce.upload.thread.pool.size", "16");
Expand All @@ -73,6 +74,7 @@ void publicConstructor() {
assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMillis(200));
assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMillis(100));
assertThat(config.cacheMaxCount()).isEqualTo(100);
assertThat(config.cacheMaxBytes()).hasValue(1048576L);
assertThat(config.cacheExpirationLifespanSec()).isEqualTo(200);
assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(100);
assertThat(config.produceUploadThreadPoolSize()).isEqualTo(16);
Expand Down Expand Up @@ -102,6 +104,7 @@ void minimalConfig() {
assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMinutes(1));
assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMinutes(1));
assertThat(config.cacheMaxCount()).isEqualTo(1000);
assertThat(config.cacheMaxBytes()).isEmpty();
assertThat(config.cacheExpirationLifespanSec()).isEqualTo(60);
assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(-1);
assertThat(config.produceUploadThreadPoolSize()).isEqualTo(8);
Expand All @@ -125,6 +128,7 @@ void fullConfig() {
configs.put("file.cleaner.retention.period.ms", "200");
configs.put("file.merger.interval.ms", "100");
configs.put("consume.cache.max.count", "100");
configs.put("consume.cache.max.bytes", "1048576");
configs.put("consume.cache.expiration.lifespan.sec", "200");
configs.put("consume.cache.expiration.max.idle.sec", "100");
configs.put("produce.upload.thread.pool.size", "16");
Expand All @@ -146,6 +150,7 @@ void fullConfig() {
assertThat(config.fileCleanerRetentionPeriod()).isEqualTo(Duration.ofMillis(200));
assertThat(config.fileMergerInterval()).isEqualTo(Duration.ofMillis(100));
assertThat(config.cacheMaxCount()).isEqualTo(100);
assertThat(config.cacheMaxBytes()).hasValue(1048576L);
assertThat(config.cacheExpirationLifespanSec()).isEqualTo(200);
assertThat(config.cacheExpirationMaxIdleSec()).isEqualTo(100);
assertThat(config.produceUploadThreadPoolSize()).isEqualTo(16);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteRecordsResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -80,6 +81,8 @@ class DeleteRecordsInterceptorTest {
static final Supplier<LogConfig> DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of());

Time time = new MockTime();
Metrics storageMetrics = new Metrics();

@Mock
InklessConfig disklessConfig;
@Mock
Expand All @@ -100,20 +103,7 @@ class DeleteRecordsInterceptorTest {
public void mixingDisklessAndClassicTopicsIsNotAllowed() {
when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true);
when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false);
final SharedState state = new SharedState(
time,
BROKER_ID,
disklessConfig,
metadataView,
controlPlane,
OBJECT_KEY_CREATOR,
KEY_ALIGNMENT_STRATEGY,
OBJECT_CACHE,
BATCH_COORDINATE_CACHE,
brokerTopicStats,
DEFAULT_TOPIC_CONFIGS
);
final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state);
final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState());

final Map<TopicPartition, Long> entriesPerPartition = Map.of(
new TopicPartition("diskless", 0),
Expand Down Expand Up @@ -144,9 +134,7 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() {
@Test
public void notInterceptDeletingRecordsFromClassicTopics() {
when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false);
final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(
new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane,
OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS));
final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(buildSharedState());

final Map<TopicPartition, Long> entriesPerPartition = Map.of(
new TopicPartition("non_diskless", 0), 4567L
Expand Down Expand Up @@ -182,9 +170,9 @@ public void interceptDeletingRecordsFromDisklessTopics() {
});

final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(
new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane,
OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS),
new SynchronousExecutor());
buildSharedState(),
new SynchronousExecutor()
);

final TopicPartition tp0 = new TopicPartition("diskless", 0);
final TopicPartition tp1 = new TopicPartition("diskless", 1);
Expand Down Expand Up @@ -225,9 +213,9 @@ public void controlPlaneException() {
when(controlPlane.deleteRecords(anyList())).thenThrow(new RuntimeException("test"));

final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(
new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane,
OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS),
new SynchronousExecutor());
buildSharedState(),
new SynchronousExecutor()
);

final TopicPartition topicPartition = new TopicPartition("diskless", 1);
final Map<TopicPartition, Long> entriesPerPartition = Map.of(
Expand Down Expand Up @@ -266,9 +254,9 @@ public void topicIdNotFound() {
});

final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(
new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane,
OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS),
new SynchronousExecutor());
buildSharedState(),
new SynchronousExecutor()
);

final TopicPartition topicPartition1 = new TopicPartition("diskless1", 1);
final TopicPartition topicPartition2 = new TopicPartition("diskless2", 2);
Expand All @@ -295,4 +283,21 @@ public void topicIdNotFound() {
.setLowWatermark(INVALID_LOW_WATERMARK)
));
}

private SharedState buildSharedState() {
return new SharedState(
time,
BROKER_ID,
disklessConfig,
metadataView,
controlPlane,
OBJECT_KEY_CREATOR,
KEY_ALIGNMENT_STRATEGY,
OBJECT_CACHE,
BATCH_COORDINATE_CACHE,
brokerTopicStats,
DEFAULT_TOPIC_CONFIGS,
storageMetrics
);
}
}
Loading