-
Notifications
You must be signed in to change notification settings - Fork 282
Adding Partition Execution Logging for DimensionalTimeSliceCrawler #6362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for this! Just want to raise that sometimes the Total partitions created in this crawl: 0.0 shows 0 when it actually makes partitions if you look at the metric. Not sure why.
dfc1690 to
8badf25
Compare
| @Override | ||
| public void executePartition(DimensionalTimeSliceWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) { | ||
| log.info("Processing partition - DimensionType: {}, TimeRange: {} to {}, ProcessingTime: {}", | ||
| state.getDimensionType(), state.getStartTime(), state.getEndTime(), Instant.now()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log.info has the now timestamp. I don't think the last one is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a super noisy log. Do we need to log this? if yes then may be change it to log.debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the partitions are small, I do think this is too noisy, even if it's helpful. debug makes more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log debug makes sense, we also want to add this logging to another crawler. I will make sure both are debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think log.info is necessary to help us debug. Otherwise it would not be as helpful.
1 log line per minute might not be too bad.
Are you ok we make each partition duration longer like 2 minutes and change to log.info?
dcc52e6 to
2ede0d9
Compare
| @Override | ||
| public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) { | ||
| String firstToken = !state.getItemIds().isEmpty() ? state.getItemIds().get(0) : ""; | ||
| String lastToken = !state.getItemIds().isEmpty() ? state.getItemIds().get(state.getItemIds().size()-1) : ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add necessary log information calling out this is for RetryPartition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do!
| private void processBatch(List<ItemInfo> batch, | ||
| LeaderPartition leaderPartition, | ||
| EnhancedSourceCoordinator coordinator) { | ||
| String firstToken = !batch.isEmpty() ? batch.get(0).getItemId() : "(No First Token in this batch)"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look at the LeaderOnlyTokenCrawler design. The main processing is in Leader Thread crawl function.
Please log the lastToken in https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawler.java#L82 so the system record necessary information before an process.
We don't record it in processing so the system can catch error when encountering the failure when retrieving the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2ede0d9 to
d68311b
Compare
d68311b to
3135327
Compare
| Instant executionStart = Instant.now(); | ||
| partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet)); | ||
|
|
||
| // Enforce 2 minute minimum execution duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't recall we do this any where. In addition, we don't want to add 2 min delay inside partition execution because it blocks the thread and prevent it from doing other things.
There is already 5 min delay added before creating the partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on offline discussion, I misunderstood the previous comment #6362 (comment)
3135327 to
5ac6802
Compare
5ac6802 to
2e606c4
Compare
|
|
||
| @Override | ||
| public void executePartition(DimensionalTimeSliceWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) { | ||
| log.info("Processing partition - DimensionType: {}, TimeRange: {} to {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this log is useful, it is somewhat expensive since it will be 1 log per minute
|
@chrisale000 Seems like the commit is not signed. Please fix it |
910037e to
b150055
Compare
Signed-off-by: Alexander Christensen <alchrisk@amazon.com>
b150055 to
eaa551d
Compare

Description
Implemented enhanced partition logging to facilitate debugging and observability of partition processing workflows in DimensionalTimeSliceCrawler and added a 2 minute limitter for partition batch processing.
Testing
M365 Pipeline Validation
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.