-
Notifications
You must be signed in to change notification settings - Fork 74
[WIP] Fix cursors for resubscription #2707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: graphite-base/2707
Are you sure you want to change the base?
[WIP] Fix cursors for resubscription #2707
Conversation
|
Claude finished @neekolas's task —— View job PR Review: Fix cursors for resubscriptionReview Date: November 3, 2025 SummaryThis PR adds cursor-based group message subscription to improve message streaming reliability. While functional, there are several important issues to address, particularly around the D14n cursor calculation logic and error handling. Critical Issues1. Potential cursor calculation bug in D14n implementation (xmtp_api_d14n/src/queries/d14n/streams.rs:66-76) The D14n client computes a "lowest common cursor" (LCC) by taking the minimum sequence ID across all groups. This could cause message loss: for (_, cursor) in groups_with_cursors {
for (&node_id, &seq_id) in cursor.iter() {
min_clock
.entry(node_id)
.and_modify(|existing| *existing = (*existing).min(seq_id))
.or_insert(seq_id);
}
}Problem: If Group A has cursor Suggestion: Consider per-group cursors instead of a global LCC, or document why this approach is safe for your architecture. 2. Missing error handling for cursor extraction (xmtp_proto/src/api_client/impls.rs:269-276) async fn subscribe_group_messages_with_cursors(
&self,
groups_with_cursors: &[(&GroupId, crate::types::GlobalCursor)],
) -> Result<Self::GroupMessageStream, Self::Error> {
(**self)
.subscribe_group_messages_with_cursors(groups_with_cursors)
.await
}The delegation pattern is correct, but there's no validation that cursors are valid or that group IDs exist. Consider adding validation at the API boundary. Design Concerns3. Tight coupling between resubscription and cursor positions (xmtp_mls/src/subscriptions/stream_messages.rs:231-261) The let groups_with_cursors: Vec<(&GroupId, GlobalCursor)> = groups_with_positions
.iter()
.map(|(group_id, position)| (group_id, GlobalCursor::new(position.last_streamed())))
.collect();Issue: When adding a new group, all existing groups are resubscribed with their current positions. If there's a temporary network issue during resubscription, you might miss messages that arrived between the last cursor update and the resubscription. Suggestion: Consider whether cursors should be refreshed from the database before resubscribing, or document that this is acceptable given your architecture. 4. Inconsistent cursor initialization for new groups (xmtp_mls/src/subscriptions/stream_messages.rs:254-260) Ok((
stream,
new_group,
Some(Cursor {
sequence_id: 1,
originator_id: 0,
}),
))Hardcoding cursor to Suggestion: Either document this assumption clearly or derive the starting cursor from the group's actual state. Code Quality Issues5. Inconsistent logging between implementations V3 implementation has detailed logging: tracing::debug!(
"subscribing to group {} @ cursor {}",
hex::encode(group_id),
id_cursor
);D14n implementation is less detailed: tracing::debug!(
"subscribing to messages with provided cursors @cursor={}",
lcc
);Suggestion: Add per-group cursor logging in D14n implementation for consistency and debuggability. 6. Test improvements needed (xmtp_mls/src/subscriptions/stream_all/tests.rs:651-884) The
Suggestion: Add a dedicated test for
Minor Issues7. Incomplete coverage (per Codecov report) Missing coverage in:
Suggestion: Add test cases for the new cursor-based subscription methods. Positive Aspects✅ Good propagation of the new method through all wrapper layers Recommendations
|
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
How to use the Graphite Merge QueueAdd the label mergequeue to this PR to add it to the merge queue. You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. |
Add cursor-based group message subscription by implementing
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## graphite-base/2707 #2707 +/- ##
=====================================================
Coverage ? 77.38%
=====================================================
Files ? 360
Lines ? 53458
Branches ? 0
=====================================================
Hits ? 41368
Misses ? 12090
Partials ? 0 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
2146a1c to
b67db31
Compare
b67db31 to
24de843
Compare

TL;DR
What changed?
subscribe_group_messages_with_cursorsmethod in theXmtpMlsStreamstrait and its implementationsHow to test?
test_stream_all_concurrent_writeswhich has been updated to better validate message deliveryWhy make this change?
This change improves message streaming reliability by allowing clients to specify exact cursor positions when subscribing to group messages. This is particularly important for resuming streams after disconnections or when adding new groups to an existing subscription. The implementation helps prevent message loss by ensuring streams can be properly resumed from the last known position.