From a7291023dd6be0683e1e0d7ee2be1451e0d2c012 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 24 Dec 2025 09:36:16 +0200 Subject: [PATCH] refactor(inkless): consolidate executor management in SharedState Moves all ExecutorService creation and lifecycle management from Reader, Writer, and FileCommitter to SharedState, establishing a centralized pattern for managing broker-scoped thread pools. This refactoring also eliminates an unnecessary executor (cache store) and fixes a resource leak bug. Previously, Reader, Writer, and FileCommitter each created and managed their own thread pools, violating the Single Responsibility Principle: - **Reader**: Created 2 fixed thread pools for fetch operations - **Writer**: Created 1 scheduled thread pool for commit ticking - **FileCommitter**: Created 3 thread pools (upload, commit, cache store) Instead of having the CacheStoreThreadPool, reuse the existing upload thread pool for caching. --- .../kafka/server/ReplicaManagerTest.scala | 3 +- .../io/aiven/inkless/common/SharedState.java | 231 ++++++++++++++++-- .../aiven/inkless/consume/FetchHandler.java | 8 +- .../java/io/aiven/inkless/consume/Reader.java | 52 +--- .../aiven/inkless/produce/AppendHandler.java | 27 +- .../aiven/inkless/produce/FileCommitter.java | 77 +++--- .../java/io/aiven/inkless/produce/Writer.java | 62 ++--- .../io/aiven/inkless/consume/ReaderTest.java | 26 +- .../delete/DeleteRecordsInterceptorTest.java | 55 ++--- .../inkless/merge/FileMergerMockedTest.java | 20 +- .../inkless/produce/AppendHandlerTest.java | 47 ++-- .../inkless/produce/FileCommitterTest.java | 120 ++++++--- .../produce/WriterIntegrationTest.java | 36 ++- .../inkless/produce/WriterMockedTest.java | 63 ++--- .../inkless/produce/WriterPropertyTest.java | 86 +++++-- 15 files changed, 559 insertions(+), 354 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 3e6dcc78cc..5cf87ca9ad 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -98,7 +98,7 @@ import java.net.InetAddress import java.nio.file.{Files, Paths} import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} -import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, TimeUnit} +import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, Executors, TimeUnit} import java.util.function.BiConsumer import java.util.stream.IntStream import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties} @@ -7148,6 +7148,7 @@ class ReplicaManagerTest { } disklessTopics.foreach(t => when(inklessMetadata.isDisklessTopic(t)).thenReturn(true)) when(sharedState.metadata()).thenReturn(inklessMetadata) + when(sharedState.produceCommitExecutor()).thenReturn(Executors.newFixedThreadPool(1)) val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java index aa817b2bd4..5d641c3898 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java @@ -22,14 +22,23 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import io.aiven.inkless.cache.BatchCoordinateCache; @@ -39,12 +48,15 @@ import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.NullBatchCoordinateCache; import io.aiven.inkless.cache.ObjectCache; +import io.aiven.inkless.common.metrics.ThreadPoolMonitor; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.control_plane.MetadataView; import io.aiven.inkless.storage_backend.common.StorageBackend; public final class SharedState implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(SharedState.class); + public static final String STORAGE_METRIC_CONTEXT = "io.aiven.inkless.storage"; private final Time time; @@ -59,8 +71,17 @@ public final class SharedState implements Closeable { private final BrokerTopicStats brokerTopicStats; private final Supplier defaultTopicConfigs; private final Metrics storageMetrics; + private final ExecutorService fetchMetadataExecutor; + private final ExecutorService fetchDataExecutor; + private final ScheduledExecutorService produceCommitTickScheduler; + private final ExecutorService produceUploadExecutor; + private final ExecutorService produceCommitExecutor; + + private final ThreadPoolMonitor fetchMetadataThreadPoolMonitor; + private final ThreadPoolMonitor fetchDataThreadPoolMonitor; + private final ThreadPoolMonitor produceUploaderThreadPoolMonitor; - public SharedState( + SharedState( final Time time, final int brokerId, final InklessConfig config, @@ -71,7 +92,15 @@ public SharedState( final ObjectCache cache, final BatchCoordinateCache batchCoordinateCache, final BrokerTopicStats brokerTopicStats, - final Supplier defaultTopicConfigs + final Supplier defaultTopicConfigs, + final ExecutorService fetchMetadataExecutor, + final ExecutorService fetchDataExecutor, + final ScheduledExecutorService produceCommitTickScheduler, + final ExecutorService produceUploadExecutor, + final ExecutorService produceCommitExecutor, + final ThreadPoolMonitor fetchMetadataThreadPoolMonitor, + final ThreadPoolMonitor fetchDataThreadPoolMonitor, + final ThreadPoolMonitor produceUploaderThreadPoolMonitor ) { this.time = time; this.brokerId = brokerId; @@ -84,6 +113,14 @@ public SharedState( this.batchCoordinateCache = batchCoordinateCache; this.brokerTopicStats = brokerTopicStats; this.defaultTopicConfigs = defaultTopicConfigs; + this.fetchMetadataExecutor = fetchMetadataExecutor; + this.fetchDataExecutor = fetchDataExecutor; + this.produceCommitTickScheduler = produceCommitTickScheduler; + this.produceUploadExecutor = produceUploadExecutor; + this.produceCommitExecutor = produceCommitExecutor; + this.fetchMetadataThreadPoolMonitor = fetchMetadataThreadPoolMonitor; + this.fetchDataThreadPoolMonitor = fetchDataThreadPoolMonitor; + this.produceUploaderThreadPoolMonitor = produceUploaderThreadPoolMonitor; final MetricsReporter reporter = new JmxReporter(); this.storageMetrics = new Metrics( @@ -107,34 +144,162 @@ public static SharedState initialize( "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2" ); } - return new SharedState( - time, - brokerId, - config, - metadata, - controlPlane, - ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()), - new FixedBlockAlignment(config.fetchCacheBlockBytes()), - new CaffeineCache( + + // Track created executors to clean up on failure + ExecutorService fetchMetadataExecutor = null; + ExecutorService fetchDataExecutor = null; + ScheduledExecutorService commitTickScheduler = null; + ExecutorService produceUploadExecutor = null; + ExecutorService produceCommitExecutor = null; + ThreadPoolMonitor fetchMetadataThreadPoolMonitor = null; + ThreadPoolMonitor fetchDataThreadPoolMonitor = null; + ThreadPoolMonitor produceUploaderThreadPoolMonitor = null; + + CaffeineCache objectCache = null; + BatchCoordinateCache batchMetadataCache = null; + + try { + // Create dedicated thread pools for fetch operations (metadata and data) + fetchMetadataExecutor = Executors.newFixedThreadPool( + config.fetchMetadataThreadPoolSize(), + new InklessThreadFactory("inkless-fetch-metadata-", false) + ); + + fetchDataExecutor = Executors.newFixedThreadPool( + config.fetchDataThreadPoolSize(), + new InklessThreadFactory("inkless-fetch-data-", false) + ); + + // Create scheduled executor for commit interval timer (triggers commits in Writer) + commitTickScheduler = Executors.newScheduledThreadPool( + 1, + new InklessThreadFactory("inkless-produce-commit-ticker-", true) + ); + + // Create thread pools for file operations (upload and commit to control plane) + produceUploadExecutor = Executors.newFixedThreadPool( + config.produceUploadThreadPoolSize(), + new InklessThreadFactory("inkless-produce-uploader-", false) + ); + + // Single thread executor for sequential commits (preserves ordering) + // Note: We use newFixedThreadPool(1) instead of newSingleThreadExecutor() to allow + // validation at construction time in FileCommitter constructor. Both create single-threaded executors, + // but newSingleThreadExecutor() returns a DelegatedExecutorService wrapper that would + // require fragile reflection-based introspection to validate. newFixedThreadPool(1) returns + // a ThreadPoolExecutor directly, enabling simple instanceof checks. + // Tradeoff: We lose the immutability wrapper that prevents pool size reconfiguration, but + // this is acceptable because: (1) we control executor creation and lifecycle, (2) the + // executor is not exposed to external code, and (3) validation prevents the real bug + // (multithreaded executor causing ordering violations) vs theoretical misconfiguration + // that would be immediately obvious. + produceCommitExecutor = Executors.newFixedThreadPool( + 1, + new InklessThreadFactory("inkless-produce-committer-", false) + ); + + // Create thread pool monitors - must be created before SharedState constructor + // so they're in scope for exception handling if construction fails + fetchMetadataThreadPoolMonitor = new ThreadPoolMonitor("inkless-fetch-metadata", fetchMetadataExecutor); + fetchDataThreadPoolMonitor = new ThreadPoolMonitor("inkless-fetch-data", fetchDataExecutor); + produceUploaderThreadPoolMonitor = new ThreadPoolMonitor("inkless-produce-uploader", produceUploadExecutor); + + objectCache = new CaffeineCache( config.cacheMaxCount(), config.cacheExpirationLifespanSec(), config.cacheExpirationMaxIdleSec() - ), - config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(), - brokerTopicStats, - defaultTopicConfigs - ); + ); + batchMetadataCache = config.isBatchCoordinateCacheEnabled() + ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) + : new NullBatchCoordinateCache(); + + return new SharedState( + time, + brokerId, + config, + metadata, + controlPlane, + ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()), + new FixedBlockAlignment(config.fetchCacheBlockBytes()), + objectCache, + batchMetadataCache, + brokerTopicStats, + defaultTopicConfigs, + fetchMetadataExecutor, + fetchDataExecutor, + commitTickScheduler, + produceUploadExecutor, + produceCommitExecutor, + fetchMetadataThreadPoolMonitor, + fetchDataThreadPoolMonitor, + produceUploaderThreadPoolMonitor + ); + } catch (Exception e) { + // Clean up any executors and monitors that were created before the failure + LOGGER.error("Failed to initialize SharedState, shutting down any created executors", e); + cleanupExecutorsAndMonitors( + fetchMetadataExecutor, + fetchDataExecutor, + commitTickScheduler, + produceUploadExecutor, + produceCommitExecutor, + fetchMetadataThreadPoolMonitor, + fetchDataThreadPoolMonitor, + produceUploaderThreadPoolMonitor + ); + // Also close caches that may have been created before the failure + Utils.closeQuietly(objectCache, "objectCache", LOGGER); + Utils.closeQuietly(batchMetadataCache, "batchMetadataCache", LOGGER); + throw new RuntimeException("Failed to initialize SharedState", e); + } + } + + /** + * Cleanup helper for shutting down executors and closing monitors. + * Safe to call with null references - all utility methods handle null gracefully. + */ + private static void cleanupExecutorsAndMonitors( + ExecutorService fetchMetadataExecutor, + ExecutorService fetchDataExecutor, + ScheduledExecutorService commitTickScheduler, + ExecutorService produceUploadExecutor, + ExecutorService produceCommitExecutor, + ThreadPoolMonitor fetchMetadataThreadPoolMonitor, + ThreadPoolMonitor fetchDataThreadPoolMonitor, + ThreadPoolMonitor produceUploaderThreadPoolMonitor + ) { + // Shutdown executors - write path first, then read path + ThreadUtils.shutdownExecutorServiceQuietly(commitTickScheduler, 5, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutorServiceQuietly(produceUploadExecutor, 5, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutorServiceQuietly(produceCommitExecutor, 5, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutorServiceQuietly(fetchMetadataExecutor, 5, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutorServiceQuietly(fetchDataExecutor, 5, TimeUnit.SECONDS); + + // Close monitors + Utils.closeQuietly(fetchMetadataThreadPoolMonitor, "fetchMetadataThreadPoolMonitor", LOGGER); + Utils.closeQuietly(fetchDataThreadPoolMonitor, "fetchDataThreadPoolMonitor", LOGGER); + Utils.closeQuietly(produceUploaderThreadPoolMonitor, "produceUploaderThreadPoolMonitor", LOGGER); } @Override public void close() throws IOException { - try { - cache.close(); - controlPlane.close(); - storageMetrics.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } + // Shutdown executors and close monitors + cleanupExecutorsAndMonitors( + fetchMetadataExecutor, + fetchDataExecutor, + produceCommitTickScheduler, + produceUploadExecutor, + produceCommitExecutor, + fetchMetadataThreadPoolMonitor, + fetchDataThreadPoolMonitor, + produceUploaderThreadPoolMonitor + ); + + // Close remaining resources + Utils.closeQuietly(cache, "cache", LOGGER); + Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache", LOGGER); + Utils.closeQuietly(controlPlane, "controlPlane", LOGGER); + Utils.closeQuietly(storageMetrics, "storageMetrics", LOGGER); } public Time time() { @@ -188,4 +353,24 @@ public Supplier defaultTopicConfigs() { public StorageBackend buildStorage() { return config.storage(storageMetrics); } + + public ExecutorService fetchMetadataExecutor() { + return fetchMetadataExecutor; + } + + public ExecutorService fetchDataExecutor() { + return fetchDataExecutor; + } + + public ScheduledExecutorService produceCommitTickScheduler() { + return produceCommitTickScheduler; + } + + public ExecutorService produceUploadExecutor() { + return produceUploadExecutor; + } + + public ExecutorService produceCommitExecutor() { + return produceCommitExecutor; + } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java index fb10490dd5..39caa9c9e7 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java @@ -52,10 +52,10 @@ public FetchHandler(final SharedState state) { state.cache(), state.controlPlane(), state.buildStorage(), - state.brokerTopicStats(), - state.config().fetchMetadataThreadPoolSize(), - state.config().fetchDataThreadPoolSize(), - state.config().maxBatchesPerPartitionToFind() + state.config().maxBatchesPerPartitionToFind(), + state.fetchMetadataExecutor(), + state.fetchDataExecutor(), + state.brokerTopicStats() ) ); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 1b04618951..836af99650 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; @@ -35,21 +34,17 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import io.aiven.inkless.TimeUtils; import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.InklessThreadFactory; import io.aiven.inkless.common.ObjectKeyCreator; -import io.aiven.inkless.common.metrics.ThreadPoolMonitor; import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.storage_backend.common.ObjectFetcher; public class Reader implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(Reader.class); - private static final long EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 5; + private final Time time; private final ObjectKeyCreator objectKeyCreator; private final KeyAlignmentStrategy keyAlignmentStrategy; @@ -61,35 +56,13 @@ public class Reader implements AutoCloseable { private final ExecutorService dataExecutor; private final InklessFetchMetrics fetchMetrics; private final BrokerTopicStats brokerTopicStats; - private ThreadPoolMonitor metadataThreadPoolMonitor; - private ThreadPoolMonitor dataThreadPoolMonitor; - - public Reader( - Time time, - ObjectKeyCreator objectKeyCreator, - KeyAlignmentStrategy keyAlignmentStrategy, - ObjectCache cache, - ControlPlane controlPlane, - ObjectFetcher objectFetcher, - BrokerTopicStats brokerTopicStats, - int fetchMetadataThreadPoolSize, - int fetchDataThreadPoolSize, - int maxBatchesPerPartitionToFind - ) { - this( - time, - objectKeyCreator, - keyAlignmentStrategy, - cache, - controlPlane, - objectFetcher, - maxBatchesPerPartitionToFind, - Executors.newFixedThreadPool(fetchMetadataThreadPoolSize, new InklessThreadFactory("inkless-fetch-metadata-", false)), - Executors.newFixedThreadPool(fetchDataThreadPoolSize, new InklessThreadFactory("inkless-fetch-data-", false)), - brokerTopicStats - ); - } + /** + * Creates a Reader with injected executor services. + * + *

The executor services are managed by the caller (typically SharedState) and will be + * shut down externally. This Reader does not own the lifecycle of the executors. + */ public Reader( Time time, ObjectKeyCreator objectKeyCreator, @@ -113,13 +86,6 @@ public Reader( this.dataExecutor = dataExecutor; this.fetchMetrics = new InklessFetchMetrics(time, cache); this.brokerTopicStats = brokerTopicStats; - try { - this.metadataThreadPoolMonitor = new ThreadPoolMonitor("inkless-fetch-metadata", metadataExecutor); - this.dataThreadPoolMonitor = new ThreadPoolMonitor("inkless-fetch-data", dataExecutor); - } catch (final Exception e) { - // only expected to happen on tests passing other types of pools - LOGGER.warn("Failed to create thread pool monitors", e); - } } public CompletableFuture> fetch( @@ -205,10 +171,6 @@ public CompletableFuture> fetch( @Override public void close() throws IOException { - ThreadUtils.shutdownExecutorServiceQuietly(metadataExecutor, EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); - ThreadUtils.shutdownExecutorServiceQuietly(dataExecutor, EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (metadataThreadPoolMonitor != null) metadataThreadPoolMonitor.close(); - if (dataThreadPoolMonitor != null) dataThreadPoolMonitor.close(); objectFetcher.close(); fetchMetrics.close(); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java index c54670709e..57e1451fee 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java @@ -54,18 +54,25 @@ public AppendHandler(final SharedState state) { state, new Writer( state.time(), - state.brokerId(), - state.objectKeyCreator(), - state.buildStorage(), - state.keyAlignmentStrategy(), - state.cache(), - state.batchCoordinateCache(), - state.controlPlane(), state.config().commitInterval(), state.config().produceBufferMaxBytes(), - state.config().produceMaxUploadAttempts(), - state.config().produceUploadBackoff(), - state.config().produceUploadThreadPoolSize(), + state.produceCommitTickScheduler(), + new FileCommitter( + state.brokerId(), + state.controlPlane(), + state.objectKeyCreator(), + state.buildStorage(), + state.keyAlignmentStrategy(), + state.cache(), + state.batchCoordinateCache(), + state.time(), + state.config().produceMaxUploadAttempts(), + state.config().produceUploadBackoff(), + state.produceUploadExecutor(), + state.produceCommitExecutor(), + new FileCommitterMetrics(state.time()) + ), + new WriterMetrics(state.time()), state.brokerTopicStats() ) ); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java index 0f54613448..0a40268a0f 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java @@ -19,8 +19,6 @@ import org.apache.kafka.common.utils.Time; -import com.groupcdg.pitest.annotations.DoNotMutate; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +31,8 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -43,10 +41,8 @@ import io.aiven.inkless.cache.BatchCoordinateCache; import io.aiven.inkless.cache.KeyAlignmentStrategy; import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.InklessThreadFactory; import io.aiven.inkless.common.ObjectKey; import io.aiven.inkless.common.ObjectKeyCreator; -import io.aiven.inkless.common.metrics.ThreadPoolMonitor; import io.aiven.inkless.control_plane.CommitBatchResponse; import io.aiven.inkless.control_plane.ControlPlane; import io.aiven.inkless.storage_backend.common.StorageBackend; @@ -75,35 +71,18 @@ class FileCommitter implements Closeable { private final Duration fileUploadRetryBackoff; private final ExecutorService executorServiceUpload; private final ExecutorService executorServiceCommit; - private final ExecutorService executorServiceCacheStore; - private ThreadPoolMonitor threadPoolMonitor; private final AtomicInteger totalFilesInProgress = new AtomicInteger(0); private final AtomicInteger totalBytesInProgress = new AtomicInteger(0); - @DoNotMutate - FileCommitter(final int brokerId, - final ControlPlane controlPlane, - final ObjectKeyCreator objectKeyCreator, - final StorageBackend storage, - final KeyAlignmentStrategy keyAlignmentStrategy, - final ObjectCache objectCache, - final BatchCoordinateCache batchCoordinateCache, - final Time time, - final int maxFileUploadAttempts, - final Duration fileUploadRetryBackoff, - final int fileUploaderThreadPoolSize) { - this(brokerId, controlPlane, objectKeyCreator, storage, keyAlignmentStrategy, objectCache, batchCoordinateCache, time, maxFileUploadAttempts, fileUploadRetryBackoff, - Executors.newFixedThreadPool(fileUploaderThreadPoolSize, new InklessThreadFactory("inkless-file-uploader-", false)), - // It must be single-thread to preserve the commit order. - Executors.newSingleThreadExecutor(new InklessThreadFactory("inkless-file-committer-", false)), - // Reuse the same thread pool size as uploads, as there are no more concurrency expected to handle - Executors.newFixedThreadPool(fileUploaderThreadPoolSize, new InklessThreadFactory("inkless-file-cache-store-", false)), - new FileCommitterMetrics(time) - ); - } - - // Visible for testing + /** + * Creates a FileCommitter with injected executor services. + * + *

The ExecutorService instances are managed by the caller (typically SharedState) and will be + * shut down externally. This FileCommitter does not own the lifecycle of the executors. + * + *

IMPORTANT: executorServiceCommit MUST be single-threaded to preserve commit ordering. + */ FileCommitter(final int brokerId, final ControlPlane controlPlane, final ObjectKeyCreator objectKeyCreator, @@ -116,7 +95,6 @@ class FileCommitter implements Closeable { final Duration fileUploadRetryBackoff, final ExecutorService executorServiceUpload, final ExecutorService executorServiceCommit, - final ExecutorService executorServiceCacheStore, final FileCommitterMetrics metrics) { this.brokerId = brokerId; this.controlPlane = Objects.requireNonNull(controlPlane, "controlPlane cannot be null"); @@ -136,19 +114,14 @@ class FileCommitter implements Closeable { "executorServiceUpload cannot be null"); this.executorServiceCommit = Objects.requireNonNull(executorServiceCommit, "executorServiceCommit cannot be null"); - this.executorServiceCacheStore = Objects.requireNonNull(executorServiceCacheStore, - "executorServiceCacheStore cannot be null"); + if (!isSingleThreadedExecutor(executorServiceCommit)) { + throw new IllegalArgumentException("executorServiceCommit must be a single-threaded ThreadPoolExecutor"); + } this.metrics = Objects.requireNonNull(metrics, "metrics cannot be null"); // Can't do this in the FileCommitterMetrics constructor, so initializing this way. this.metrics.initTotalFilesInProgressMetric(totalFilesInProgress::get); this.metrics.initTotalBytesInProgressMetric(totalBytesInProgress::get); - try { - this.threadPoolMonitor = new ThreadPoolMonitor("inkless-produce-uploader", executorServiceUpload); - } catch (final Exception e) { - // only expected to happen on tests passing other types of pools - LOGGER.warn("Failed to create ThreadPoolMonitor for inkless-produce-uploader", e); - } } void commit(final ClosedFile file) throws InterruptedException { @@ -209,15 +182,21 @@ void commit(final ClosedFile file) throws InterruptedException { } }); - final CacheStoreJob cacheStoreJob = new CacheStoreJob( + // Caching is not strictly IO-bound, but may be slowed by memory pressure. + commitFuture.thenRunAsync( + new CacheStoreJob( time, objectCache, keyAlignmentStrategy, file.data(), uploadFuture, metrics::cacheStoreFinished + ), + // Reuse the upload executor for caching as well. + // Caching is substantially faster than uploading, + // so this should be fine to allocating some upload executor's capacity to caching. + executorServiceUpload ); - executorServiceCacheStore.submit(cacheStoreJob); } commitFuture.whenComplete((commitBatchResponses, throwable) -> { final AppendCompleter completerJob = new AppendCompleter(file, batchCoordinateCache); @@ -242,12 +221,20 @@ int totalBytesInProgress() { return totalBytesInProgress.get(); } + /** + * Checks if the given ExecutorService is a single-threaded ThreadPoolExecutor. + * Only supports ThreadPoolExecutor as created by SharedState with Executors.newFixedThreadPool(1). + */ + static boolean isSingleThreadedExecutor(ExecutorService executor) { + if (!(executor instanceof ThreadPoolExecutor)) return false; + final int maximumPoolSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize(); + final int corePoolSize = ((ThreadPoolExecutor) executor).getCorePoolSize(); + return maximumPoolSize == 1 && corePoolSize == 1; + } + @Override public void close() throws IOException { - // Don't wait here, they should try to finish their work. - executorServiceUpload.shutdown(); - executorServiceCommit.shutdown(); metrics.close(); - if (threadPoolMonitor != null) threadPoolMonitor.close(); + storage.close(); } } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java index 7096393d4d..bb04a0051d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java @@ -26,8 +26,6 @@ import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; -import com.groupcdg.pitest.annotations.DoNotMutate; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +36,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -46,13 +43,6 @@ import java.util.concurrent.locks.ReentrantLock; import io.aiven.inkless.TimeUtils; -import io.aiven.inkless.cache.BatchCoordinateCache; -import io.aiven.inkless.cache.KeyAlignmentStrategy; -import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.InklessThreadFactory; -import io.aiven.inkless.common.ObjectKeyCreator; -import io.aiven.inkless.control_plane.ControlPlane; -import io.aiven.inkless.storage_backend.common.StorageBackend; /** * The entry point for diskless writing. @@ -70,7 +60,6 @@ class Writer implements Closeable { private final Lock lock = new ReentrantLock(); private ActiveFile activeFile; - private final StorageBackend storage; private final FileCommitter fileCommitter; private final Time time; private final Duration commitInterval; @@ -82,44 +71,16 @@ class Writer implements Closeable { private Instant openedAt; private ScheduledFuture scheduledTick; - @DoNotMutate - Writer(final Time time, - final int brokerId, - final ObjectKeyCreator objectKeyCreator, - final StorageBackend storage, - final KeyAlignmentStrategy keyAlignmentStrategy, - final ObjectCache objectCache, - final BatchCoordinateCache batchCoordinateCache, - final ControlPlane controlPlane, - final Duration commitInterval, - final int maxBufferSize, - final int maxFileUploadAttempts, - final Duration fileUploadRetryBackoff, - final int fileUploaderThreadPoolSize, - final BrokerTopicStats brokerTopicStats - ) { - this( - time, - commitInterval, - maxBufferSize, - Executors.newScheduledThreadPool(1, new InklessThreadFactory("inkless-file-commit-ticker-", true)), - storage, - new FileCommitter( - brokerId, controlPlane, objectKeyCreator, storage, - keyAlignmentStrategy, objectCache, batchCoordinateCache, time, - maxFileUploadAttempts, fileUploadRetryBackoff, - fileUploaderThreadPoolSize), - new WriterMetrics(time), - brokerTopicStats - ); - } - - // Visible for testing + /** + * Creates a Writer with injected scheduled executor service. + * + *

The ScheduledExecutorService is managed by the caller (typically SharedState) and will be + * shut down externally. This Writer does not own the lifecycle of the scheduler. + */ Writer(final Time time, final Duration commitInterval, final int maxBufferSize, final ScheduledExecutorService commitTickScheduler, - final StorageBackend storage, final FileCommitter fileCommitter, final WriterMetrics writerMetrics, final BrokerTopicStats brokerTopicStats) { @@ -130,7 +91,6 @@ class Writer implements Closeable { } this.maxBufferSize = maxBufferSize; this.commitTickScheduler = Objects.requireNonNull(commitTickScheduler, "commitTickScheduler cannot be null"); - this.storage = Objects.requireNonNull(storage, "storage cannot be null"); this.fileCommitter = Objects.requireNonNull(fileCommitter, "fileCommitter cannot be null"); this.writerMetrics = Objects.requireNonNull(writerMetrics, "writerMetrics cannot be null"); this.brokerTopicStats = brokerTopicStats; @@ -222,12 +182,18 @@ public void close() throws IOException { if (closed) { return; } + + // Cancel any scheduled tick before marking as closed to prevent race condition + // where a tick could fire after close() starts but before resources are cleaned up + if (this.scheduledTick != null) { + this.scheduledTick.cancel(false); + this.scheduledTick = null; + } + closed = true; - commitTickScheduler.shutdownNow(); // Rotate file before closing the uploader so the file gets into the queue first. rotateFile(true); fileCommitter.close(); - storage.close(); writerMetrics.close(); } finally { lock.unlock(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index 3f6345ea3c..15d52b11e2 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.jspecify.annotations.NonNull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -51,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -77,7 +79,7 @@ public class ReaderTest { @Test public void testReaderEmptyRequests() throws IOException { - try(final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, controlPlane, objectFetcher, 0, metadataExecutor, dataExecutor, new BrokerTopicStats())) { + try (final var reader = getReader()) { final CompletableFuture> fetch = reader.fetch(fetchParams, Collections.emptyMap()); verify(metadataExecutor, atLeastOnce()).execute(any()); verifyNoInteractions(dataExecutor); @@ -87,9 +89,25 @@ public void testReaderEmptyRequests() throws IOException { @Test public void testClose() throws Exception { - final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, controlPlane, objectFetcher, 0, metadataExecutor, dataExecutor, new BrokerTopicStats()); + final var reader = getReader(); reader.close(); - verify(metadataExecutor, atLeastOnce()).shutdown(); - verify(dataExecutor, atLeastOnce()).shutdown(); + // Executors created outside of Reader should not be shutdown by Reader.close() + verify(metadataExecutor, times(0)).shutdown(); + verify(dataExecutor, times(0)).shutdown(); + } + + private @NonNull Reader getReader() { + return new Reader( + time, + OBJECT_KEY_CREATOR, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + controlPlane, + objectFetcher, + 0, + metadataExecutor, + dataExecutor, + new BrokerTopicStats() + ); } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java index 43e197dbd2..f78edbed69 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java @@ -36,20 +36,11 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; -import io.aiven.inkless.cache.BatchCoordinateCache; -import io.aiven.inkless.cache.CaffeineBatchCoordinateCache; -import io.aiven.inkless.cache.FixedBlockAlignment; -import io.aiven.inkless.cache.KeyAlignmentStrategy; -import io.aiven.inkless.cache.NullCache; -import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.ObjectKey; -import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.SharedState; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.ControlPlane; @@ -72,17 +63,11 @@ @MockitoSettings(strictness = Strictness.STRICT_STUBS) class DeleteRecordsInterceptorTest { static final int BROKER_ID = 11; - static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("", false); - private static final KeyAlignmentStrategy KEY_ALIGNMENT_STRATEGY = new FixedBlockAlignment(Integer.MAX_VALUE); - private static final ObjectCache OBJECT_CACHE = new NullCache(); - private static final BatchCoordinateCache BATCH_COORDINATE_CACHE = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30)); static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); Time time = new MockTime(); @Mock - InklessConfig disklessConfig; - @Mock MetadataView metadataView; @Mock ControlPlane controlPlane; @@ -96,24 +81,23 @@ class DeleteRecordsInterceptorTest { @Captor ArgumentCaptor> deleteRecordsCaptor; - @Test - public void mixingDisklessAndClassicTopicsIsNotAllowed() { - when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); - when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final SharedState state = new SharedState( + private SharedState getSharedState() { + return SharedState.initialize( time, BROKER_ID, - disklessConfig, + new InklessConfig(Map.of()), metadataView, controlPlane, - OBJECT_KEY_CREATOR, - KEY_ALIGNMENT_STRATEGY, - OBJECT_CACHE, - BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS ); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state); + } + + @Test + public void mixingDisklessAndClassicTopicsIsNotAllowed() { + when(metadataView.isDisklessTopic(eq("diskless"))).thenReturn(true); + when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(getSharedState()); final Map entriesPerPartition = Map.of( new TopicPartition("diskless", 0), @@ -144,9 +128,7 @@ public void mixingDisklessAndClassicTopicsIsNotAllowed() { @Test public void notInterceptDeletingRecordsFromClassicTopics() { when(metadataView.isDisklessTopic(eq("non_diskless"))).thenReturn(false); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS)); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(getSharedState()); final Map entriesPerPartition = Map.of( new TopicPartition("non_diskless", 0), 4567L @@ -181,10 +163,7 @@ public void interceptDeletingRecordsFromDisklessTopics() { } }); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(getSharedState(), new SynchronousExecutor()); final TopicPartition tp0 = new TopicPartition("diskless", 0); final TopicPartition tp1 = new TopicPartition("diskless", 1); @@ -224,10 +203,7 @@ public void controlPlaneException() { when(controlPlane.deleteRecords(anyList())).thenThrow(new RuntimeException("test")); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(getSharedState(), new SynchronousExecutor()); final TopicPartition topicPartition = new TopicPartition("diskless", 1); final Map entriesPerPartition = Map.of( @@ -265,10 +241,7 @@ public void topicIdNotFound() { } }); - final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor( - new SharedState(time, BROKER_ID, disklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), - new SynchronousExecutor()); + final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(getSharedState(), new SynchronousExecutor()); final TopicPartition topicPartition1 = new TopicPartition("diskless1", 1); final TopicPartition topicPartition2 = new TopicPartition("diskless2", 2); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java index 436a522b48..079b728f1d 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.AfterEach; @@ -46,7 +47,8 @@ import java.time.Duration; import java.time.Instant; import java.util.List; -import java.util.function.Supplier; +import java.util.Map; +import java.util.Properties; import io.aiven.inkless.common.ObjectFormat; import io.aiven.inkless.common.ObjectKey; @@ -117,9 +119,19 @@ void setup() { when(inklessConfig.objectKeyPrefix()).thenReturn("prefix"); when(inklessConfig.fileMergeWorkDir()).thenReturn(WORK_DIR); when(inklessConfig.cacheMaxCount()).thenReturn(10000L); - - sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, mock(MetadataView.class), controlPlane, - mock(BrokerTopicStats.class), mock(Supplier.class)); + when(inklessConfig.fetchMetadataThreadPoolSize()).thenReturn(1); + when(inklessConfig.fetchDataThreadPoolSize()).thenReturn(1); + when(inklessConfig.produceUploadThreadPoolSize()).thenReturn(1); + + sharedState = SharedState.initialize( + time, + BROKER_ID, + inklessConfig, + mock(MetadataView.class), + controlPlane, + mock(BrokerTopicStats.class), + () -> LogConfig.fromProps(Map.of(), new Properties()) + ); } @AfterEach diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java index 6fda9ad1e8..bf7b73fe20 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java @@ -40,20 +40,11 @@ import org.mockito.quality.Strictness; import java.io.IOException; -import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import io.aiven.inkless.cache.BatchCoordinateCache; -import io.aiven.inkless.cache.CaffeineBatchCoordinateCache; -import io.aiven.inkless.cache.FixedBlockAlignment; -import io.aiven.inkless.cache.KeyAlignmentStrategy; -import io.aiven.inkless.cache.NullCache; -import io.aiven.inkless.cache.ObjectCache; -import io.aiven.inkless.common.ObjectKey; -import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.SharedState; import io.aiven.inkless.config.InklessConfig; import io.aiven.inkless.control_plane.ControlPlane; @@ -71,18 +62,12 @@ @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class AppendHandlerTest { static final int BROKER_ID = 11; - static final ObjectKeyCreator OBJECT_KEY_CREATOR = ObjectKey.creator("", false); - private static final KeyAlignmentStrategy KEY_ALIGNMENT_STRATEGY = new FixedBlockAlignment(Integer.MAX_VALUE); - private static final ObjectCache OBJECT_CACHE = new NullCache(); - private static final BatchCoordinateCache BATCH_COORDINATE_CACHE = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30)); static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); Time time = new MockTime(); RequestLocal requestLocal = RequestLocal.noCaching(); @Mock - InklessConfig inklessConfig; - @Mock MetadataView metadataView; @Mock ControlPlane controlPlane; @@ -111,11 +96,21 @@ public class AppendHandlerTest { new SimpleRecord(0, "hello".getBytes()) ); + private SharedState getSharedState() { + return SharedState.initialize( + time, + BROKER_ID, + new InklessConfig(Map.of()), + metadataView, + controlPlane, + brokerTopicStats, + DEFAULT_TOPIC_CONFIGS + ); + } + @Test public void rejectTransactionalProduce() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(getSharedState(), writer)) { final TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless1"); final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless2"); @@ -137,9 +132,7 @@ topicIdPartition2, new PartitionResponse(Errors.INVALID_REQUEST) @Test public void emptyRequests() throws Exception { - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(getSharedState(), writer)) { final Map entriesPerPartition = Map.of(); @@ -165,9 +158,7 @@ topicIdPartition, new PartitionResponse(Errors.NONE) ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(getSharedState(), writer)) { final var result = interceptor.handle(entriesPerPartition, requestLocal).get(); assertThat(result).isEqualTo(writeResult); @@ -188,18 +179,14 @@ public void writeFutureFailed() throws Exception { ); when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try (final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer)) { + try (final AppendHandler interceptor = new AppendHandler(getSharedState(), writer)) { assertThatThrownBy(() -> interceptor.handle(entriesPerPartition, requestLocal).get()).hasCause(exception); } } @Test public void close() throws IOException { - final AppendHandler interceptor = new AppendHandler( - new SharedState(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, brokerTopicStats, DEFAULT_TOPIC_CONFIGS), writer); + final AppendHandler interceptor = new AppendHandler(getSharedState(), writer); interceptor.close(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java index e89b18c3c0..d7131d77e9 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; @@ -41,6 +42,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import io.aiven.inkless.cache.BatchCoordinateCache; import io.aiven.inkless.cache.CaffeineBatchCoordinateCache; @@ -69,7 +72,8 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.STRICT_STUBS) +// Needed to allow lenient stubbing for thread pool executor size checks on beforeEach +@MockitoSettings(strictness = Strictness.LENIENT) class FileCommitterTest { static final int BROKER_ID = 11; @@ -106,9 +110,7 @@ public ObjectKey create(String value) { @Mock ExecutorService executorServiceUpload; @Mock - ExecutorService executorServiceCommit; - @Mock - ExecutorService executorServiceCacheStore; + ThreadPoolExecutor executorServiceCommit; @Mock FileCommitterMetrics metrics; @@ -117,6 +119,13 @@ public ObjectKey create(String value) { @Captor ArgumentCaptor commitRunnableCaptor; + @BeforeEach + void setUp() { + // Ensure executorServiceCommit is single-threaded + when(executorServiceCommit.getCorePoolSize()).thenReturn(1); + when(executorServiceCommit.getMaximumPoolSize()).thenReturn(1); + } + @Test @SuppressWarnings("unchecked") void success() throws Exception { @@ -133,7 +142,7 @@ void success() throws Exception { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, + executorServiceUpload, executorServiceCommit, metrics); verify(metrics).initTotalFilesInProgressMetric(any()); @@ -185,7 +194,7 @@ void commitFailed() throws Exception { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 1, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, + executorServiceUpload, executorServiceCommit, metrics); assertThat(committer.totalFilesInProgress()).isZero(); @@ -233,7 +242,7 @@ void uploadFailed() throws Exception { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, + executorServiceUpload, executorServiceCommit, metrics); assertThat(committer.totalFilesInProgress()).isZero(); @@ -271,13 +280,14 @@ void close() throws IOException { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics); + executorServiceUpload, executorServiceCommit, metrics); committer.close(); - verify(executorServiceUpload).shutdown(); - verify(executorServiceCommit).shutdown(); verify(metrics).close(); + // Verify that executors are not shut down by the committer as management is external + verify(executorServiceUpload, times(0)).shutdown(); + verify(executorServiceCommit, times(0)).shutdown(); } @Test @@ -286,63 +296,63 @@ void constructorInvalidArguments() { new FileCommitter( BROKER_ID, null, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 100, Duration.ofMillis(1), 8)) + 100, Duration.ofMillis(1), executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("controlPlane cannot be null"); assertThatThrownBy(() -> new FileCommitter( BROKER_ID, controlPlane, null, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 100, Duration.ofMillis(1), 8)) + 100, Duration.ofMillis(1), executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("objectKeyCreator cannot be null"); assertThatThrownBy(() -> new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, null, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 100, Duration.ofMillis(1), 8)) + 100, Duration.ofMillis(1), executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("storage cannot be null"); assertThatThrownBy(() -> new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, null, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 100, Duration.ofMillis(1), 8)) + 100, Duration.ofMillis(1), executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("keyAlignmentStrategy cannot be null"); assertThatThrownBy(() -> new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, null, BATCH_COORDINATE_CACHE, time, - 100, Duration.ofMillis(1), 8)) + 100, Duration.ofMillis(1), executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("objectCache cannot be null"); assertThatThrownBy(() -> new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, null, time, - 100, Duration.ofMillis(1), 8)) + 100, Duration.ofMillis(1), executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("batchCoordinateCache cannot be null"); assertThatThrownBy(() -> new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, null, - 100, Duration.ofMillis(1), 8)) + 100, Duration.ofMillis(1), executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("time cannot be null"); assertThatThrownBy(() -> new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 0, Duration.ofMillis(1), 8)) + 0, Duration.ofMillis(1), executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("maxFileUploadAttempts must be positive"); assertThatThrownBy(() -> new FileCommitter( BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 100, null, 8)) + 100, null, executorServiceUpload, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("fileUploadRetryBackoff cannot be null"); assertThatThrownBy(() -> @@ -350,7 +360,7 @@ void constructorInvalidArguments() { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - null, executorServiceCommit, executorServiceCacheStore, metrics)) + null, executorServiceCommit, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("executorServiceUpload cannot be null"); assertThatThrownBy(() -> @@ -358,7 +368,7 @@ void constructorInvalidArguments() { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, null, executorServiceCacheStore, metrics)) + executorServiceUpload, null, metrics)) .isInstanceOf(NullPointerException.class) .hasMessage("executorServiceCommit cannot be null"); assertThatThrownBy(() -> @@ -366,23 +376,9 @@ void constructorInvalidArguments() { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, null, metrics)) - .isInstanceOf(NullPointerException.class) - .hasMessage("executorServiceCacheStore cannot be null"); - assertThatThrownBy(() -> - new FileCommitter( - BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, - KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, null)) + executorServiceUpload, executorServiceCommit, null)) .isInstanceOf(NullPointerException.class) .hasMessage("metrics cannot be null"); - assertThatThrownBy(() -> - new FileCommitter( - BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, - KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, - 3, Duration.ofMillis(1), 0)) // pool size has to be positive - .isInstanceOf(IllegalArgumentException.class); } @Test @@ -391,9 +387,59 @@ void commitNull() { BROKER_ID, controlPlane, OBJECT_KEY_CREATOR, storage, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, time, 3, Duration.ofMillis(100), - executorServiceUpload, executorServiceCommit, executorServiceCacheStore, metrics); + executorServiceUpload, executorServiceCommit, metrics); assertThatThrownBy(() -> committer.commit(null)) .isInstanceOf(NullPointerException.class) .hasMessage("file cannot be null"); } + + @Test + void throwsIfExecutorServiceCommitIsNotSingleThreaded() { + ExecutorService multiThreadedExecutor = Executors.newFixedThreadPool(2); + try { + assertThatThrownBy(() -> new FileCommitter( + BROKER_ID, + controlPlane, + OBJECT_KEY_CREATOR, + storage, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + time, + 1, + Duration.ofMillis(100), + executorServiceUpload, + multiThreadedExecutor, + metrics + )).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("executorServiceCommit must be a single-threaded ThreadPoolExecutor"); + } finally { + multiThreadedExecutor.shutdown(); + } + } + + @Test + void allowsSingleThreadedExecutorServiceCommit() { + ExecutorService singleThreadedExecutor = Executors.newFixedThreadPool(1); + try { + FileCommitter committer = new FileCommitter( + BROKER_ID, + controlPlane, + OBJECT_KEY_CREATOR, + storage, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + time, + 1, + Duration.ofMillis(100), + executorServiceUpload, + singleThreadedExecutor, + metrics + ); + assertThat(committer).isNotNull(); + } finally { + singleThreadedExecutor.shutdown(); + } + } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java index 92d90ada30..ddaec5ef68 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.storage.internals.log.LogConfig; @@ -43,6 +44,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -131,15 +133,33 @@ void test() throws ExecutionException, InterruptedException, TimeoutException, I ); controlPlane.createTopicAndPartitions(createTopicAndPartitionsRequests); + final var commitTickScheduler = Executors.newScheduledThreadPool(1); + final var fileUploadExecutor = Executors.newFixedThreadPool(8); + final var fileCommitExecutor = Executors.newFixedThreadPool(1); + try ( - final Writer writer = new Writer( - time, 11, ObjectKey.creator("", false), storage, - KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, BATCH_COORDINATE_CACHE, - controlPlane, Duration.ofMillis(10), - 10 * 1024, + final var fileCommitter = new FileCommitter( + 11, + controlPlane, + ObjectKey.creator("", false), + storage, + KEY_ALIGNMENT_STRATEGY, + OBJECT_CACHE, + BATCH_COORDINATE_CACHE, + time, 1, Duration.ofMillis(10), - 8, + fileUploadExecutor, + fileCommitExecutor, + new FileCommitterMetrics(time) + ); + final Writer writer = new Writer( + time, + Duration.ofMillis(10), + 10 * 1024, + commitTickScheduler, + fileCommitter, + new WriterMetrics(time), new BrokerTopicStats() ) ) { @@ -182,6 +202,10 @@ T1P0, new PartitionResponse(Errors.NONE, 103, ts1, 0) assertThat(result3).isEqualTo(Map.of( T1P0, new PartitionResponse(Errors.NONE, 103 + 13, ts2, 0) )); + } finally { + ThreadUtils.shutdownExecutorServiceQuietly(commitTickScheduler, 5, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutorServiceQuietly(fileUploadExecutor, 5, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutorServiceQuietly(fileCommitExecutor, 5, TimeUnit.SECONDS); } } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java index 1a22decfea..9e5a5075e1 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java @@ -47,8 +47,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import io.aiven.inkless.storage_backend.common.StorageBackend; - import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -88,8 +86,6 @@ static LogConfig logConfig(Map config) { @Mock ScheduledExecutorService commitTickScheduler; @Mock - StorageBackend storage; - @Mock FileCommitter fileCommitter; @Mock WriterMetrics writerMetrics; @@ -110,7 +106,7 @@ void setup() { @Test void tickWithEmptyFile() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.tick(); @@ -121,7 +117,7 @@ void tickWithEmptyFile() throws InterruptedException { @Test void tickIsScheduledWhenFileIsWrittenTo() { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100) @@ -136,7 +132,7 @@ void committingDueToOverfillWithFirstRequest() throws InterruptedException { when(time.nanoseconds()).thenReturn(10_000_000L); final Writer writer = new Writer( - time, Duration.ofMillis(1), 15908, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 15908, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100), @@ -156,7 +152,7 @@ void committingDueToOverfillWithFirstRequest() throws InterruptedException { @Test void committingDueToOverfillBeforeLastRequest() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -189,7 +185,7 @@ void committingDueToOverfillBeforeLastRequest() throws InterruptedException { @Test void committingDueToOverfillBeforeAfterLastRequest() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -220,7 +216,7 @@ void committingDueToOverfillBeforeAfterLastRequest() throws InterruptedException @Test void committingOnTick() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -241,7 +237,7 @@ void committingOnTick() throws InterruptedException { @Test void committingDueToClose() throws InterruptedException, IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 1), @@ -262,7 +258,7 @@ void committingDueToClose() throws InterruptedException, IOException { @Test void writeAfterRotation() throws InterruptedException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final Map writeRequest0 = Map.of( T0P0, recordCreator.create(T0P0.topicPartition(), 100), @@ -290,57 +286,53 @@ void writeAfterRotation() throws InterruptedException { @Test void close() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); reset(commitTickScheduler); writer.close(); - verify(commitTickScheduler).shutdownNow(); verify(fileCommitter).close(); + // Verify scheduler is not shutdown as it's managed externally. + verify(commitTickScheduler, times(0)).shutdownNow(); } @Test void closeAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); reset(fileCommitter); - reset(storage); writer.close(); verifyNoInteractions(commitTickScheduler); verifyNoInteractions(fileCommitter); - verifyNoInteractions(storage); } @Test void tickAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); reset(fileCommitter); - reset(storage); writer.tick(); verifyNoInteractions(commitTickScheduler); verifyNoInteractions(fileCommitter); - verifyNoInteractions(storage); } @Test void writeAfterClose() throws IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); writer.close(); reset(commitTickScheduler); reset(fileCommitter); - reset(storage); final var writeResult = writer.write(Map.of(T0P0, recordCreator.create(T0P0.topicPartition(), 10)), TOPIC_CONFIGS, REQUEST_LOCAL); @@ -352,13 +344,12 @@ void writeAfterClose() throws IOException { verifyNoInteractions(commitTickScheduler); verifyNoInteractions(fileCommitter); - verifyNoInteractions(storage); } @Test void commitInterrupted() throws InterruptedException, IOException { final Writer writer = new Writer( - time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); final InterruptedException interruptedException = new InterruptedException(); doThrow(interruptedException).when(fileCommitter).commit(any()); @@ -374,42 +365,40 @@ void commitInterrupted() throws InterruptedException, IOException { .hasRootCause(interruptedException); // Shutdown happens. - verify(commitTickScheduler).shutdownNow(); verify(fileCommitter).close(); + // Verify scheduler is not shutdown as it's managed externally. + verify(commitTickScheduler, times(0)).shutdownNow(); } @Test void constructorInvalidArguments() { assertThatThrownBy(() -> new Writer( - null, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + null, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("time cannot be null"); assertThatThrownBy(() -> new Writer( - time, null, 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + time, null, 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("commitInterval cannot be null"); assertThatThrownBy(() -> new Writer( - time, Duration.ofMillis(1), 0, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats)) + time, Duration.ofMillis(1), 0, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("maxBufferSize must be positive"); assertThatThrownBy(() -> - new Writer(time, Duration.ofMillis(1), 8 * 1024, null, storage, fileCommitter, writerMetrics, brokerTopicStats)) + new Writer(time, Duration.ofMillis(1), 8 * 1024, null, fileCommitter, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("commitTickScheduler cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, null, brokerTopicStats)) + assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, null, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("writerMetrics cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, null, writerMetrics, brokerTopicStats)) + assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, null, writerMetrics, brokerTopicStats)) .isInstanceOf(NullPointerException.class) .hasMessage("fileCommitter cannot be null"); - assertThatThrownBy(() -> new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, null, fileCommitter, writerMetrics, brokerTopicStats)) - .isInstanceOf(NullPointerException.class) - .hasMessage("storage cannot be null"); } @Test void writeNull() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(null, TOPIC_CONFIGS, REQUEST_LOCAL)) .isInstanceOf(NullPointerException.class) @@ -424,7 +413,7 @@ void writeNull() { @Test void writeEmptyRequests() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(Map.of(), TOPIC_CONFIGS, REQUEST_LOCAL)) .isInstanceOf(IllegalArgumentException.class) @@ -433,7 +422,7 @@ void writeEmptyRequests() { @Test void entriesTopicConfigMismatch() { - final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, storage, fileCommitter, writerMetrics, brokerTopicStats); + final Writer writer = new Writer(time, Duration.ofMillis(1), 8 * 1024, commitTickScheduler, fileCommitter, writerMetrics, brokerTopicStats); assertThatThrownBy(() -> writer.write(Map.of(T0P0, MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(new byte[10]))), Map.of(TOPIC_1, new LogConfig(Map.of())), REQUEST_LOCAL)) .isInstanceOf(IllegalArgumentException.class) diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java index e273fbbad8..b9707862d1 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java @@ -134,11 +134,10 @@ void testInMemoryControlPlane(@ForAll @IntRange(max = 100) int requestCount, @ForAll @IntRange(min = 10, max = 30) int uploadDurationAvg, @ForAll @IntRange(min = 5, max = 10) int commitDurationAvg, @ForAll @IntRange(min = 5, max = 10) int completeDurationAvg, - @ForAll @IntRange(min = 5, max = 10) int cacheStoreAvg, @ForAll @IntRange(min = 1, max = 1 * 1024) int maxBufferSize) throws Exception { try (final ControlPlane controlPlane = new InMemoryControlPlane(new MockTime(0, 0, 0))) { controlPlane.configure(Map.of()); - test(requestCount, requestIntervalMsAvg, commitIntervalMsAvg, uploadDurationAvg, commitDurationAvg, completeDurationAvg, cacheStoreAvg, maxBufferSize, controlPlane); + test(requestCount, requestIntervalMsAvg, commitIntervalMsAvg, uploadDurationAvg, commitDurationAvg, completeDurationAvg, maxBufferSize, controlPlane); } } @@ -150,7 +149,6 @@ void testPostgresControlPlane(@ForAll @IntRange(max = 100) int requestCount, @ForAll @IntRange(min = 10, max = 30) int uploadDurationAvg, @ForAll @IntRange(min = 5, max = 10) int commitDurationAvg, @ForAll @IntRange(min = 5, max = 10) int completeDurationAvg, - @ForAll @IntRange(min = 5, max = 10) int cacheStoreAvg, @ForAll @IntRange(min = 1, max = 1 * 1024) int maxBufferSize) throws Exception { String dbName = "test-" + requestCount + "-" + requestIntervalMsAvg @@ -170,7 +168,7 @@ void testPostgresControlPlane(@ForAll @IntRange(max = 100) int requestCount, "password", pgContainer.getPassword() )); - test(requestCount, requestIntervalMsAvg, commitIntervalMsAvg, uploadDurationAvg, commitDurationAvg, completeDurationAvg, cacheStoreAvg, maxBufferSize, controlPlane); + test(requestCount, requestIntervalMsAvg, commitIntervalMsAvg, uploadDurationAvg, commitDurationAvg, completeDurationAvg, maxBufferSize, controlPlane); } } @@ -180,7 +178,6 @@ void test(final int requestCount, final int uploadDurationAvg, final int commitDurationAvg, final int completeDurationAvg, - final int cacheStoreDurationAvg, final int maxBufferSize, final ControlPlane controlPlane) throws Exception @@ -204,7 +201,7 @@ void test(final int requestCount, ); final CommitterHandler committerHandler = new CommitterHandler( uploaderHandler, - new MockExecutorServiceWithFutureSupport(), + new MockSingleThreadedExecutorService(), new Timer("commit", time, Instant.ofEpochMilli(time.milliseconds()), @@ -218,14 +215,6 @@ void test(final int requestCount, Instant.ofEpochMilli(time.milliseconds()), Arbitraries.longs().between(completeDurationAvg - 2, completeDurationAvg + 2)) ); - final CacheStoreHandler cacheStoreHandler = new CacheStoreHandler( - uploaderHandler, - new MockExecutorServiceWithFutureSupport(), - new Timer("cacheStore", - time, - Instant.ofEpochMilli(time.milliseconds()), - Arbitraries.longs().between(cacheStoreDurationAvg - 2, cacheStoreDurationAvg + 2)) - ); try(final FileCommitter fileCommitter = new FileCommitter( 11, controlPlane, @@ -239,7 +228,6 @@ void test(final int requestCount, Duration.ZERO, uploaderHandler.executorService, committerHandler.executorService, - cacheStoreHandler.executorService, mock(FileCommitterMetrics.class) )) { @@ -248,7 +236,6 @@ void test(final int requestCount, Duration.ofMillis(commitIntervalMsAvg), // it doesn't matter as the scheduling doesn't happen maxBufferSize, mock(ScheduledExecutorService.class), - storage, fileCommitter, mock(WriterMetrics.class), new BrokerTopicStats() @@ -283,7 +270,6 @@ void test(final int requestCount, uploaderHandler.maybeRunNext(); committerHandler.maybeRunNext(); completerHandler.maybeRunNext(); - cacheStoreHandler.maybeRunNext(); time.sleep(1); } assertThat(finished).withFailMessage(String.format("Not finished in %d virtual ms", maxTime)).isTrue(); @@ -583,6 +569,68 @@ boolean runNextIfExists() throws InterruptedException { } } + /** + * A single-threaded ThreadPoolExecutor that can be controlled for testing. + * This meets FileCommitter's requirement for a ThreadPoolExecutor with corePoolSize=1 and maximumPoolSize=1. + */ + private static class MockSingleThreadedExecutorService extends java.util.concurrent.ThreadPoolExecutor { + final LinkedBlockingQueue> returnedFutures = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue taskQueue = new LinkedBlockingQueue<>(); + + MockSingleThreadedExecutorService() { + super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + } + + @Override + public void execute(final Runnable command) { + this.submit(command); + } + + @Override + public Future submit(final Runnable task) { + return this.submit(() -> { + task.run(); + return null; + }); + } + + @Override + public Future submit(final Callable task) { + final var result = new CompletableFuture(); + returnedFutures.offer(result); + taskQueue.offer(() -> { + try { + result.complete(task.call()); + } catch (final Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + boolean runNextIfExists() throws InterruptedException { + assertThat(returnedFutures.size()).isEqualTo(taskQueue.size()); + final Runnable nextRunnable = taskQueue.poll(); + if (nextRunnable != null) { + nextRunnable.run(); + assert returnedFutures.take().isDone(); + return true; + } else { + return false; + } + } + + @Override + public void shutdown() { + // No-op for testing + } + + @Override + public List shutdownNow() { + return List.of(); + } + } + private static class UploaderHandler { private final MockExecutorServiceWithFutureSupport executorService; private final Timer timer; @@ -608,11 +656,11 @@ boolean oldestFutureIsDone() { private static class CommitterHandler { private final UploaderHandler uploaderHandler; - private final MockExecutorServiceWithFutureSupport executorService; + private final MockSingleThreadedExecutorService executorService; private final Timer timer; private CommitterHandler(final UploaderHandler uploaderHandler, - final MockExecutorServiceWithFutureSupport executorService, + final MockSingleThreadedExecutorService executorService, final Timer timer) { this.uploaderHandler = uploaderHandler; this.executorService = executorService;