From af8e28559e7fc56e4027100521436eec8a4ac5a9 Mon Sep 17 00:00:00 2001 From: Ginger Date: Wed, 20 May 2026 14:55:47 -0400 Subject: [PATCH] feat: Add support for `state_after` --- src/api/client/sync/v3/joined.rs | 27 +++-- src/api/client/sync/v3/left.rs | 11 +- src/api/client/sync/v3/mod.rs | 4 + src/api/client/sync/v3/state.rs | 198 +++++++++---------------------- src/core/matrix/versions.rs | 1 + 5 files changed, 89 insertions(+), 152 deletions(-) diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 4de0ce3e3..d5464960a 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -97,12 +97,19 @@ pub(super) async fn load_joined_room( ); } + let state_events = + StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect()); + let joined_room = assign!(JoinedRoom::new(), { account_data, summary: summary.unwrap_or_default(), unread_notifications: notification_counts.unwrap_or_default(), timeline, - state: RoomState::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())), + state: if sync_context.use_state_after { + RoomState::After(state_events) + } else { + RoomState::Before(state_events) + }, ephemeral, unread_thread_notifications: BTreeMap::new(), }); @@ -366,7 +373,11 @@ async fn fetch_shortstatehashes( let pdus_rev = services .rooms .timeline - .pdus_rev(room_id, Some(PduCount::Normal(last_sync_end_count.saturating_sub(1)))) + // We add 2 to the count here because `pdu_shortstatehash` returns + // state _before_ the event ID (i.e. not including the event itself if it's a state event) + // and `pdus_rev` does exclusive iteration (i.e. not including an event that has _exactly_ + // the provided count). + .pdus_rev(room_id, Some(PduCount::Normal(last_sync_end_count).saturating_add(2))) .ignore_err(); } @@ -444,6 +455,7 @@ async fn build_state_events( syncing_user, last_sync_end_count, full_state, + use_state_after, .. } = sync_context; @@ -452,10 +464,6 @@ async fn build_state_events( last_sync_end_shortstatehash, } = shortstatehashes; - // the spec states that the `state` property only includes state events up to - // the beginning of the timeline, so we determine the state of the syncing room - // as of the first timeline event. NOTE: this explanation is not entirely - // accurate; see the implementation of `build_state_incremental`. let timeline_start_shortstatehash = async { if let Some((_, pdu)) = timeline.pdus.front() { if let Ok(shortstatehash) = services @@ -486,16 +494,15 @@ async fn build_state_events( is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false, then use `build_state_incremental`. */ - | (Some(last_sync_end_count), Some(last_sync_end_shortstatehash)) if !full_state => + | (Some(_), Some(last_sync_end_shortstatehash)) if !full_state => build_state_incremental( services, syncing_user, - room_id, - PduCount::Normal(last_sync_end_count), last_sync_end_shortstatehash, timeline_start_shortstatehash, current_shortstatehash, timeline, + use_state_after, lazily_loaded_members.as_ref(), ) .boxed() @@ -510,6 +517,8 @@ async fn build_state_events( services, syncing_user, timeline_start_shortstatehash, + current_shortstatehash, + use_state_after, lazily_loaded_members.as_ref(), ) .boxed() diff --git a/src/api/client/sync/v3/left.rs b/src/api/client/sync/v3/left.rs index d833bfdf9..63d14b9ce 100644 --- a/src/api/client/sync/v3/left.rs +++ b/src/api/client/sync/v3/left.rs @@ -181,6 +181,9 @@ pub(super) async fn load_left_room( .collect::>() .await; + let state_events = + StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect()); + Ok(Some(assign!(LeftRoom::new(), { account_data: RoomAccountData::new(), timeline: assign!(Timeline::new(), { @@ -188,7 +191,11 @@ pub(super) async fn load_left_room( prev_batch: Some(current_count.to_string()), events: raw_timeline_pdus, }), - state: State::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())), + state: if sync_context.use_state_after { + State::After(state_events) + } else { + State::Before(state_events) + }, }))) } @@ -264,6 +271,8 @@ async fn build_left_state_and_timeline( services, syncing_user, timeline_start_shortstatehash, + leave_shortstatehash, + sync_context.use_state_after, lazily_loaded_members.as_ref(), ) .await?; diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs index c32ed22cd..7f707f239 100644 --- a/src/api/client/sync/v3/mod.rs +++ b/src/api/client/sync/v3/mod.rs @@ -110,6 +110,9 @@ struct SyncContext<'a> { /// The sync filter, which the client uses to specify what data should be /// included in the sync response. filter: &'a FilterDefinition, + /// Whether the state at the end of the timeline should be used when + /// calculating state diffs for sync. + use_state_after: bool, } impl<'a> SyncContext<'a> { @@ -263,6 +266,7 @@ pub(crate) async fn build_sync_events( current_count, full_state, filter: &filter, + use_state_after: body.use_state_after, }; let joined_rooms = services diff --git a/src/api/client/sync/v3/state.rs b/src/api/client/sync/v3/state.rs index b22074d89..cedc6ca73 100644 --- a/src/api/client/sync/v3/state.rs +++ b/src/api/client/sync/v3/state.rs @@ -1,11 +1,8 @@ -use std::{collections::BTreeSet, ops::ControlFlow}; +use std::collections::HashSet; use conduwuit::{ - Result, at, is_equal_to, - matrix::{ - Event, - pdu::{PduCount, PduEvent}, - }, + Result, at, + matrix::{Event, pdu::PduEvent}, utils::{ BoolExt, IterStream, ReadyExt, TryFutureExtExt, stream::{BroadbandExt, TryIgnore}, @@ -16,9 +13,7 @@ use conduwuit_service::{ rooms::{lazy_loading::MemberSet, short::ShortStateHash}, }; use futures::{FutureExt, StreamExt}; -use itertools::Itertools; -use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType}; -use service::rooms::short::ShortEventId; +use ruma::{OwnedEventId, UserId, events::StateEventType}; use tracing::trace; use crate::client::TimelinePdus; @@ -39,13 +34,19 @@ pub(super) async fn build_state_initial( services: &Services, sender_user: &UserId, timeline_start_shortstatehash: ShortStateHash, + timeline_end_shortstatehash: ShortStateHash, + use_state_after: bool, lazily_loaded_members: Option<&MemberSet>, ) -> Result> { // load the keys and event IDs of the state events at the start of the timeline let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services .rooms .state_accessor - .state_full_ids(timeline_start_shortstatehash) + .state_full_ids(if use_state_after { + timeline_end_shortstatehash + } else { + timeline_start_shortstatehash + }) .unzip() .await; @@ -92,82 +93,31 @@ pub(super) async fn build_state_initial( pub(super) async fn build_state_incremental<'a>( services: &Services, sender_user: &'a UserId, - room_id: &RoomId, - last_sync_end_count: PduCount, last_sync_end_shortstatehash: ShortStateHash, timeline_start_shortstatehash: ShortStateHash, timeline_end_shortstatehash: ShortStateHash, timeline: &TimelinePdus, + use_state_after: bool, lazily_loaded_members: Option<&'a MemberSet>, ) -> Result> { - /* - NB: a limited sync is one where `timeline.limited == true`. Synapse calls this a "gappy" sync internally. + // NB: a limited sync is one where `timeline.limited == true`. Synapse calls + // this a "gappy" sync internally. - The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described - by the Matrix specification. This is because the specification's description of the `state` property does not accurately - reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include: - 1. We do not compute the delta using the naive approach of "every state event from the end of the last sync - up to the start of this sync's timeline". see below for details. - 2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined - elsewhere and supplied to this function in the `lazily_loaded_members` parameter. - */ + let mut state_event_ids: HashSet = HashSet::new(); - /* - the `state` property of an incremental sync which isn't limited are _usually_ empty. - (note: the specification says that the `state` property is _always_ empty for limited syncs, which is incorrect.) - however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`), - the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state - at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline - merged a split in the DAG. + trace!(%timeline.limited, "computing state for incremental sync"); - see: https://github.com/element-hq/synapse/issues/16941 - */ + // Fetch lazy-loaded membership events if lazy-loading is enabled + if let Some(lazily_loaded_members) = lazily_loaded_members + && !lazily_loaded_members.is_empty() + { + trace!("including lazy membership events for members: {:?}", lazily_loaded_members); - let timeline_is_linear = timeline.pdus.is_empty() || { - let last_pdu_of_last_sync = services + services .rooms - .timeline - .pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1))) - .boxed() - .next() - .await - .transpose() - .expect("last sync should have had some PDUs") - .map(at!(1)); - - // make sure the prev_events of each pdu in the timeline refer only to the - // previous pdu - timeline - .pdus - .iter() - .try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| { - if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() { - if prev_event_id - .as_ref() - .is_none_or(is_equal_to!(pdu_prev_event_id)) - { - return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned())); - } - } - - trace!( - "pdu {:?} has split prev_events (expected {:?}): {:?}", - pdu.event_id, prev_event_id, pdu.prev_events - ); - ControlFlow::Break(()) - }) - .is_continue() - }; - - if timeline_is_linear && !timeline.limited { - // if there are no splits in the DAG and the timeline isn't limited, then - // `state` will always be empty unless lazy loading is enabled. - - if let Some(lazily_loaded_members) = lazily_loaded_members { - if !timeline.pdus.is_empty() { - // lazy loading is enabled, so we return the membership events which were - // requested by the caller. - let lazy_membership_events: Vec<_> = lazily_loaded_members + .short + .multi_get_eventid_from_short::<'_, OwnedEventId, _>( + lazily_loaded_members .iter() .stream() .broad_filter_map(|user_id| async move { @@ -178,71 +128,24 @@ pub(super) async fn build_state_incremental<'a>( services .rooms .state_accessor - .state_get( + .state_get_shortid( timeline_start_shortstatehash, &StateEventType::RoomMember, user_id.as_str(), ) .ok() .await - }) - .collect() - .await; - - if !lazy_membership_events.is_empty() { - trace!( - "syncing lazy membership events for members: {:?}", - lazy_membership_events - .iter() - .map(|pdu| pdu.state_key().unwrap()) - .collect::>() - ); - } - return Ok(lazy_membership_events); - } - } - - // lazy loading is disabled, `state` is empty. - return Ok(vec![]); + }), + ) + .ignore_err() + .ready_for_each(|event_id| { + state_event_ids.insert(event_id); + }) + .await; } - /* - at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates - computing the incremental state (which may be empty). - - NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included - in the incremental state. therefore, the incremental state may include "redundant" membership events, - which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`, - and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that - the performance penalty is acceptable. - */ - - trace!(%timeline_is_linear, %timeline.limited, "computing state for incremental sync"); - - // fetch the shorteventids of state events in the timeline - let state_events_in_timeline: BTreeSet = services - .rooms - .short - .multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| { - if pdu.state_key().is_some() { - Some(pdu.event_id.as_ref()) - } else { - None - } - })) - .collect() - .await; - - trace!("{} state events in timeline", state_events_in_timeline.len()); - - /* - fetch the state events which were added since the last sync. - - specifically we fetch the difference between the state at the last sync and the state at the _end_ - of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched. - this is necessary to account for splits in the DAG, as explained above. - */ - let state_diff = services + // Fetch the state events added since the last sync. + services .rooms .short .multi_get_eventid_from_short::<'_, OwnedEventId, _>( @@ -252,18 +155,29 @@ pub(super) async fn build_state_incremental<'a>( .state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash)) .await? .stream() - .ready_filter_map(|(_, shorteventid)| { - if state_events_in_timeline.contains(&shorteventid) { - None - } else { - Some(shorteventid) - } - }), + .map(at!(1)), ) - .ignore_err(); + .ignore_err() + .ready_for_each(|event_id| { + state_event_ids.insert(event_id); + }) + .await; - // finally, fetch the PDU contents and collect them into a vec - let state_diff_pdus = state_diff + if !use_state_after { + // If state_after isn't enabled, filter out state events which also exist + // in the timeline. If splits exist in the DAG, this may not be exactly the same + // thing as the state diff ending at the start of the timeline, but Synapse + // also does this and it's technically more useful behavior anyway. + // See: https://github.com/element-hq/synapse/issues/16941 + + for (_, pdu) in &timeline.pdus { + state_event_ids.remove(pdu.event_id()); + } + } + + // Finally, fetch the PDU contents and collect them into a vec + let state_diff_pdus = state_event_ids + .stream() .broad_filter_map(|event_id| async move { services .rooms diff --git a/src/core/matrix/versions.rs b/src/core/matrix/versions.rs index fa2284e7d..ed4183685 100644 --- a/src/core/matrix/versions.rs +++ b/src/core/matrix/versions.rs @@ -21,6 +21,7 @@ pub fn versions() -> Vec { "v1.12".to_owned(), "v1.13".to_owned(), "v1.14".to_owned(), + "v1.16".to_owned(), ] }