Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ import java.net.InetAddress
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, TimeUnit}
import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, Executors, TimeUnit}
import java.util.function.BiConsumer
import java.util.stream.IntStream
import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties}
Expand Down Expand Up @@ -7148,6 +7148,7 @@ class ReplicaManagerTest {
}
disklessTopics.foreach(t => when(inklessMetadata.isDisklessTopic(t)).thenReturn(true))
when(sharedState.metadata()).thenReturn(inklessMetadata)
when(sharedState.produceCommitExecutor()).thenReturn(Executors.newFixedThreadPool(1))
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock returns a new executor that is never shut down, creating a resource leak in tests. This executor should either be tracked and cleaned up in test teardown, or the mock should return a mock ExecutorService instead of a real one.

Copilot uses AI. Check for mistakes.

val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

Expand Down
231 changes: 208 additions & 23 deletions storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import io.aiven.inkless.cache.BatchCoordinateCache;
Expand All @@ -39,12 +48,15 @@
import io.aiven.inkless.cache.KeyAlignmentStrategy;
import io.aiven.inkless.cache.NullBatchCoordinateCache;
import io.aiven.inkless.cache.ObjectCache;
import io.aiven.inkless.common.metrics.ThreadPoolMonitor;
import io.aiven.inkless.config.InklessConfig;
import io.aiven.inkless.control_plane.ControlPlane;
import io.aiven.inkless.control_plane.MetadataView;
import io.aiven.inkless.storage_backend.common.StorageBackend;

public final class SharedState implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(SharedState.class);

public static final String STORAGE_METRIC_CONTEXT = "io.aiven.inkless.storage";

private final Time time;
Expand All @@ -59,8 +71,17 @@ public final class SharedState implements Closeable {
private final BrokerTopicStats brokerTopicStats;
private final Supplier<LogConfig> defaultTopicConfigs;
private final Metrics storageMetrics;
private final ExecutorService fetchMetadataExecutor;
private final ExecutorService fetchDataExecutor;
private final ScheduledExecutorService produceCommitTickScheduler;
private final ExecutorService produceUploadExecutor;
private final ExecutorService produceCommitExecutor;

private final ThreadPoolMonitor fetchMetadataThreadPoolMonitor;
private final ThreadPoolMonitor fetchDataThreadPoolMonitor;
private final ThreadPoolMonitor produceUploaderThreadPoolMonitor;

public SharedState(
SharedState(
final Time time,
final int brokerId,
final InklessConfig config,
Expand All @@ -71,7 +92,15 @@ public SharedState(
final ObjectCache cache,
final BatchCoordinateCache batchCoordinateCache,
final BrokerTopicStats brokerTopicStats,
final Supplier<LogConfig> defaultTopicConfigs
final Supplier<LogConfig> defaultTopicConfigs,
final ExecutorService fetchMetadataExecutor,
final ExecutorService fetchDataExecutor,
final ScheduledExecutorService produceCommitTickScheduler,
final ExecutorService produceUploadExecutor,
final ExecutorService produceCommitExecutor,
final ThreadPoolMonitor fetchMetadataThreadPoolMonitor,
final ThreadPoolMonitor fetchDataThreadPoolMonitor,
final ThreadPoolMonitor produceUploaderThreadPoolMonitor
) {
this.time = time;
this.brokerId = brokerId;
Expand All @@ -84,6 +113,14 @@ public SharedState(
this.batchCoordinateCache = batchCoordinateCache;
this.brokerTopicStats = brokerTopicStats;
this.defaultTopicConfigs = defaultTopicConfigs;
this.fetchMetadataExecutor = fetchMetadataExecutor;
this.fetchDataExecutor = fetchDataExecutor;
this.produceCommitTickScheduler = produceCommitTickScheduler;
this.produceUploadExecutor = produceUploadExecutor;
this.produceCommitExecutor = produceCommitExecutor;
this.fetchMetadataThreadPoolMonitor = fetchMetadataThreadPoolMonitor;
this.fetchDataThreadPoolMonitor = fetchDataThreadPoolMonitor;
this.produceUploaderThreadPoolMonitor = produceUploaderThreadPoolMonitor;

final MetricsReporter reporter = new JmxReporter();
this.storageMetrics = new Metrics(
Expand All @@ -107,34 +144,162 @@ public static SharedState initialize(
"Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2"
);
}
return new SharedState(
time,
brokerId,
config,
metadata,
controlPlane,
ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()),
new FixedBlockAlignment(config.fetchCacheBlockBytes()),
new CaffeineCache(

// Track created executors to clean up on failure
ExecutorService fetchMetadataExecutor = null;
ExecutorService fetchDataExecutor = null;
ScheduledExecutorService commitTickScheduler = null;
ExecutorService produceUploadExecutor = null;
ExecutorService produceCommitExecutor = null;
ThreadPoolMonitor fetchMetadataThreadPoolMonitor = null;
ThreadPoolMonitor fetchDataThreadPoolMonitor = null;
ThreadPoolMonitor produceUploaderThreadPoolMonitor = null;

CaffeineCache objectCache = null;
BatchCoordinateCache batchMetadataCache = null;

try {
// Create dedicated thread pools for fetch operations (metadata and data)
fetchMetadataExecutor = Executors.newFixedThreadPool(
config.fetchMetadataThreadPoolSize(),
new InklessThreadFactory("inkless-fetch-metadata-", false)
);

fetchDataExecutor = Executors.newFixedThreadPool(
config.fetchDataThreadPoolSize(),
new InklessThreadFactory("inkless-fetch-data-", false)
);

// Create scheduled executor for commit interval timer (triggers commits in Writer)
commitTickScheduler = Executors.newScheduledThreadPool(
1,
new InklessThreadFactory("inkless-produce-commit-ticker-", true)
);

// Create thread pools for file operations (upload and commit to control plane)
produceUploadExecutor = Executors.newFixedThreadPool(
config.produceUploadThreadPoolSize(),
new InklessThreadFactory("inkless-produce-uploader-", false)
);

// Single thread executor for sequential commits (preserves ordering)
// Note: We use newFixedThreadPool(1) instead of newSingleThreadExecutor() to allow
// validation at construction time in FileCommitter constructor. Both create single-threaded executors,
// but newSingleThreadExecutor() returns a DelegatedExecutorService wrapper that would
// require fragile reflection-based introspection to validate. newFixedThreadPool(1) returns
// a ThreadPoolExecutor directly, enabling simple instanceof checks.
// Tradeoff: We lose the immutability wrapper that prevents pool size reconfiguration, but
// this is acceptable because: (1) we control executor creation and lifecycle, (2) the
// executor is not exposed to external code, and (3) validation prevents the real bug
// (multithreaded executor causing ordering violations) vs theoretical misconfiguration
// that would be immediately obvious.
Comment on lines +194 to +195
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "validation prevents the real bug (multithreaded executor causing ordering violations) vs theoretical misconfiguration that would be immediately obvious." The phrase "vs theoretical misconfiguration" is unclear. Consider rephrasing to "over theoretical misconfiguration" or "compared to theoretical misconfiguration" for better clarity.

Suggested change
// (multithreaded executor causing ordering violations) vs theoretical misconfiguration
// that would be immediately obvious.
// (multithreaded executor causing ordering violations) compared to a theoretical
// misconfiguration that would be immediately obvious.

Copilot uses AI. Check for mistakes.
produceCommitExecutor = Executors.newFixedThreadPool(
1,
new InklessThreadFactory("inkless-produce-committer-", false)
);

// Create thread pool monitors - must be created before SharedState constructor
// so they're in scope for exception handling if construction fails
fetchMetadataThreadPoolMonitor = new ThreadPoolMonitor("inkless-fetch-metadata", fetchMetadataExecutor);
fetchDataThreadPoolMonitor = new ThreadPoolMonitor("inkless-fetch-data", fetchDataExecutor);
produceUploaderThreadPoolMonitor = new ThreadPoolMonitor("inkless-produce-uploader", produceUploadExecutor);

objectCache = new CaffeineCache(
config.cacheMaxCount(),
config.cacheExpirationLifespanSec(),
config.cacheExpirationMaxIdleSec()
),
config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(),
brokerTopicStats,
defaultTopicConfigs
);
);
batchMetadataCache = config.isBatchCoordinateCacheEnabled()
? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl())
: new NullBatchCoordinateCache();

return new SharedState(
time,
brokerId,
config,
metadata,
controlPlane,
ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()),
new FixedBlockAlignment(config.fetchCacheBlockBytes()),
objectCache,
batchMetadataCache,
brokerTopicStats,
defaultTopicConfigs,
fetchMetadataExecutor,
fetchDataExecutor,
commitTickScheduler,
produceUploadExecutor,
produceCommitExecutor,
fetchMetadataThreadPoolMonitor,
fetchDataThreadPoolMonitor,
produceUploaderThreadPoolMonitor
);
} catch (Exception e) {
// Clean up any executors and monitors that were created before the failure
LOGGER.error("Failed to initialize SharedState, shutting down any created executors", e);
cleanupExecutorsAndMonitors(
fetchMetadataExecutor,
fetchDataExecutor,
commitTickScheduler,
produceUploadExecutor,
produceCommitExecutor,
fetchMetadataThreadPoolMonitor,
fetchDataThreadPoolMonitor,
produceUploaderThreadPoolMonitor
);
// Also close caches that may have been created before the failure
Utils.closeQuietly(objectCache, "objectCache", LOGGER);
Utils.closeQuietly(batchMetadataCache, "batchMetadataCache", LOGGER);
throw new RuntimeException("Failed to initialize SharedState", e);
}
}

/**
* Cleanup helper for shutting down executors and closing monitors.
* Safe to call with null references - all utility methods handle null gracefully.
*/
private static void cleanupExecutorsAndMonitors(
ExecutorService fetchMetadataExecutor,
ExecutorService fetchDataExecutor,
ScheduledExecutorService commitTickScheduler,
ExecutorService produceUploadExecutor,
ExecutorService produceCommitExecutor,
ThreadPoolMonitor fetchMetadataThreadPoolMonitor,
ThreadPoolMonitor fetchDataThreadPoolMonitor,
ThreadPoolMonitor produceUploaderThreadPoolMonitor
) {
// Shutdown executors - write path first, then read path
ThreadUtils.shutdownExecutorServiceQuietly(commitTickScheduler, 5, TimeUnit.SECONDS);
ThreadUtils.shutdownExecutorServiceQuietly(produceUploadExecutor, 5, TimeUnit.SECONDS);
ThreadUtils.shutdownExecutorServiceQuietly(produceCommitExecutor, 5, TimeUnit.SECONDS);
ThreadUtils.shutdownExecutorServiceQuietly(fetchMetadataExecutor, 5, TimeUnit.SECONDS);
ThreadUtils.shutdownExecutorServiceQuietly(fetchDataExecutor, 5, TimeUnit.SECONDS);

// Close monitors
Utils.closeQuietly(fetchMetadataThreadPoolMonitor, "fetchMetadataThreadPoolMonitor", LOGGER);
Utils.closeQuietly(fetchDataThreadPoolMonitor, "fetchDataThreadPoolMonitor", LOGGER);
Utils.closeQuietly(produceUploaderThreadPoolMonitor, "produceUploaderThreadPoolMonitor", LOGGER);
}

@Override
public void close() throws IOException {
try {
cache.close();
controlPlane.close();
storageMetrics.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
// Shutdown executors and close monitors
cleanupExecutorsAndMonitors(
fetchMetadataExecutor,
fetchDataExecutor,
produceCommitTickScheduler,
produceUploadExecutor,
produceCommitExecutor,
fetchMetadataThreadPoolMonitor,
fetchDataThreadPoolMonitor,
produceUploaderThreadPoolMonitor
);

// Close remaining resources
Utils.closeQuietly(cache, "cache", LOGGER);
Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache", LOGGER);
Utils.closeQuietly(controlPlane, "controlPlane", LOGGER);
Utils.closeQuietly(storageMetrics, "storageMetrics", LOGGER);
}

public Time time() {
Expand Down Expand Up @@ -188,4 +353,24 @@ public Supplier<LogConfig> defaultTopicConfigs() {
public StorageBackend buildStorage() {
return config.storage(storageMetrics);
}

public ExecutorService fetchMetadataExecutor() {
return fetchMetadataExecutor;
}

public ExecutorService fetchDataExecutor() {
return fetchDataExecutor;
}

public ScheduledExecutorService produceCommitTickScheduler() {
return produceCommitTickScheduler;
}

public ExecutorService produceUploadExecutor() {
return produceUploadExecutor;
}

public ExecutorService produceCommitExecutor() {
return produceCommitExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public FetchHandler(final SharedState state) {
state.cache(),
state.controlPlane(),
state.buildStorage(),
state.brokerTopicStats(),
state.config().fetchMetadataThreadPoolSize(),
state.config().fetchDataThreadPoolSize(),
state.config().maxBatchesPerPartitionToFind()
state.config().maxBatchesPerPartitionToFind(),
state.fetchMetadataExecutor(),
state.fetchDataExecutor(),
state.brokerTopicStats()
)
);
}
Expand Down
Loading
Loading