Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions xmtp_api_d14n/src/protocol/sort/causal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use xmtp_proto::api::VectorClock;
use xmtp_proto::types::TopicCursor;
use xmtp_proto::types::{Topic, TopicCursor};
use xmtp_proto::{api::VectorClock, types::GlobalCursor};

use crate::protocol::{ApplyCursor, Envelope, EnvelopeError, Sort};

Expand All @@ -8,24 +8,64 @@ pub struct CausalSort<'a, E> {
topic_cursor: &'a mut TopicCursor,
}

// store temporary info about the envelope
// so that we do not need to re-deserialize
struct Missed<E> {
envelope: E,
depends_on: GlobalCursor,
topic: Topic,
}

impl<E> Missed<E> {
pub fn new(envelope: E, depends_on: GlobalCursor, topic: Topic) -> Self {
Self {
envelope,
depends_on,
topic,
}
}

pub fn into_envelope(self) -> E {
self.envelope
}
}

impl<'b, 'a: 'b, E: Envelope<'a>> CausalSort<'b, E> {
// check if any of the dependencies of envelopes in `other` are
// satisfied by any envelopes in `self.envelopes`
// this lets us resolve dependencies internally
// for deeply-nested sets of dependencies.
fn recover_newly_valid(&mut self, missed: &mut Vec<Missed<E>>) -> Vec<E> {
missed
.extract_if(.., |m| {
let clock = self.topic_cursor.get_or_default(&m.topic);
clock.dominates(&m.depends_on)
})
.map(move |m| m.envelope)
.collect()
}
}

impl<'b, 'a: 'b, E: Envelope<'a>> Sort<Vec<E>> for CausalSort<'b, E> {
fn sort(self) -> Result<Option<Vec<E>>, EnvelopeError> {
fn sort(mut self) -> Result<Option<Vec<E>>, EnvelopeError> {
let mut i = 0;
// cant use `Vec::extract_if` b/c we are returning results
let mut missing = Vec::new();
let mut missed = Vec::new();
while i < self.envelopes.len() {
let env = &mut self.envelopes[i];
let topic = env.topic()?;
let last_seen = env.depends_on()?.unwrap_or(Default::default());
let vector_clock = self.topic_cursor.get_or_default(&topic);
if vector_clock.dominates(&last_seen) {
self.topic_cursor.apply(env)?;
let newly_valid = self.recover_newly_valid(&mut missed);
i += 1;
self.envelopes.splice(i..i, newly_valid.into_iter());
} else {
missing.push(self.envelopes.remove(i));
let missed_envelope = self.envelopes.remove(i);
missed.push(Missed::new(missed_envelope, last_seen, topic));
}
}
Ok((!missing.is_empty()).then_some(missing))
Ok((!missed.is_empty()).then_some(missed.into_iter().map(Missed::into_envelope).collect()))
}
}

Expand Down Expand Up @@ -119,11 +159,8 @@ mod tests {
assert_sorted(&envelopes, &missing, &removed)
}

/// this sort does not handle dependencies that are already available _within_ the given
/// dependency array.
#[xmtp_common::test]
#[should_panic]
fn does_not_reapply_within_array(mut envelopes in sorted_dependencies(10, vec![10, 20, 30, 40]).prop_shuffle()) {
fn reapplies_within_array(mut envelopes in sorted_dependencies(10, vec![10, 20, 30, 40]).prop_shuffle()) {
let mut topic_cursor = TopicCursor::default();
let mut missing = vec![];
if let Some(m) = sort::causal(&mut envelopes, &mut topic_cursor).sort()? {
Expand Down
Loading