Skip to content

Conversation

@XorSum
Copy link
Contributor

@XorSum XorSum commented Dec 5, 2025

Which issue does this PR close?

Closes #1693.

Rationale for this change

As discussed previously in #1693 and #1694, the join operation should check batch size and trigger flushing in a timely manner, to prevent extreme large batch size.

What changes are included in this PR?

Are there any user-facing changes?

How was this patch tested?

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses issue #1693 by refactoring the sort-merge join (SMJ) implementation to check batch size and flush more frequently when processing joins with duplicate keys. The goal is to prevent excessive memory usage by ensuring timely flushing of output batches during cartesian product generation.

Key Changes

  • Simplified the duplicate key handling algorithm by collecting all duplicate indices from both sides before generating the cartesian product
  • Added flush checks inside the cartesian product generation loop (instead of only after collecting all duplicates)
  • Removed the complex streaming logic that separately handled left and right side streaming

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 163 to 164
if self.should_flush() {
self.as_mut().flush(cur1, cur2).await?;
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush condition is incomplete and missing cleanup calls that are critical for memory management during joins with many duplicate keys:

  1. Missing check for cur1.num_buffered_batches() > 1 || cur2.num_buffered_batches() > 1 (present at lines 99-102)
  2. Missing calls to clean_out_dated_batches() after flushing (used consistently elsewhere at lines 104-105, 174-175, 186, 196)

Without these, memory will accumulate unbounded during cartesian product generation when there are many duplicate keys, defeating the purpose of this PR.

Should be:

if self.should_flush() || cur1.num_buffered_batches() > 1 || cur2.num_buffered_batches() > 1 {
    self.as_mut().flush(cur1, cur2).await?;
    cur1.clean_out_dated_batches();
    cur2.clean_out_dated_batches();
}
Suggested change
if self.should_flush() {
self.as_mut().flush(cur1, cur2).await?;
if self.should_flush()
|| cur1.num_buffered_batches() > 1
|| cur2.num_buffered_batches() > 1
{
self.as_mut().flush(cur1, cur2).await?;
cur1.clean_out_dated_batches();
cur2.clean_out_dated_batches();

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not call cur1 .clean_out_dated_batches(), because doing so will break the index correlation between cur1 and equal_lindices.

@xumingming
Copy link
Contributor

The change seems too much for me. I was thinking just split the loop here into small batches is enough?

for (&lidx, &ridx) in equal_lindices.iter().cartesian_product(&equal_rindices) {
self.lindices.push(lidx);
self.rindices.push(ridx);
}

@XorSum
Copy link
Contributor Author

XorSum commented Dec 6, 2025

The change seems too much for me. I was thinking just split the loop here into small batches is enough?

done

@xumingming
Copy link
Contributor

@XorSum I think a test that could reproduce your issue should be added to confirm it is fixed with the new code.

for (&lidx, &ridx) in equal_lindices.iter().cartesian_product(&equal_rindices) {
self.lindices.push(lidx);
self.rindices.push(ridx);
for &lidx in &equal_lindices {
Copy link
Contributor

@xumingming xumingming Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

// Choose a better name.
fn has_enough_room(&self, new_size: usize) -> bool {
  self.lindices.len() + new_size < self.join_params.batch_size
}

let new_size = equal_lindices.len() * equal_rindices.len()
if (self.has_enough_room(self, new_size)) {
     // old cartesian_product way
} else {
    // do more aggressive flush
}

And don't forget to clear the outdated batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added this logic and unit test. But we should not clear the outdated batches here.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs:201

  • Inconsistent flushing strategy: The aggressive flushing logic added for the cartesian product (lines 177-186) is not applied here in the "stream right side" path. When equal_lindices is large, this loop can add many elements before checking should_flush() at line 197, potentially creating batches much larger than batch_size.

Consider adding a flush check inside the inner loop (after line 194), similar to the approach used in lines 177-186, to prevent extreme batch sizes when streaming with duplicated keys.

                    if r_equal {
                        // stream right side
                        while !cur2.finished() && cur2.cur_key() == cur1.key(l_key_idx) {
                            for &lidx in &equal_lindices {
                                self.lindices.push(lidx);
                                self.rindices.push(cur2.cur_idx());
                            }
                            cur_forward!(cur2);
                            if self.should_flush() || cur2.num_buffered_batches() > 1 {
                                self.as_mut().flush(cur1, cur2).await?;
                                cur2.clean_out_dated_batches();
                            }
                        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

XorSum and others added 2 commits December 8, 2025 11:20
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@cxzl25 cxzl25 requested a review from richox December 10, 2025 04:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Arrow Vector OversizedAllocationException when processing large batches

2 participants