Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions pyiceberg/table/puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import math
import zlib
from collections.abc import Iterable
from typing import TYPE_CHECKING, Literal

from pydantic import Field
Expand All @@ -27,6 +30,7 @@

# Short for: Puffin Fratercula arctica, version 1
MAGIC_BYTES = b"PFA1"
DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64"
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
Expand Down Expand Up @@ -62,6 +66,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
return bitmaps


def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes:
"""
Serialize a dictionary of bitmaps into a byte array.

The format is:
- 8 bytes: number of bitmaps (little-endian)
- For each bitmap:
- 4 bytes: key (little-endian)
- n bytes: serialized bitmap
"""
with io.BytesIO() as out:
sorted_keys = sorted(bitmaps.keys())

# number of bitmaps
out.write(len(sorted_keys).to_bytes(8, "little"))

for key in sorted_keys:
if key < 0:
raise ValueError(f"Invalid unsigned key: {key}")
if key > MAX_JAVA_SIGNED:
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")

# key
out.write(key.to_bytes(4, "little"))
# bitmap
out.write(bitmaps[key].serialize())
return out.getvalue()


class PuffinBlobMetadata(IcebergBaseModel):
type: Literal["deletion-vector-v1"] = Field()
fields: list[int] = Field()
Expand Down Expand Up @@ -114,3 +147,97 @@ def __init__(self, puffin: bytes) -> None:

def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}


class PuffinWriter:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg spec recommends writing created-by property in Puffin files: https://iceberg.apache.org/puffin-spec/#deletion-vector-v1-blob-type. Could you please write the property?

_blobs: list[PuffinBlobMetadata]
_blob_payloads: list[bytes]

def __init__(self) -> None:
self._blobs = []
self._blob_payloads = []

def add(
self,
positions: Iterable[int],
referenced_data_file: str,
) -> None:
# 1. Create bitmaps from positions
bitmaps: dict[int, BitMap] = {}
cardinality = 0
for pos in positions:
cardinality += 1
key = pos >> 32
low_bits = pos & 0xFFFFFFFF
if key not in bitmaps:
bitmaps[key] = BitMap()
bitmaps[key].add(low_bits)

# 2. Serialize bitmaps for the vector payload
vector_payload = _serialize_bitmaps(bitmaps)

# 3. Construct the full blob payload for deletion-vector-v1
with io.BytesIO() as blob_payload_buffer:
# Magic bytes for DV
blob_payload_buffer.write(DELETION_VECTOR_MAGIC)
# The vector itself
blob_payload_buffer.write(vector_payload)

# The content for CRC calculation
crc_content = blob_payload_buffer.getvalue()
crc32 = zlib.crc32(crc_content)

# The full blob to be stored in the Puffin file
with io.BytesIO() as full_blob_buffer:
# Combined length of the vector and magic bytes stored as 4 bytes, big-endian
full_blob_buffer.write(len(crc_content).to_bytes(4, "big"))
# The content (magic + vector)
full_blob_buffer.write(crc_content)
# A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian
full_blob_buffer.write(crc32.to_bytes(4, "big"))

self._blob_payloads.append(full_blob_buffer.getvalue())

# 4. Create blob metadata
properties = {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)}

self._blobs.append(
PuffinBlobMetadata(
type="deletion-vector-v1",
fields=[],
snapshot_id=-1,
sequence_number=-1,
offset=0, # Will be set later
length=0, # Will be set later
properties=properties,
compression_codec=None, # Explicitly None
)
)

def finish(self) -> bytes:
with io.BytesIO() as out:
payload_buffer = io.BytesIO()
for blob_payload in self._blob_payloads:
payload_buffer.write(blob_payload)

updated_blobs_metadata: list[PuffinBlobMetadata] = []
current_offset = 4 # Start after file magic (4 bytes)
for i, blob_payload in enumerate(self._blob_payloads):
original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True)
original_metadata_dict["offset"] = current_offset
original_metadata_dict["length"] = len(blob_payload)
updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict))
current_offset += len(blob_payload)

footer = Footer(blobs=updated_blobs_metadata)
footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8")

# Final assembly
out.write(MAGIC_BYTES)
out.write(payload_buffer.getvalue())
out.write(footer_payload_bytes)
out.write(len(footer_payload_bytes).to_bytes(4, "little"))
out.write((0).to_bytes(4, "little")) # flags
out.write(MAGIC_BYTES)

return out.getvalue()
77 changes: 76 additions & 1 deletion tests/table/test_puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pytest
from pyroaring import BitMap

from pyiceberg.table.puffin import _deserialize_bitmap
from pyiceberg.table.puffin import PROPERTY_REFERENCED_DATA_FILE, PuffinFile, PuffinWriter, _deserialize_bitmap


def _open_file(file: str) -> bytes:
Expand Down Expand Up @@ -71,3 +71,78 @@ def test_map_high_vals() -> None:

with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
_ = _deserialize_bitmap(puffin)


def test_puffin_round_trip() -> None:
# Define some deletion positions for multiple files
deletions1 = [10, 20, 30]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg deletion vectors expect 1:1 for DV:path. The current PuffinWriter may violate the expectation.

For instance, the following code writes 2 blobs without merging them. Do you fix this behavior as a follow-up, or the caller should provide the merged DV?

    deletions1a = [10, 20]
    deletions1b = [30]
...
    writer.add(positions=deletions1a, referenced_data_file=file1_path)
    writer.add(positions=deletions1b, referenced_data_file=file1_path)

deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position

file1_path = "path/to/data1.parquet"
file2_path = "path/to/data2.parquet"

# Write the Puffin file
writer = PuffinWriter()
writer.add(positions=deletions1, referenced_data_file=file1_path)
writer.add(positions=deletions2, referenced_data_file=file2_path)
puffin_bytes = writer.finish()

# Read the Puffin file back
reader = PuffinFile(puffin_bytes)

# Assert footer metadata
assert len(reader.footer.blobs) == 2

blob1_meta = reader.footer.blobs[0]
assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path
assert blob1_meta.properties["cardinality"] == str(len(deletions1))

blob2_meta = reader.footer.blobs[1]
assert blob2_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file2_path
assert blob2_meta.properties["cardinality"] == str(len(deletions2))

# Assert the content of deletion vectors
read_vectors = reader.to_vector()

assert file1_path in read_vectors
assert file2_path in read_vectors

assert read_vectors[file1_path].to_pylist() == sorted(deletions1)
assert read_vectors[file2_path].to_pylist() == sorted(deletions2)


def test_write_and_read_puffin_file() -> None:
writer = PuffinWriter()
writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet")
writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet")
puffin_bytes = writer.finish()

reader = PuffinFile(puffin_bytes)

assert len(reader.footer.blobs) == 2
blob1 = reader.footer.blobs[0]
blob2 = reader.footer.blobs[1]

assert blob1.properties["referenced-data-file"] == "file1.parquet"
assert blob1.properties["cardinality"] == "3"
assert blob1.type == "deletion-vector-v1"
assert blob1.snapshot_id == -1
assert blob1.sequence_number == -1
assert blob1.compression_codec is None

assert blob2.properties["referenced-data-file"] == "file2.parquet"
assert blob2.properties["cardinality"] == "3"

vectors = reader.to_vector()
assert len(vectors) == 2
assert vectors["file1.parquet"].to_pylist() == [1, 2, 3]
assert vectors["file2.parquet"].to_pylist() == [4, 5, 6]


def test_puffin_file_with_no_blobs() -> None:
writer = PuffinWriter()
puffin_bytes = writer.finish()

reader = PuffinFile(puffin_bytes)
assert len(reader.footer.blobs) == 0
assert len(reader.to_vector()) == 0