diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1077f41f6a..303d5ecda9 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1903,6 +1903,15 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}: return values.cast(target_type) raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}") + elif isinstance(field.field_type, (IntegerType, LongType)): + # Cast smaller integer types to target type for cross-platform compatibility + # Only allow widening conversions (smaller bit width to larger) + # Narrowing conversions fall through to promote() handling below + if pa.types.is_integer(values.type): + source_width = values.type.bit_width + target_width = target_type.bit_width + if source_width < target_width: + return values.cast(target_type) if field.field_type != file_field.field_type: target_schema = schema_to_pyarrow( diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 869e60f4aa..4d4f2a377c 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2716,6 +2716,38 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception( assert "Unsupported schema projection from timestamp[ns] to timestamp[us]" in str(exc_info.value) +@pytest.mark.parametrize( + "arrow_type,iceberg_type,expected_arrow_type", + [ + (pa.uint8(), IntegerType(), pa.int32()), + (pa.int8(), IntegerType(), pa.int32()), + (pa.int16(), IntegerType(), pa.int32()), + (pa.uint16(), IntegerType(), pa.int32()), + (pa.uint32(), LongType(), pa.int64()), + (pa.int32(), LongType(), pa.int64()), + ], +) +def test__to_requested_schema_integer_promotion( + arrow_type: pa.DataType, + iceberg_type: PrimitiveType, + expected_arrow_type: pa.DataType, +) -> None: + """Test that smaller integer types are cast to target Iceberg type during write.""" + requested_schema = Schema(NestedField(1, "col", iceberg_type, required=False)) + file_schema = requested_schema + + arrow_schema = pa.schema([pa.field("col", arrow_type)]) + data = pa.array([1, 2, 3, None], type=arrow_type) + batch = pa.RecordBatch.from_arrays([data], schema=arrow_schema) + + result = _to_requested_schema( + requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=False, include_field_ids=False + ) + + assert result.schema[0].type == expected_arrow_type + assert result.column(0).to_pylist() == [1, 2, 3, None] + + def test_pyarrow_file_io_fs_by_scheme_cache() -> None: # It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region` # Refer to: https://github.com/apache/arrow/issues/43713