-
Notifications
You must be signed in to change notification settings - Fork 6
refactor(inkless): consolidate executor management in SharedState #468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
2691b9d to
1764bf3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors executor management by consolidating all ExecutorService creation and lifecycle management into SharedState, eliminating the Single Responsibility Principle violations where Reader, Writer, and FileCommitter each managed their own thread pools. The refactoring also removes the unnecessary CacheStoreJob and moves cache updates inline within FileCommitter's commit completion callback.
Key Changes:
- Centralized executor creation in SharedState for fetch operations (metadata/data), file operations (upload/commit), and commit ticking
- Removed CacheStoreJob class and its dedicated executor; cache updates now happen inline after successful commits
- Updated Reader, Writer, and FileCommitter constructors to accept externally-managed executors instead of creating their own
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| SharedState.java | Creates and manages all broker-scoped executors; added shutdown logic and thread pool monitoring |
| FileCommitter.java | Removed cache store executor parameter; added inline cache update logic and single-thread validation |
| Writer.java | Removed internal scheduler creation; now accepts externally-managed ScheduledExecutorService |
| Reader.java | Removed internal executor creation; now accepts externally-managed metadata and data executors |
| CacheStoreJob.java | Deleted; functionality moved inline to FileCommitter's commit completion handler |
| AppendHandler.java | Updated to wire FileCommitter with executors from SharedState |
| FetchHandler.java | Updated to pass executors from SharedState to Reader constructor |
| WriterIntegrationTest.java | Manually creates and shuts down executors since SharedState is not used |
| WriterMockedTest.java | Updated assertions to verify executors are NOT shutdown by Writer.close() |
| FileCommitterTest.java | Added tests for single-threaded executor validation; removed cache store executor |
| ReaderTest.java | Updated assertions to verify executors are NOT shutdown by Reader.close() |
| WriterPropertyTest.java | Removed CacheStoreHandler since cache updates are now inline |
| AppendHandlerTest.java | Added helper method to create SharedState via initialize() |
| DeleteRecordsInterceptorTest.java | Added helper method to create SharedState via initialize() |
| FileMergerMockedTest.java | Updated to use SharedState.initialize() with required config parameters |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java
Show resolved
Hide resolved
d20fe7c to
95608a8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
cd7fb3e to
190a9e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
f96aa70 to
f9a30b2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java
Outdated
Show resolved
Hide resolved
52a1699 to
bed54fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java:193
- The scheduledTick future should be cancelled in the close() method before closing resources. If a tick is scheduled but hasn't executed yet, it may fire after the Writer is closed, potentially causing issues. Consider cancelling the scheduledTick before setting closed to true.
public void close() throws IOException {
lock.lock();
try {
if (closed) {
return;
}
closed = true;
// Rotate file before closing the uploader so the file gets into the queue first.
rotateFile(true);
fileCommitter.close();
writerMetrics.close();
} finally {
lock.unlock();
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
a5d07a1 to
9dc496e
Compare
Moves all ExecutorService creation and lifecycle management from Reader, Writer, and FileCommitter to SharedState, establishing a centralized pattern for managing broker-scoped thread pools. This refactoring also eliminates an unnecessary executor (cache store) and fixes a resource leak bug. Previously, Reader, Writer, and FileCommitter each created and managed their own thread pools, violating the Single Responsibility Principle: - **Reader**: Created 2 fixed thread pools for fetch operations - **Writer**: Created 1 scheduled thread pool for commit ticking - **FileCommitter**: Created 3 thread pools (upload, commit, cache store) Instead of having the CacheStoreThreadPool, reuse the existing upload thread pool for caching.
9dc496e to
a729102
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| disklessTopics.foreach(t => when(inklessMetadata.isDisklessTopic(t)).thenReturn(true)) | ||
| when(sharedState.metadata()).thenReturn(inklessMetadata) | ||
| when(sharedState.produceCommitExecutor()).thenReturn(Executors.newFixedThreadPool(1)) |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock returns a new executor that is never shut down, creating a resource leak in tests. This executor should either be tracked and cleaned up in test teardown, or the mock should return a mock ExecutorService instead of a real one.
| sharedState = SharedState.initialize( | ||
| time, | ||
| BROKER_ID, | ||
| inklessConfig, | ||
| mock(MetadataView.class), | ||
| controlPlane, | ||
| mock(BrokerTopicStats.class), | ||
| () -> LogConfig.fromProps(Map.of(), new Properties()) | ||
| ); | ||
| } |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SharedState instance created in the setup is not closed in tearDown, causing a resource leak. The SharedState owns executors and other resources that need to be properly shut down. Add sharedState.close() to the tearDown method.
| private SharedState getSharedState() { | ||
| return SharedState.initialize( | ||
| time, | ||
| BROKER_ID, | ||
| new InklessConfig(Map.of()), | ||
| metadataView, | ||
| controlPlane, | ||
| brokerTopicStats, | ||
| DEFAULT_TOPIC_CONFIGS | ||
| ); | ||
| } |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each call to getSharedState() creates a new SharedState with executors and other resources that are never closed. Since SharedState is passed to AppendHandler which is closed via try-with-resources, the SharedState resources leak. Consider creating SharedState once in a setup method and closing it in tearDown, or make AppendHandler responsible for closing SharedState when it's closed.
| private SharedState getSharedState() { | ||
| return SharedState.initialize( | ||
| time, | ||
| BROKER_ID, | ||
| disklessConfig, | ||
| new InklessConfig(Map.of()), | ||
| metadataView, | ||
| controlPlane, | ||
| OBJECT_KEY_CREATOR, | ||
| KEY_ALIGNMENT_STRATEGY, | ||
| OBJECT_CACHE, | ||
| BATCH_COORDINATE_CACHE, | ||
| brokerTopicStats, | ||
| DEFAULT_TOPIC_CONFIGS | ||
| ); | ||
| final DeleteRecordsInterceptor interceptor = new DeleteRecordsInterceptor(state); | ||
| } |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each call to getSharedState() creates a new SharedState with executors and other resources that are never closed, causing resource leaks. The DeleteRecordsInterceptor does not own the SharedState and doesn't close it. Consider creating SharedState once in a setup method and closing it in tearDown, or ensure the interceptor or test properly cleans up these resources.
| mock(BrokerTopicStats.class), mock(Supplier.class)); | ||
| when(inklessConfig.fetchMetadataThreadPoolSize()).thenReturn(1); | ||
| when(inklessConfig.fetchDataThreadPoolSize()).thenReturn(1); | ||
| when(inklessConfig.produceUploadThreadPoolSize()).thenReturn(1); |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock InklessConfig is missing required method stubs. SharedState.initialize() calls methods like cacheExpirationLifespanSec(), cacheExpirationMaxIdleSec(), isBatchCoordinateCacheEnabled(), batchCoordinateCacheTtl(), fetchCacheBlockBytes(), and objectKeyLogPrefixMasked() which are not mocked. This will cause NullPointerExceptions or unexpected behavior when SharedState.initialize() is called. Add the missing when() stubs for these config methods.
| when(inklessConfig.produceUploadThreadPoolSize()).thenReturn(1); | |
| when(inklessConfig.produceUploadThreadPoolSize()).thenReturn(1); | |
| when(inklessConfig.cacheExpirationLifespanSec()).thenReturn(3600L); | |
| when(inklessConfig.cacheExpirationMaxIdleSec()).thenReturn(600L); | |
| when(inklessConfig.isBatchCoordinateCacheEnabled()).thenReturn(true); | |
| when(inklessConfig.batchCoordinateCacheTtl()).thenReturn(Duration.ofMinutes(10)); | |
| when(inklessConfig.fetchCacheBlockBytes()).thenReturn(1024 * 1024); | |
| when(inklessConfig.objectKeyLogPrefixMasked()).thenReturn("prefix"); |
| // (multithreaded executor causing ordering violations) vs theoretical misconfiguration | ||
| // that would be immediately obvious. |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "validation prevents the real bug (multithreaded executor causing ordering violations) vs theoretical misconfiguration that would be immediately obvious." The phrase "vs theoretical misconfiguration" is unclear. Consider rephrasing to "over theoretical misconfiguration" or "compared to theoretical misconfiguration" for better clarity.
| // (multithreaded executor causing ordering violations) vs theoretical misconfiguration | |
| // that would be immediately obvious. | |
| // (multithreaded executor causing ordering violations) compared to a theoretical | |
| // misconfiguration that would be immediately obvious. |
Moves all ExecutorService creation and lifecycle management from Reader, Writer, and FileCommitter to SharedState, establishing a centralized pattern for managing broker-scoped thread pools.
This refactoring also eliminates an unnecessary executor (cache store) and fixes a resource leak bug.
Previously, Reader, Writer, and FileCommitter each created and managed their own thread pools, violating the Single Responsibility Principle:
Instead of having the CacheStoreThreadPool, reuse the existing upload thread pool for caching.