Skip to content

Conversation

@neekolas
Copy link
Contributor

@neekolas neekolas commented Oct 31, 2025

TL;DR

  • Hacky fix for updating cursors in group subscriptions.
  • Not a great implementation and I think kinda ugly.

What changed?

  • Implemented subscribe_group_messages_with_cursors method in the XmtpMlsStreams trait and its implementations
  • Added support for this method across various client wrappers (debug, d14n, boxed streams)
  • Enhanced the stream subscription mechanism to use cursor information when resubscribing
  • Improved test coverage for concurrent message streaming with better tracking of sent vs received messages

How to test?

  • Run the existing test suite, particularly test_stream_all_concurrent_writes which has been updated to better validate message delivery
  • Create a client that subscribes to multiple groups with different cursor positions and verify messages are received correctly from those positions

Why make this change?

This change improves message streaming reliability by allowing clients to specify exact cursor positions when subscribing to group messages. This is particularly important for resuming streams after disconnections or when adding new groups to an existing subscription. The implementation helps prevent message loss by ensuring streams can be properly resumed from the last known position.

@neekolas neekolas changed the title Add subscribe_group_messages_with_cursors [WIP] Oct 31, 2025
@claude
Copy link

claude bot commented Oct 31, 2025

Claude finished @neekolas's task —— View job


PR Review: Fix cursors for resubscription

Review Date: November 3, 2025

Summary

This PR adds cursor-based group message subscription to improve message streaming reliability. While functional, there are several important issues to address, particularly around the D14n cursor calculation logic and error handling.


Critical Issues

1. Potential cursor calculation bug in D14n implementation (xmtp_api_d14n/src/queries/d14n/streams.rs:66-76)

The D14n client computes a "lowest common cursor" (LCC) by taking the minimum sequence ID across all groups. This could cause message loss:

for (_, cursor) in groups_with_cursors {
    for (&node_id, &seq_id) in cursor.iter() {
        min_clock
            .entry(node_id)
            .and_modify(|existing| *existing = (*existing).min(seq_id))
            .or_insert(seq_id);
    }
}

Problem: If Group A has cursor {node1: 100} and Group B has cursor {node1: 50, node2: 30}, the resulting LCC will be {node1: 50, node2: 30}. When subscribing to Group A, messages 51-99 from node1 will be replayed unnecessarily. More critically, if Group A never had messages from node2, it will start receiving messages from cursor 30 when it should start from cursor 0.

Suggestion: Consider per-group cursors instead of a global LCC, or document why this approach is safe for your architecture.


2. Missing error handling for cursor extraction (xmtp_proto/src/api_client/impls.rs:269-276)

async fn subscribe_group_messages_with_cursors(
    &self,
    groups_with_cursors: &[(&GroupId, crate::types::GlobalCursor)],
) -> Result<Self::GroupMessageStream, Self::Error> {
    (**self)
        .subscribe_group_messages_with_cursors(groups_with_cursors)
        .await
}

The delegation pattern is correct, but there's no validation that cursors are valid or that group IDs exist. Consider adding validation at the API boundary.


Design Concerns

3. Tight coupling between resubscription and cursor positions (xmtp_mls/src/subscriptions/stream_messages.rs:231-261)

The subscribe function always uses cursors from MessagePosition when resubscribing:

let groups_with_cursors: Vec<(&GroupId, GlobalCursor)> = groups_with_positions
    .iter()
    .map(|(group_id, position)| (group_id, GlobalCursor::new(position.last_streamed())))
    .collect();

Issue: When adding a new group, all existing groups are resubscribed with their current positions. If there's a temporary network issue during resubscription, you might miss messages that arrived between the last cursor update and the resubscription.

Suggestion: Consider whether cursors should be refreshed from the database before resubscribing, or document that this is acceptable given your architecture.


4. Inconsistent cursor initialization for new groups (xmtp_mls/src/subscriptions/stream_messages.rs:254-260)

Ok((
    stream,
    new_group,
    Some(Cursor {
        sequence_id: 1,
        originator_id: 0,
    }),
))

Hardcoding cursor to {sequence_id: 1, originator_id: 0} assumes messages start at ID 1. This might skip the first message if the backend starts from 0, or cause issues in decentralized scenarios.

Suggestion: Either document this assumption clearly or derive the starting cursor from the group's actual state.


Code Quality Issues

5. Inconsistent logging between implementations

V3 implementation has detailed logging:

tracing::debug!(
    "subscribing to group {} @ cursor {}",
    hex::encode(group_id),
    id_cursor
);

D14n implementation is less detailed:

tracing::debug!(
    "subscribing to messages with provided cursors @cursor={}",
    lcc
);

Suggestion: Add per-group cursor logging in D14n implementation for consistency and debuggability.


6. Test improvements needed (xmtp_mls/src/subscriptions/stream_all/tests.rs:651-884)

The test_stream_all_concurrent_writes test has good coverage but could be improved:

  • The test tracks sent messages but doesn't verify ordering within each group
  • No explicit test for the resubscription cursor behavior
  • The 100 message count assertion is hardcoded without clear derivation

Suggestion: Add a dedicated test for subscribe_group_messages_with_cursors that validates:

  1. Correct cursor positions after resubscription
  2. No duplicate messages
  3. No missed messages
  4. Behavior when cursors are out of sync

Minor Issues

7. Incomplete coverage (per Codecov report)

Missing coverage in:

  • xmtp_proto/src/api_client/impls.rs (2 lines, 50% coverage)
  • xmtp_api/src/debug_wrapper.rs (1 line, 85% coverage)
  • xmtp_api/src/mls.rs (1 line, 92% coverage)

Suggestion: Add test cases for the new cursor-based subscription methods.


Positive Aspects

✅ Good propagation of the new method through all wrapper layers
✅ Comprehensive test for concurrent message streaming
✅ Proper use of GlobalCursor type for abstraction
✅ Debug logging at key points


Recommendations

  1. Priority 1: Review and fix the D14n LCC calculation logic (Issue Refactor to separate wasm bindings from rust crate #1)
  2. Priority 2: Add validation for cursor values and group IDs (Issue Implement hkdf #2)
  3. Priority 3: Document cursor refresh strategy during resubscription (Issue switch hkdf to use RustCrypto hdkf and sha2 #3)
  4. Priority 4: Add dedicated tests for cursor-based subscription behavior (Issue feat: implemented Keystore::{decryptV1, saveInvites, getV2Conversations} #6)
  5. Consider whether the "hacky" implementation mentioned in the PR description could be improved before merging

Copy link
Contributor Author

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label mergequeue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@neekolas neekolas changed the title [WIP] [WIP] Fix cursors for resubscription Oct 31, 2025
@macroscopeapp
Copy link
Contributor

macroscopeapp bot commented Oct 31, 2025

Add cursor-based group message subscription by implementing XmtpMlsStreams::subscribe_group_messages_with_cursors across clients and wrappers [WIP]

Introduce subscribe_group_messages_with_cursors on XmtpMlsStreams and propagate it through client facades, debug/stats/boxed wrappers, v3 and D14n stream implementations, and tests. The D14n client computes a lowest common GlobalCursor across groups; the v3 client derives per-group id cursors from provided GlobalCursors. Update message streaming to pass explicit cursors derived from MessagePosition.

📍Where to Start

Start with the trait change adding XmtpMlsStreams::subscribe_group_messages_with_cursors in api_client.rs, then follow its use in the v3 implementation in streams.rs and the D14n implementation in streams.rs.


📊 Macroscope summarized 24de843. 11 files reviewed, 9 issues evaluated, 9 issues filtered, 0 comments posted. View details

@codecov
Copy link

codecov bot commented Oct 31, 2025

Codecov Report

❌ Patch coverage is 90.90909% with 4 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (graphite-base/2707@8c08d7b). Learn more about missing BASE report.

Files with missing lines Patch % Lines
xmtp_proto/src/api_client/impls.rs 50.00% 2 Missing ⚠️
xmtp_api/src/debug_wrapper.rs 85.71% 1 Missing ⚠️
xmtp_api/src/mls.rs 92.85% 1 Missing ⚠️
Additional details and impacted files
@@                  Coverage Diff                  @@
##             graphite-base/2707    #2707   +/-   ##
=====================================================
  Coverage                      ?   77.38%           
=====================================================
  Files                         ?      360           
  Lines                         ?    53458           
  Branches                      ?        0           
=====================================================
  Hits                          ?    41368           
  Misses                        ?    12090           
  Partials                      ?        0           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@neekolas neekolas force-pushed the 10-31-add_subscribe_group_messages_with_cursors branch 2 times, most recently from 2146a1c to b67db31 Compare November 1, 2025 00:04
@neekolas neekolas force-pushed the 10-31-add_subscribe_group_messages_with_cursors branch from b67db31 to 24de843 Compare November 3, 2025 16:15
@neekolas neekolas changed the base branch from 10-30-add_test_for_concurrent_stream_writes to graphite-base/2707 November 3, 2025 16:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants