-
Notifications
You must be signed in to change notification settings - Fork 6
refactor(inkless): consolidate executor management in SharedState #468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -22,14 +22,23 @@ | |||||||||
| import org.apache.kafka.common.metrics.MetricConfig; | ||||||||||
| import org.apache.kafka.common.metrics.Metrics; | ||||||||||
| import org.apache.kafka.common.metrics.MetricsReporter; | ||||||||||
| import org.apache.kafka.common.utils.ThreadUtils; | ||||||||||
| import org.apache.kafka.common.utils.Time; | ||||||||||
| import org.apache.kafka.common.utils.Utils; | ||||||||||
| import org.apache.kafka.storage.internals.log.LogConfig; | ||||||||||
| import org.apache.kafka.storage.log.metrics.BrokerTopicStats; | ||||||||||
|
|
||||||||||
| import org.slf4j.Logger; | ||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||
|
|
||||||||||
| import java.io.Closeable; | ||||||||||
| import java.io.IOException; | ||||||||||
| import java.time.Duration; | ||||||||||
| import java.util.List; | ||||||||||
| import java.util.concurrent.ExecutorService; | ||||||||||
| import java.util.concurrent.Executors; | ||||||||||
| import java.util.concurrent.ScheduledExecutorService; | ||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||
| import java.util.function.Supplier; | ||||||||||
|
|
||||||||||
| import io.aiven.inkless.cache.BatchCoordinateCache; | ||||||||||
|
|
@@ -39,12 +48,15 @@ | |||||||||
| import io.aiven.inkless.cache.KeyAlignmentStrategy; | ||||||||||
| import io.aiven.inkless.cache.NullBatchCoordinateCache; | ||||||||||
| import io.aiven.inkless.cache.ObjectCache; | ||||||||||
| import io.aiven.inkless.common.metrics.ThreadPoolMonitor; | ||||||||||
| import io.aiven.inkless.config.InklessConfig; | ||||||||||
| import io.aiven.inkless.control_plane.ControlPlane; | ||||||||||
| import io.aiven.inkless.control_plane.MetadataView; | ||||||||||
| import io.aiven.inkless.storage_backend.common.StorageBackend; | ||||||||||
|
|
||||||||||
| public final class SharedState implements Closeable { | ||||||||||
| private static final Logger LOGGER = LoggerFactory.getLogger(SharedState.class); | ||||||||||
|
|
||||||||||
| public static final String STORAGE_METRIC_CONTEXT = "io.aiven.inkless.storage"; | ||||||||||
|
|
||||||||||
| private final Time time; | ||||||||||
|
|
@@ -59,8 +71,17 @@ public final class SharedState implements Closeable { | |||||||||
| private final BrokerTopicStats brokerTopicStats; | ||||||||||
| private final Supplier<LogConfig> defaultTopicConfigs; | ||||||||||
| private final Metrics storageMetrics; | ||||||||||
| private final ExecutorService fetchMetadataExecutor; | ||||||||||
| private final ExecutorService fetchDataExecutor; | ||||||||||
| private final ScheduledExecutorService produceCommitTickScheduler; | ||||||||||
| private final ExecutorService produceUploadExecutor; | ||||||||||
| private final ExecutorService produceCommitExecutor; | ||||||||||
|
|
||||||||||
| private final ThreadPoolMonitor fetchMetadataThreadPoolMonitor; | ||||||||||
| private final ThreadPoolMonitor fetchDataThreadPoolMonitor; | ||||||||||
| private final ThreadPoolMonitor produceUploaderThreadPoolMonitor; | ||||||||||
|
|
||||||||||
| public SharedState( | ||||||||||
| SharedState( | ||||||||||
| final Time time, | ||||||||||
| final int brokerId, | ||||||||||
| final InklessConfig config, | ||||||||||
|
|
@@ -71,7 +92,15 @@ public SharedState( | |||||||||
| final ObjectCache cache, | ||||||||||
| final BatchCoordinateCache batchCoordinateCache, | ||||||||||
| final BrokerTopicStats brokerTopicStats, | ||||||||||
| final Supplier<LogConfig> defaultTopicConfigs | ||||||||||
| final Supplier<LogConfig> defaultTopicConfigs, | ||||||||||
| final ExecutorService fetchMetadataExecutor, | ||||||||||
| final ExecutorService fetchDataExecutor, | ||||||||||
| final ScheduledExecutorService produceCommitTickScheduler, | ||||||||||
| final ExecutorService produceUploadExecutor, | ||||||||||
| final ExecutorService produceCommitExecutor, | ||||||||||
| final ThreadPoolMonitor fetchMetadataThreadPoolMonitor, | ||||||||||
| final ThreadPoolMonitor fetchDataThreadPoolMonitor, | ||||||||||
| final ThreadPoolMonitor produceUploaderThreadPoolMonitor | ||||||||||
| ) { | ||||||||||
| this.time = time; | ||||||||||
| this.brokerId = brokerId; | ||||||||||
|
|
@@ -84,6 +113,14 @@ public SharedState( | |||||||||
| this.batchCoordinateCache = batchCoordinateCache; | ||||||||||
| this.brokerTopicStats = brokerTopicStats; | ||||||||||
| this.defaultTopicConfigs = defaultTopicConfigs; | ||||||||||
| this.fetchMetadataExecutor = fetchMetadataExecutor; | ||||||||||
| this.fetchDataExecutor = fetchDataExecutor; | ||||||||||
| this.produceCommitTickScheduler = produceCommitTickScheduler; | ||||||||||
| this.produceUploadExecutor = produceUploadExecutor; | ||||||||||
| this.produceCommitExecutor = produceCommitExecutor; | ||||||||||
| this.fetchMetadataThreadPoolMonitor = fetchMetadataThreadPoolMonitor; | ||||||||||
| this.fetchDataThreadPoolMonitor = fetchDataThreadPoolMonitor; | ||||||||||
| this.produceUploaderThreadPoolMonitor = produceUploaderThreadPoolMonitor; | ||||||||||
|
|
||||||||||
| final MetricsReporter reporter = new JmxReporter(); | ||||||||||
| this.storageMetrics = new Metrics( | ||||||||||
|
|
@@ -107,34 +144,162 @@ public static SharedState initialize( | |||||||||
| "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" | ||||||||||
| ); | ||||||||||
| } | ||||||||||
| return new SharedState( | ||||||||||
| time, | ||||||||||
| brokerId, | ||||||||||
| config, | ||||||||||
| metadata, | ||||||||||
| controlPlane, | ||||||||||
| ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()), | ||||||||||
| new FixedBlockAlignment(config.fetchCacheBlockBytes()), | ||||||||||
| new CaffeineCache( | ||||||||||
|
|
||||||||||
| // Track created executors to clean up on failure | ||||||||||
| ExecutorService fetchMetadataExecutor = null; | ||||||||||
| ExecutorService fetchDataExecutor = null; | ||||||||||
| ScheduledExecutorService commitTickScheduler = null; | ||||||||||
| ExecutorService produceUploadExecutor = null; | ||||||||||
| ExecutorService produceCommitExecutor = null; | ||||||||||
| ThreadPoolMonitor fetchMetadataThreadPoolMonitor = null; | ||||||||||
| ThreadPoolMonitor fetchDataThreadPoolMonitor = null; | ||||||||||
| ThreadPoolMonitor produceUploaderThreadPoolMonitor = null; | ||||||||||
|
|
||||||||||
| CaffeineCache objectCache = null; | ||||||||||
| BatchCoordinateCache batchMetadataCache = null; | ||||||||||
|
|
||||||||||
| try { | ||||||||||
| // Create dedicated thread pools for fetch operations (metadata and data) | ||||||||||
| fetchMetadataExecutor = Executors.newFixedThreadPool( | ||||||||||
| config.fetchMetadataThreadPoolSize(), | ||||||||||
| new InklessThreadFactory("inkless-fetch-metadata-", false) | ||||||||||
| ); | ||||||||||
|
|
||||||||||
| fetchDataExecutor = Executors.newFixedThreadPool( | ||||||||||
| config.fetchDataThreadPoolSize(), | ||||||||||
| new InklessThreadFactory("inkless-fetch-data-", false) | ||||||||||
| ); | ||||||||||
|
|
||||||||||
| // Create scheduled executor for commit interval timer (triggers commits in Writer) | ||||||||||
| commitTickScheduler = Executors.newScheduledThreadPool( | ||||||||||
| 1, | ||||||||||
| new InklessThreadFactory("inkless-produce-commit-ticker-", true) | ||||||||||
| ); | ||||||||||
|
|
||||||||||
| // Create thread pools for file operations (upload and commit to control plane) | ||||||||||
| produceUploadExecutor = Executors.newFixedThreadPool( | ||||||||||
| config.produceUploadThreadPoolSize(), | ||||||||||
| new InklessThreadFactory("inkless-produce-uploader-", false) | ||||||||||
| ); | ||||||||||
|
|
||||||||||
| // Single thread executor for sequential commits (preserves ordering) | ||||||||||
| // Note: We use newFixedThreadPool(1) instead of newSingleThreadExecutor() to allow | ||||||||||
| // validation at construction time in FileCommitter constructor. Both create single-threaded executors, | ||||||||||
| // but newSingleThreadExecutor() returns a DelegatedExecutorService wrapper that would | ||||||||||
| // require fragile reflection-based introspection to validate. newFixedThreadPool(1) returns | ||||||||||
| // a ThreadPoolExecutor directly, enabling simple instanceof checks. | ||||||||||
| // Tradeoff: We lose the immutability wrapper that prevents pool size reconfiguration, but | ||||||||||
| // this is acceptable because: (1) we control executor creation and lifecycle, (2) the | ||||||||||
| // executor is not exposed to external code, and (3) validation prevents the real bug | ||||||||||
| // (multithreaded executor causing ordering violations) vs theoretical misconfiguration | ||||||||||
| // that would be immediately obvious. | ||||||||||
|
Comment on lines
+194
to
+195
|
||||||||||
| // (multithreaded executor causing ordering violations) vs theoretical misconfiguration | |
| // that would be immediately obvious. | |
| // (multithreaded executor causing ordering violations) compared to a theoretical | |
| // misconfiguration that would be immediately obvious. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock returns a new executor that is never shut down, creating a resource leak in tests. This executor should either be tracked and cleaned up in test teardown, or the mock should return a mock ExecutorService instead of a real one.