-
Notifications
You must be signed in to change notification settings - Fork 268
Fix Iceberg data corruption when meets null in the condition in merge process #14031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Chong Gao <res_life@163.com>
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @res-life for this fix!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Overview
This PR fixes a critical data corruption bug in Iceberg MERGE operations when NULL values appear in merge conditions. The issue occurs when cond.and(mask) produces NULL values, which then propagate through the logic and cause rows to be incorrectly skipped or lost.
Changes
GpuMergeRowsExec.scala
- Added Scalar import: Required for creating boolean scalars
- Fixed NULL handling in instruction condition processing:
- When evaluating merge instruction conditions, if the condition evaluates to NULL, the result of
cond.and(mask)can be NULL - The fix explicitly replaces these NULLs with FALSE before computing the next mask
- This ensures NULL conditions are correctly treated as "condition not satisfied" (FALSE)
- The nextMask calculation then correctly includes these rows for processing by subsequent instructions
- When evaluating merge instruction conditions, if the condition evaluates to NULL, the result of
integration_tests/src/main/python/iceberg/init.py
- Enabled NULL values in test data: Changed
_c2and_c3columns from non-nullable to nullable- This allows iceberg tests to cover NULL values in conditions, which is essential for verifying the NULL handling fix
- Uses default
int_genandlong_gen(withnullable=True) instead of explicitly non-nullable versions
Technical Details
The Bug
In the original code:
- When a merge condition is NULL:
cond.and(mask)→ NULL - Computing NOT(NULL) → NULL
- So
nextMask = NULL AND mask→ NULL - Rows with NULL conditions were lost from processing
The Fix
- Replace NULLs in
condMaskwith FALSE explicitly - This makes NULL conditions semantically equivalent to FALSE (both mean "condition not satisfied")
- The subsequent NOT operation correctly produces TRUE for these rows
- They're then included in nextMask for processing by the next instruction
Quality Assessment
- Fix is logically correct and handles all three-valued logic cases properly
- Resource management is correct (proper use of withResource, closeOnExcept patterns)
- Code follows existing patterns in the codebase
- Comments clearly explain the NULL handling logic
Confidence Score: 5/5
- This PR is safe to merge with high confidence - it fixes a critical NULL handling bug without introducing new risks.
- Score of 5 reflects: (1) The fix correctly addresses a critical data corruption issue by properly handling NULL values in merge conditions; (2) All three SQL three-valued logic cases (TRUE, FALSE, NULL) are correctly handled; (3) Resource management follows established patterns in the codebase with proper withResource and closeOnExcept usage; (4) The import statement addition is correct and minimal; (5) Test coverage is improved by enabling NULL values in test data; (6) No issues found during comprehensive code review including edge cases and resource cleanup.
- No files require special attention - all changes are correct and well-implemented.
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/GpuMergeRowsExec.scala | 5/5 | Fixed critical NULL handling bug in merge instruction processing. Added proper NULL-to-FALSE replacement when evaluating conditions, ensuring NULL conditions are correctly treated as unsatisfied. Import statement updated to include Scalar. Resource management is correct with proper withResource and closeOnExcept patterns. |
| integration_tests/src/main/python/iceberg/init.py | 5/5 | Updated iceberg test data generators to allow NULL values in _c2 (int) and _c3 (long) columns by using default int_gen and long_gen (nullable=True) instead of explicitly non-nullable versions. This improves test coverage for the NULL handling fix in merge operations. |
Sequence Diagram
sequenceDiagram
participant Instruction as Merge<br/>Instruction
participant Condition as Condition<br/>Eval
participant Logic as AND/NOT<br/>Operations
participant Filter as Row<br/>Filter
rect rgb(200, 220, 255)
Note over Condition,Logic: FIX: Handle NULL in Conditions
end
Instruction->>Condition: evaluateCondition(batch)
Condition-->>Instruction: cond (may contain NULL)
rect rgb(200, 255, 200)
Note over Logic: NEW: Replace NULLs with FALSE
end
Instruction->>Logic: cond.getBase.and(mask)
Logic-->>Instruction: condMask (with NULL values)
Instruction->>Logic: condMask.replaceNulls(FALSE)
Logic-->>Instruction: condMask (NULLs → FALSE)
Instruction->>Logic: condMask.not()
Logic-->>Instruction: notCondMask
Instruction->>Logic: mask.and(notCondMask)
Logic-->>Instruction: nextMask
rect rgb(255, 220, 200)
Note over Filter: Use condMask for current instruction<br/>Use nextMask for next instruction
end
Instruction->>Filter: GpuColumnVector.filter(batch, condMask)
Filter-->>Instruction: filtered rows
Note over Instruction: Pass nextMask to<br/>next instruction
Now that the null handling fix is in place, remove: - iceberg_table_gen_for_fix and related variables from __init__.py - xfail marker from test_iceberg_merge_additional_patterns_bug - _for_fix variable usage in test Fixes NVIDIA#14030 Signed-off-by: Chong Gao <res_life@163.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR fixes a critical data corruption bug in Iceberg MERGE operations when NULL values are present in conditional expressions.
Key Changes:
- Scala fix: Properly handles NULL values in MERGE conditions by replacing them with FALSE using
replaceNulls(). This ensures NULL conditions are treated as "not satisfied" per SQL semantics. - Test enablement: Removes the workaround schema and xfail marker, enabling nullable columns (
int_gen,long_gen) to properly test the null handling code path. - Code cleanup: Consolidates duplicate schema definitions and removes obsolete imports.
Technical Details:
The bug occurred when evaluating conditional clauses like WHEN MATCHED AND t._c2 < 0. If _c2 contained NULL, the condition would evaluate to NULL, causing cond.and(mask) to produce NULL values in the mask. These NULLs propagated through the logic, leading to incorrect row processing and data corruption.
The fix replaces NULLs with FALSE in condMask before computing nextMask, ensuring rows with NULL conditions correctly proceed to subsequent clauses rather than being incorrectly processed by the current clause.
Confidence Score: 5/5
- This PR is safe to merge with high confidence
- The fix correctly addresses a critical data corruption bug with proper null handling. The code follows established patterns in the codebase (similar
replaceNullsusage found in Delta Lake code), includes comprehensive test coverage with nullable columns, and has proper resource management. The logic has been thoroughly verified through trace analysis. - No files require special attention
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| integration_tests/src/main/python/iceberg/init.py | 5/5 | Removes workaround schema, consolidates to use nullable int_gen and long_gen to properly test null handling |
| integration_tests/src/main/python/iceberg/iceberg_merge_test.py | 5/5 | Removes xfail marker and enables null-handling test, cleans up duplicate imports |
| sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/GpuMergeRowsExec.scala | 5/5 | Correctly fixes null handling in MERGE conditions by replacing NULLs with FALSE, preventing data corruption |
Sequence Diagram
sequenceDiagram
participant Test as Test (iceberg_merge_test.py)
participant Schema as Schema (__init__.py)
participant GPU as GpuMergeRowsExec
participant Condition as Condition Evaluator
participant Filter as Row Filter
Note over Test,Schema: Setup Phase
Test->>Schema: Request table schema
Schema-->>Test: Return nullable int_gen/long_gen
Note over Schema: Now includes NULL values<br/>in _c2 and _c3 columns
Note over Test,GPU: MERGE Execution
Test->>GPU: Execute MERGE with conditions
Note over Test: WHEN MATCHED AND t._c2 < 0<br/>WHEN MATCHED AND t._c2 >= 0
GPU->>Condition: Evaluate condition (t._c2 < 0)
Note over Condition: Returns: [TRUE, FALSE, NULL]<br/>for different rows
Condition->>GPU: condition.and(mask)
Note over GPU: Result: [TRUE, FALSE, NULL]
GPU->>GPU: replaceNulls(FALSE)
Note over GPU: Fixed Result: [TRUE, FALSE, FALSE]<br/>NULL treated as "not satisfied"
GPU->>GPU: condMask.not()
Note over GPU: [FALSE, TRUE, TRUE]
GPU->>GPU: mask.and(notCondMask)
Note over GPU: nextMask for next instruction
GPU->>Filter: Apply condMask filter
Filter->>GPU: Filtered rows for current action
GPU->>Condition: Process next instruction with nextMask
Note over GPU: Rows where condition was<br/>FALSE or NULL proceed here
GPU-->>Test: MERGE completed correctly
Note over Test: No data corruption!<br/>NULL conditions handled properly
Signed-off-by: Chong Gao <res_life@163.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Summary
This PR fixes a critical data corruption bug in Iceberg MERGE operations when NULL values appear in the merge condition. The fix properly handles NULL semantics in the GPU merge processing pipeline.
Changes
GpuMergeRowsExec.scala
- Root Cause: When evaluating merge conditions that contain NULL values, the bitwise AND operation (
cond.and(mask)) preserves NULLs in the result. This caused rows with NULL conditions to disappear from output entirely, resulting in data loss. - Fix: Replaces NULL values in the condition mask with FALSE (correct semantics since NULL condition means "condition not satisfied"). The implementation:
- Computes
cond.and(mask)to get initial mask (may contain NULLs) - Replaces any NULLs with FALSE using
replaceNulls(Scalar.fromBool(false)) - Uses the cleaned mask to compute
nextMask = NOT(condMask).and(mask) - This ensures rows with NULL conditions are passed to the next instruction instead of disappearing
- Computes
- Resource Management: Properly manages GPU memory with nested
withResourceblocks ensuring temporary objects are cleaned up - Import: Added
Scalarto the import list
Integration Tests
- iceberg/init.py: Consolidated data generators by removing the
_for_fixworkaround versions. The mainiceberg_table_gennow includes nullable int/long columns to properly test NULL handling. - iceberg_merge_test.py:
- Cleaned up imports (removed duplicates and references to deleted
_for_fixgenerators) - Removed
@pytest.mark.xfaildecorator since the bug is now fixed - Updated test parameters to use consolidated data generators
- Updated copyright years to 2025-2026
- Cleaned up imports (removed duplicates and references to deleted
Logic Verification
The fix correctly handles all cases:
- TRUE conditions: Rows pass to current instruction (unchanged behavior)
- FALSE conditions: Rows pass to next instruction (unchanged behavior)
- NULL conditions: Now treated as FALSE (fixed behavior - previously caused data loss)
This ensures all rows are processed by at least one instruction with proper data preservation.
Confidence Score: 5/5
- This PR is safe to merge - it fixes a critical data corruption bug with well-implemented logic and proper resource management.
- Score of 5 reflects: (1) Logically correct fix addressing the root cause of data corruption from NULL handling in merge conditions; (2) Proper resource management with nested withResource blocks; (3) Comprehensive test consolidation removing workarounds; (4) Clear comments explaining the fix; (5) No edge cases or resource leaks identified; (6) Correct semantics treating NULL as FALSE in conditions; (7) All changes are necessary and well-integrated.
- No files require special attention. All changes are correct and complete.
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/GpuMergeRowsExec.scala | 5/5 | Fixed critical bug where NULL values in merge conditions caused data corruption. Added proper NULL handling by replacing NULLs with FALSE in condition masks, with correct resource management and clear comments explaining the fix. |
| integration_tests/src/main/python/iceberg/init.py | 5/5 | Consolidated data generators by removing the '_for_fix' workaround version. The main iceberg_table_gen now includes nullable int/long columns to properly test NULL handling in merge conditions. |
| integration_tests/src/main/python/iceberg/iceberg_merge_test.py | 5/5 | Cleaned up test imports (removed duplicates and references to deleted '_for_fix' generators), removed pytest.mark.xfail decorator since bug is fixed, and updated test parameters to use consolidated data generators. |
Sequence Diagram
sequenceDiagram
participant Condition as Condition<br/>(may have NULLs)
participant OriginalBug as Original Code<br/>(BUG)
participant FixedCode as Fixed Code<br/>(CORRECTED)
participant Output as Output Rows
rect rgb(255, 200, 200)
Note over Condition,Output: SCENARIO: Condition = NULL
Condition->>OriginalBug: NULL
OriginalBug->>OriginalBug: condMask = NULL AND mask = NULL
OriginalBug->>OriginalBug: nextMask = NOT(NULL) AND mask = NULL
OriginalBug->>Output: ❌ Row disappears (data loss!)
end
rect rgb(200, 255, 200)
Note over Condition,Output: SCENARIO: Condition = NULL (FIXED)
Condition->>FixedCode: NULL
FixedCode->>FixedCode: condMask = (NULL AND mask)<br/>.replaceNulls(FALSE) = FALSE
FixedCode->>FixedCode: nextMask = NOT(FALSE) AND mask = TRUE
FixedCode->>Output: ✅ Row passes to next instruction
end
rect rgb(200, 200, 255)
Note over Condition,Output: SCENARIO: Condition = TRUE
Condition->>FixedCode: TRUE
FixedCode->>FixedCode: condMask = (TRUE AND mask)<br/>.replaceNulls(FALSE) = mask
FixedCode->>FixedCode: nextMask = NOT(mask) AND mask = FALSE
FixedCode->>Output: ✅ Row processed by current instruction
end
|
build |
closes #14030
Fix Iceberg data corruption when meets nulls in the query condition in the merge process.
Bug analysis
Did not handle null values in the condition.
cond.and(mask)may contain NULLs if cond has NULLsShould do replace NULLs with FALSE since NULL condition means "condition not satisfied"
Fix
Handle null values.
For more details, refer to the comment in this code.
Depends on
Signed-off-by: Chong Gao res_life@163.com