From c20b6b3609ce9aabf6f3e078a7c8adb0ef1ad565 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 16:38:26 +0200 Subject: [PATCH 01/10] chore(logs): simplify logs around taking a read/write sqlite connection --- crates/matrix-sdk-sqlite/src/event_cache_store.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 65ba9a73fe0..574b4629d94 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -181,8 +181,7 @@ impl SqliteEventCacheStore { // Acquire a connection for executing read operations. #[instrument(skip_all)] async fn read(&self) -> Result { - trace!("Taking a `read` connection"); - let _timer = timer!("connection"); + let _timer = timer!("Taking a `read` connection"); let connection = self.pool.get().await?; @@ -198,8 +197,7 @@ impl SqliteEventCacheStore { // Acquire a connection for executing write operations. #[instrument(skip_all)] async fn write(&self) -> Result> { - trace!("Taking a `write` connection"); - let _timer = timer!("connection"); + let _timer = timer!("Taking a `write` connection"); let connection = self.write_connection.clone().lock_owned().await; From 085f872cc9a61b5d6c7790c840dbefde0a5fecab Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 17:14:28 +0200 Subject: [PATCH 02/10] feat(event cache): use the event cache store lock as little as we must --- .../src/event_cache/store/mod.rs | 33 +++ crates/matrix-sdk/src/event_cache/mod.rs | 57 ++-- .../matrix-sdk/src/event_cache/pagination.rs | 3 + crates/matrix-sdk/src/event_cache/room/mod.rs | 265 ++++++++++++------ 4 files changed, 253 insertions(+), 105 deletions(-) diff --git a/crates/matrix-sdk-base/src/event_cache/store/mod.rs b/crates/matrix-sdk-base/src/event_cache/store/mod.rs index 85a582b4d1a..e6f6e749726 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/mod.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/mod.rs @@ -92,6 +92,39 @@ impl EventCacheStoreLock { Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() }) } + + /// Acquire a spin lock and return an owned guard that can be used to access + /// the cache store. + pub async fn lock_owned(&self) -> Result { + let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?; + + Ok(OwnedEventCacheStoreLockGuard { + cross_process_lock_guard: Arc::new(cross_process_lock_guard), + store: self.store.clone(), + }) + } +} + +/// An RAII implementation of a owned “scoped lock” of an +/// [`EventCacheStoreLock`]. +/// +/// When this structure is dropped (falls out of scope), the lock will be +/// unlocked. +#[derive(Clone)] +pub struct OwnedEventCacheStoreLockGuard { + /// The cross process lock guard. + #[allow(unused)] + cross_process_lock_guard: Arc, + + /// A reference to the store. + pub store: Arc, +} + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for OwnedEventCacheStoreLockGuard { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.debug_struct("OwnedEventCacheStoreLockGuard").finish_non_exhaustive() + } } /// An RAII implementation of a “scoped lock” of an [`EventCacheStoreLock`]. diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 45225d62c16..185a9535157 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -804,25 +804,31 @@ impl EventCacheInner { .await; // Clear the storage for all the rooms, using the storage facility. - self.store.lock().await?.clear_all_linked_chunks().await?; + let locked_store = self.store.lock_owned().await?; + + locked_store.store.clear_all_linked_chunks().await?; // At this point, all the in-memory linked chunks are desynchronized from the // storage. Resynchronize them manually by calling reset(), and // propagate updates to observers. - try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move { - let updates_as_vector_diffs = state_guard.reset().await?; + try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| { + let locked_store = locked_store.clone(); - let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { - diffs: updates_as_vector_diffs, - origin: EventsOrigin::Cache, - }); + async move { + let updates_as_vector_diffs = state_guard.reset(locked_store).await?; - let _ = room - .inner - .generic_update_sender - .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() }); + let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs: updates_as_vector_diffs, + origin: EventsOrigin::Cache, + }); - Ok::<_, EventCacheError>(()) + let _ = room + .inner + .generic_update_sender + .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() }); + + Ok::<_, EventCacheError>(()) + } })) .await?; @@ -839,6 +845,8 @@ impl EventCacheInner { self.multiple_room_updates_lock.lock().await }; + let locked_store = self.store.lock_owned().await?; + // Note: bnjbvr tried to make this concurrent at some point, but it turned out // to be a performance regression, even for large sync updates. Lacking // time to investigate, this code remains sequential for now. See also @@ -848,7 +856,9 @@ impl EventCacheInner { for (room_id, left_room_update) in updates.left { let room = self.for_room(&room_id).await?; - if let Err(err) = room.inner.handle_left_room_update(left_room_update).await { + if let Err(err) = + room.inner.handle_left_room_update(locked_store.clone(), left_room_update).await + { // Non-fatal error, try to continue to the next room. error!("handling left room update: {err}"); } @@ -860,7 +870,9 @@ impl EventCacheInner { let room = self.for_room(&room_id).await?; - if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await { + if let Err(err) = + room.inner.handle_joined_room_update(locked_store.clone(), joined_room_update).await + { // Non-fatal error, try to continue to the next room. error!(%room_id, "handling joined room update: {err}"); } @@ -925,6 +937,7 @@ impl EventCacheInner { let room_event_cache = RoomEventCache::new( self.client.clone(), + self.store.clone(), room_state, pagination_status, room_id.to_owned(), @@ -1125,11 +1138,17 @@ mod tests { .unwrap(); let account_data = vec![read_marker_event; 100]; - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { account_data, ..Default::default() }, + ) + .await + .unwrap(); + } // … there's only one read marker update. assert_matches!( diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 368959390aa..f1ef1421045 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -276,12 +276,15 @@ impl RoomPagination { .await .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?; + let locked_store = self.inner.store.lock_owned().await?; + if let Some((outcome, timeline_event_diffs)) = self .inner .state .write() .await .handle_backpagination( + locked_store, response.chunk, response.end, prev_token, diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 4997e1eb825..35b1c6ffeb9 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -29,7 +29,10 @@ use eyeball::SharedObservable; use eyeball_im::VectorDiff; use matrix_sdk_base::{ deserialized_responses::AmbiguityChange, - event_cache::Event, + event_cache::{ + store::{EventCacheStoreLock, OwnedEventCacheStoreLockGuard}, + Event, + }, linked_chunk::Position, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; @@ -163,6 +166,7 @@ impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. pub(super) fn new( client: WeakClient, + store: EventCacheStoreLock, state: RoomEventCacheState, pagination_status: SharedObservable, room_id: OwnedRoomId, @@ -172,6 +176,7 @@ impl RoomEventCache { Self { inner: Arc::new(RoomEventCacheInner::new( client, + store, state, pagination_status, room_id, @@ -375,7 +380,9 @@ impl RoomEventCache { /// storage. pub async fn clear(&self) -> Result<()> { // Clear the linked chunk and persisted storage. - let updates_as_vector_diffs = self.inner.state.write().await.reset().await?; + let locked_store = self.inner.store.lock_owned().await?; + + let updates_as_vector_diffs = self.inner.state.write().await.reset(locked_store).await?; // Notify observers about the update. let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { @@ -414,6 +421,8 @@ pub(super) struct RoomEventCacheInner { pub weak_room: WeakRoom, + pub store: EventCacheStoreLock, + /// Sender part for subscribers to this room. pub sender: Sender, @@ -444,6 +453,7 @@ impl RoomEventCacheInner { /// to handle new timeline events. fn new( client: WeakClient, + store: EventCacheStoreLock, state: RoomEventCacheState, pagination_status: SharedObservable, room_id: OwnedRoomId, @@ -457,6 +467,7 @@ impl RoomEventCacheInner { weak_room, state: RwLock::new(state), sender, + store, pagination_batch_token_notifier: Default::default(), auto_shrink_sender, pagination_status, @@ -504,21 +515,32 @@ impl RoomEventCacheInner { } #[instrument(skip_all, fields(room_id = %self.room_id))] - pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { + pub(super) async fn handle_joined_room_update( + &self, + locked_store: OwnedEventCacheStoreLockGuard, + updates: JoinedRoomUpdate, + ) -> Result<()> { self.handle_timeline( + locked_store, updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes, ) .await?; + self.handle_account_data(updates.account_data); Ok(()) } #[instrument(skip_all, fields(room_id = %self.room_id))] - pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { - self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?; + pub(super) async fn handle_left_room_update( + &self, + locked_store: OwnedEventCacheStoreLockGuard, + updates: LeftRoomUpdate, + ) -> Result<()> { + self.handle_timeline(locked_store, updates.timeline, Vec::new(), updates.ambiguity_changes) + .await?; Ok(()) } @@ -527,6 +549,7 @@ impl RoomEventCacheInner { /// room. async fn handle_timeline( &self, + locked_store: OwnedEventCacheStoreLockGuard, timeline: Timeline, ephemeral_events: Vec>, ambiguity_changes: BTreeMap, @@ -553,6 +576,7 @@ impl RoomEventCacheInner { .write() .await .handle_sync( + locked_store, timeline, #[cfg(feature = "experimental-search")] &room, @@ -628,7 +652,7 @@ mod private { apply_redaction, deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind}, event_cache::{ - store::{DynEventCacheStore, EventCacheStoreLock}, + store::{DynEventCacheStore, EventCacheStoreLock, OwnedEventCacheStoreLockGuard}, Event, Gap, }, linked_chunk::{ @@ -1042,7 +1066,10 @@ mod private { /// pending diff updates with the result of this function. /// /// Otherwise, returns `None`. - pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> { + pub(super) async fn shrink_to_last_chunk( + &mut self, + locked_store: OwnedEventCacheStoreLockGuard, + ) -> Result<(), EventCacheError> { let store_lock = self.store.lock().await?; // Attempt to load the last chunk. @@ -1073,7 +1100,7 @@ mod private { self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator) { error!("error when replacing the linked chunk: {err}"); - return self.reset_internal().await; + return self.reset_internal(locked_store).await; } // Let pagination observers know that we may have not reached the start of the @@ -1101,7 +1128,8 @@ mod private { if subscriber_count == 0 { // If we are the last strong reference to the auto-shrinker, we can shrink the // events data structure to its last chunk. - self.shrink_to_last_chunk().await?; + let locked_store = self.store.lock_owned().await?; + self.shrink_to_last_chunk(locked_store).await?; Ok(Some(self.room_linked_chunk.updates_as_vector_diffs())) } else { Ok(None) @@ -1112,7 +1140,8 @@ mod private { pub(crate) async fn force_shrink_to_last_chunk( &mut self, ) -> Result>, EventCacheError> { - self.shrink_to_last_chunk().await?; + let locked_store = self.store.lock_owned().await?; + self.shrink_to_last_chunk(locked_store).await?; Ok(self.room_linked_chunk.updates_as_vector_diffs()) } @@ -1172,6 +1201,7 @@ mod private { #[instrument(skip_all)] async fn remove_events( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, in_memory_events: Vec<(OwnedEventId, Position)>, in_store_events: Vec<(OwnedEventId, Position)>, ) -> Result<(), EventCacheError> { @@ -1189,7 +1219,7 @@ mod private { .map(|pos| Update::RemoveItem { at: pos }) .collect::>(); - self.apply_store_only_updates(updates).await?; + self.apply_store_only_updates(locked_store.clone(), updates).await?; } // In-memory events. @@ -1205,13 +1235,16 @@ mod private { ) .expect("failed to remove an event"); - self.propagate_changes().await + self.propagate_changes(locked_store).await } /// Propagate changes to the underlying storage. - async fn propagate_changes(&mut self) -> Result<(), EventCacheError> { + async fn propagate_changes( + &mut self, + locked_store: OwnedEventCacheStoreLockGuard, + ) -> Result<(), EventCacheError> { let updates = self.room_linked_chunk.store_updates().take(); - self.send_updates_to_store(updates).await + self.send_updates_to_store(locked_store, updates).await } /// Apply some updates that are effective only on the store itself. @@ -1222,14 +1255,16 @@ mod private { /// storage. async fn apply_store_only_updates( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, updates: Vec>, ) -> Result<(), EventCacheError> { self.room_linked_chunk.order_tracker.map_updates(&updates); - self.send_updates_to_store(updates).await + self.send_updates_to_store(locked_store, updates).await } async fn send_updates_to_store( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, mut updates: Vec>, ) -> Result<(), EventCacheError> { if updates.is_empty() { @@ -1259,16 +1294,16 @@ mod private { // The store cross-process locking involves an actual mutex, which ensures that // storing updates happens in the expected order. - let store = self.store.clone(); let room_id = self.room.clone(); let cloned_updates = updates.clone(); spawn(async move { - let store = store.lock().await?; - trace!(updates = ?cloned_updates, "sending linked chunk updates to the store"); let linked_chunk_id = LinkedChunkId::Room(&room_id); - store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?; + locked_store + .store + .handle_linked_chunk_updates(linked_chunk_id, cloned_updates) + .await?; trace!("linked chunk updates applied"); super::Result::Ok(()) @@ -1291,8 +1326,11 @@ mod private { /// result, the caller may override any pending diff updates /// with the result of this function. #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] - pub async fn reset(&mut self) -> Result>, EventCacheError> { - self.reset_internal().await?; + pub async fn reset( + &mut self, + locked_store: OwnedEventCacheStoreLockGuard, + ) -> Result>, EventCacheError> { + self.reset_internal(locked_store).await?; let diff_updates = self.room_linked_chunk.updates_as_vector_diffs(); @@ -1303,7 +1341,10 @@ mod private { Ok(diff_updates) } - async fn reset_internal(&mut self) -> Result<(), EventCacheError> { + async fn reset_internal( + &mut self, + locked_store: OwnedEventCacheStoreLockGuard, + ) -> Result<(), EventCacheError> { self.room_linked_chunk.reset(); // No need to update the thread summaries: the room events are @@ -1315,7 +1356,7 @@ mod private { thread.clear(); } - self.propagate_changes().await?; + self.propagate_changes(locked_store).await?; // Reset the pagination state too: pretend we never waited for the initial // prev-batch token, and indicate that we're not at the start of the @@ -1501,17 +1542,18 @@ mod private { /// Flushes updates to disk first. async fn post_process_new_events( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, events: Vec, is_sync: bool, #[cfg(feature = "experimental-search")] room: &Room, ) -> Result<(), EventCacheError> { // Update the store before doing the post-processing. - self.propagate_changes().await?; + self.propagate_changes(locked_store.clone()).await?; let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new(); for event in events { - self.maybe_apply_new_redaction(&event).await?; // TODO: Handle redaction for search index + self.maybe_apply_new_redaction(locked_store.clone(), &event).await?; // TODO: Handle redaction for search index // We can also add the event to the index. #[cfg(feature = "experimental-search")] @@ -1534,7 +1576,7 @@ mod private { } } - self.update_threads(new_events_by_thread, is_sync).await?; + self.update_threads(locked_store, new_events_by_thread, is_sync).await?; Ok(()) } @@ -1554,6 +1596,7 @@ mod private { #[instrument(skip_all)] async fn update_threads( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, new_events_by_thread: BTreeMap>, is_sync: bool, ) -> Result<(), EventCacheError> { @@ -1616,7 +1659,7 @@ mod private { // Trigger an update to observers. target_event.thread_summary = ThreadSummaryStatus::Some(new_summary); - self.replace_event_at(location, target_event).await?; + self.replace_event_at(locked_store.clone(), location, target_event).await?; } Ok(()) @@ -1630,6 +1673,7 @@ mod private { /// unlikely to observe the store updates directly. async fn replace_event_at( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, location: EventLocation, event: Event, ) -> Result<(), EventCacheError> { @@ -1640,7 +1684,7 @@ mod private { .expect("should have been a valid position of an item"); // We just changed the in-memory representation; synchronize this with // the store. - self.propagate_changes().await?; + self.propagate_changes(locked_store).await?; } EventLocation::Store => { self.save_event([event]).await?; @@ -1656,6 +1700,7 @@ mod private { #[instrument(skip_all)] async fn maybe_apply_new_redaction( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, event: &Event, ) -> Result<(), EventCacheError> { let raw_event = event.raw(); @@ -1715,7 +1760,7 @@ mod private { // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case. target_event.replace_raw(redacted_event.cast_unchecked()); - self.replace_event_at(location, target_event).await?; + self.replace_event_at(locked_store, location, target_event).await?; } Ok(()) @@ -1759,6 +1804,7 @@ mod private { #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] pub async fn handle_sync( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, mut timeline: Timeline, #[cfg(feature = "experimental-search")] room: &Room, ) -> Result<(bool, Vec>), EventCacheError> { @@ -1824,7 +1870,7 @@ mod private { target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary); - self.replace_event_at(location, target_event).await?; + self.replace_event_at(locked_store.clone(), location, target_event).await?; } } } @@ -1847,13 +1893,18 @@ mod private { // // We don't have to worry the removals can change the position of the existing // events, because we are pushing all _new_ `events` at the back. - self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) - .await?; + self.remove_events( + locked_store.clone(), + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + ) + .await?; self.room_linked_chunk .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events); self.post_process_new_events( + locked_store.clone(), events, true, #[cfg(feature = "experimental-search")] @@ -1868,7 +1919,7 @@ mod private { // // We must do this *after* persisting these events to storage (in // `post_process_new_events`). - self.shrink_to_last_chunk().await?; + self.shrink_to_last_chunk(locked_store).await?; } let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs(); @@ -1886,6 +1937,7 @@ mod private { #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] pub async fn handle_backpagination( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, events: Vec, mut new_token: Option, prev_token: Option, @@ -1942,8 +1994,12 @@ mod private { if !all_duplicates { // Let's forget all the previous events. - self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) - .await?; + self.remove_events( + locked_store.clone(), + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + ) + .await?; } else { // All new events are duplicated, they can all be ignored. events.clear(); @@ -1965,6 +2021,7 @@ mod private { // Note: this flushes updates to the store. self.post_process_new_events( + locked_store, topo_ordered_events, false, #[cfg(feature = "experimental-search")] @@ -2347,11 +2404,17 @@ mod timed_tests { events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()], }; - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { timeline, ..Default::default() }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. assert_matches!( @@ -2426,11 +2489,17 @@ mod timed_tests { let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] }; - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { timeline, ..Default::default() }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. assert_matches!( @@ -2754,11 +2823,17 @@ mod timed_tests { // A new update with one of these events leads to deduplication. let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] }; - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { timeline, ..Default::default() }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. There is a duplicate event, so // no generic changes whatsoever! @@ -2861,18 +2936,24 @@ mod timed_tests { // Propagate an update including a limited timeline with one message and a // prev-batch token. - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { - timeline: Timeline { - limited: true, - prev_batch: Some("raclette".to_owned()), - events: vec![f.text_msg("hey yo").into_event()], - }, - ..Default::default() - }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { + timeline: Timeline { + limited: true, + prev_batch: Some("raclette".to_owned()), + events: vec![f.text_msg("hey yo").into_event()], + }, + ..Default::default() + }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. assert_matches!( @@ -2922,18 +3003,24 @@ mod timed_tests { // Now, propagate an update for another message, but the timeline isn't limited // this time. - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { - timeline: Timeline { - limited: false, - prev_batch: Some("fondue".to_owned()), - events: vec![f.text_msg("sup").into_event()], - }, - ..Default::default() - }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { + timeline: Timeline { + limited: false, + prev_batch: Some("fondue".to_owned()), + events: vec![f.text_msg("sup").into_event()], + }, + ..Default::default() + }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. assert_matches!( @@ -3180,18 +3267,24 @@ mod timed_tests { // last chunk, and that the linked chunk only contains the last two // events. let evid4 = event_id!("$4"); - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { - timeline: Timeline { - limited: true, - prev_batch: Some("fondue".to_owned()), - events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()], - }, - ..Default::default() - }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { + timeline: Timeline { + limited: true, + prev_batch: Some("fondue".to_owned()), + events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()], + }, + ..Default::default() + }, + ) + .await + .unwrap(); + } { let state = room_event_cache.inner.state.read().await; From 56c64fb16eaa6935de61ab087d9e20440582ece1 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 17:19:08 +0200 Subject: [PATCH 03/10] fixup! feat(event cache): use the event cache store lock as little as we must --- .../src/event_cache/deduplicator.rs | 20 +++++++++++------- crates/matrix-sdk/src/event_cache/room/mod.rs | 21 ++++++++++++------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/deduplicator.rs b/crates/matrix-sdk/src/event_cache/deduplicator.rs index 779cfb5468e..b3791714d1c 100644 --- a/crates/matrix-sdk/src/event_cache/deduplicator.rs +++ b/crates/matrix-sdk/src/event_cache/deduplicator.rs @@ -18,7 +18,7 @@ use std::collections::BTreeSet; use matrix_sdk_base::{ - event_cache::store::EventCacheStoreLock, + event_cache::store::OwnedEventCacheStoreLockGuard, linked_chunk::{LinkedChunkId, Position}, }; use ruma::OwnedEventId; @@ -32,7 +32,7 @@ use super::{ /// information about the duplicates found in the new events, including the /// events that are not loaded in memory. pub async fn filter_duplicate_events( - store: &EventCacheStoreLock, + store: OwnedEventCacheStoreLockGuard, linked_chunk_id: LinkedChunkId<'_>, linked_chunk: &EventLinkedChunk, mut new_events: Vec, @@ -50,10 +50,9 @@ pub async fn filter_duplicate_events( }); } - let store = store.lock().await?; - // Let the store do its magic ✨ let duplicated_event_ids = store + .store .filter_duplicated_events( linked_chunk_id, new_events.iter().filter_map(|event| event.event_id()).collect(), @@ -148,7 +147,10 @@ pub(super) struct DeduplicationOutcome { mod tests { use std::ops::Not as _; - use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier}; + use matrix_sdk_base::{ + deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock, + linked_chunk::ChunkIdentifier, + }; use matrix_sdk_test::{async_test, event_factory::EventFactory}; use ruma::{owned_event_id, serde::Raw, user_id, EventId}; @@ -222,6 +224,7 @@ mod tests { .unwrap(); let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned()); + let locked_store = event_cache_store.lock_owned().await.unwrap(); { // When presenting with only duplicate events, some of them in the in-memory @@ -232,7 +235,7 @@ mod tests { linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]); let outcome = filter_duplicate_events( - &event_cache_store, + locked_store.clone(), LinkedChunkId::Room(room_id), &linked_chunk, vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()], @@ -247,7 +250,7 @@ mod tests { linked_chunk.push_events([event_2.clone(), event_3.clone()]); let outcome = filter_duplicate_events( - &event_cache_store, + locked_store, LinkedChunkId::Room(room_id), &linked_chunk, vec![event_0, event_1, event_2, event_3, event_4], @@ -351,6 +354,7 @@ mod tests { // Wrap the store into its lock. let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned()); + let locked_store = event_cache_store.lock_owned().await.unwrap(); let linked_chunk = EventLinkedChunk::new(); @@ -360,7 +364,7 @@ mod tests { in_store_duplicated_event_ids, non_empty_all_duplicates, } = filter_duplicate_events( - &event_cache_store, + locked_store, LinkedChunkId::Room(room_id), &linked_chunk, vec![ev1, ev2, ev3, ev4], diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 35b1c6ffeb9..12fd8cc565e 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -402,7 +402,13 @@ impl RoomEventCache { /// Save some events in the event cache, for further retrieval with /// [`Self::event`]. pub(crate) async fn save_events(&self, events: impl IntoIterator) { - if let Err(err) = self.inner.state.write().await.save_event(events).await { + let Ok(locked_store) = self.inner.store.lock_owned().await.inspect_err(|err| { + warn!("couldn't lock the event cache store: {err}"); + }) else { + return; + }; + + if let Err(err) = self.inner.state.write().await.save_event(locked_store, events).await { warn!("couldn't save event in the event cache: {err}"); } } @@ -1572,7 +1578,7 @@ mod private { // Save a bundled thread event, if there was one. if let Some(bundled_thread) = event.bundled_latest_thread_event { - self.save_event([*bundled_thread]).await?; + self.save_event(locked_store.clone(), [*bundled_thread]).await?; } } @@ -1687,7 +1693,7 @@ mod private { self.propagate_changes(locked_store).await?; } EventLocation::Store => { - self.save_event([event]).await?; + self.save_event(locked_store, [event]).await?; } } @@ -1774,17 +1780,16 @@ mod private { /// the event. Instead, an update to the linked chunk must be used. pub async fn save_event( &self, + locked_store: OwnedEventCacheStoreLockGuard, events: impl IntoIterator, ) -> Result<(), EventCacheError> { - let store = self.store.clone(); let room_id = self.room.clone(); let events = events.into_iter().collect::>(); // Spawn a task so the save is uninterrupted by task cancellation. spawn(async move { - let store = store.lock().await?; for event in events { - store.save_event(&room_id, event).await?; + locked_store.store.save_event(&room_id, event).await?; } super::Result::Ok(()) }) @@ -1816,7 +1821,7 @@ mod private { in_store_duplicated_event_ids, non_empty_all_duplicates: all_duplicates, } = filter_duplicate_events( - &self.store, + locked_store.clone(), LinkedChunkId::Room(self.room.as_ref()), &self.room_linked_chunk, timeline.events, @@ -1972,7 +1977,7 @@ mod private { in_store_duplicated_event_ids, non_empty_all_duplicates: all_duplicates, } = filter_duplicate_events( - &self.store, + locked_store.clone(), LinkedChunkId::Room(self.room.as_ref()), &self.room_linked_chunk, events, From 9c605e6551d8089847d938f28d3379578924a52c Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 17:22:19 +0200 Subject: [PATCH 04/10] fixup! feat(event cache): use the event cache store lock as little as we must --- crates/matrix-sdk/src/event_cache/mod.rs | 2 +- crates/matrix-sdk/src/event_cache/room/mod.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 185a9535157..5ecb0992af8 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -369,7 +369,7 @@ impl EventCache { trace!("waiting for state lock…"); let mut state = room.inner.state.write().await; - match state.auto_shrink_if_no_subscribers().await { + match state.auto_shrink_if_no_subscribers(&room.inner.store).await { Ok(diffs) => { if let Some(diffs) = diffs { // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 12fd8cc565e..7ca188870a3 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1076,12 +1076,10 @@ mod private { &mut self, locked_store: OwnedEventCacheStoreLockGuard, ) -> Result<(), EventCacheError> { - let store_lock = self.store.lock().await?; - // Attempt to load the last chunk. let linked_chunk_id = LinkedChunkId::Room(&self.room); let (last_chunk, chunk_identifier_generator) = - match store_lock.load_last_chunk(linked_chunk_id).await { + match locked_store.store.load_last_chunk(linked_chunk_id).await { Ok(pair) => pair, Err(err) => { @@ -1089,7 +1087,8 @@ mod private { error!("error when reloading a linked chunk from memory: {err}"); // Clear storage for this room. - store_lock + locked_store + .store .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) .await?; @@ -1126,6 +1125,7 @@ mod private { #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] pub(crate) async fn auto_shrink_if_no_subscribers( &mut self, + store_lock: &EventCacheStoreLock, ) -> Result>>, EventCacheError> { let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst); @@ -1134,7 +1134,7 @@ mod private { if subscriber_count == 0 { // If we are the last strong reference to the auto-shrinker, we can shrink the // events data structure to its last chunk. - let locked_store = self.store.lock_owned().await?; + let locked_store = store_lock.lock_owned().await?; self.shrink_to_last_chunk(locked_store).await?; Ok(Some(self.room_linked_chunk.updates_as_vector_diffs())) } else { @@ -1641,8 +1641,8 @@ mod private { // worry about filtering out aggregation events (like // reactions/edits/etc.). Pretty neat, huh? let num_replies = { - let store_guard = &*self.store.lock().await?; - let related_thread_events = store_guard + let related_thread_events = locked_store + .store .find_event_relations( &self.room, &thread_root, From 5fe1c3e65d11eb36234db11f2c8a7947f033b9ed Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 17:24:48 +0200 Subject: [PATCH 05/10] fixup! feat(event cache): use the event cache store lock as little as we must --- crates/matrix-sdk/src/event_cache/pagination.rs | 2 +- crates/matrix-sdk/src/event_cache/room/mod.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index f1ef1421045..e928f4b6585 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -180,7 +180,7 @@ impl RoomPagination { loop { let mut state_guard = self.inner.state.write().await; - match state_guard.load_more_events_backwards().await? { + match state_guard.load_more_events_backwards(&self.inner.store).await? { LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => { const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3); diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 7ca188870a3..02cf0277e6e 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -976,6 +976,7 @@ mod private { /// Load more events backwards if the last chunk is **not** a gap. pub(in super::super) async fn load_more_events_backwards( &mut self, + store_lock: &EventCacheStoreLock, ) -> Result { // If any in-memory chunk is a gap, don't load more events, and let the caller // resolve the gap. @@ -992,7 +993,7 @@ mod private { .expect("a linked chunk is never empty") .identifier(); - let store = self.store.lock().await?; + let store = store_lock.lock().await?; // The first chunk is not a gap, we can load its previous chunk. let linked_chunk_id = LinkedChunkId::Room(&self.room); @@ -2988,7 +2989,7 @@ mod timed_tests { // But if I manually reload more of the chunk, the gap will be present. assert_matches!( - state.load_more_events_backwards().await.unwrap(), + state.load_more_events_backwards(&room_event_cache.inner.store).await.unwrap(), LoadMoreEventsBackwardsOutcome::Gap { .. } ); From c9d76c2eb76653a1b9e820de40e74baee0e5a20c Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 17:30:16 +0200 Subject: [PATCH 06/10] fixup! feat(event cache): use the event cache store lock as little as we must --- crates/matrix-sdk/src/event_cache/room/mod.rs | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 02cf0277e6e..6d2abd2632a 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -336,11 +336,13 @@ impl RoomEventCache { /// It starts by looking into loaded events before looking inside the /// storage. pub async fn find_event(&self, event_id: &EventId) -> Option { + let locked_store = self.inner.store.lock_owned().await.ok()?; + self.inner .state .read() .await - .find_event(event_id) + .find_event(&locked_store, event_id) .await .ok() .flatten() @@ -368,7 +370,7 @@ impl RoomEventCache { .state .read() .await - .find_event_with_relations(event_id, filter.clone()) + .find_event_with_relations(&self.inner.store, event_id, filter.clone()) .await .ok() .flatten() @@ -1146,8 +1148,9 @@ mod private { #[cfg(test)] pub(crate) async fn force_shrink_to_last_chunk( &mut self, + store_lock: &EventCacheStoreLock, ) -> Result>, EventCacheError> { - let locked_store = self.store.lock_owned().await?; + let locked_store = store_lock.lock_owned().await?; self.shrink_to_last_chunk(locked_store).await?; Ok(self.room_linked_chunk.updates_as_vector_diffs()) } @@ -1398,6 +1401,7 @@ mod private { /// looking inside the storage. pub async fn find_event( &self, + locked_store: &OwnedEventCacheStoreLockGuard, event_id: &EventId, ) -> Result, EventCacheError> { // There are supposedly fewer events loaded in memory than in the store. Let's @@ -1408,9 +1412,8 @@ mod private { } } - let store = self.store.lock().await?; - - Ok(store + Ok(locked_store + .store .find_event(&self.room, event_id) .await? .map(|event| (EventLocation::Store, event))) @@ -1431,10 +1434,11 @@ mod private { /// chunk. pub async fn find_event_with_relations( &self, + store_lock: &EventCacheStoreLock, event_id: &EventId, filters: Option>, ) -> Result)>, EventCacheError> { - let store = self.store.lock().await?; + let store = store_lock.lock().await?; // First, hit storage to get the target event and its related events. let found = store.find_event(&self.room, event_id).await?; @@ -1623,7 +1627,8 @@ mod private { let last_event_id = thread_cache.latest_event_id(); - let Some((location, mut target_event)) = self.find_event(&thread_root).await? + let Some((location, mut target_event)) = + self.find_event(&locked_store, &thread_root).await? else { trace!(%thread_root, "thread root event is missing from the linked chunk"); continue; @@ -1735,7 +1740,9 @@ mod private { }; // Replace the redacted event by a redacted form, if we knew about it. - let Some((location, mut target_event)) = self.find_event(event_id).await? else { + let Some((location, mut target_event)) = + self.find_event(&locked_store, event_id).await? + else { trace!("redacted event is missing from the linked chunk"); return Ok(()); }; @@ -1865,7 +1872,8 @@ mod private { // thread event is. The thread count can remain as is, as it might still be // valid, and there's no good value to reset it to, anyways. for thread_root in summaries_to_update { - let Some((location, mut target_event)) = self.find_event(&thread_root).await? + let Some((location, mut target_event)) = + self.find_event(&locked_store, &thread_root).await? else { trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync"); continue; @@ -3148,7 +3156,7 @@ mod timed_tests { .state .write() .await - .force_shrink_to_last_chunk() + .force_shrink_to_last_chunk(&room_event_cache.inner.store) .await .expect("shrinking should succeed"); From 5bf1cbd12dd4cd92e75c706d9b01dea099ce2fb7 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 17:31:36 +0200 Subject: [PATCH 07/10] refactor(event cache): get rid of `RoomEventCacheState::store` \o/ --- crates/matrix-sdk/src/event_cache/room/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 6d2abd2632a..302307ef8db 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -708,9 +708,6 @@ mod private { /// The rules for the version of this room. room_version_rules: RoomVersionRules, - /// Reference to the underlying backing store. - store: EventCacheStoreLock, - /// The loaded events for the current room, that is, the in-memory /// linked chunk for this room. room_linked_chunk: EventLinkedChunk, @@ -817,7 +814,6 @@ mod private { Ok(Self { room: room_id, room_version_rules, - store, room_linked_chunk, threads, waited_for_initial_prev_token: false, From 5eee20f34d4714ef5a184306a26208170cb4a77a Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 17:37:38 +0200 Subject: [PATCH 08/10] refactor(event cache): pass the full store object even less --- crates/matrix-sdk/src/event_cache/mod.rs | 36 +++++++++++++------ crates/matrix-sdk/src/event_cache/room/mod.rs | 14 +++----- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 5ecb0992af8..d1621cb8212 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -39,7 +39,7 @@ use futures_util::future::{join_all, try_join_all}; use matrix_sdk_base::{ deserialized_responses::{AmbiguityChange, TimelineEvent}, event_cache::{ - store::{EventCacheStoreError, EventCacheStoreLock}, + store::{DynEventCacheStore, EventCacheStoreError, EventCacheStoreLock}, Gap, }, executor::AbortOnDrop, @@ -358,11 +358,22 @@ impl EventCache { while let Some(room_id) = rx.recv().await { trace!(for_room = %room_id, "received notification to shrink"); - let room = match inner.for_room(&room_id).await { - Ok(room) => room, - Err(err) => { - warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}"); + let room = { + let Ok(store) = inner + .store + .lock() + .await + .inspect_err(|err| warn!("Failed to lock the store: {err}")) + else { continue; + }; + + match inner.for_room(&*store, &room_id).await { + Ok(room) => room, + Err(err) => { + warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}"); + continue; + } } }; @@ -408,7 +419,8 @@ impl EventCache { return Err(EventCacheError::NotSubscribedYet); }; - let room = self.inner.for_room(room_id).await?; + let store = self.inner.store.lock().await?; + let room = self.inner.for_room(&*store, room_id).await?; Ok((room, drop_handles)) } @@ -854,7 +866,7 @@ impl EventCacheInner { // Left rooms. for (room_id, left_room_update) in updates.left { - let room = self.for_room(&room_id).await?; + let room = self.for_room(&*locked_store.store, &room_id).await?; if let Err(err) = room.inner.handle_left_room_update(locked_store.clone(), left_room_update).await @@ -868,7 +880,7 @@ impl EventCacheInner { for (room_id, joined_room_update) in updates.joined { trace!(?room_id, "Handling a `JoinedRoomUpdate`"); - let room = self.for_room(&room_id).await?; + let room = self.for_room(&*locked_store.store, &room_id).await?; if let Err(err) = room.inner.handle_joined_room_update(locked_store.clone(), joined_room_update).await @@ -885,7 +897,11 @@ impl EventCacheInner { } /// Return a room-specific view over the [`EventCache`]. - async fn for_room(&self, room_id: &RoomId) -> Result { + async fn for_room( + &self, + store: &DynEventCacheStore, + room_id: &RoomId, + ) -> Result { // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a // write lock. let by_room_guard = self.by_room.read().await; @@ -920,7 +936,7 @@ impl EventCacheInner { room_id.to_owned(), room_version_rules, self.linked_chunk_update_sender.clone(), - self.store.clone(), + store, pagination_status.clone(), ) .await?; diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 302307ef8db..c4f6e92dfab 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -748,11 +748,9 @@ mod private { room_id: OwnedRoomId, room_version_rules: RoomVersionRules, linked_chunk_update_sender: Sender, - store: EventCacheStoreLock, + store: &DynEventCacheStore, pagination_status: SharedObservable, ) -> Result { - let store_lock = store.lock().await?; - let linked_chunk_id = LinkedChunkId::Room(&room_id); // Load the full linked chunk's metadata, so as to feed the order tracker. @@ -760,7 +758,7 @@ mod private { // If loading the full linked chunk failed, we'll clear the event cache, as it // indicates that at some point, there's some malformed data. let full_linked_chunk_metadata = - match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await { + match Self::load_linked_chunk_metadata(store, linked_chunk_id).await { Ok(metas) => metas, Err(err) => { error!( @@ -768,7 +766,7 @@ mod private { ); // Try to clear storage for this room. - store_lock + store .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) .await?; @@ -777,7 +775,7 @@ mod private { } }; - let linked_chunk = match store_lock + let linked_chunk = match store .load_last_chunk(linked_chunk_id) .await .map_err(EventCacheError::from) @@ -792,9 +790,7 @@ mod private { ); // Try to clear storage for this room. - store_lock - .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) - .await?; + store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?; None } From 0a50883f911b03bb72ac5b554b90304ed9326684 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 21 Jul 2025 12:52:26 +0200 Subject: [PATCH 09/10] feat(event cache): process room updates concurrently This should improve latency of event cache updates in general, especially for massive initial sync responses (which may include many many rooms). --- crates/matrix-sdk/src/event_cache/mod.rs | 41 ++++++++++++++++-------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index d1621cb8212..72426dcf2d1 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -58,7 +58,7 @@ use ruma::{ RoomId, }; use tokio::{ - select, + join, select, sync::{ broadcast::{channel, error::RecvError, Receiver, Sender}, mpsc, Mutex, RwLock, @@ -859,6 +859,9 @@ impl EventCacheInner { let locked_store = self.store.lock_owned().await?; + let mut left_futures = Vec::with_capacity(updates.left.len()); + let mut joined_futures = Vec::with_capacity(updates.joined.len()); + // Note: bnjbvr tried to make this concurrent at some point, but it turned out // to be a performance regression, even for large sync updates. Lacking // time to investigate, this code remains sequential for now. See also @@ -868,12 +871,17 @@ impl EventCacheInner { for (room_id, left_room_update) in updates.left { let room = self.for_room(&*locked_store.store, &room_id).await?; - if let Err(err) = - room.inner.handle_left_room_update(locked_store.clone(), left_room_update).await - { - // Non-fatal error, try to continue to the next room. - error!("handling left room update: {err}"); - } + let locked_store = locked_store.clone(); + left_futures.push(async move { + trace!(?room_id, "Handling a `LeftRoomUpdate`"); + + if let Err(err) = + room.inner.handle_left_room_update(locked_store, left_room_update).await + { + // Non-fatal error, try to continue to the next room. + error!("handling left room update: {err}"); + } + }); } // Joined rooms. @@ -882,14 +890,21 @@ impl EventCacheInner { let room = self.for_room(&*locked_store.store, &room_id).await?; - if let Err(err) = - room.inner.handle_joined_room_update(locked_store.clone(), joined_room_update).await - { - // Non-fatal error, try to continue to the next room. - error!(%room_id, "handling joined room update: {err}"); - } + let locked_store = locked_store.clone(); + joined_futures.push(async move { + trace!(?room_id, "Handling a `JoinedRoomUpdate`"); + + if let Err(err) = + room.inner.handle_joined_room_update(locked_store, joined_room_update).await + { + // Non-fatal error, try to continue to the next room. + error!(%room_id, "handling joined room update: {err}"); + } + }); } + join!(join_all(left_futures), join_all(joined_futures)); + // Invited rooms. // TODO: we don't anything with `updates.invite` at this point. From 7113973ef528dfd8ed3a9647ee159bf26a76d02d Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 19 Aug 2025 17:53:56 +0200 Subject: [PATCH 10/10] Revert "feat(event cache): process room updates concurrently" This reverts commit 0a50883f911b03bb72ac5b554b90304ed9326684. --- crates/matrix-sdk/src/event_cache/mod.rs | 41 ++++++++---------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 72426dcf2d1..d1621cb8212 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -58,7 +58,7 @@ use ruma::{ RoomId, }; use tokio::{ - join, select, + select, sync::{ broadcast::{channel, error::RecvError, Receiver, Sender}, mpsc, Mutex, RwLock, @@ -859,9 +859,6 @@ impl EventCacheInner { let locked_store = self.store.lock_owned().await?; - let mut left_futures = Vec::with_capacity(updates.left.len()); - let mut joined_futures = Vec::with_capacity(updates.joined.len()); - // Note: bnjbvr tried to make this concurrent at some point, but it turned out // to be a performance regression, even for large sync updates. Lacking // time to investigate, this code remains sequential for now. See also @@ -871,17 +868,12 @@ impl EventCacheInner { for (room_id, left_room_update) in updates.left { let room = self.for_room(&*locked_store.store, &room_id).await?; - let locked_store = locked_store.clone(); - left_futures.push(async move { - trace!(?room_id, "Handling a `LeftRoomUpdate`"); - - if let Err(err) = - room.inner.handle_left_room_update(locked_store, left_room_update).await - { - // Non-fatal error, try to continue to the next room. - error!("handling left room update: {err}"); - } - }); + if let Err(err) = + room.inner.handle_left_room_update(locked_store.clone(), left_room_update).await + { + // Non-fatal error, try to continue to the next room. + error!("handling left room update: {err}"); + } } // Joined rooms. @@ -890,21 +882,14 @@ impl EventCacheInner { let room = self.for_room(&*locked_store.store, &room_id).await?; - let locked_store = locked_store.clone(); - joined_futures.push(async move { - trace!(?room_id, "Handling a `JoinedRoomUpdate`"); - - if let Err(err) = - room.inner.handle_joined_room_update(locked_store, joined_room_update).await - { - // Non-fatal error, try to continue to the next room. - error!(%room_id, "handling joined room update: {err}"); - } - }); + if let Err(err) = + room.inner.handle_joined_room_update(locked_store.clone(), joined_room_update).await + { + // Non-fatal error, try to continue to the next room. + error!(%room_id, "handling joined room update: {err}"); + } } - join!(join_all(left_futures), join_all(joined_futures)); - // Invited rooms. // TODO: we don't anything with `updates.invite` at this point.