From 9f8b7b7a44c0b0e9625c53c37948e5f368938887 Mon Sep 17 00:00:00 2001 From: Somasundaram Sekar Date: Mon, 8 Dec 2025 18:32:09 +0100 Subject: [PATCH] fix: Cast smaller integer types to int32/int64 on write for Spark compatibility When writing PyArrow tables with smaller integer types (uint8, int8, int16, uint16) to Iceberg tables with IntegerType columns, PyIceberg preserves the original Arrow type in the Parquet file. This causes Spark to fail with: java.lang.UnsupportedOperationException: Unsupported logical type: UINT_8 The fix casts smaller integer types to their canonical Iceberg representation (int32 for IntegerType, int64 for LongType) during write, ensuring cross-platform compatibility. Only widening conversions are allowed - narrowing conversions (e.g., int64 to int32) continue to be rejected via the existing promote() function. Closes #2791 --- pyiceberg/io/pyarrow.py | 9 +++++++++ tests/io/test_pyarrow.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1077f41f6a..303d5ecda9 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1903,6 +1903,15 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}: return values.cast(target_type) raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}") + elif isinstance(field.field_type, (IntegerType, LongType)): + # Cast smaller integer types to target type for cross-platform compatibility + # Only allow widening conversions (smaller bit width to larger) + # Narrowing conversions fall through to promote() handling below + if pa.types.is_integer(values.type): + source_width = values.type.bit_width + target_width = target_type.bit_width + if source_width < target_width: + return values.cast(target_type) if field.field_type != file_field.field_type: target_schema = schema_to_pyarrow( diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 869e60f4aa..4d4f2a377c 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2716,6 +2716,38 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception( assert "Unsupported schema projection from timestamp[ns] to timestamp[us]" in str(exc_info.value) +@pytest.mark.parametrize( + "arrow_type,iceberg_type,expected_arrow_type", + [ + (pa.uint8(), IntegerType(), pa.int32()), + (pa.int8(), IntegerType(), pa.int32()), + (pa.int16(), IntegerType(), pa.int32()), + (pa.uint16(), IntegerType(), pa.int32()), + (pa.uint32(), LongType(), pa.int64()), + (pa.int32(), LongType(), pa.int64()), + ], +) +def test__to_requested_schema_integer_promotion( + arrow_type: pa.DataType, + iceberg_type: PrimitiveType, + expected_arrow_type: pa.DataType, +) -> None: + """Test that smaller integer types are cast to target Iceberg type during write.""" + requested_schema = Schema(NestedField(1, "col", iceberg_type, required=False)) + file_schema = requested_schema + + arrow_schema = pa.schema([pa.field("col", arrow_type)]) + data = pa.array([1, 2, 3, None], type=arrow_type) + batch = pa.RecordBatch.from_arrays([data], schema=arrow_schema) + + result = _to_requested_schema( + requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=False, include_field_ids=False + ) + + assert result.schema[0].type == expected_arrow_type + assert result.column(0).to_pylist() == [1, 2, 3, None] + + def test_pyarrow_file_io_fs_by_scheme_cache() -> None: # It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region` # Refer to: https://github.com/apache/arrow/issues/43713