From 9a4b91de21a8f02dd7d5fb93854f53f17a7d76e9 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 13:50:24 -0800 Subject: [PATCH 1/3] deletion vector write --- pyiceberg/table/puffin.py | 126 ++++++++++++++++++++++++++++++++++++- tests/table/test_puffin.py | 77 ++++++++++++++++++++++- 2 files changed, 201 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 917d387f45..da0074a954 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,8 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import io import math -from typing import TYPE_CHECKING, Literal +import zlib +from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional from pydantic import Field from pyroaring import BitMap, FrozenBitMap @@ -27,6 +29,7 @@ # Short for: Puffin Fratercula arctica, version 1 MAGIC_BYTES = b"PFA1" +DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64" EMPTY_BITMAP = FrozenBitMap() MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" @@ -62,6 +65,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps +def _serialize_bitmaps(bitmaps: Dict[int, BitMap]) -> bytes: + """ + Serializes a dictionary of bitmaps into a byte array. + + The format is: + - 8 bytes: number of bitmaps (little-endian) + - For each bitmap: + - 4 bytes: key (little-endian) + - n bytes: serialized bitmap + """ + with io.BytesIO() as out: + sorted_keys = sorted(bitmaps.keys()) + + # number of bitmaps + out.write(len(sorted_keys).to_bytes(8, "little")) + + for key in sorted_keys: + if key < 0: + raise ValueError(f"Invalid unsigned key: {key}") + if key > MAX_JAVA_SIGNED: + raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") + + # key + out.write(key.to_bytes(4, "little")) + # bitmap + out.write(bitmaps[key].serialize()) + return out.getvalue() + + class PuffinBlobMetadata(IcebergBaseModel): type: Literal["deletion-vector-v1"] = Field() fields: list[int] = Field() @@ -114,3 +146,95 @@ def __init__(self, puffin: bytes) -> None: def to_vector(self) -> dict[str, "pa.ChunkedArray"]: return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()} + + +class PuffinWriter: + _blobs: List[PuffinBlobMetadata] + _blob_payloads: List[bytes] + + def __init__(self) -> None: + self._blobs = [] + self._blob_payloads = [] + + def add( + self, + positions: Iterable[int], + referenced_data_file: str, + ) -> None: + # 1. Create bitmaps from positions + bitmaps: Dict[int, BitMap] = {} + cardinality = 0 + for pos in positions: + cardinality += 1 + key = pos >> 32 + low_bits = pos & 0xFFFFFFFF + if key not in bitmaps: + bitmaps[key] = BitMap() + bitmaps[key].add(low_bits) + + # 2. Serialize bitmaps for the vector payload + vector_payload = _serialize_bitmaps(bitmaps) + + # 3. Construct the full blob payload for deletion-vector-v1 + with io.BytesIO() as blob_payload_buffer: + # Magic bytes for DV + blob_payload_buffer.write(DELETION_VECTOR_MAGIC) + # The vector itself + blob_payload_buffer.write(vector_payload) + + # The content for CRC calculation + crc_content = blob_payload_buffer.getvalue() + crc32 = zlib.crc32(crc_content) + + # The full blob to be stored in the Puffin file + with io.BytesIO() as full_blob_buffer: + # Combined length of the vector and magic bytes stored as 4 bytes, big-endian + full_blob_buffer.write(len(crc_content).to_bytes(4, "big")) + # The content (magic + vector) + full_blob_buffer.write(crc_content) + # A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian + full_blob_buffer.write(crc32.to_bytes(4, "big")) + + self._blob_payloads.append(full_blob_buffer.getvalue()) + + # 4. Create blob metadata + properties = {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)} + + self._blobs.append( + PuffinBlobMetadata( + type="deletion-vector-v1", + fields=[], + snapshot_id=-1, + sequence_number=-1, + offset=0, # Will be set later + length=0, # Will be set later + properties=properties, + compression_codec=None, # Explicitly None + ) + ) + + def finish(self) -> bytes: + with io.BytesIO() as out: + payload_buffer = io.BytesIO() + for blob_payload in self._blob_payloads: + payload_buffer.write(blob_payload) + + # Set offsets and lengths in metadata + current_offset = 4 # Start after file magic + for i, blob_payload in enumerate(self._blob_payloads): + self._blobs[i].offset = current_offset + self._blobs[i].length = len(blob_payload) + current_offset += len(blob_payload) + + footer = Footer(blobs=self._blobs) + footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") + + # Final assembly + out.write(MAGIC_BYTES) + out.write(payload_buffer.getvalue()) + out.write(footer_payload_bytes) + out.write(len(footer_payload_bytes).to_bytes(4, "little")) + out.write((0).to_bytes(4, "little")) # flags + out.write(MAGIC_BYTES) + + return out.getvalue() diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index bf8c82014c..0e9c881860 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap +from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter def _open_file(file: str) -> bytes: @@ -71,3 +71,78 @@ def test_map_high_vals() -> None: with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"): _ = _deserialize_bitmap(puffin) + + +def test_puffin_round_trip(): + # Define some deletion positions for multiple files + deletions1 = [10, 20, 30] + deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position + + file1_path = "path/to/data1.parquet" + file2_path = "path/to/data2.parquet" + + # Write the Puffin file + writer = PuffinWriter() + writer.add(positions=deletions1, referenced_data_file=file1_path) + writer.add(positions=deletions2, referenced_data_file=file2_path) + puffin_bytes = writer.finish() + + # Read the Puffin file back + reader = PuffinFile(puffin_bytes) + + # Assert footer metadata + assert len(reader.footer.blobs) == 2 + + blob1_meta = reader.footer.blobs[0] + assert blob1_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file1_path + assert blob1_meta.properties["cardinality"] == str(len(deletions1)) + + blob2_meta = reader.footer.blobs[1] + assert blob2_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file2_path + assert blob2_meta.properties["cardinality"] == str(len(deletions2)) + + # Assert the content of deletion vectors + read_vectors = reader.to_vector() + + assert file1_path in read_vectors + assert file2_path in read_vectors + + assert read_vectors[file1_path].to_pylist() == sorted(deletions1) + assert read_vectors[file2_path].to_pylist() == sorted(deletions2) + + +def test_write_and_read_puffin_file(): + writer = PuffinWriter() + writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet") + writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet") + puffin_bytes = writer.finish() + + reader = PuffinFile(puffin_bytes) + + assert len(reader.footer.blobs) == 2 + blob1 = reader.footer.blobs[0] + blob2 = reader.footer.blobs[1] + + assert blob1.properties["referenced-data-file"] == "file1.parquet" + assert blob1.properties["cardinality"] == "3" + assert blob1.type == "deletion-vector-v1" + assert blob1.snapshot_id == -1 + assert blob1.sequence_number == -1 + assert blob1.compression_codec is None + + assert blob2.properties["referenced-data-file"] == "file2.parquet" + assert blob2.properties["cardinality"] == "3" + + vectors = reader.to_vector() + assert len(vectors) == 2 + assert vectors["file1.parquet"].to_pylist() == [1, 2, 3] + assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] + + +def test_puffin_file_with_no_blobs(): + writer = PuffinWriter() + puffin_bytes = writer.finish() + + reader = PuffinFile(puffin_bytes) + assert len(reader.footer.blobs) == 0 + assert len(reader.to_vector()) == 0 From 4db1734cd4ee7d3c7fcfaa4e61e26cb8f0002c0c Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 14:10:57 -0800 Subject: [PATCH 2/3] test fix --- pyiceberg/table/puffin.py | 12 +++++++----- tests/table/test_puffin.py | 6 +++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index da0074a954..8a7d4c2215 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -219,14 +219,16 @@ def finish(self) -> bytes: for blob_payload in self._blob_payloads: payload_buffer.write(blob_payload) - # Set offsets and lengths in metadata - current_offset = 4 # Start after file magic + updated_blobs_metadata: List[PuffinBlobMetadata] = [] + current_offset = 4 # Start after file magic (4 bytes) for i, blob_payload in enumerate(self._blob_payloads): - self._blobs[i].offset = current_offset - self._blobs[i].length = len(blob_payload) + original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) + original_metadata_dict["offset"] = current_offset + original_metadata_dict["length"] = len(blob_payload) + updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) current_offset += len(blob_payload) - footer = Footer(blobs=self._blobs) + footer = Footer(blobs=updated_blobs_metadata) footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") # Final assembly diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index 0e9c881860..c71afd24af 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter +from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter, PROPERTY_REFERENCED_DATA_FILE def _open_file(file: str) -> bytes: @@ -94,11 +94,11 @@ def test_puffin_round_trip(): assert len(reader.footer.blobs) == 2 blob1_meta = reader.footer.blobs[0] - assert blob1_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file1_path + assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path assert blob1_meta.properties["cardinality"] == str(len(deletions1)) blob2_meta = reader.footer.blobs[1] - assert blob2_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file2_path + assert blob2_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file2_path assert blob2_meta.properties["cardinality"] == str(len(deletions2)) # Assert the content of deletion vectors From 71dd92510dee5785a1fe6a9e1eec804806ee6b29 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 14:17:09 -0800 Subject: [PATCH 3/3] lint fixes --- pyiceberg/table/puffin.py | 15 ++++++++------- tests/table/test_puffin.py | 14 +++++++------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 8a7d4c2215..c54173e01a 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -17,7 +17,8 @@ import io import math import zlib -from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional +from collections.abc import Iterable +from typing import TYPE_CHECKING, Literal from pydantic import Field from pyroaring import BitMap, FrozenBitMap @@ -65,9 +66,9 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps -def _serialize_bitmaps(bitmaps: Dict[int, BitMap]) -> bytes: +def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes: """ - Serializes a dictionary of bitmaps into a byte array. + Serialize a dictionary of bitmaps into a byte array. The format is: - 8 bytes: number of bitmaps (little-endian) @@ -149,8 +150,8 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: class PuffinWriter: - _blobs: List[PuffinBlobMetadata] - _blob_payloads: List[bytes] + _blobs: list[PuffinBlobMetadata] + _blob_payloads: list[bytes] def __init__(self) -> None: self._blobs = [] @@ -162,7 +163,7 @@ def add( referenced_data_file: str, ) -> None: # 1. Create bitmaps from positions - bitmaps: Dict[int, BitMap] = {} + bitmaps: dict[int, BitMap] = {} cardinality = 0 for pos in positions: cardinality += 1 @@ -219,7 +220,7 @@ def finish(self) -> bytes: for blob_payload in self._blob_payloads: payload_buffer.write(blob_payload) - updated_blobs_metadata: List[PuffinBlobMetadata] = [] + updated_blobs_metadata: list[PuffinBlobMetadata] = [] current_offset = 4 # Start after file magic (4 bytes) for i, blob_payload in enumerate(self._blob_payloads): original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index c71afd24af..403b2e038f 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter, PROPERTY_REFERENCED_DATA_FILE +from pyiceberg.table.puffin import PROPERTY_REFERENCED_DATA_FILE, PuffinFile, PuffinWriter, _deserialize_bitmap def _open_file(file: str) -> bytes: @@ -73,10 +73,10 @@ def test_map_high_vals() -> None: _ = _deserialize_bitmap(puffin) -def test_puffin_round_trip(): +def test_puffin_round_trip() -> None: # Define some deletion positions for multiple files deletions1 = [10, 20, 30] - deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position + deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position file1_path = "path/to/data1.parquet" file2_path = "path/to/data2.parquet" @@ -92,7 +92,7 @@ def test_puffin_round_trip(): # Assert footer metadata assert len(reader.footer.blobs) == 2 - + blob1_meta = reader.footer.blobs[0] assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path assert blob1_meta.properties["cardinality"] == str(len(deletions1)) @@ -103,7 +103,7 @@ def test_puffin_round_trip(): # Assert the content of deletion vectors read_vectors = reader.to_vector() - + assert file1_path in read_vectors assert file2_path in read_vectors @@ -111,7 +111,7 @@ def test_puffin_round_trip(): assert read_vectors[file2_path].to_pylist() == sorted(deletions2) -def test_write_and_read_puffin_file(): +def test_write_and_read_puffin_file() -> None: writer = PuffinWriter() writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet") writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet") @@ -139,7 +139,7 @@ def test_write_and_read_puffin_file(): assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] -def test_puffin_file_with_no_blobs(): +def test_puffin_file_with_no_blobs() -> None: writer = PuffinWriter() puffin_bytes = writer.finish()