diff --git a/src/realtime/src/realtime/_async/client.py b/src/realtime/src/realtime/_async/client.py index b5702a54..855a9aac 100644 --- a/src/realtime/src/realtime/_async/client.py +++ b/src/realtime/src/realtime/_async/client.py @@ -14,11 +14,15 @@ from ..exceptions import NotConnectedError from ..message import Message, ServerMessageAdapter from ..transformers import http_endpoint_url +from ..serializer import Serializer from ..types import ( DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_TIMEOUT, + DEFAULT_VSN, PHOENIX_CHANNEL, VSN, + VSN_1_0_0, + VSN_2_0_0, ChannelEvents, ) from ..utils import is_ws_url @@ -38,6 +42,7 @@ def wrapper(*args, **kwargs): class AsyncRealtimeClient: + serializer: Optional[Serializer] def __init__( self, url: str, @@ -48,6 +53,8 @@ def __init__( max_retries: int = 5, initial_backoff: float = 1.0, timeout: int = DEFAULT_TIMEOUT, + vsn: str = DEFAULT_VSN, + allowed_metadata_keys: Optional[List[str]] = None, ) -> None: """ Initialize a RealtimeClient instance for WebSocket communication. @@ -61,6 +68,9 @@ def __init__( :param max_retries: Maximum number of reconnection attempts. Defaults to 5. :param initial_backoff: Initial backoff time (in seconds) for reconnection attempts. Defaults to 1.0. :param timeout: Connection timeout in seconds. Defaults to DEFAULT_TIMEOUT. + :param vsn: Serializer version to use. Defaults to "1.0.0". Use "2.0.0" for binary support. + :param allowed_metadata_keys: List of metadata keys allowed in user broadcast push messages. + Only used with VSN 2.0.0. Defaults to None. """ if not is_ws_url(url): raise ValueError("url must be a valid WebSocket URL or HTTP URL string") @@ -80,8 +90,17 @@ def __init__( self.max_retries = max_retries self.initial_backoff = initial_backoff self.timeout = timeout + self.vsn = vsn self._listen_task: Optional[asyncio.Task] = None self._heartbeat_task: Optional[asyncio.Task] = None + + # Initialize serializer based on version + if vsn == VSN_2_0_0: + self.serializer = Serializer(allowed_metadata_keys=allowed_metadata_keys) + elif vsn == VSN_1_0_0: + self.serializer = None # V1 uses JSON directly + else: + raise ValueError(f"Unsupported serializer version: {vsn}") @property def is_connected(self) -> bool: @@ -101,7 +120,19 @@ async def _listen(self) -> None: logger.info(f"receive: {msg!r}") try: - message = ServerMessageAdapter.validate_json(msg) + # Handle binary messages for V2 + if isinstance(msg, bytes) and self.vsn == VSN_2_0_0 and self.serializer: + decoded = self.serializer.decode(msg) + # Convert decoded message to JSON string for validation + msg_json = json.dumps({ + "event": decoded.get("event"), + "topic": decoded.get("topic"), + "payload": decoded.get("payload"), + "ref": decoded.get("ref"), + }) + message = ServerMessageAdapter.validate_json(msg_json) + else: + message = ServerMessageAdapter.validate_json(msg) except ValidationError as e: logger.error(f"Unrecognized message format {msg!r}\n{e}") continue @@ -343,15 +374,36 @@ async def send(self, message: Union[Message, Dict[str, Any]]) -> None: "Warning: calling AsyncRealtimeClient.send with a dictionary is deprecated. Please call it with a Message object instead. This will be a hard error in the future." ) msg = Message(**message) - message_str = msg.model_dump_json() - logger.info(f"send: {message_str}") + + # Encode message based on serializer version + message_data: Union[str, bytes] + if self.vsn == VSN_2_0_0 and self.serializer: + # Convert Message to dict for serializer + msg_dict = { + "join_ref": msg.join_ref, + "ref": msg.ref, + "topic": msg.topic, + "event": msg.event, + "payload": msg.payload, + } + encoded = self.serializer.encode(msg_dict) + if isinstance(encoded, bytes): + message_data = encoded + logger.info(f"send (binary): {len(message_data)} bytes") + else: + message_data = encoded + logger.info(f"send: {message_data}") + else: + # V1: JSON encoding + message_data = msg.model_dump_json() + logger.info(f"send: {message_data}") async def send_message(): if not self._ws_connection: raise NotConnectedError("_send") try: - await self._ws_connection.send(message_str) + await self._ws_connection.send(message_data) except websockets.exceptions.ConnectionClosedError as e: await self._on_connect_error(e) except websockets.exceptions.ConnectionClosedOK: @@ -374,7 +426,7 @@ async def _leave_open_topic(self, topic: str): def endpoint_url(self) -> str: parsed_url = urlparse(self.url) - query = urlencode({**self.params, "vsn": VSN}, doseq=True) + query = urlencode({**self.params, "vsn": self.vsn}, doseq=True) return urlunparse( ( parsed_url.scheme, diff --git a/src/realtime/src/realtime/serializer.py b/src/realtime/src/realtime/serializer.py new file mode 100644 index 00000000..998fbeaa --- /dev/null +++ b/src/realtime/src/realtime/serializer.py @@ -0,0 +1,476 @@ +""" +Serializer for Realtime V2 protocol. + +This module implements binary encoding/decoding for Realtime V2 serializer, +supporting binary payloads and user broadcast messages with metadata. + +Based on supabase-js PRs #1829 and #1894. +""" +from __future__ import annotations + +import json +from typing import Any, Dict, List, Optional, Union + +from .types import ChannelEvents + + +class Serializer: + """ + Serializer for Realtime messages supporting both V1 (JSON) and V2 (binary) formats. + + V2 features: + - Binary payload support for user messages + - Two new message types: user broadcast and user broadcast push + - Optional metadata support for user broadcast push messages + - Reduced JSON encoding overhead on the server side + """ + + HEADER_LENGTH = 1 + META_LENGTH = 4 + USER_BROADCAST_PUSH_META_LENGTH = 6 + + # Message kinds + KINDS = { + "push": 0, + "reply": 1, + "broadcast": 2, + "userBroadcastPush": 3, + "userBroadcast": 4, + } + + # Encoding types + BINARY_ENCODING = 0 + JSON_ENCODING = 1 + + BROADCAST_EVENT = "broadcast" + + def __init__(self, allowed_metadata_keys: Optional[List[str]] = None) -> None: + """ + Initialize the serializer. + + Args: + allowed_metadata_keys: List of metadata keys allowed in user broadcast push messages. + If None or empty, no metadata will be included. + """ + self.allowed_metadata_keys = allowed_metadata_keys or [] + + def encode( + self, + msg: Dict[str, Any], + ) -> Union[bytes, str]: + """ + Encode a message to either JSON string (V1) or binary (V2). + + Args: + msg: Message dictionary with keys: join_ref, ref, topic, event, payload + + Returns: + JSON string for V1 or bytes for V2 binary encoding + """ + payload = msg.get("payload") + + # Check if payload is bytes/bytearray (binary) + if isinstance(payload, (bytes, bytearray)): + return self._binary_encode_push(msg) + + # Check if this is a user broadcast push message + if ( + msg.get("event") == self.BROADCAST_EVENT + and isinstance(payload, dict) + and "event" in payload + ): + return self._encode_user_broadcast_push(msg) + + # Default: JSON encoding (V1) + return json.dumps([ + msg.get("join_ref"), + msg.get("ref"), + msg.get("topic"), + msg.get("event"), + payload, + ], separators=(',', ':')) + + def _binary_encode_push(self, message: Dict[str, Any]) -> bytes: + """ + Encode a push message with binary payload. + + Format: + - 1 byte: kind (push = 0) + - 1 byte: join_ref length + - 1 byte: ref length (0 for push) + - 1 byte: topic length + - 1 byte: event length + - N bytes: join_ref (UTF-8) + - N bytes: topic (UTF-8) + - N bytes: event (UTF-8) + - N bytes: binary payload + """ + join_ref = message.get("join_ref", "") or "" + ref = message.get("ref", "") or "" + topic = message.get("topic", "") + event = message.get("event", "") + payload = message.get("payload", b"") + + if isinstance(payload, bytearray): + payload = bytes(payload) + elif not isinstance(payload, bytes): + payload = bytes(payload) + + meta_length = ( + self.META_LENGTH + + len(join_ref) + + len(ref) + + len(topic) + + len(event) + ) + + header = bytearray(self.HEADER_LENGTH + meta_length) + offset = 0 + + header[offset] = self.KINDS["push"] + offset += 1 + header[offset] = len(join_ref) + offset += 1 + header[offset] = len(ref) + offset += 1 + header[offset] = len(topic) + offset += 1 + header[offset] = len(event) + offset += 1 + + # Write strings as UTF-8 + header[offset:offset + len(join_ref)] = join_ref.encode("utf-8") + offset += len(join_ref) + header[offset:offset + len(ref)] = ref.encode("utf-8") + offset += len(ref) + header[offset:offset + len(topic)] = topic.encode("utf-8") + offset += len(topic) + header[offset:offset + len(event)] = event.encode("utf-8") + offset += len(event) + + # Combine header and payload + return bytes(header) + payload + + def _encode_user_broadcast_push(self, message: Dict[str, Any]) -> bytes: + """ + Encode a user broadcast push message. + + Supports both JSON and binary payloads, with optional metadata. + """ + payload = message.get("payload", {}) + user_payload = payload.get("payload") + + if isinstance(user_payload, (bytes, bytearray)): + return self._encode_binary_user_broadcast_push(message) + else: + return self._encode_json_user_broadcast_push(message) + + def _encode_binary_user_broadcast_push(self, message: Dict[str, Any]) -> bytes: + """Encode user broadcast push with binary payload.""" + user_payload = message.get("payload", {}).get("payload", b"") + if isinstance(user_payload, bytearray): + user_payload = bytes(user_payload) + elif not isinstance(user_payload, bytes): + user_payload = bytes(user_payload) + + return self._encode_user_broadcast_push_internal( + message, self.BINARY_ENCODING, user_payload + ) + + def _encode_json_user_broadcast_push(self, message: Dict[str, Any]) -> bytes: + """Encode user broadcast push with JSON payload.""" + user_payload = message.get("payload", {}).get("payload", {}) + encoded_payload = json.dumps(user_payload).encode("utf-8") + return self._encode_user_broadcast_push_internal( + message, self.JSON_ENCODING, encoded_payload + ) + + def _encode_user_broadcast_push_internal( + self, + message: Dict[str, Any], + encoding_type: int, + encoded_payload: bytes, + ) -> bytes: + """ + Internal method to encode user broadcast push messages. + + Format: + - 1 byte: kind (userBroadcastPush = 3) + - 1 byte: join_ref length + - 1 byte: ref length + - 1 byte: topic length + - 1 byte: user_event length + - 1 byte: metadata length + - 1 byte: encoding type (0 = binary, 1 = JSON) + - N bytes: join_ref (UTF-8) + - N bytes: ref (UTF-8) + - N bytes: topic (UTF-8) + - N bytes: user_event (UTF-8) + - N bytes: metadata (UTF-8 JSON, if present) + - N bytes: encoded payload + """ + topic = message.get("topic", "") + ref = message.get("ref", "") or "" + join_ref = message.get("join_ref", "") or "" + payload = message.get("payload", {}) + user_event = payload.get("event", "") + + # Filter metadata based on allowed keys (exclude special fields: type, event, payload) + rest = self._pick(payload, self.allowed_metadata_keys) + # Remove special fields that shouldn't be in metadata + rest = {k: v for k, v in rest.items() if k not in ("type", "event", "payload")} + metadata = json.dumps(rest) if rest else "" + + # Validate lengths don't exceed uint8 max value (255) + if len(join_ref) > 255: + raise ValueError(f"joinRef length {len(join_ref)} exceeds maximum of 255") + if len(ref) > 255: + raise ValueError(f"ref length {len(ref)} exceeds maximum of 255") + if len(topic) > 255: + raise ValueError(f"topic length {len(topic)} exceeds maximum of 255") + if len(user_event) > 255: + raise ValueError(f"userEvent length {len(user_event)} exceeds maximum of 255") + if len(metadata) > 255: + raise ValueError(f"metadata length {len(metadata)} exceeds maximum of 255") + + meta_length = ( + self.USER_BROADCAST_PUSH_META_LENGTH + + len(join_ref) + + len(ref) + + len(topic) + + len(user_event) + + len(metadata) + ) + + header = bytearray(self.HEADER_LENGTH + meta_length) + offset = 0 + + header[offset] = self.KINDS["userBroadcastPush"] + offset += 1 + header[offset] = len(join_ref) + offset += 1 + header[offset] = len(ref) + offset += 1 + header[offset] = len(topic) + offset += 1 + header[offset] = len(user_event) + offset += 1 + header[offset] = len(metadata) + offset += 1 + header[offset] = encoding_type + offset += 1 + + # Write strings as UTF-8 + header[offset:offset + len(join_ref)] = join_ref.encode("utf-8") + offset += len(join_ref) + header[offset:offset + len(ref)] = ref.encode("utf-8") + offset += len(ref) + header[offset:offset + len(topic)] = topic.encode("utf-8") + offset += len(topic) + header[offset:offset + len(user_event)] = user_event.encode("utf-8") + offset += len(user_event) + header[offset:offset + len(metadata)] = metadata.encode("utf-8") + offset += len(metadata) + + # Combine header and payload + return bytes(header) + encoded_payload + + def decode(self, raw_payload: Union[bytes, str]) -> Dict[str, Any]: + """ + Decode a message from either JSON string (V1) or binary (V2). + + Args: + raw_payload: JSON string or bytes to decode + + Returns: + Decoded message dictionary + """ + if isinstance(raw_payload, bytes): + return self._binary_decode(raw_payload) + + if isinstance(raw_payload, str): + json_payload = json.loads(raw_payload) + join_ref, ref, topic, event, payload = json_payload + return { + "join_ref": join_ref, + "ref": ref, + "topic": topic, + "event": event, + "payload": payload, + } + + return {} + + def _binary_decode(self, buffer: bytes) -> Dict[str, Any]: + """Decode a binary message based on its kind.""" + if len(buffer) == 0: + return {} + + kind = buffer[0] + + if kind == self.KINDS["push"]: + return self._decode_push(buffer) + elif kind == self.KINDS["reply"]: + return self._decode_reply(buffer) + elif kind == self.KINDS["broadcast"]: + return self._decode_broadcast(buffer) + elif kind == self.KINDS["userBroadcast"]: + return self._decode_user_broadcast(buffer) + else: + return {} + + def _decode_push(self, buffer: bytes) -> Dict[str, Any]: + """Decode a push message.""" + if len(buffer) < self.HEADER_LENGTH + self.META_LENGTH - 1: + return {} + + join_ref_size = buffer[1] + topic_size = buffer[2] + event_size = buffer[3] + + offset = self.HEADER_LENGTH + self.META_LENGTH - 1 # pushes have no ref + + join_ref = buffer[offset:offset + join_ref_size].decode("utf-8") + offset += join_ref_size + + topic = buffer[offset:offset + topic_size].decode("utf-8") + offset += topic_size + + event = buffer[offset:offset + event_size].decode("utf-8") + offset += event_size + + data = json.loads(buffer[offset:].decode("utf-8")) + + return { + "join_ref": join_ref, + "ref": None, + "topic": topic, + "event": event, + "payload": data, + } + + def _decode_reply(self, buffer: bytes) -> Dict[str, Any]: + """Decode a reply message.""" + if len(buffer) < self.HEADER_LENGTH + self.META_LENGTH: + return {} + + join_ref_size = buffer[1] + ref_size = buffer[2] + topic_size = buffer[3] + event_size = buffer[4] + + offset = self.HEADER_LENGTH + self.META_LENGTH + + join_ref = buffer[offset:offset + join_ref_size].decode("utf-8") + offset += join_ref_size + + ref = buffer[offset:offset + ref_size].decode("utf-8") + offset += ref_size + + topic = buffer[offset:offset + topic_size].decode("utf-8") + offset += topic_size + + event = buffer[offset:offset + event_size].decode("utf-8") + offset += event_size + + data = json.loads(buffer[offset:].decode("utf-8")) + payload = {"status": event, "response": data} + + return { + "join_ref": join_ref, + "ref": ref, + "topic": topic, + "event": ChannelEvents.reply, + "payload": payload, + } + + def _decode_broadcast(self, buffer: bytes) -> Dict[str, Any]: + """Decode a broadcast message.""" + if len(buffer) < self.HEADER_LENGTH + 2: + return {} + + topic_size = buffer[1] + event_size = buffer[2] + + offset = self.HEADER_LENGTH + 2 # kind(1) + topic_len(1) + event_len(1) = 3, but offset is after reading lengths + + topic = buffer[offset:offset + topic_size].decode("utf-8") + offset += topic_size + + event = buffer[offset:offset + event_size].decode("utf-8") + offset += event_size + + data = json.loads(buffer[offset:].decode("utf-8")) + + return { + "join_ref": None, + "ref": None, + "topic": topic, + "event": event, + "payload": data, + } + + def _decode_user_broadcast(self, buffer: bytes) -> Dict[str, Any]: + """ + Decode a user broadcast message. + + Supports both JSON and binary payloads, with optional metadata. + """ + if len(buffer) < self.HEADER_LENGTH + 4: + return {} + + topic_size = buffer[1] + user_event_size = buffer[2] + metadata_size = buffer[3] + payload_encoding = buffer[4] + + offset = self.HEADER_LENGTH + 4 + + topic = buffer[offset:offset + topic_size].decode("utf-8") + offset += topic_size + + user_event = buffer[offset:offset + user_event_size].decode("utf-8") + offset += user_event_size + + metadata = buffer[offset:offset + metadata_size].decode("utf-8") if metadata_size > 0 else "" + offset += metadata_size + + payload = buffer[offset:] + + if payload_encoding == self.JSON_ENCODING: + parsed_payload = json.loads(payload.decode("utf-8")) + else: + parsed_payload = payload + + data: Dict[str, Any] = { + "type": self.BROADCAST_EVENT, + "event": user_event, + "payload": parsed_payload, + } + + # Metadata is optional and always JSON encoded + if metadata_size > 0: + data["meta"] = json.loads(metadata) + + return { + "join_ref": None, + "ref": None, + "topic": topic, + "event": self.BROADCAST_EVENT, + "payload": data, + } + + def _pick(self, obj: Optional[Dict[str, Any]], keys: List[str]) -> Dict[str, Any]: + """ + Pick specific keys from a dictionary. + + Args: + obj: Dictionary to pick from + keys: List of keys to pick + + Returns: + Dictionary containing only the specified keys + """ + if not obj or not isinstance(obj, dict): + return {} + return {key: obj[key] for key in keys if key in obj} diff --git a/src/realtime/src/realtime/types.py b/src/realtime/src/realtime/types.py index 1223f06a..0eb7456f 100644 --- a/src/realtime/src/realtime/types.py +++ b/src/realtime/src/realtime/types.py @@ -17,7 +17,10 @@ # Constants DEFAULT_TIMEOUT = 10 PHOENIX_CHANNEL = "phoenix" -VSN = "1.0.0" +VSN_1_0_0 = "1.0.0" +VSN_2_0_0 = "2.0.0" +DEFAULT_VSN = VSN_1_0_0 +VSN = DEFAULT_VSN # Backward compatibility DEFAULT_HEARTBEAT_INTERVAL = 25 # Type variables and custom types diff --git a/src/realtime/tests/test_serializer.py b/src/realtime/tests/test_serializer.py new file mode 100644 index 00000000..1b9a5d82 --- /dev/null +++ b/src/realtime/tests/test_serializer.py @@ -0,0 +1,366 @@ +""" +Tests for Realtime V2 serializer. + +Based on supabase-js PRs #1829 and #1894. +""" +import json +import pytest + +from realtime.serializer import Serializer +from realtime.types import ChannelEvents + + +def test_json_encode(): + """Test JSON encoding (V1 format).""" + serializer = Serializer() + msg = { + "join_ref": "0", + "ref": "1", + "topic": "t", + "event": "e", + "payload": {"foo": 1}, + } + result = serializer.encode(msg) + assert result == '["0","1","t","e",{"foo":1}]' + + +def test_json_decode(): + """Test JSON decoding (V1 format).""" + serializer = Serializer() + result = serializer.decode('["0","1","t","e",{"foo":1}]') + assert result == { + "join_ref": "0", + "ref": "1", + "topic": "t", + "event": "e", + "payload": {"foo": 1}, + } + + +def test_binary_encode_push(): + """Test binary encoding of push message with binary payload.""" + serializer = Serializer() + buffer = bytes([1, 4]) + msg = { + "join_ref": "0", + "ref": "1", + "topic": "t", + "event": "e", + "payload": buffer, + } + result = serializer.encode(msg) + assert isinstance(result, bytes) + # Verify structure: kind (0), join_ref_len (1), ref_len (1), topic_len (1), event_len (1) + assert result[0] == 0 # push kind + assert result[1] == 1 # join_ref length + assert result[2] == 1 # ref length + assert result[3] == 1 # topic length + assert result[4] == 1 # event length + # Verify payload is at the end + assert result[-2:] == buffer + + +def test_binary_encode_push_variable_length(): + """Test binary encoding with variable length segments.""" + serializer = Serializer() + buffer = bytes([1, 4]) + msg = { + "join_ref": "10", + "ref": "1", + "topic": "top", + "event": "ev", + "payload": buffer, + } + result = serializer.encode(msg) + assert isinstance(result, bytes) + assert result[0] == 0 # push kind + assert result[1] == 2 # join_ref length + assert result[2] == 1 # ref length + assert result[3] == 3 # topic length + assert result[4] == 2 # event length + + +def test_encode_user_broadcast_push_json(): + """Test encoding user broadcast push with JSON payload.""" + serializer = Serializer() + msg = { + "join_ref": "10", + "ref": "1", + "topic": "top", + "event": "broadcast", + "payload": { + "type": "broadcast", + "event": "user-event", + "payload": {"a": "b"}, + }, + } + result = serializer.encode(msg) + assert isinstance(result, bytes) + # Verify structure: kind (3), join_ref_len, ref_len, topic_len, user_event_len, metadata_len, encoding + assert result[0] == 3 # userBroadcastPush kind + assert result[1] == 2 # join_ref length + assert result[2] == 1 # ref length + assert result[3] == 3 # topic length + assert result[4] == 10 # user_event length + assert result[5] == 0 # metadata length (no metadata) + assert result[6] == 1 # JSON encoding + + +def test_encode_user_broadcast_push_json_with_metadata(): + """Test encoding user broadcast push with JSON payload and metadata.""" + serializer = Serializer(allowed_metadata_keys=["extra"]) + msg = { + "join_ref": "10", + "ref": "1", + "topic": "top", + "event": "broadcast", + "payload": { + "type": "broadcast", + "event": "user-event", + "extra": "bit", + "store": True, # Not in allowed keys, should be filtered + "payload": {"a": "b"}, + }, + } + result = serializer.encode(msg) + assert isinstance(result, bytes) + assert result[0] == 3 # userBroadcastPush kind + assert result[5] > 0 # metadata length (should have metadata) + # Verify metadata contains "extra" but not "store" + # Format: kind(1) + join_ref_len(1) + ref_len(1) + topic_len(1) + user_event_len(1) + metadata_len(1) + encoding(1) = 7 bytes header + # Then: join_ref(2) + ref(1) + topic(3) + user_event(10) = 16 bytes + # So metadata starts at: 7 + 16 = 23 + header_len = 7 + join_ref_len = result[1] + ref_len = result[2] + topic_len = result[3] + user_event_len = result[4] + metadata_start = header_len + join_ref_len + ref_len + topic_len + user_event_len + metadata_len = result[5] + metadata_bytes = result[metadata_start:metadata_start + metadata_len] + metadata = json.loads(metadata_bytes.decode("utf-8")) + assert "extra" in metadata + assert metadata["extra"] == "bit" + assert "store" not in metadata + + +def test_encode_user_broadcast_push_binary(): + """Test encoding user broadcast push with binary payload.""" + serializer = Serializer() + buffer = bytes([1, 4]) + msg = { + "join_ref": "10", + "ref": "1", + "topic": "top", + "event": "broadcast", + "payload": { + "type": "broadcast", + "event": "user-event", + "payload": buffer, + }, + } + result = serializer.encode(msg) + assert isinstance(result, bytes) + assert result[0] == 3 # userBroadcastPush kind + assert result[6] == 0 # binary encoding + # Verify binary payload is at the end + assert result[-2:] == buffer + + +def test_decode_push(): + """Test decoding push message.""" + serializer = Serializer() + # Create binary message: kind=0, join_ref_len=3, topic_len=3, event_len=10, then strings, then JSON payload + bin_data = b'\x00\x03\x03\n123topsome-event{"a":"b"}' + result = serializer.decode(bin_data) + assert result["join_ref"] == "123" + assert result["ref"] is None + assert result["topic"] == "top" + assert result["event"] == "some-event" + assert result["payload"] == {"a": "b"} + + +def test_decode_reply(): + """Test decoding reply message.""" + serializer = Serializer() + # Create binary message: kind=1, join_ref_len=3, ref_len=2, topic_len=3, event_len=2, then strings, then JSON + bin_data = b'\x01\x03\x02\x03\x0210012topok{"a":"b"}' + result = serializer.decode(bin_data) + assert result["join_ref"] == "100" + assert result["ref"] == "12" + assert result["topic"] == "top" + assert result["event"] == ChannelEvents.reply + assert result["payload"]["status"] == "ok" + assert result["payload"]["response"] == {"a": "b"} + + +def test_decode_broadcast(): + """Test decoding broadcast message.""" + serializer = Serializer() + # Create binary message: kind=2, topic_len=3, event_len=10, then strings, then JSON + bin_data = b'\x02\x03\ntopsome-event{"a":"b"}' + result = serializer.decode(bin_data) + assert result["join_ref"] is None + assert result["ref"] is None + assert result["topic"] == "top" + assert result["event"] == "some-event" + assert result["payload"] == {"a": "b"} + + +def test_decode_user_broadcast_json_no_metadata(): + """Test decoding user broadcast with JSON payload and no metadata.""" + serializer = Serializer() + # kind=4, topic_len=3, user_event_len=10, metadata_len=0, encoding=1, then strings, then JSON payload + bin_data = b'\x04\x03\n\x00\x01topuser-event{"a":"b"}' + result = serializer.decode(bin_data) + assert result["join_ref"] is None + assert result["ref"] is None + assert result["topic"] == "top" + assert result["event"] == "broadcast" + assert result["payload"]["type"] == "broadcast" + assert result["payload"]["event"] == "user-event" + assert result["payload"]["payload"] == {"a": "b"} + assert "meta" not in result["payload"] + + +def test_decode_user_broadcast_json_with_metadata(): + """Test decoding user broadcast with JSON payload and metadata.""" + serializer = Serializer() + # kind=4, topic_len=3, user_event_len=10, metadata_len=17, encoding=1, then strings, metadata, JSON payload + bin_data = b'\x04\x03\n\x11\x01topuser-event{"replayed":true}{"a":"b"}' + result = serializer.decode(bin_data) + assert result["join_ref"] is None + assert result["ref"] is None + assert result["topic"] == "top" + assert result["event"] == "broadcast" + assert result["payload"]["type"] == "broadcast" + assert result["payload"]["event"] == "user-event" + assert result["payload"]["payload"] == {"a": "b"} + assert result["payload"]["meta"] == {"replayed": True} + + +def test_decode_user_broadcast_binary_no_metadata(): + """Test decoding user broadcast with binary payload and no metadata.""" + serializer = Serializer() + # kind=4, topic_len=3, user_event_len=10, metadata_len=0, encoding=0, then strings, then binary payload + bin_data = b'\x04\x03\n\x00\x00topuser-event\x01\x04' + result = serializer.decode(bin_data) + assert result["join_ref"] is None + assert result["ref"] is None + assert result["topic"] == "top" + assert result["event"] == "broadcast" + assert result["payload"]["type"] == "broadcast" + assert result["payload"]["event"] == "user-event" + assert isinstance(result["payload"]["payload"], bytes) + assert result["payload"]["payload"] == b'\x01\x04' + assert "meta" not in result["payload"] + + +def test_decode_user_broadcast_binary_with_metadata(): + """Test decoding user broadcast with binary payload and metadata.""" + serializer = Serializer() + # kind=4, topic_len=3, user_event_len=10, metadata_len=17, encoding=0, then strings, metadata, binary payload + bin_data = b'\x04\x03\n\x11\x00topuser-event{"replayed":true}\x01\x04' + result = serializer.decode(bin_data) + assert result["join_ref"] is None + assert result["ref"] is None + assert result["topic"] == "top" + assert result["event"] == "broadcast" + assert result["payload"]["type"] == "broadcast" + assert result["payload"]["event"] == "user-event" + assert isinstance(result["payload"]["payload"], bytes) + assert result["payload"]["payload"] == b'\x01\x04' + assert result["payload"]["meta"] == {"replayed": True} + + +def test_encode_validation_errors(): + """Test that encoding validates field lengths.""" + serializer = Serializer() + + # Test join_ref too long + with pytest.raises(ValueError, match="joinRef length"): + serializer._encode_user_broadcast_push_internal( + { + "join_ref": "a" * 256, + "ref": "1", + "topic": "top", + "payload": {"event": "user-event", "payload": {}}, + }, + 1, + b"", + ) + + # Test ref too long + with pytest.raises(ValueError, match="ref length"): + serializer._encode_user_broadcast_push_internal( + { + "join_ref": "10", + "ref": "a" * 256, + "topic": "top", + "payload": {"event": "user-event", "payload": {}}, + }, + 1, + b"", + ) + + # Test topic too long + with pytest.raises(ValueError, match="topic length"): + serializer._encode_user_broadcast_push_internal( + { + "join_ref": "10", + "ref": "1", + "topic": "a" * 256, + "payload": {"event": "user-event", "payload": {}}, + }, + 1, + b"", + ) + + # Test user_event too long + with pytest.raises(ValueError, match="userEvent length"): + serializer._encode_user_broadcast_push_internal( + { + "join_ref": "10", + "ref": "1", + "topic": "top", + "payload": {"event": "a" * 256, "payload": {}}, + }, + 1, + b"", + ) + + # Test metadata too long + serializer_with_meta = Serializer(allowed_metadata_keys=["extra"]) + # Create metadata that will exceed 255 chars when JSON stringified + # JSON format: {"extra":"aaa..."} = 2 + 9 + 1 + 1 + N + 1 = 14 + N + # So we need N > 241 to exceed 255 total + with pytest.raises(ValueError, match="metadata length"): + serializer_with_meta._encode_user_broadcast_push_internal( + { + "join_ref": "10", + "ref": "1", + "topic": "top", + "payload": { + "event": "user-event", + "payload": {}, + "extra": "a" * 260, # Will exceed 255 when JSON stringified + }, + }, + 1, + b"", + ) + + +def test_pick_helper(): + """Test the _pick helper method.""" + serializer = Serializer() + obj = {"a": 1, "b": 2, "c": 3} + result = serializer._pick(obj, ["a", "c"]) + assert result == {"a": 1, "c": 3} + assert "b" not in result + + # Test with None + assert serializer._pick(None, ["a"]) == {} + + # Test with empty dict + assert serializer._pick({}, ["a"]) == {}