-
Notifications
You must be signed in to change notification settings - Fork 27
[AIT-96] feat: RealtimeChannel publish over WebSocket implementation #643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds per-message ACK/NACK tracking and queuing (msgSerial, PendingMessage/PendingMessageQueue) in ConnectionManager; a new async RealtimeChannel.publish with client_id validation, encryption, encoding and size validation; transport-level ACK/NACK routing in WebSocketTransport; helper validate_message_size; and extensive publish tests. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Channel as RealtimeChannel
participant ConnMgr as ConnectionManager
participant Transport as WebSocketTransport
participant Server as Server
rect rgba(100,150,200,0.12)
Note over App,Channel: Publish -> await per-message ACK/NACK
App->>Channel: publish(message)
Channel->>Channel: validate client_id, encrypt, encode, size check
Channel->>ConnMgr: send_protocol_message(MESSAGE)
ConnMgr->>ConnMgr: create PendingMessage + Future
ConnMgr->>ConnMgr: assign msgSerial (if ack-required) and enqueue/send
ConnMgr->>Transport: send MESSAGE (or enqueue if disconnected)
Transport->>Server: transmit
Server-->>Transport: ACK {msgSerial, count}
Transport->>ConnMgr: on_ack(msgSerial, count)
ConnMgr->>ConnMgr: complete PendingMessage future(s)
Channel->>App: publish resolves
end
rect rgba(220,120,120,0.12)
Note over Server,ConnMgr: NACK path
Server-->>Transport: NACK {msgSerial, count, error}
Transport->>ConnMgr: on_nack(msgSerial, count, error)
ConnMgr->>ConnMgr: fail PendingMessage future(s) with error
Channel->>App: publish raises AblyException
end
rect rgba(150,200,120,0.12)
Note over ConnMgr,Transport: Reconnect / Requeue
alt connectionId changed
ConnMgr->>ConnMgr: on_connected -> reset msgSerial
end
ConnMgr->>ConnMgr: requeue_pending_messages()
ConnMgr->>Transport: resend pending MESSAGEs preserving futures/msgSerial
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
test/ably/realtime/realtimechannel_publish_test.py (1)
381-434: Consider simplifying test setup.The test manually manipulates internal state (
on_ack,pending_message_queue.push,queued_messages.get) which makes it fragile to implementation changes. However, it does effectively validate the requeue mechanics.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
ably/realtime/connectionmanager.py(7 hunks)ably/realtime/realtime_channel.py(2 hunks)ably/transport/websockettransport.py(3 hunks)ably/util/helper.py(2 hunks)test/ably/realtime/realtimechannel_publish_test.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
AblyException(9-84)ably/types/options.py (2)
use_binary_protocol(161-162)use_binary_protocol(165-166)
test/ably/realtime/realtimechannel_publish_test.py (5)
ably/realtime/connectionmanager.py (10)
ably(741-742)count(54-56)notify_state(562-608)state(745-746)on_connected(322-353)connection_details(749-750)on_ack(431-439)send_protocol_message(171-236)requeue_pending_messages(244-271)push(50-52)ably/transport/websockettransport.py (6)
ProtocolMessageAction(34-56)close(228-229)send(231-236)on_protocol_message(124-185)dispose(214-226)connect(76-82)ably/util/exceptions.py (1)
AblyException(9-84)ably/util/helper.py (1)
cancel(75-76)ably/types/connectiondetails.py (1)
ConnectionDetails(5-20)
ably/realtime/connectionmanager.py (3)
ably/transport/websockettransport.py (1)
ProtocolMessageAction(34-56)ably/util/exceptions.py (1)
AblyException(9-84)ably/types/connectionstate.py (1)
ConnectionState(8-16)
ably/realtime/realtime_channel.py (6)
ably/realtime/connectionmanager.py (3)
ably(741-742)state(745-746)send_protocol_message(171-236)ably/util/exceptions.py (2)
AblyException(9-84)IncompatibleClientIdException(103-104)ably/util/helper.py (1)
validate_message_size(78-100)ably/types/message.py (6)
Message(24-231)name(65-66)data(69-70)client_id(73-74)encrypt(100-116)as_dict(132-178)ably/types/connectionstate.py (1)
ConnectionState(8-16)ably/types/channelstate.py (1)
ChannelState(8-15)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (3)
count(54-56)on_ack(431-439)on_nack(441-453)ably/util/exceptions.py (2)
AblyException(9-84)from_dict(83-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: check (3.9)
- GitHub Check: check (3.11)
- GitHub Check: check (3.12)
- GitHub Check: check (3.13)
- GitHub Check: check (3.8)
- GitHub Check: check (3.10)
- GitHub Check: check (3.7)
🔇 Additional comments (19)
ably/transport/websockettransport.py (2)
36-56: LGTM!The
ProtocolMessageActionenum expansion correctly adds the new action types with sequential integer values matching the Ably protocol specification.
168-179: LGTM!The ACK/NACK handling correctly extracts
msgSerialandcountwith sensible defaults, converts error dicts toAblyExceptionfor NACKs, and delegates to the appropriateConnectionManagermethods.ably/realtime/realtime_channel.py (5)
16-17: LGTM!New imports correctly added for client ID validation and message size enforcement.
387-440: LGTM!The argument parsing correctly handles all specified forms: single
Message, dict, list of messages/dicts, and the(name, data)pair form per RTL6i specification.
442-454: LGTM!The clientId validation correctly implements RTL6g:
- Rejects wildcard
'*'clientId (RTL6g3)- Validates message clientId against configured client using
can_assume_client_id
456-488: LGTM!The encoding flow correctly handles encryption, serialization, size validation, and sending via the connection manager with proper acknowledgment awaiting.
490-517: LGTM!The state validation correctly prevents publishing when connection or channel states would make it impossible, implementing RTL6c4 requirements.
test/ably/realtime/realtimechannel_publish_test.py (6)
1-11: LGTM!Imports and test class setup are correctly structured for async testing.
19-104: LGTM!Good coverage of basic ACK/NACK functionality and msgSerial sequencing per RTN7a/RTN7b. The transport interception pattern for simulating server responses is well-implemented.
106-190: LGTM!RTN7e tests correctly verify that pending messages fail when connection enters SUSPENDED or FAILED states.
192-281: LGTM!RTN7d tests correctly validate the
queueMessagesoption behavior on DISCONNECTED state - failing messages when false, preserving when true.
283-379: LGTM!RTN19a2 tests correctly verify msgSerial reset behavior based on connectionId changes, and concurrent publish handling.
587-676: LGTM!Comprehensive integration test for RTL3d + RTL6c2 verifying that queued messages are sent immediately on reconnection without waiting for channel reattachment.
ably/realtime/connectionmanager.py (6)
27-42: LGTM!
PendingMessagecorrectly identifies which protocol actions require acknowledgment and maintains the future for async/await support.
44-104: LGTM!
PendingMessageQueuecorrectly implements serial-based message completion with proper handling of ACK count ranges and error propagation for NACKs.
244-271: LGTM!
requeue_pending_messagescorrectly implements RTN19a by preserving thePendingMessageobjects (including their futures and msgSerials) when requeueing for resend after reconnection.
326-334: LGTM!The msgSerial reset logic correctly implements RTN19a2 - only resetting when the connectionId changes (indicating a failed resume/new connection).
431-453: LGTM!
on_ackandon_nackcorrectly delegate toPendingMessageQueue.complete_messages, withon_nackproviding a default error when none is supplied by the server.
599-608: LGTM!State transition handling correctly implements RTN7e (fail on SUSPENDED/CLOSED/FAILED) and RTN7d (fail on DISCONNECTED when
queueMessagesis false).
8f52b9b to
c34b665
Compare
c34b665 to
df15d8a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (2)
ably/util/helper.py (1)
78-100: AblyException arguments are in wrong order.The
AblyExceptionconstructor signature is(message, status_code, code, cause=None). Here40009is passed asstatus_codeand400ascode, but 40009 is the Ably error code and 400 is the HTTP status code - these should be swapped.if size > max_message_size: raise AblyException( f"Maximum size of messages that can be published at once exceeded " f"(was {size} bytes; limit is {max_message_size} bytes)", - 40009, - 400 + 400, + 40009 )ably/realtime/connectionmanager.py (1)
190-201: Duplicate push to pending_message_queue when queueing messages.When a message is queued (DISCONNECTED/CONNECTING state), it's added to both
queued_messages(line 194) andpending_message_queue(lines 196-197). Later, whensend_queued_messagesretrieves thePendingMessageand callssend_protocol_message_on_connected_state, lines 214-215 push it topending_message_queueagain, causing duplicates.if self.state in ( ConnectionState.DISCONNECTED, ConnectionState.CONNECTING, ): self.queued_messages.put(pending_message) - # For queued messages requiring ack, add to pending queue - if pending_message.ack_required: - self.pending_message_queue.push(pending_message) if pending_message.ack_required: await pending_message.future return NoneThe message should only be added to
pending_message_queuewhen it's actually sent (insend_protocol_message_on_connected_state), not when it's queued.
🧹 Nitpick comments (1)
ably/realtime/connectionmanager.py (1)
75-77: Redundant conditional check.The check
if first:on line 76 is redundant since line 71 already returns early ifself.messagesis empty, guaranteeing thatfirstis truthy at this point.first = self.messages[0] - if first: - start_serial = first.message.get('msgSerial') + start_serial = first.message.get('msgSerial')
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
ably/realtime/connectionmanager.py(7 hunks)ably/realtime/realtime_channel.py(2 hunks)ably/transport/websockettransport.py(3 hunks)ably/util/helper.py(2 hunks)test/ably/realtime/realtimechannel_publish_test.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
ably/realtime/realtime_channel.py (3)
ably/util/exceptions.py (2)
AblyException(9-84)IncompatibleClientIdException(103-104)ably/util/helper.py (1)
validate_message_size(78-100)ably/types/message.py (6)
Message(24-231)name(65-66)data(69-70)client_id(73-74)encrypt(100-116)as_dict(132-178)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
AblyException(9-84)ably/types/options.py (2)
use_binary_protocol(161-162)use_binary_protocol(165-166)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (3)
count(57-59)on_ack(423-431)on_nack(433-445)ably/util/exceptions.py (2)
AblyException(9-84)from_dict(83-84)
ably/realtime/connectionmanager.py (2)
ably/transport/websockettransport.py (1)
ProtocolMessageAction(34-56)ably/util/exceptions.py (1)
AblyException(9-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: check (3.13)
- GitHub Check: check (3.11)
- GitHub Check: check (3.10)
- GitHub Check: check (3.7)
- GitHub Check: check (3.12)
- GitHub Check: check (3.9)
- GitHub Check: check (3.8)
🔇 Additional comments (7)
ably/transport/websockettransport.py (1)
168-179: LGTM!The ACK and NACK protocol message handling is correctly implemented. The code properly extracts
msgSerialandcountwith sensible defaults, converts error dicts toAblyExceptionusing the existing factory method, and routes to the appropriate connection manager callbacks.ably/realtime/realtime_channel.py (1)
387-488: Solid publish implementation with proper validation.The publish method correctly implements RTL6 spec requirements:
- Flexible argument parsing supporting Message objects, dicts, lists, and (name, data) pairs
- Proper clientId validation for identified clients
- Message encryption support when cipher is configured
- Size validation before sending
- Awaits server acknowledgment per RTL6b
The implementation is well-structured and handles edge cases appropriately.
test/ably/realtime/realtimechannel_publish_test.py (1)
13-677: Comprehensive test coverage for RTN7 spec.The test suite provides excellent coverage of the realtime publish behavior including:
- Basic ACK/NACK handling
- Sequential msgSerial incrementing
- Pending message failure on state transitions (SUSPENDED, FAILED, DISCONNECTED)
- queueMessages option behavior
- msgSerial reset/preservation on connection changes
- Concurrent publish handling
- Message requeueing and resend on reconnect
- Multi-count ACK handling
- Channel reattachment edge cases
The test patterns using transport interception and state manipulation are appropriate for testing these protocol-level behaviors.
ably/realtime/connectionmanager.py (4)
27-106: Well-designed pending message tracking primitives.The
PendingMessageandPendingMessageQueueclasses provide clean abstractions for tracking messages awaiting acknowledgment. The design correctly:
- Identifies ack-required actions (MESSAGE, PRESENCE, ANNOTATION, OBJECT)
- Associates a Future with each pending message for async waiting
- Handles batch ACK/NACK with serial and count parameters
- Provides methods to complete or fail messages
314-345: Correct RTN19a2 implementation for msgSerial reset.The
on_connectedmethod correctly resetsmsg_serialto 0 when theconnectionIdchanges (indicating a new connection rather than a resume). This aligns with the RTN19a2 spec requirement.
236-263: Solid requeue implementation for RTN19a.The
requeue_pending_messagesmethod correctly preservesPendingMessageobjects (with their Futures and msgSerials) when requeueing for resend after transport disconnection. Usingreversed()maintains proper message ordering.
591-600: Correct RTN7d/RTN7e state transition handling.The
notify_statemethod properly fails queued and pending messages on:
- SUSPENDED, CLOSED, FAILED states (RTN7e)
- DISCONNECTED when
queueMessages=false(RTN7d)This ensures messages don't hang indefinitely when the connection cannot recover.
df15d8a to
a22b7d5
Compare
owenpearson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! My initial thoughts:
- I think the main thing I found that's broken is that websockettransport is hardcoded to use JSON serialisation regardless of the
use_binary_protocolsetting. I think this was fine when it was just sending control protocol mesasges, but now that it's sending real data it's quite important to fix this. One particular problem with this is that encryption won't work over json transport because encrypted messages can't always be json serialised. It also would be nice to have all tests running on JSON and binary transports so we can make sure there aren't any other subtle bugs here. - We're not implementing bundling messages here, but could be nice to implement that while we're working on this SDK. I think that could be in a separate PR anyway.
- Test coverage is much lower than ably-js. I tried prompting claude to try and copy all the tests from ably-js and it found quite a few tests that we don't have yet on this branch:
✅ Message Size Validation Tests (2 tests)
- test_publish_message_exceeding_size_limit - Verifies 70KB message is rejected (RSL1i)
- test_publish_message_within_size_limit - Verifies 10KB message succeeds
✅ Client ID Validation Tests (4 tests)
- test_publish_with_matching_client_id - Explicit matching clientId succeeds (RTL6g2)
- test_publish_with_null_client_id_when_identified - Null clientId gets populated by server (RTL6g1)
- test_publish_with_mismatched_client_id_fails - Mismatched clientId is rejected (RTL6g3)
- test_publish_with_wildcard_client_id_fails - Wildcard clientId is rejected (RTL6g3)
✅ Data Type Variation Tests (6 tests)
- test_publish_with_string_data - String data
- test_publish_with_json_object_data - JSON object data
- test_publish_with_json_array_data - JSON array data
- test_publish_with_null_data - Null data (RTL6i3)
- test_publish_with_null_name - Null name (RTL6i3)
- test_publish_message_array - Array of messages (RTL6i2)
✅ Channel State Validation Tests (2 tests)
- test_publish_fails_on_suspended_channel - Publishing on SUSPENDED channel fails (RTL6c4)
- test_publish_fails_on_failed_channel - Publishing on FAILED channel fails (RTL6c4)
✅ Idempotent Publishing Test (1 test)
- test_idempotent_realtime_publishing - Messages with same ID (RSL1k2, RSL1k5)
a22b7d5 to
4e5f0d8
Compare
4e5f0d8 to
a818d7f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
test/ably/realtime/realtimechannel_publish_test.py (1)
181-185: AblyException arguments are in wrong order.The
AblyExceptionconstructor signature is(message, status_code, code). Here80000is passed asstatus_codeand500ascode, but80000is an Ably error code and500is the HTTP status code.# Force FAILED state connection_manager.notify_state( ConnectionState.FAILED, - AblyException('Test failure', 80000, 500) + AblyException('Test failure', 500, 80000) )
🧹 Nitpick comments (2)
ably/realtime/realtime_channel.py (1)
457-470: In-place message encryption may cause unexpected side effects.The
m.encrypt(self.cipher)call modifies the Message object in place. If the caller reuses the same Message object after publishing, it will be in an encrypted state, which could lead to double-encryption on retry or unexpected behavior.Consider working on a copy of the message to avoid mutating the caller's object:
for m in messages: # Encode the message with encryption if needed + msg_to_encode = m if self.cipher: - m.encrypt(self.cipher) + # Work on a copy to avoid mutating caller's message + msg_to_encode = Message(name=m.name, data=m.data, client_id=m.client_id, id=m.id, + connection_id=m.connection_id, extras=m.extras) + msg_to_encode.encrypt(self.cipher) # Convert to dict representation - msg_dict = m.as_dict(binary=self.ably.options.use_binary_protocol) + msg_dict = msg_to_encode.as_dict(binary=self.ably.options.use_binary_protocol) encoded_messages.append(msg_dict)ably/realtime/connectionmanager.py (1)
61-95: Redundant truthiness check on line 76.The check
if first:on line 76 is alwaysTruebecause the function returns early on line 73 ifself.messagesis empty. This is harmless but slightly redundant.first = self.messages[0] - if first: - start_serial = first.message.get('msgSerial') - if start_serial is None: - log.warning('MessageQueue.complete_messages(): first message has no msgSerial') - return + start_serial = first.message.get('msgSerial') + if start_serial is None: + log.warning('MessageQueue.complete_messages(): first message has no msgSerial') + return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
ably/realtime/connectionmanager.py(7 hunks)ably/realtime/realtime_channel.py(2 hunks)ably/transport/websockettransport.py(3 hunks)ably/util/helper.py(2 hunks)test/ably/realtime/realtimechannel_publish_test.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
AblyException(9-84)ably/types/options.py (2)
use_binary_protocol(161-162)use_binary_protocol(165-166)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (2)
count(57-59)on_ack(413-421)ably/util/exceptions.py (2)
AblyException(9-84)from_dict(83-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: check (3.12)
- GitHub Check: check (3.11)
- GitHub Check: check (3.8)
- GitHub Check: check (3.9)
- GitHub Check: check (3.13)
- GitHub Check: check (3.10)
- GitHub Check: check (3.7)
🔇 Additional comments (13)
ably/util/helper.py (1)
78-100: LGTM! Size validation implementation is correct.The function properly handles both binary (msgpack) and JSON encoding paths, uses compact JSON separators for accurate size calculation, and the
AblyExceptionarguments are now in the correct order (status_code=400,code=40009).ably/transport/websockettransport.py (2)
34-56: Protocol message action enum expansion looks correct.The new enum values (ACK, NACK, CONNECT, DISCONNECT, PRESENCE, SYNC, ACTIVATE, OBJECT, OBJECT_SYNC, ANNOTATION) align with the Ably protocol message action definitions.
168-179: ACK/NACK handling implementation is correct.The handlers properly extract
msgSerialandcountwith appropriate defaults:msgSerial=0follows Ably protocol initialization (used by SDKs), andcount=1aligns with the protocol specification which treats missing or zero counts as 1. Error dictionaries are correctly converted toAblyExceptionwhen present, and messages are routed to the appropriateConnectionManagermethods.test/ably/realtime/realtimechannel_publish_test.py (2)
14-32: Good test structure with proper setup and cleanup.The test class properly sets up test variables and each test ensures cleanup with
await ably.close(). The test coverage for RTN7 (ACK/NACK), RTN19 (resend/msgSerial reset), RTL6 (publish behavior), and RSL1i (size limits) is comprehensive.
927-975: Idempotent publishing test validates deduplication correctly.The test verifies that messages with the same explicit ID are deduplicated by the server (RSL1k5), confirming only 2 unique messages are received when 3 are published (2 with same ID, 1 different).
ably/realtime/realtime_channel.py (3)
387-440: Publish method argument parsing is well-structured.The implementation correctly handles all RTL6i input forms: single Message, dict, list of messages, and (name, data) pairs. The RTL6i3 compliance for allowing null name/data is also properly handled.
442-455: Client ID validation correctly implements RTL6g.The validation properly rejects wildcard (
*) client IDs (RTL6g3) and validates client ID compatibility usingcan_assume_client_id. This aligns with the spec requirements.
490-517: State validation helper correctly implements RTL6c4.The
_throw_if_unpublishable_statemethod properly checks both connection state (allowing CONNECTED, CONNECTING, DISCONNECTED) and channel state (rejecting SUSPENDED, FAILED). TheAblyExceptionarguments are now in the correct order.ably/realtime/connectionmanager.py (5)
27-45: PendingMessage class correctly models message acknowledgment tracking.The class properly determines which actions require acknowledgment (MESSAGE, PRESENCE, ANNOTATION, OBJECT) and creates a Future only when needed.
202-218: Duplicate push prevention correctly addresses the past review concern.Line 205 properly checks
if pending_message not in self.pending_message_queue.messagesbefore pushing, preventing the duplicate push issue noted in previous reviews.
304-316: msgSerial reset on new connectionId correctly implements RTN19a2.The logic properly detects when
connectionIdchanges (indicating a new connection rather than resume) and resetsmsg_serialto 0.
413-435: ACK/NACK handlers correctly route to queue completion.The
on_ackandon_nackmethods properly delegate topending_message_queue.complete_messageswith appropriate error handling for NACK cases.
581-590: RTN7d and RTN7e state-based message failure is correctly implemented.The logic properly fails pending messages on SUSPENDED/CLOSED/FAILED (RTN7e) and on DISCONNECTED when
queue_messages=False(RTN7d).
a818d7f to
8f1e406
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
ably/realtime/connectionmanager.py (2)
245-249: Comment is misleading and requeued messages lack priority.The comment says "Add back to front of queue" but
append()adds to the end. Per RTN19a, messages awaiting ACK should be resent on reconnect, implying they should have priority over messages queued during disconnect.To give requeued messages priority over new messages:
# Add back to front of queue (FIFO but priority over new messages) # Store the entire PendingMessage object to preserve Future - for pending_msg in reversed(pending_messages): + for pending_msg in pending_messages: # PendingMessage object retains its Future, msgSerial - self.queued_messages.append(pending_msg) + self.queued_messages.appendleft(pending_msg)This adds messages to the front in original order, ensuring they're sent before any newly queued messages.
220-224: Usepopleft()for FIFO order consistency.If the intent is FIFO (first-in-first-out), use
popleft()to match the comment inrequeue_pending_messages()about maintaining order.def send_queued_messages(self) -> None: log.info(f'ConnectionManager.send_queued_messages(): sending {len(self.queued_messages)} message(s)') while len(self.queued_messages) > 0: - pending_message = self.queued_messages.pop() + pending_message = self.queued_messages.popleft() asyncio.create_task(self._send_protocol_message_on_connected_state(pending_message))test/ably/realtime/realtimechannel_publish_test.py (1)
182-185: AblyException arguments are in wrong order.The
AblyExceptionconstructor signature is(message, status_code, code). Here80000(Ably error code) is passed asstatus_codeand500(HTTP status) ascode. These should be swapped.# Force FAILED state connection_manager.notify_state( ConnectionState.FAILED, - AblyException('Test failure', 80000, 500) + AblyException('Test failure', 500, 80000) )
🧹 Nitpick comments (2)
ably/realtime/connectionmanager.py (2)
75-76: Redundant truthiness check.After confirming
self.messagesis non-empty at line 71,first = self.messages[0]will always be aPendingMessageobject (truthy). Theif first:check is redundant.first = self.messages[0] - if first: - start_serial = first.message.get('msgSerial') + start_serial = first.message.get('msgSerial')Then dedent the remaining logic accordingly.
263-266: Uselog.errorinstead oflog.exceptionwhen not handling an exception.
log.exception()is intended for use within exception handlers as it includes stack trace information. Here you're just logging a failure message, not handling an exception.- log.exception( + log.error( f"ConnectionManager.fail_queued_messages(): Failed to send protocol message: " f"{pending_msg.message}" )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
ably/realtime/connectionmanager.py(8 hunks)ably/realtime/realtime_channel.py(2 hunks)ably/transport/websockettransport.py(3 hunks)ably/util/helper.py(2 hunks)test/ably/realtime/realtimechannel_publish_test.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
test/ably/realtime/realtimechannel_publish_test.py (8)
ably/realtime/connectionmanager.py (10)
ably(723-724)count(57-59)notify_state(544-590)state(727-728)on_connected(304-335)connection_details(731-732)on_ack(413-421)send_protocol_message(174-200)requeue_pending_messages(226-253)push(53-55)ably/types/connectionstate.py (1)
ConnectionState(8-16)ably/types/channelstate.py (1)
ChannelState(8-15)ably/transport/websockettransport.py (3)
ProtocolMessageAction(34-56)send(231-236)connect(76-82)ably/types/message.py (2)
Message(24-231)connection_key(89-90)ably/util/exceptions.py (2)
AblyException(9-84)IncompatibleClientIdException(103-104)test/ably/utils.py (4)
BaseAsyncTestCase(44-59)WaitableEvent(239-253)assert_waiter(187-199)wait(249-250)ably/realtime/realtime_channel.py (5)
attach(160-198)publish(388-488)state(690-692)state(695-696)name(684-686)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
AblyException(9-84)ably/types/options.py (2)
use_binary_protocol(161-162)use_binary_protocol(165-166)
ably/transport/websockettransport.py (1)
ably/util/exceptions.py (2)
AblyException(9-84)from_dict(83-84)
ably/realtime/connectionmanager.py (2)
ably/util/exceptions.py (1)
AblyException(9-84)ably/types/connectionstate.py (1)
ConnectionState(8-16)
🪛 GitHub Actions: Linting check
ably/realtime/connectionmanager.py
[error] 126-126: UP006 Use collections.deque instead of Deque for type annotation.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: check (3.11)
- GitHub Check: check (3.7)
- GitHub Check: check (3.13)
- GitHub Check: check (3.12)
- GitHub Check: check (3.9)
- GitHub Check: check (3.10)
- GitHub Check: check (3.8)
🔇 Additional comments (14)
ably/realtime/connectionmanager.py (5)
27-45: LGTM!The
PendingMessageclass correctly identifies actions requiring acknowledgment (MESSAGE, PRESENCE, ANNOTATION, OBJECT) per RTN7a and creates a Future for awaiting the ACK/NACK.
308-316: LGTM!The
msgSerialreset logic correctly implements RTN19a2: reset to 0 only whenconnectionIdchanges (failed resume/new connection), preserving serial on successful resume.
413-435: LGTM!The ACK/NACK handlers correctly delegate to
PendingMessageQueue.complete_messages()with appropriate error handling for NACK.
575-590: LGTM!State transition handling correctly implements RTN7e (fail on SUSPENDED/CLOSED/FAILED) and RTN7d (fail on DISCONNECTED when
queueMessages=false).
437-443: LGTM!Correctly implements RTN19a by requeuing pending messages before transport disposal.
ably/util/helper.py (1)
78-100: LGTM!The
validate_message_sizefunction correctly computes encoded payload size based on the protocol (msgpack vs JSON) and raises an appropriately structuredAblyExceptionwhen the limit is exceeded.test/ably/realtime/realtimechannel_publish_test.py (2)
14-16: Well-structured test suite with comprehensive spec coverage.The test class covers key spec points (RTN7a/b/d/e, RTN19a/a2, RSL1i/k, RTL6c4/g/i) with clear docstrings mapping to requirements.
928-975: LGTM!The idempotent publishing test correctly verifies that messages with explicit IDs are sent and server-side deduplication occurs for duplicate IDs (RSL1k2, RSL1k5).
ably/realtime/realtime_channel.py (4)
387-441: LGTM!The
publishmethod correctly handles multiple argument forms (RTL6i), with clear validation and parsing logic.
442-455: LGTM!Client ID validation correctly implements RTL6g: rejecting wildcard
*and mismatched client IDs with appropriate error codes.
468-470: LGTM!Message size validation correctly uses the helper function with appropriate default (64KB per RSL1i).
490-517: LGTM!State validation correctly implements RTL6c4, preventing publish on invalid connection/channel states with properly structured exceptions.
ably/transport/websockettransport.py (2)
34-56: LGTM!The
ProtocolMessageActionenum is correctly extended with the necessary protocol actions for message acknowledgment (ACK, NACK) and other protocol messages.
168-179: LGTM!ACK/NACK handling correctly extracts protocol fields with sensible defaults and delegates to
ConnectionManagerfor processing. Error conversion properly usesAblyException.from_dict().
Implemented Spec points: ## Message Publishing Specifications (RTL6) ### RTL6c - Messages published on channels in specific states - Messages published when channel is not **ATTACHED** should be published immediately ### RTL6c2 - Message queuing behavior - Messages can be queued when connection/channel is not ready - Relates to processing queued messages when connection becomes ready ### RTL6c3 - Publishing without implicit attach ### RTL6c4 - Behavior when queueMessages client option is false ### RTL6d - Message bundling restrictions #### RTL6d1: Maximum message size limits for bundling - **RTL6d2**: All messages in bundle must have same clientId #### RTL6d3: Can only bundle messages for same channel - **RTL6d4**: Can only bundle messages with same action (MESSAGE or PRESENCE) #### RTL6d7: Cannot bundle idempotent messages with non-idempotent messages --- ## Message Acknowledgment (RTN7) ### RTN7a All **PRESENCE**, **MESSAGE**, **ANNOTATION**, and **OBJECT** ProtocolMessages sent to Ably expect either an **ACK** or **NACK** to confirm successful receipt or failure ### RTN7b Every ProtocolMessage requiring acknowledgment must contain a unique serially incrementing `msgSerial` integer starting at zero ### RTN7c If connection enters **SUSPENDED**, **CLOSED**, or **FAILED** state and ACK/NACK has not been received, client should fail those messages and remove them from retry queues ### RTN7d If `queueMessages` is false, messages entering **DISCONNECTED** state without acknowledgment should be treated as failed immediately ### RTN7e When connection state changes to **SUSPENDED**/**CLOSED**/**FAILED**, pending messages (submitted via RTL6c1 or RTL6c2) awaiting ACK/NACK should be considered failed --- ## Message Resending and Serial Handling (RTN19) ### RTN19a Upon reconnection after disconnection, client library must resend all pending messages awaiting acknowledgment, allowing the realtime system to respond with ACK/NACK ### RTN19a2 In the event of a new `connectionId` (connection not resumed), previous `msgSerials` are meaningless and must be reset. The `msgSerial` counter resets to 0 for the new connection --- ## Channel State and Reattachment (RTL3, RTL4, RTL5) ### RTL3c Channel state implications when connection goes into **SUSPENDED** ### RTL3d When connection enters **CONNECTED** state, channels in **ATTACHING**, **ATTACHED**, or **SUSPENDED** states should transition to **ATTACHING** and initiate attach sequence. Connection should process queued messages immediately without waiting for attach operations to finish ### RTL4c - Attach sequence - **RTL4c1**: ATTACH message includes channel serial to resume from previous message or attachment ### RTL5i If channel is **DETACHING**, re-send **DETACH** and remain in 'detaching' state
8f1e406 to
b9ab475
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
test/ably/realtime/realtimechannel_publish_test.py (1)
182-185: AblyException arguments are in wrong order.The
AblyExceptionconstructor signature is(message, status_code, code). Here80000is passed asstatus_codeand500ascode, but these should be swapped—500is the HTTP status and80000is the Ably error code.# Force FAILED state connection_manager.notify_state( ConnectionState.FAILED, - AblyException('Test failure', 80000, 500) + AblyException('Test failure', 500, 80000) )ably/realtime/connectionmanager.py (3)
220-224: Usepopleft()for proper FIFO ordering.Combined with changing
appendleft()toappend()above, usepopleft()here for consistent FIFO semantics (first-in, first-out):def send_queued_messages(self) -> None: log.info(f'ConnectionManager.send_queued_messages(): sending {len(self.queued_messages)} message(s)') while len(self.queued_messages) > 0: - pending_message = self.queued_messages.pop() + pending_message = self.queued_messages.popleft() asyncio.create_task(self._send_protocol_message_on_connected_state(pending_message))This ensures messages are sent in the order they were queued.
260-262: Usepopleft()infail_queued_messagesfor consistency.For consistent FIFO semantics across the codebase:
error = err or AblyException("Connection failed", 80000, 500) while len(self.queued_messages) > 0: - pending_msg = self.queued_messages.pop() + pending_msg = self.queued_messages.popleft() log.exception(
194-198: LIFO ordering issue: new messages queued withappendleft()but sent withpop()creates reverse order.New messages are added to the front with
appendleft()(line 195), butsend_queued_messages()usespop()(line 223) which removes from the end. This means newly queued messages will be sent after older messages in the queue—effectively LIFO order for new messages.For FIFO ordering, use
append()here instead ofappendleft():if self.state in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING): - self.queued_messages.appendleft(pending_message) + self.queued_messages.append(pending_message) if pending_message.ack_required: await pending_message.future return NoneThis also aligns with
requeue_pending_messages()at line 249 which correctly usesappend().
🧹 Nitpick comments (2)
ably/realtime/connectionmanager.py (2)
245-249: Update misleading comment:append()adds to end, not front.The comment says "Add back to front of queue" but
append()adds to the end. Update the comment to match the actual behavior:- # Add back to front of queue (FIFO but priority over new messages) + # Add to end of queue (these will be sent first via popleft in send_queued_messages) # Store the entire PendingMessage object to preserve Future for pending_msg in reversed(pending_messages): # PendingMessage object retains its Future, msgSerial self.queued_messages.append(pending_msg)Note: With
popleft()semantics, usingreversed()here ensures requeued messages maintain their original order and are sent before any new messages added viaappend().
75-77: Redundant truthiness check.After the empty check at lines 71-73,
firstat line 75 will always be truthy (sinceself.messages[0]returns the first element of a non-empty list). Theif first:check is redundant.first = self.messages[0] - if first: - start_serial = first.message.get('msgSerial') + start_serial = first.message.get('msgSerial')
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
ably/realtime/connectionmanager.py(8 hunks)ably/realtime/realtime_channel.py(2 hunks)ably/transport/websockettransport.py(3 hunks)ably/util/helper.py(2 hunks)test/ably/realtime/realtimechannel_publish_test.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
ably/realtime/connectionmanager.py (3)
ably/transport/websockettransport.py (1)
ProtocolMessageAction(34-56)ably/util/exceptions.py (1)
AblyException(9-84)ably/types/connectionstate.py (1)
ConnectionState(8-16)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
AblyException(9-84)ably/types/options.py (2)
use_binary_protocol(161-162)use_binary_protocol(165-166)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (3)
count(57-59)on_ack(413-421)on_nack(423-435)ably/util/exceptions.py (2)
AblyException(9-84)from_dict(83-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: check (3.7)
- GitHub Check: check (3.12)
- GitHub Check: check (3.11)
- GitHub Check: check (3.10)
- GitHub Check: check (3.13)
- GitHub Check: check (3.8)
- GitHub Check: check (3.9)
🔇 Additional comments (11)
ably/util/helper.py (1)
78-100: LGTM!The
validate_message_sizefunction correctly validates message size limits using the appropriate encoding (msgpack for binary protocol, compact JSON otherwise). TheAblyExceptionarguments are in the correct order (status_code=400, code=40009).ably/realtime/connectionmanager.py (4)
27-45: LGTM!The
PendingMessageclass correctly encapsulates message tracking with appropriate ACK requirement detection for MESSAGE, PRESENCE, ANNOTATION, and OBJECT actions.
47-106: LGTM!The
PendingMessageQueueclass provides clean abstractions for managing pending messages with proper serial-based completion logic.
304-316: LGTM!The RTN19a2 implementation correctly resets
msgSerialto 0 when theconnectionIdchanges, indicating a new connection rather than a resume.
413-435: LGTM!The
on_ackandon_nackhandlers correctly delegate toPendingMessageQueue.complete_messages, with appropriate error creation for NACK without explicit error.ably/transport/websockettransport.py (2)
34-56: LGTM!The
ProtocolMessageActionenum expansion correctly adds all required protocol actions per the Ably specification.
168-179: LGTM!The ACK/NACK handling correctly extracts
msgSerialandcountwith sensible defaults, and properly routes to the connection manager's handlers.ably/realtime/realtime_channel.py (2)
387-488: LGTM!The
publishmethod implementation correctly handles:
- Multiple input forms (Message, dict, list, name+data)
- RTL6g client ID validation for identified clients
- Message encryption when cipher is configured
- Size validation via
validate_message_size- State validation before sending
- Protocol message construction and awaiting acknowledgment
490-517: LGTM!The
_throw_if_unpublishable_statemethod correctly validates connection and channel states per RTL6c4, with properly orderedAblyExceptionarguments.test/ably/realtime/realtimechannel_publish_test.py (2)
14-16: LGTM!The test class provides comprehensive coverage for RTN7 message acknowledgment spec points including ACK/NACK handling, msgSerial sequencing, pending message failures, and queueing behavior.
927-975: LGTM!The idempotent publishing test correctly verifies RSL1k2/RSL1k5 behavior by checking that the server deduplicates messages with the same ID.
owenpearson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! ![]()
Implemented Spec points:
Message Publishing Specifications (RTL6)
RTL6c - Messages published on channels in specific states
RTL6c2 - Message queuing behavior
RTL6c3 - Publishing without implicit attach
RTL6c4 - Behavior when queueMessages client option is false
RTL6d - Message bundling restrictions
RTL6d1: Maximum message size limits for bundling
RTL6d3: Can only bundle messages for same channel
RTL6d7: Cannot bundle idempotent messages with non-idempotent messages
Message Acknowledgment (RTN7)
RTN7a
All PRESENCE, MESSAGE, ANNOTATION, and OBJECT ProtocolMessages sent to Ably expect either an ACK or NACK to confirm successful receipt or failure
RTN7b
Every ProtocolMessage requiring acknowledgment must contain a unique serially incrementing
msgSerialinteger starting at zeroRTN7c
If connection enters SUSPENDED, CLOSED, or FAILED state and ACK/NACK has not been received, client should fail those messages and remove them from retry queues
RTN7d
If
queueMessagesis false, messages entering DISCONNECTED state without acknowledgment should be treated as failed immediatelyRTN7e
When connection state changes to SUSPENDED/CLOSED/FAILED, pending messages (submitted via RTL6c1 or RTL6c2) awaiting ACK/NACK should be considered failed
Message Resending and Serial Handling (RTN19)
RTN19a
Upon reconnection after disconnection, client library must resend all pending messages awaiting acknowledgment, allowing the realtime system to respond with ACK/NACK
RTN19a2
In the event of a new
connectionId(connection not resumed), previousmsgSerialsare meaningless and must be reset. ThemsgSerialcounter resets to 0 for the new connectionChannel State and Reattachment (RTL3, RTL4, RTL5)
RTL3c
Channel state implications when connection goes into SUSPENDED
RTL3d
When connection enters CONNECTED state, channels in ATTACHING, ATTACHED, or SUSPENDED states should transition to ATTACHING and initiate attach sequence. Connection should process queued messages immediately without waiting for attach operations to finish
RTL4c - Attach sequence
RTL5i
If channel is DETACHING, re-send DETACH and remain in 'detaching' state
Summary by CodeRabbit
New Features
Bug Fixes / Reliability
Tests
✏️ Tip: You can customize this high-level summary in your review settings.