From 4facaa44405bea053545ba814e8e2ff25c5ed5fa Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 21 May 2026 10:53:15 -0400 Subject: [PATCH] fix: Calculate state at end of last sync correctly --- src/api/client/sync/v3/joined.rs | 74 +++++++++++++++----------------- src/api/client/sync/v3/state.rs | 11 +++-- 2 files changed, 41 insertions(+), 44 deletions(-) diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index d5464960a..b5f6ba21c 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -362,41 +362,41 @@ async fn fetch_shortstatehashes( .rooms .state .get_room_shortstatehash(room_id) - .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); + .map_err(|_| err!(Database(error!("Room {room_id} has no state")))) + .await?; - // the room state as of the end of the last sync. - // this will be None if we are doing an initial sync or if we just joined this + // The room state as of the end of the last sync. + // This will be None if we are doing an initial sync or if we just joined this // room. let last_sync_end_shortstatehash = OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| { pin! { - let pdus_rev = services + let pdus = services .rooms .timeline - // We add 2 to the count here because `pdu_shortstatehash` returns - // state _before_ the event ID (i.e. not including the event itself if it's a state event) - // and `pdus_rev` does exclusive iteration (i.e. not including an event that has _exactly_ - // the provided count). - .pdus_rev(room_id, Some(PduCount::Normal(last_sync_end_count).saturating_add(2))) + .pdus(room_id, Some(PduCount::Normal(last_sync_end_count))) .ignore_err(); } - let (_, pdu_at_last_sync_end) = pdus_rev.next().await?; + match pdus.next().await { + | Some((_, pdu_after_last_sync_end)) => { + trace!(?pdu_after_last_sync_end.event_id, "pdu at last sync end"); - Some( - services - .rooms - .state_accessor - .pdu_shortstatehash(&pdu_at_last_sync_end.event_id) - .await - .expect("pdu should have a shortstatehash"), - ) + services + .rooms + .state_accessor + .pdu_shortstatehash(&pdu_after_last_sync_end.event_id) + .await + .expect("pdu should have a shortstatehash") + }, + | None => { + // No events have been sent since the last sync, + // so the state then is the same as the state now + current_shortstatehash + }, + } })) - .map(Option::flatten) - .map(Ok); - - let (current_shortstatehash, last_sync_end_shortstatehash) = - try_join(current_shortstatehash, last_sync_end_shortstatehash).await?; + .await; Ok(ShortStateHashes { current_shortstatehash, @@ -464,28 +464,22 @@ async fn build_state_events( last_sync_end_shortstatehash, } = shortstatehashes; - let timeline_start_shortstatehash = async { - if let Some((_, pdu)) = timeline.pdus.front() { - if let Ok(shortstatehash) = services - .rooms - .state_accessor - .pdu_shortstatehash(&pdu.event_id) - .await - { - return shortstatehash; - } - } - - current_shortstatehash + let timeline_start_shortstatehash = if let Some((_, pdu)) = timeline.pdus.front() { + services + .rooms + .state_accessor + .pdu_shortstatehash(&pdu.event_id) + .await + .expect("event should have shortstatehash") + } else { + // if the timeline is empty there can't possibly be any changes to the state + return Ok(vec![]); }; // the user IDs of members whose membership needs to be sent to the client, if // lazy-loading is enabled. let lazily_loaded_members = - prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()); - - let (timeline_start_shortstatehash, lazily_loaded_members) = - join(timeline_start_shortstatehash, lazily_loaded_members).await; + prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await; // compute the state delta between the previous sync and this sync. match (last_sync_end_count, last_sync_end_shortstatehash) { diff --git a/src/api/client/sync/v3/state.rs b/src/api/client/sync/v3/state.rs index cedc6ca73..0da053d65 100644 --- a/src/api/client/sync/v3/state.rs +++ b/src/api/client/sync/v3/state.rs @@ -100,12 +100,15 @@ pub(super) async fn build_state_incremental<'a>( use_state_after: bool, lazily_loaded_members: Option<&'a MemberSet>, ) -> Result> { - // NB: a limited sync is one where `timeline.limited == true`. Synapse calls - // this a "gappy" sync internally. - let mut state_event_ids: HashSet = HashSet::new(); - trace!(%timeline.limited, "computing state for incremental sync"); + trace!( + %use_state_after, + %last_sync_end_shortstatehash, + %timeline_start_shortstatehash, + %timeline_end_shortstatehash, + "computing state for incremental sync" + ); // Fetch lazy-loaded membership events if lazy-loading is enabled if let Some(lazily_loaded_members) = lazily_loaded_members