-
Notifications
You must be signed in to change notification settings - Fork 198
[AURON #1693] join operation should flush in time on duplicated keys #1701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR addresses issue #1693 by refactoring the sort-merge join (SMJ) implementation to check batch size and flush more frequently when processing joins with duplicate keys. The goal is to prevent excessive memory usage by ensuring timely flushing of output batches during cartesian product generation.
Key Changes
- Simplified the duplicate key handling algorithm by collecting all duplicate indices from both sides before generating the cartesian product
- Added flush checks inside the cartesian product generation loop (instead of only after collecting all duplicates)
- Removed the complex streaming logic that separately handled left and right side streaming
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if self.should_flush() { | ||
| self.as_mut().flush(cur1, cur2).await?; |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flush condition is incomplete and missing cleanup calls that are critical for memory management during joins with many duplicate keys:
- Missing check for
cur1.num_buffered_batches() > 1 || cur2.num_buffered_batches() > 1(present at lines 99-102) - Missing calls to
clean_out_dated_batches()after flushing (used consistently elsewhere at lines 104-105, 174-175, 186, 196)
Without these, memory will accumulate unbounded during cartesian product generation when there are many duplicate keys, defeating the purpose of this PR.
Should be:
if self.should_flush() || cur1.num_buffered_batches() > 1 || cur2.num_buffered_batches() > 1 {
self.as_mut().flush(cur1, cur2).await?;
cur1.clean_out_dated_batches();
cur2.clean_out_dated_batches();
}| if self.should_flush() { | |
| self.as_mut().flush(cur1, cur2).await?; | |
| if self.should_flush() | |
| || cur1.num_buffered_batches() > 1 | |
| || cur2.num_buffered_batches() > 1 | |
| { | |
| self.as_mut().flush(cur1, cur2).await?; | |
| cur1.clean_out_dated_batches(); | |
| cur2.clean_out_dated_batches(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not call cur1 .clean_out_dated_batches(), because doing so will break the index correlation between cur1 and equal_lindices.
|
The change seems too much for me. I was thinking just split the loop here into small batches is enough? auron/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs Lines 163 to 166 in 4f015cf
|
done |
|
@XorSum I think a test that could reproduce your issue should be added to confirm it is fixed with the new code. |
| for (&lidx, &ridx) in equal_lindices.iter().cartesian_product(&equal_rindices) { | ||
| self.lindices.push(lidx); | ||
| self.rindices.push(ridx); | ||
| for &lidx in &equal_lindices { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
// Choose a better name.
fn has_enough_room(&self, new_size: usize) -> bool {
self.lindices.len() + new_size < self.join_params.batch_size
}
let new_size = equal_lindices.len() * equal_rindices.len()
if (self.has_enough_room(self, new_size)) {
// old cartesian_product way
} else {
// do more aggressive flush
}
And don't forget to clear the outdated batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added this logic and unit test. But we should not clear the outdated batches here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs:201
- Inconsistent flushing strategy: The aggressive flushing logic added for the cartesian product (lines 177-186) is not applied here in the "stream right side" path. When
equal_lindicesis large, this loop can add many elements before checkingshould_flush()at line 197, potentially creating batches much larger thanbatch_size.
Consider adding a flush check inside the inner loop (after line 194), similar to the approach used in lines 177-186, to prevent extreme batch sizes when streaming with duplicated keys.
if r_equal {
// stream right side
while !cur2.finished() && cur2.cur_key() == cur1.key(l_key_idx) {
for &lidx in &equal_lindices {
self.lindices.push(lidx);
self.rindices.push(cur2.cur_idx());
}
cur_forward!(cur2);
if self.should_flush() || cur2.num_buffered_batches() > 1 {
self.as_mut().flush(cur1, cur2).await?;
cur2.clean_out_dated_batches();
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Which issue does this PR close?
Closes #1693.
Rationale for this change
As discussed previously in #1693 and #1694, the join operation should check batch size and trigger flushing in a timely manner, to prevent extreme large batch size.
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?