Skip to content

Conversation

@jeqo
Copy link
Contributor

@jeqo jeqo commented Dec 23, 2025

Moves all ExecutorService creation and lifecycle management from Reader, Writer, and FileCommitter to SharedState, establishing a centralized pattern for managing broker-scoped thread pools.
This refactoring also eliminates an unnecessary executor (cache store) and fixes a resource leak bug.

Previously, Reader, Writer, and FileCommitter each created and managed their own thread pools, violating the Single Responsibility Principle:

  • Reader: Created 2 fixed thread pools for fetch operations
  • Writer: Created 1 scheduled thread pool for commit ticking
  • FileCommitter: Created 3 thread pools (upload, commit, cache store)

Instead of having the CacheStoreThreadPool, reuse the existing upload thread pool for caching.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors executor management by consolidating all ExecutorService creation and lifecycle management into SharedState, eliminating the Single Responsibility Principle violations where Reader, Writer, and FileCommitter each managed their own thread pools. The refactoring also removes the unnecessary CacheStoreJob and moves cache updates inline within FileCommitter's commit completion callback.

Key Changes:

  • Centralized executor creation in SharedState for fetch operations (metadata/data), file operations (upload/commit), and commit ticking
  • Removed CacheStoreJob class and its dedicated executor; cache updates now happen inline after successful commits
  • Updated Reader, Writer, and FileCommitter constructors to accept externally-managed executors instead of creating their own

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
SharedState.java Creates and manages all broker-scoped executors; added shutdown logic and thread pool monitoring
FileCommitter.java Removed cache store executor parameter; added inline cache update logic and single-thread validation
Writer.java Removed internal scheduler creation; now accepts externally-managed ScheduledExecutorService
Reader.java Removed internal executor creation; now accepts externally-managed metadata and data executors
CacheStoreJob.java Deleted; functionality moved inline to FileCommitter's commit completion handler
AppendHandler.java Updated to wire FileCommitter with executors from SharedState
FetchHandler.java Updated to pass executors from SharedState to Reader constructor
WriterIntegrationTest.java Manually creates and shuts down executors since SharedState is not used
WriterMockedTest.java Updated assertions to verify executors are NOT shutdown by Writer.close()
FileCommitterTest.java Added tests for single-threaded executor validation; removed cache store executor
ReaderTest.java Updated assertions to verify executors are NOT shutdown by Reader.close()
WriterPropertyTest.java Removed CacheStoreHandler since cache updates are now inline
AppendHandlerTest.java Added helper method to create SharedState via initialize()
DeleteRecordsInterceptorTest.java Added helper method to create SharedState via initialize()
FileMergerMockedTest.java Updated to use SharedState.initialize() with required config parameters

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/thread-pool-refactor branch 4 times, most recently from d20fe7c to 95608a8 Compare December 24, 2025 00:10
@jeqo jeqo requested a review from Copilot December 24, 2025 00:11
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/thread-pool-refactor branch 3 times, most recently from cd7fb3e to 190a9e1 Compare December 24, 2025 00:46
@jeqo jeqo requested a review from Copilot December 24, 2025 00:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/thread-pool-refactor branch 2 times, most recently from 52a1699 to bed54fa Compare December 24, 2025 07:48
@jeqo jeqo requested a review from Copilot December 24, 2025 07:50
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java:193

  • The scheduledTick future should be cancelled in the close() method before closing resources. If a tick is scheduled but hasn't executed yet, it may fire after the Writer is closed, potentially causing issues. Consider cancelling the scheduledTick before setting closed to true.
    public void close() throws IOException {
        lock.lock();
        try {
            if (closed) {
                return;
            }
            closed = true;
            // Rotate file before closing the uploader so the file gets into the queue first.
            rotateFile(true);
            fileCommitter.close();
            writerMetrics.close();
        } finally {
            lock.unlock();
        }
    }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/thread-pool-refactor branch 2 times, most recently from a5d07a1 to 9dc496e Compare December 24, 2025 08:40
Moves all ExecutorService creation and lifecycle management from Reader,
Writer, and FileCommitter to SharedState, establishing a centralized
pattern for managing broker-scoped thread pools.
This refactoring also eliminates an unnecessary executor (cache store)
and fixes a resource leak bug.

Previously, Reader, Writer, and FileCommitter each created and managed
their own thread pools, violating the Single Responsibility Principle:
- **Reader**: Created 2 fixed thread pools for fetch operations
- **Writer**: Created 1 scheduled thread pool for commit ticking
- **FileCommitter**: Created 3 thread pools (upload, commit, cache
store)

Instead of having the CacheStoreThreadPool, reuse the existing upload
thread pool for caching.
@jeqo jeqo force-pushed the jeqo/thread-pool-refactor branch from 9dc496e to a729102 Compare December 24, 2025 09:16
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}
disklessTopics.foreach(t => when(inklessMetadata.isDisklessTopic(t)).thenReturn(true))
when(sharedState.metadata()).thenReturn(inklessMetadata)
when(sharedState.produceCommitExecutor()).thenReturn(Executors.newFixedThreadPool(1))
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock returns a new executor that is never shut down, creating a resource leak in tests. This executor should either be tracked and cleaned up in test teardown, or the mock should return a mock ExecutorService instead of a real one.

Copilot uses AI. Check for mistakes.
Comment on lines +126 to 135
sharedState = SharedState.initialize(
time,
BROKER_ID,
inklessConfig,
mock(MetadataView.class),
controlPlane,
mock(BrokerTopicStats.class),
() -> LogConfig.fromProps(Map.of(), new Properties())
);
}
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SharedState instance created in the setup is not closed in tearDown, causing a resource leak. The SharedState owns executors and other resources that need to be properly shut down. Add sharedState.close() to the tearDown method.

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +109
private SharedState getSharedState() {
return SharedState.initialize(
time,
BROKER_ID,
new InklessConfig(Map.of()),
metadataView,
controlPlane,
brokerTopicStats,
DEFAULT_TOPIC_CONFIGS
);
}
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each call to getSharedState() creates a new SharedState with executors and other resources that are never closed. Since SharedState is passed to AppendHandler which is closed via try-with-resources, the SharedState resources leak. Consider creating SharedState once in a setup method and closing it in tearDown, or make AppendHandler responsible for closing SharedState when it's closed.

Copilot uses AI. Check for mistakes.
Comment on lines +84 to +94
private SharedState getSharedState() {
return SharedState.initialize(
time,
BROKER_ID,
disklessConfig,
new InklessConfig(Map.of()),
metadataView,
controlPlane,
OBJECT_KEY_CREATOR,
KEY_ALIGNMENT_STRATEGY,
OBJECT_CACHE,
BATCH_COORDINATE_CACHE,
brokerTopicStats,
DEFAULT_TOPIC_CONFIGS
);
final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state);
}
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each call to getSharedState() creates a new SharedState with executors and other resources that are never closed, causing resource leaks. The DeleteRecordsInterceptor does not own the SharedState and doesn't close it. Consider creating SharedState once in a setup method and closing it in tearDown, or ensure the interceptor or test properly cleans up these resources.

Copilot uses AI. Check for mistakes.
mock(BrokerTopicStats.class), mock(Supplier.class));
when(inklessConfig.fetchMetadataThreadPoolSize()).thenReturn(1);
when(inklessConfig.fetchDataThreadPoolSize()).thenReturn(1);
when(inklessConfig.produceUploadThreadPoolSize()).thenReturn(1);
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock InklessConfig is missing required method stubs. SharedState.initialize() calls methods like cacheExpirationLifespanSec(), cacheExpirationMaxIdleSec(), isBatchCoordinateCacheEnabled(), batchCoordinateCacheTtl(), fetchCacheBlockBytes(), and objectKeyLogPrefixMasked() which are not mocked. This will cause NullPointerExceptions or unexpected behavior when SharedState.initialize() is called. Add the missing when() stubs for these config methods.

Suggested change
when(inklessConfig.produceUploadThreadPoolSize()).thenReturn(1);
when(inklessConfig.produceUploadThreadPoolSize()).thenReturn(1);
when(inklessConfig.cacheExpirationLifespanSec()).thenReturn(3600L);
when(inklessConfig.cacheExpirationMaxIdleSec()).thenReturn(600L);
when(inklessConfig.isBatchCoordinateCacheEnabled()).thenReturn(true);
when(inklessConfig.batchCoordinateCacheTtl()).thenReturn(Duration.ofMinutes(10));
when(inklessConfig.fetchCacheBlockBytes()).thenReturn(1024 * 1024);
when(inklessConfig.objectKeyLogPrefixMasked()).thenReturn("prefix");

Copilot uses AI. Check for mistakes.
Comment on lines +194 to +195
// (multithreaded executor causing ordering violations) vs theoretical misconfiguration
// that would be immediately obvious.
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "validation prevents the real bug (multithreaded executor causing ordering violations) vs theoretical misconfiguration that would be immediately obvious." The phrase "vs theoretical misconfiguration" is unclear. Consider rephrasing to "over theoretical misconfiguration" or "compared to theoretical misconfiguration" for better clarity.

Suggested change
// (multithreaded executor causing ordering violations) vs theoretical misconfiguration
// that would be immediately obvious.
// (multithreaded executor causing ordering violations) compared to a theoretical
// misconfiguration that would be immediately obvious.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants