diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 917d387f45..8acf21f974 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,7 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import io import math +import zlib +from collections.abc import Iterable from typing import TYPE_CHECKING, Literal from pydantic import Field @@ -27,6 +30,7 @@ # Short for: Puffin Fratercula arctica, version 1 MAGIC_BYTES = b"PFA1" +DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64" EMPTY_BITMAP = FrozenBitMap() MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" @@ -62,6 +66,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps +def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes: + """ + Serialize a dictionary of bitmaps into a byte array. + + The format is: + - 8 bytes: number of bitmaps (little-endian) + - For each bitmap: + - 4 bytes: key (little-endian) + - n bytes: serialized bitmap + """ + with io.BytesIO() as out: + sorted_keys = sorted(bitmaps.keys()) + + # number of bitmaps + out.write(len(sorted_keys).to_bytes(8, "little")) + + for key in sorted_keys: + if key < 0: + raise ValueError(f"Invalid unsigned key: {key}") + if key > MAX_JAVA_SIGNED: + raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") + + # key + out.write(key.to_bytes(4, "little")) + # bitmap + out.write(bitmaps[key].serialize()) + return out.getvalue() + + class PuffinBlobMetadata(IcebergBaseModel): type: Literal["deletion-vector-v1"] = Field() fields: list[int] = Field() @@ -114,3 +147,105 @@ def __init__(self, puffin: bytes) -> None: def to_vector(self) -> dict[str, "pa.ChunkedArray"]: return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()} + + +class PuffinWriter: + _blobs: list[PuffinBlobMetadata] + _blob_payloads: list[bytes] + _created_by: str | None + + def __init__(self, created_by: str | None = None) -> None: + self._blobs = [] + self._blob_payloads = [] + self._created_by = created_by + + def set_blob( + self, + positions: Iterable[int], + referenced_data_file: str, + ) -> None: + # We only support one blob at the moment + self._blobs = [] + self._blob_payloads = [] + + # 1. Create bitmaps from positions + bitmaps: dict[int, BitMap] = {} + for pos in positions: + key = pos >> 32 + low_bits = pos & 0xFFFFFFFF + if key not in bitmaps: + bitmaps[key] = BitMap() + bitmaps[key].add(low_bits) + + # Calculate the cardinality from the bitmaps + cardinality = sum(len(bm) for bm in bitmaps.values()) + + # 2. Serialize bitmaps for the vector payload + vector_payload = _serialize_bitmaps(bitmaps) + + # 3. Construct the full blob payload for deletion-vector-v1 + with io.BytesIO() as blob_payload_buffer: + # Magic bytes for DV + blob_payload_buffer.write(DELETION_VECTOR_MAGIC) + # The vector itself + blob_payload_buffer.write(vector_payload) + + # The content for CRC calculation + crc_content = blob_payload_buffer.getvalue() + crc32 = zlib.crc32(crc_content) + + # The full blob to be stored in the Puffin file + with io.BytesIO() as full_blob_buffer: + # Combined length of the vector and magic bytes stored as 4 bytes, big-endian + full_blob_buffer.write(len(crc_content).to_bytes(4, "big")) + # The content (magic + vector) + full_blob_buffer.write(crc_content) + # A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian + full_blob_buffer.write(crc32.to_bytes(4, "big")) + + self._blob_payloads.append(full_blob_buffer.getvalue()) + + # 4. Create blob metadata + properties = {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)} + + self._blobs.append( + PuffinBlobMetadata( + type="deletion-vector-v1", + fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors + snapshot_id=-1, + sequence_number=-1, + offset=0, # TODO: Use DeleteFileIndex data + length=0, # TODO: Use DeleteFileIndex data + properties=properties, + compression_codec=None, + ) + ) + + def finish(self) -> bytes: + with io.BytesIO() as out: + payload_buffer = io.BytesIO() + for blob_payload in self._blob_payloads: + payload_buffer.write(blob_payload) + + updated_blobs_metadata: list[PuffinBlobMetadata] = [] + current_offset = 4 # Start after file magic (4 bytes) + for i, blob_payload in enumerate(self._blob_payloads): + original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) + original_metadata_dict["offset"] = current_offset + original_metadata_dict["length"] = len(blob_payload) + updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) + current_offset += len(blob_payload) + + footer = Footer(blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {}) + footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") + + # Final assembly + out.write(MAGIC_BYTES) + out.write(payload_buffer.getvalue()) + out.write(MAGIC_BYTES) + out.write(footer_payload_bytes) + out.write(len(footer_payload_bytes).to_bytes(4, "little")) + out.write((0).to_bytes(4, "little")) # flags + out.write(MAGIC_BYTES) + + return out.getvalue() diff --git a/tests/integration/test_puffin_spark_interop.py b/tests/integration/test_puffin_spark_interop.py new file mode 100644 index 0000000000..d4c6735fca --- /dev/null +++ b/tests/integration/test_puffin_spark_interop.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog.rest import RestCatalog +from pyiceberg.manifest import ManifestContent +from pyiceberg.table.puffin import PuffinFile + + +def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None: + for sql in sqls: + spark.sql(sql) + + +@pytest.mark.integration +def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) -> None: + """Verify pyiceberg can read Puffin DVs written by Spark.""" + identifier = "default.spark_puffin_format_test" + + run_spark_commands(spark, [f"DROP TABLE IF EXISTS {identifier}"]) + run_spark_commands( + spark, + [ + f""" + CREATE TABLE {identifier} (id BIGINT) + USING iceberg + TBLPROPERTIES ( + 'format-version' = '3', + 'write.delete.mode' = 'merge-on-read' + ) + """, + ], + ) + + df = spark.range(1, 51) + df.coalesce(1).writeTo(identifier).append() + + files_before = spark.sql(f"SELECT * FROM {identifier}.files").collect() + assert len(files_before) == 1, f"Expected 1 file, got {len(files_before)}" + + run_spark_commands(spark, [f"DELETE FROM {identifier} WHERE id IN (10, 20, 30, 40)"]) + + table = session_catalog.load_table(identifier) + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + + manifests = current_snapshot.manifests(table.io) + delete_manifests = [m for m in manifests if m.content == ManifestContent.DELETES] + assert len(delete_manifests) > 0, "Expected delete manifest with DVs" + + delete_manifest = delete_manifests[0] + entries = list(delete_manifest.fetch_manifest_entry(table.io)) + assert len(entries) > 0, "Expected at least one delete file entry" + + delete_entry = entries[0] + puffin_path = delete_entry.data_file.file_path + assert puffin_path.endswith(".puffin"), f"Expected Puffin file, got: {puffin_path}" + + input_file = table.io.new_input(puffin_path) + with input_file.open() as f: + puffin_bytes = f.read() + + puffin = PuffinFile(puffin_bytes) + + assert len(puffin.footer.blobs) == 1, "Expected exactly one blob" + + blob = puffin.footer.blobs[0] + assert blob.type == "deletion-vector-v1" + assert "referenced-data-file" in blob.properties + assert blob.properties["cardinality"] == "4" + + dv_dict = puffin.to_vector() + assert len(dv_dict) == 1, "Expected one data file's deletions" + + for _data_file_path, chunked_array in dv_dict.items(): + positions = chunked_array.to_pylist() + assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}" + assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}" diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index bf8c82014c..1ea0913e29 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap +from pyiceberg.table.puffin import PROPERTY_REFERENCED_DATA_FILE, PuffinFile, PuffinWriter, _deserialize_bitmap def _open_file(file: str) -> bytes: @@ -71,3 +71,66 @@ def test_map_high_vals() -> None: with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"): _ = _deserialize_bitmap(puffin) + + +def test_puffin_round_trip() -> None: + # Define some deletion positions for a file + deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate + + file_path = "path/to/data.parquet" + + # Write the Puffin file + writer = PuffinWriter(created_by="my-test-app") + writer.set_blob(positions=deletions, referenced_data_file=file_path) + puffin_bytes = writer.finish() + + # Read the Puffin file back + reader = PuffinFile(puffin_bytes) + + # Assert footer metadata + assert reader.footer.properties["created-by"] == "my-test-app" + assert len(reader.footer.blobs) == 1 + + blob_meta = reader.footer.blobs[0] + assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path + assert blob_meta.properties["cardinality"] == str(len(set(deletions))) + + # Assert the content of deletion vectors + read_vectors = reader.to_vector() + + assert file_path in read_vectors + assert read_vectors[file_path].to_pylist() == sorted(set(deletions)) + + +def test_write_and_read_puffin_file() -> None: + writer = PuffinWriter() + writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet") + writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet") + puffin_bytes = writer.finish() + + reader = PuffinFile(puffin_bytes) + + assert len(reader.footer.blobs) == 1 + blob = reader.footer.blobs[0] + + assert blob.properties["referenced-data-file"] == "file2.parquet" + assert blob.properties["cardinality"] == "3" + assert blob.type == "deletion-vector-v1" + assert blob.snapshot_id == -1 + assert blob.sequence_number == -1 + assert blob.compression_codec is None + + vectors = reader.to_vector() + assert len(vectors) == 1 + assert "file1.parquet" not in vectors + assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] + + +def test_puffin_file_with_no_blobs() -> None: + writer = PuffinWriter() + puffin_bytes = writer.finish() + + reader = PuffinFile(puffin_bytes) + assert len(reader.footer.blobs) == 0 + assert len(reader.to_vector()) == 0 + assert "created-by" not in reader.footer.properties