fix(sync/v3): Further cleanup + improve incremental sync consistency

This commit is contained in:
Ginger
2025-10-24 14:57:31 -04:00
parent 72bf8e5927
commit 0eff173c0b
8 changed files with 455 additions and 276 deletions
+5 -5
View File
@@ -16,7 +16,7 @@ use conduwuit_service::{
Services, Services,
rooms::{ rooms::{
lazy_loading, lazy_loading,
lazy_loading::{Options, Witness}, lazy_loading::{MemberSet, Options},
timeline::PdusIterItem, timeline::PdusIterItem,
}, },
}; };
@@ -162,7 +162,7 @@ pub(crate) async fn get_message_events_route(
let state = witness let state = witness
.map(Option::into_iter) .map(Option::into_iter)
.map(|option| option.flat_map(Witness::into_iter)) .map(|option| option.flat_map(MemberSet::into_iter))
.map(IterStream::stream) .map(IterStream::stream)
.into_stream() .into_stream()
.flatten() .flatten()
@@ -192,7 +192,7 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
services: &Services, services: &Services,
lazy_loading_context: &lazy_loading::Context<'_>, lazy_loading_context: &lazy_loading::Context<'_>,
events: I, events: I,
) -> Witness ) -> MemberSet
where where
I: Iterator<Item = &'a PdusIterItem> + Clone + Send, I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
{ {
@@ -216,7 +216,7 @@ where
.readreceipts_since(lazy_loading_context.room_id, Some(oldest.into_unsigned())); .readreceipts_since(lazy_loading_context.room_id, Some(oldest.into_unsigned()));
pin_mut!(receipts); pin_mut!(receipts);
let witness: Witness = events let witness: MemberSet = events
.stream() .stream()
.map(ref_at!(1)) .map(ref_at!(1))
.map(Event::sender) .map(Event::sender)
@@ -232,7 +232,7 @@ where
services services
.rooms .rooms
.lazy_loading .lazy_loading
.witness_retain(witness, lazy_loading_context) .retain_lazy_members(witness, lazy_loading_context)
.await .await
} }
+1 -5
View File
@@ -49,7 +49,6 @@ async fn load_timeline(
// no messages have been sent in this room since `starting_count` // no messages have been sent in this room since `starting_count`
return Ok(TimelinePdus::default()); return Ok(TimelinePdus::default());
} }
trace!(?last_timeline_count, ?starting_count, ?ending_count);
// for incremental sync, stream from the DB all PDUs which were sent after // for incremental sync, stream from the DB all PDUs which were sent after
// `starting_count` but before `ending_count`, including `ending_count` but // `starting_count` but before `ending_count`, including `ending_count` but
@@ -64,10 +63,7 @@ async fn load_timeline(
ending_count.map(|count| count.saturating_add(1)), ending_count.map(|count| count.saturating_add(1)),
) )
.ignore_err() .ignore_err()
.ready_take_while(move |&(pducount, ref pdu)| { .ready_take_while(move |&(pducount, _)| pducount > starting_count)
trace!(?pducount, ?pdu, "glubbins");
pducount > starting_count
})
.boxed() .boxed()
}, },
| None => { | None => {
+359 -205
View File
@@ -1,4 +1,7 @@
use std::collections::{BTreeMap, HashMap}; use std::{
collections::{BTreeMap, BTreeSet, HashMap},
ops::ControlFlow,
};
use conduwuit::{ use conduwuit::{
Result, at, err, extract_variant, is_equal_to, Result, at, err, extract_variant, is_equal_to,
@@ -10,23 +13,23 @@ use conduwuit::{
result::FlatOk, result::FlatOk,
utils::{ utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt, BoolExt, IterStream, ReadyExt, TryFutureExtExt,
future::OptionStream,
math::ruma_from_u64, math::ruma_from_u64,
stream::{BroadbandExt, Tools, WidebandExt}, stream::{BroadbandExt, Tools, TryIgnore, WidebandExt},
}, },
}; };
use conduwuit_service::{ use conduwuit_service::{
Services, Services,
rooms::{ rooms::{
lazy_loading, lazy_loading,
lazy_loading::{Options, Witness}, lazy_loading::{MemberSet, Options},
short::ShortStateHash, short::ShortStateHash,
}, },
}; };
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join, join3, join4, try_join4}, future::{OptionFuture, join, join3, join4, try_join},
}; };
use itertools::Itertools;
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
api::client::sync::sync_events::{ api::client::sync::sync_events::{
@@ -41,7 +44,7 @@ use ruma::{
serde::Raw, serde::Raw,
uint, uint,
}; };
use service::rooms::short::{ShortEventId, ShortStateKey}; use service::rooms::short::ShortEventId;
use tracing::trace; use tracing::trace;
use super::{load_timeline, share_encrypted_room}; use super::{load_timeline, share_encrypted_room};
@@ -50,6 +53,7 @@ use crate::client::{
sync::v3::{DeviceListUpdates, SyncContext}, sync::v3::{DeviceListUpdates, SyncContext},
}; };
/// Generate the sync response for a room the user is joined to.
#[tracing::instrument( #[tracing::instrument(
name = "joined", name = "joined",
level = "debug", level = "debug",
@@ -61,43 +65,61 @@ use crate::client::{
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(super) async fn load_joined_room( pub(super) async fn load_joined_room(
services: &Services, services: &Services,
SyncContext { sync_context: SyncContext<'_>,
ref room_id: OwnedRoomId,
) -> Result<(JoinedRoom, DeviceListUpdates)> {
/*
this is a large function with a lot of logic. we try to parallelize as much as possible
by fetching data concurrently, so the code is roughly split into stages separated by calls to `join<n>`.
1. `current_shortstatehash` and `since_shortstatehash` are fetched from the DB. a shortstatehash is
a token which identifies the state of the room at a point in time.
2. `load_timeline` is called to fetch timeline events that happened since `since`.
3.
*/
let SyncContext {
sender_user, sender_user,
sender_device, sender_device,
since, since,
next_batch, next_batch,
full_state, full_state,
filter, filter,
}: SyncContext<'_>, } = sync_context;
ref room_id: OwnedRoomId,
) -> Result<(JoinedRoom, DeviceListUpdates)> {
let mut device_list_updates = DeviceListUpdates::new(); let mut device_list_updates = DeviceListUpdates::new();
let sincecount = since.map(PduCount::Normal);
let next_batchcount = PduCount::Normal(next_batch); let next_batchcount = PduCount::Normal(next_batch);
// the shortstatehash of the room's state right now // the room state right now
let current_shortstatehash = services let current_shortstatehash = services
.rooms .rooms
.state .state
.get_room_shortstatehash(room_id) .get_room_shortstatehash(room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state")))); .map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
// the shortstatehash of what the room's state was when the `since` token was // the global count and room state as of the end of the last sync.
// issued // this will be None if we are doing an initial sync.
let since_shortstatehash = OptionFuture::from(since.map(|since| { let previous_sync_end = OptionFuture::from(since.map(|since| async move {
services let previous_sync_end_count = PduCount::Normal(since);
let previous_sync_end_shortstatehash = services
.rooms .rooms
.user .user
.get_token_shortstatehash(room_id, since) .get_token_shortstatehash(room_id, since)
.ok() .await?;
Ok((previous_sync_end_count, previous_sync_end_shortstatehash))
})) }))
.map(|v| Ok(v.flatten())); .map(Option::transpose);
let (current_shortstatehash, previous_sync_end) =
try_join(current_shortstatehash, previous_sync_end).await?;
let timeline = load_timeline( let timeline = load_timeline(
services, services,
sender_user, sender_user,
room_id, room_id,
sincecount, previous_sync_end.map(at!(0)),
Some(next_batchcount), Some(next_batchcount),
10_usize, 10_usize,
); );
@@ -116,16 +138,11 @@ pub(super) async fn load_joined_room(
.collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>() .collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>()
.map(Ok); .map(Ok);
let (current_shortstatehash, since_shortstatehash, timeline, receipt_events) = let (timeline, receipt_events) = try_join(timeline, receipt_events).boxed().await?;
try_join4(current_shortstatehash, since_shortstatehash, timeline, receipt_events)
.boxed()
.await?;
let TimelinePdus { pdus: timeline_pdus, limited } = timeline;
let is_initial_sync = since_shortstatehash.is_none();
// the state at the beginning of the timeline
let timeline_start_shortstatehash = async { let timeline_start_shortstatehash = async {
if let Some((_, pdu)) = timeline_pdus.front() { if let Some((_, pdu)) = timeline.pdus.front() {
if let Ok(shortstatehash) = services if let Ok(shortstatehash) = services
.rooms .rooms
.state_accessor .state_accessor
@@ -139,7 +156,8 @@ pub(super) async fn load_joined_room(
current_shortstatehash current_shortstatehash
}; };
let last_notification_read: OptionFuture<_> = timeline_pdus let last_notification_read: OptionFuture<_> = timeline
.pdus
.is_empty() .is_empty()
.then(|| { .then(|| {
services services
@@ -149,12 +167,18 @@ pub(super) async fn load_joined_room(
}) })
.into(); .into();
let since_sender_member: OptionFuture<_> = since_shortstatehash // the syncing user's membership event during the last sync
.map(|short| { let membership_during_previous_sync: OptionFuture<_> = previous_sync_end
.map(at!(1))
.map(|shortstatehash| {
services services
.rooms .rooms
.state_accessor .state_accessor
.state_get_content(short, &StateEventType::RoomMember, sender_user.as_str()) .state_get_content(
shortstatehash,
&StateEventType::RoomMember,
sender_user.as_str(),
)
.ok() .ok()
}) })
.into(); .into();
@@ -167,24 +191,27 @@ pub(super) async fn load_joined_room(
let ( let (
last_notification_read, last_notification_read,
since_sender_member, membership_during_previous_sync,
timeline_start_shortstatehash, timeline_start_shortstatehash,
is_encrypted_room, is_encrypted_room,
) = join4( ) = join4(
last_notification_read, last_notification_read,
since_sender_member, membership_during_previous_sync,
timeline_start_shortstatehash, timeline_start_shortstatehash,
is_encrypted_room, is_encrypted_room,
) )
.await; .await;
let joined_since_last_sync = // TODO: If the requesting user got state-reset out of the room, this
since_sender_member // will be `true` when it shouldn't be. this function should never be called
.flatten() // in that situation, but it may be if the membership cache didn't get updated.
.is_none_or(|content: RoomMemberEventContent| { // the root cause of this needs to be addressed
content.membership != MembershipState::Join let joined_since_last_sync = membership_during_previous_sync.flatten().is_none_or(
}); |content: RoomMemberEventContent| content.membership != MembershipState::Join,
);
// lazy loading is only enabled if the filter allows for it and we aren't
// requesting the full state
let lazy_loading_enabled = (filter.room.state.lazy_load_options.is_enabled() let lazy_loading_enabled = (filter.room.state.lazy_load_options.is_enabled()
|| filter.room.timeline.lazy_load_options.is_enabled()) || filter.room.timeline.lazy_load_options.is_enabled())
&& !full_state; && !full_state;
@@ -197,8 +224,11 @@ pub(super) async fn load_joined_room(
options: Some(&filter.room.state.lazy_load_options), options: Some(&filter.room.state.lazy_load_options),
}; };
let lazy_loading_witness = OptionFuture::from(lazy_loading_enabled.then(|| { // the user IDs of members whose membership needs to be sent to the client, if
let witness: Witness = timeline_pdus // lazy-loading is enabled.
let lazily_loaded_members = OptionFuture::from(lazy_loading_enabled.then(|| {
let witness: MemberSet = timeline
.pdus
.iter() .iter()
.map(ref_at!(1)) .map(ref_at!(1))
.map(Event::sender) .map(Event::sender)
@@ -209,31 +239,12 @@ pub(super) async fn load_joined_room(
services services
.rooms .rooms
.lazy_loading .lazy_loading
.witness_retain(witness, lazy_loading_context) .retain_lazy_members(witness, lazy_loading_context)
})) }))
.await; .await;
/* replace with multiple steps
glossary for my own sanity:
- full state: every state event from the start of the room to the start of the timeline
- incremental state: state events from `since` to the start of the timeline
- state_events: the `state` key on the JSON object we return
if initial sync or full_state:
get full state
use full state as state_events
else if TL is limited:
get incremental state
use incremental state as state_events
if encryption is enabled:
use incremental state to extend device list
else:
state_events is empty
compute counts and heroes from state_events
*/
let mut state_events = if is_initial_sync || full_state {
// reset lazy loading state on initial sync // reset lazy loading state on initial sync
if is_initial_sync { if previous_sync_end.is_none() {
services services
.rooms .rooms
.lazy_loading .lazy_loading
@@ -241,85 +252,47 @@ pub(super) async fn load_joined_room(
.await; .await;
} }
calculate_state_initial( let mut state_events =
services, if let Some((previous_sync_end_count, previous_sync_end_shortstatehash)) =
sender_user, previous_sync_end
timeline_start_shortstatehash, && !full_state
lazy_loading_witness.as_ref(), {
)
.boxed()
.await?
} else if limited {
let state_incremental = calculate_state_incremental( let state_incremental = calculate_state_incremental(
services, services,
sender_user, sender_user,
since_shortstatehash, room_id,
previous_sync_end_count,
previous_sync_end_shortstatehash,
timeline_start_shortstatehash, timeline_start_shortstatehash,
lazy_loading_witness.as_ref(), current_shortstatehash,
&timeline,
lazily_loaded_members.as_ref(),
) )
.boxed() .boxed()
.await?; .await?;
// calculate device list updates for E2EE
if is_encrypted_room { if is_encrypted_room {
// add users with changed keys to the `changed` list calculate_device_list_updates(
services
.users
.room_keys_changed(room_id, since, Some(next_batch))
.map(at!(0))
.map(ToOwned::to_owned)
.ready_for_each(|user_id| {
device_list_updates.changed.insert(user_id);
})
.await;
// add users who now share encrypted rooms to `changed` and
// users who no longer share encrypted rooms to `left`
for state_event in &state_incremental {
if state_event.kind == RoomMember {
let Some(content): Option<RoomMemberEventContent> =
state_event.get_content().ok()
else {
continue;
};
let Some(user_id): Option<OwnedUserId> = state_event
.state_key
.as_ref()
.and_then(|key| key.parse().ok())
else {
continue;
};
{
use MembershipState::*;
if matches!(content.membership, Leave | Join) {
let shares_encrypted_room = share_encrypted_room(
services, services,
sender_user, sync_context,
&user_id, room_id,
Some(room_id), &mut device_list_updates,
&state_incremental,
joined_since_last_sync,
) )
.await; .await;
match content.membership {
| Leave if !shares_encrypted_room => {
device_list_updates.left.insert(user_id);
},
| Join if joined_since_last_sync || shares_encrypted_room => {
device_list_updates.changed.insert(user_id);
},
| _ => (),
}
}
}
}
}
} }
state_incremental state_incremental
} else { } else {
vec![] calculate_state_initial(
services,
sender_user,
timeline_start_shortstatehash,
lazily_loaded_members.as_ref(),
)
.boxed()
.await?
}; };
// only compute room counts and heroes (aka the summary) if the room's members // only compute room counts and heroes (aka the summary) if the room's members
@@ -339,7 +312,9 @@ pub(super) async fn load_joined_room(
.is_some_and(is_equal_to!(sender_user.as_str())) .is_some_and(is_equal_to!(sender_user.as_str()))
}; };
let joined_sender_member: Option<_> = (joined_since_last_sync && timeline_pdus.is_empty()) // the membership event of the syncing user, if they joined since the last sync
let sender_join_membership_event: Option<_> = (joined_since_last_sync
&& timeline.pdus.is_empty())
.then(|| { .then(|| {
state_events state_events
.iter() .iter()
@@ -348,17 +323,23 @@ pub(super) async fn load_joined_room(
}) })
.flatten(); .flatten();
let prev_batch = timeline_pdus // the prev_batch token for the response
.front() let prev_batch = timeline.pdus.front().map(at!(0)).or_else(|| {
.map(at!(0)) sender_join_membership_event
.or_else(|| joined_sender_member.is_some().and(since).map(Into::into)); .is_some()
.and(since)
.map(Into::into)
});
let timeline_pdus = timeline_pdus let timeline_pdus = timeline
.pdus
.into_iter() .into_iter()
.stream() .stream()
// filter out ignored events from the timeline
.wide_filter_map(|item| ignored_filter(services, item, sender_user)) .wide_filter_map(|item| ignored_filter(services, item, sender_user))
.map(at!(1)) .map(at!(1))
.chain(joined_sender_member.into_iter().stream()) // if the syncing user just joined, add their membership event to the timeline
.chain(sender_join_membership_event.into_iter().stream())
.map(Event::into_format) .map(Event::into_format)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@@ -368,8 +349,15 @@ pub(super) async fn load_joined_room(
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect(); .collect();
let send_notification_counts = /*
last_notification_read.is_none_or(|count| since.is_none_or(|since| count > since)); send notification counts if:
1. this is an initial sync
2. the user hasn't seen any notifications
3. the last notification the user saw has changed since the last sync
*/
let send_notification_counts = last_notification_read.is_none_or(|last_notification_read| {
since.is_none_or(|since| last_notification_read > since)
});
let notification_count: OptionFuture<_> = send_notification_counts let notification_count: OptionFuture<_> = send_notification_counts
.then(|| { .then(|| {
@@ -446,8 +434,7 @@ pub(super) async fn load_joined_room(
.chain(private_read_event.into_iter()) .chain(private_read_event.into_iter())
.collect(); .collect();
// Save the state after this sync so we can send the correct state diff next // save the room state at this sync to use during the next sync
// sync
services services
.rooms .rooms
.user .user
@@ -468,7 +455,7 @@ pub(super) async fn load_joined_room(
}, },
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
timeline: Timeline { timeline: Timeline {
limited, limited: timeline.limited,
prev_batch: prev_batch.as_ref().map(ToString::to_string), prev_batch: prev_batch.as_ref().map(ToString::to_string),
events: room_events, events: room_events,
}, },
@@ -482,9 +469,11 @@ pub(super) async fn load_joined_room(
Ok((joined_room, device_list_updates)) Ok((joined_room, device_list_updates))
} }
/// Calculate the "initial state", or all events from the start of the room up /// Calculate the state events to include in an initial sync response.
/// to (but not including) the `current_shortstatehash`. If ///
/// `lazy_loading_witness` is `None`, lazy loading will be disabled. /// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned
/// Vec will include the membership events of exclusively the members in
/// `lazily_loaded_members`.
#[tracing::instrument( #[tracing::instrument(
name = "initial", name = "initial",
level = "trace", level = "trace",
@@ -495,33 +484,41 @@ pub(super) async fn load_joined_room(
async fn calculate_state_initial( async fn calculate_state_initial(
services: &Services, services: &Services,
sender_user: &UserId, sender_user: &UserId,
current_shortstatehash: ShortStateHash, timeline_start_shortstatehash: ShortStateHash,
lazy_loading_witness: Option<&Witness>, lazily_loaded_members: Option<&MemberSet>,
) -> Result<Vec<PduEvent>> { ) -> Result<Vec<PduEvent>> {
// load the keys and event IDs of the state events at the start of the timeline
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
.rooms .rooms
.state_accessor .state_accessor
.state_full_ids(current_shortstatehash) .state_full_ids(timeline_start_shortstatehash)
.unzip() .unzip()
.await; .await;
trace!("event ids for initial sync @ {:?}: {:?}", current_shortstatehash, event_ids); trace!("performing initial sync of {} state events", event_ids.len());
services services
.rooms .rooms
.short .short
// look up the full state keys
.multi_get_statekey_from_short(shortstatekeys.into_iter().stream()) .multi_get_statekey_from_short(shortstatekeys.into_iter().stream())
.zip(event_ids.into_iter().stream()) .zip(event_ids.into_iter().stream())
.ready_filter_map(|item| Some((item.0.ok()?, item.1))) .ready_filter_map(|item| Some((item.0.ok()?, item.1)))
.ready_filter_map(|((event_type, state_key), event_id)| { .ready_filter_map(|((event_type, state_key), event_id)| {
let lazy = lazy_loading_witness.is_some_and(|witness| { if let Some(lazily_loaded_members) = lazily_loaded_members {
event_type == StateEventType::RoomMember /*
if lazy loading is enabled, filter out membership events which aren't for a user
included in `lazily_loaded_members` or for the user requesting the sync.
*/
let event_is_redundant = event_type == StateEventType::RoomMember
&& state_key.as_str().try_into().is_ok_and(|user_id: &UserId| { && state_key.as_str().try_into().is_ok_and(|user_id: &UserId| {
sender_user != user_id && !witness.contains(user_id) sender_user != user_id && !lazily_loaded_members.contains(user_id)
})
}); });
lazy.or_some(event_id) event_is_redundant.or_some(event_id)
} else {
Some(event_id)
}
}) })
.broad_filter_map(|event_id: OwnedEventId| async move { .broad_filter_map(|event_id: OwnedEventId| async move {
services.rooms.timeline.get_pdu(&event_id).await.ok() services.rooms.timeline.get_pdu(&event_id).await.ok()
@@ -531,89 +528,246 @@ async fn calculate_state_initial(
.await .await
} }
/// Calculate the "incremental state", or all events from the /// Calculate the state events to include in an incremental sync response.
/// `since_shortstatehash` up to (but not including) ///
/// the `current_shortstatehash`. If `lazy_loading_witness` is `None`, lazy /// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned
/// loading will be disabled. /// Vec will include the membership events of all the members in
/// `lazily_loaded_members`.
#[tracing::instrument(name = "incremental", level = "trace", skip_all)] #[tracing::instrument(name = "incremental", level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn calculate_state_incremental<'a>( async fn calculate_state_incremental<'a>(
services: &Services, services: &Services,
sender_user: &'a UserId, sender_user: &'a UserId,
since_shortstatehash: Option<ShortStateHash>, room_id: &RoomId,
current_shortstatehash: ShortStateHash, previous_sync_end_count: PduCount,
lazy_loading_witness: Option<&'a Witness>, previous_sync_end_shortstatehash: ShortStateHash,
timeline_start_shortstatehash: ShortStateHash,
timeline_end_shortstatehash: ShortStateHash,
timeline: &TimelinePdus,
lazily_loaded_members: Option<&'a MemberSet>,
) -> Result<Vec<PduEvent>> { ) -> Result<Vec<PduEvent>> {
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); // NB: a limited sync is one where `timeline.limited == true`. Synapse calls
// this a "gappy" sync internally.
/*
the state events returned from an incremental sync which isn't limited are usually empty.
however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`),
the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state
at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline
merged a split in the DAG.
see: https://github.com/element-hq/synapse/issues/16941
*/
let timeline_is_linear = timeline.pdus.is_empty() || {
let last_pdu_of_last_sync = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(previous_sync_end_count.saturating_add(1)))
.boxed()
.next()
.await
.transpose()
.expect("last sync should have had some PDUs")
.map(at!(1));
// make sure the prev_events of each pdu in the timeline refer only to the
// previous pdu
timeline
.pdus
.iter()
.try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| {
if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() {
if prev_event_id
.as_ref()
.is_none_or(is_equal_to!(pdu_prev_event_id))
{
return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned()));
}
}
trace!(
"pdu {:?} has split prev_events (expected {:?}): {:?}",
pdu.event_id, prev_event_id, pdu.prev_events
);
ControlFlow::Break(())
})
.is_continue()
};
if timeline_is_linear && !timeline.limited {
// if there are no splits in the DAG and the timeline isn't limited, then
// `state` will always be empty unless lazy loading is enabled.
if let Some(lazily_loaded_members) = lazily_loaded_members
&& !timeline.pdus.is_empty()
{
// lazy loading is enabled, so we return the membership events which were
// requested by the caller.
let lazy_membership_events: Vec<_> = lazily_loaded_members
.iter()
.stream()
.broad_filter_map(|user_id| async move {
if user_id == sender_user {
return None;
}
let state_get_shorteventid = |user_id: &'a UserId| {
services services
.rooms .rooms
.state_accessor .state_accessor
.state_get_shortid( .state_get(
current_shortstatehash, timeline_start_shortstatehash,
&StateEventType::RoomMember, &StateEventType::RoomMember,
user_id.as_str(), user_id.as_str(),
) )
.ok() .ok()
}; .await
let lazy_state_ids: OptionFuture<_> = lazy_loading_witness
.map(|witness| {
StreamExt::into_future(
witness
.iter()
.stream()
.broad_filter_map(|user_id| state_get_shorteventid(user_id)),
)
}) })
.into(); .collect()
.await;
let state_diff_shortids = services if !lazy_membership_events.is_empty() {
.rooms trace!(
.state_accessor "syncing lazy membership events for members: {:?}",
.state_added((since_shortstatehash, current_shortstatehash)) lazy_membership_events
.boxed(); .iter()
.map(|pdu| pdu.state_key().unwrap())
state_diff_shortids );
.broad_filter_map(|(shortstatekey, shorteventid)| async move { }
if lazy_loading_witness.is_none() { return Ok(lazy_membership_events);
return Some(shorteventid);
} }
lazy_filter(services, sender_user, shortstatekey, shorteventid).await // lazy loading is disabled, `state` is empty.
}) return Ok(vec![]);
.chain(lazy_state_ids.stream()) }
.broad_filter_map(|shorteventid| {
/*
at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates
computing the incremental state (which may be empty).
NOTE: this code path does not apply lazy-load filtering to membership state events. the spec forbids lazy-load filtering
if the timeline is `limited`, and DAG splits which require sending extra membership state events are (probably) uncommon
enough that the performance penalty is acceptable.
*/
trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync");
// fetch the shorteventids of state events in the timeline
let state_events_in_timeline: BTreeSet<ShortEventId> = services
.rooms
.short
.multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| {
if pdu.state_key().is_some() {
Some(pdu.event_id.as_ref())
} else {
None
}
}))
.collect()
.await;
trace!("{} state events in timeline", state_events_in_timeline.len());
/*
fetch the state events which were added since the last sync.
specifically we fetch the difference between the state at the last sync and the state at the _end_
of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched.
this is necessary to account for splits in the DAG, as explained above.
*/
let state_diff = services
.rooms
.short
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
services services
.rooms .rooms
.short .state_accessor
.get_eventid_from_short(shorteventid) .state_added((previous_sync_end_shortstatehash, timeline_end_shortstatehash))
.await?
.stream()
.ready_filter_map(|(_, shorteventid)| {
if state_events_in_timeline.contains(&shorteventid) {
None
} else {
Some(shorteventid)
}
}),
)
.ignore_err();
// finally, fetch the PDU contents and collect them into a vec
let state_diff_pdus = state_diff
.broad_filter_map(|event_id| async move {
services
.rooms
.timeline
.get_non_outlier_pdu(&event_id)
.await
.ok() .ok()
}) })
.broad_filter_map(|event_id: OwnedEventId| async move {
services.rooms.timeline.get_pdu(&event_id).await.ok()
})
.collect::<Vec<_>>() .collect::<Vec<_>>()
.map(Ok) .await;
.await
trace!(?state_diff_pdus, "collected state PDUs for incremental sync");
Ok(state_diff_pdus)
} }
async fn lazy_filter( async fn calculate_device_list_updates(
services: &Services, services: &Services,
sender_user: &UserId, SyncContext { sender_user, since, next_batch, .. }: SyncContext<'_>,
shortstatekey: ShortStateKey, room_id: &RoomId,
shorteventid: ShortEventId, device_list_updates: &mut DeviceListUpdates,
) -> Option<ShortEventId> { state_events: &Vec<PduEvent>,
let (event_type, state_key) = services joined_since_last_sync: bool,
.rooms ) {
.short // add users with changed keys to the `changed` list
.get_statekey_from_short(shortstatekey) services
.await .users
.ok()?; .room_keys_changed(room_id, since, Some(next_batch))
.map(at!(0))
.map(ToOwned::to_owned)
.ready_for_each(|user_id| {
device_list_updates.changed.insert(user_id);
})
.await;
(event_type != StateEventType::RoomMember || state_key == sender_user.as_str()) // add users who now share encrypted rooms to `changed` and
.then_some(shorteventid) // users who no longer share encrypted rooms to `left`
for state_event in state_events {
if state_event.kind == RoomMember {
let Some(content): Option<RoomMemberEventContent> = state_event.get_content().ok()
else {
continue;
};
let Some(user_id): Option<OwnedUserId> = state_event
.state_key
.as_ref()
.and_then(|key| key.parse().ok())
else {
continue;
};
{
use MembershipState::*;
if matches!(content.membership, Leave | Join) {
let shares_encrypted_room =
share_encrypted_room(services, sender_user, &user_id, Some(room_id))
.await;
match content.membership {
| Leave if !shares_encrypted_room => {
device_list_updates.left.insert(user_id);
},
| Join if joined_since_last_sync || shares_encrypted_room => {
device_list_updates.changed.insert(user_id);
},
| _ => (),
}
}
}
}
}
} }
async fn calculate_counts( async fn calculate_counts(
+5 -3
View File
@@ -39,7 +39,7 @@ pub enum Status {
Seen(u64), Seen(u64),
} }
pub type Witness = HashSet<OwnedUserId>; pub type MemberSet = HashSet<OwnedUserId>;
type Key<'a> = (&'a UserId, Option<&'a DeviceId>, &'a RoomId, &'a UserId); type Key<'a> = (&'a UserId, Option<&'a DeviceId>, &'a RoomId, &'a UserId);
impl crate::Service for Service { impl crate::Service for Service {
@@ -67,9 +67,11 @@ pub async fn reset(&self, ctx: &Context<'_>) {
.await; .await;
} }
/// Returns only the subset of `senders` which should be sent to the client
/// according to the provided lazy loading context.
#[implement(Service)] #[implement(Service)]
#[tracing::instrument(name = "retain", level = "debug", skip_all)] #[tracing::instrument(name = "retain", level = "debug", skip_all)]
pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witness { pub async fn retain_lazy_members(&self, senders: MemberSet, ctx: &Context<'_>) -> MemberSet {
debug_assert!( debug_assert!(
ctx.options.is_none_or(Options::is_enabled), ctx.options.is_none_or(Options::is_enabled),
"lazy loading should be enabled by your options" "lazy loading should be enabled by your options"
@@ -84,7 +86,7 @@ pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witne
pin_mut!(witness); pin_mut!(witness);
let _cork = self.db.db.cork(); let _cork = self.db.db.cork();
let mut senders = Witness::with_capacity(senders.len()); let mut senders = MemberSet::with_capacity(senders.len());
while let Some((status, sender)) = witness.next().await { while let Some((status, sender)) = witness.next().await {
if include_redundant || status == Status::Unseen { if include_redundant || status == Status::Unseen {
senders.insert(sender.into()); senders.insert(sender.into());
+32 -3
View File
@@ -1,10 +1,18 @@
use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc}; use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc};
pub use conduwuit::matrix::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey}; pub use conduwuit::matrix::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey};
use conduwuit::{Result, err, implement, matrix::StateKey, utils, utils::IterStream}; use conduwuit::{
Result, err, implement,
matrix::StateKey,
pair_of,
utils::{self, IterStream, ReadyExt},
};
use database::{Deserialized, Get, Map, Qry}; use database::{Deserialized, Get, Map, Qry};
use futures::{Stream, StreamExt}; use futures::{
use ruma::{EventId, RoomId, events::StateEventType}; Stream, StreamExt,
stream::{self},
};
use ruma::{EventId, OwnedEventId, RoomId, events::StateEventType};
use serde::Deserialize; use serde::Deserialize;
use crate::{Dep, globals}; use crate::{Dep, globals};
@@ -258,3 +266,24 @@ pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> ShortRoomId {
short short
}) })
} }
#[implement(Service)]
pub async fn multi_get_state_from_short<'a, S>(
&'a self,
short_state: S,
) -> impl Stream<Item = Result<((StateEventType, StateKey), OwnedEventId)>> + Send + 'a
where
S: Stream<Item = (ShortStateKey, ShortEventId)> + Send + 'a,
{
let (short_state_keys, short_event_ids): pair_of!(Vec<_>) = short_state.unzip().await;
StreamExt::zip(
self.multi_get_statekey_from_short(stream::iter(short_state_keys.into_iter())),
self.multi_get_eventid_from_short(stream::iter(short_event_ids.into_iter())),
)
.ready_filter_map(|state_event| match state_event {
| (Ok(state_key), Ok(event_id)) => Some(Ok((state_key, event_id))),
| (Err(e), _) => Some(Err(e)),
| (_, Err(e)) => Some(Err(e)),
})
}
+13 -13
View File
@@ -10,7 +10,7 @@ use conduwuit::{
}, },
}; };
use database::Deserialized; use database::Deserialized;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::try_join, pin_mut}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, pin_mut};
use ruma::{ use ruma::{
EventId, OwnedEventId, UserId, EventId, OwnedEventId, UserId,
events::{ events::{
@@ -286,28 +286,28 @@ pub fn state_keys<'a>(
/// not in .1) /// not in .1)
#[implement(super::Service)] #[implement(super::Service)]
#[inline] #[inline]
pub fn state_removed( pub async fn state_removed(
&self, &self,
shortstatehash: pair_of!(ShortStateHash), shortstatehash: pair_of!(ShortStateHash),
) -> impl Stream<Item = (ShortStateKey, ShortEventId)> + Send + '_ { ) -> Result<Vec<(ShortStateKey, ShortEventId)>> {
self.state_added((shortstatehash.1, shortstatehash.0)) self.state_added((shortstatehash.1, shortstatehash.0)).await
} }
/// Returns the state events added between the interval (present in .1 but /// Returns the state events added between the interval (present in .1 but
/// not in .0) /// not in .0)
#[implement(super::Service)] #[implement(super::Service)]
pub fn state_added( pub async fn state_added(
&self, &self,
shortstatehash: pair_of!(ShortStateHash), shortstatehash: pair_of!(ShortStateHash),
) -> impl Stream<Item = (ShortStateKey, ShortEventId)> + Send + '_ { ) -> Result<Vec<(ShortStateKey, ShortEventId)>> {
let a = self.load_full_state(shortstatehash.0); let full_state_a = self.load_full_state(shortstatehash.0).await?;
let b = self.load_full_state(shortstatehash.1); let full_state_b = self.load_full_state(shortstatehash.1).await?;
try_join(a, b)
.map_ok(|(a, b)| b.difference(&a).copied().collect::<Vec<_>>()) Ok(full_state_b
.map_ok(IterStream::try_stream) .difference(&full_state_a)
.try_flatten_stream() .copied()
.ignore_err()
.map(parse_compressed_state_event) .map(parse_compressed_state_event)
.collect())
} }
#[implement(super::Service)] #[implement(super::Service)]
+1 -1
View File
@@ -526,7 +526,7 @@ pub(crate) fn compress_state_event(
#[inline] #[inline]
#[must_use] #[must_use]
pub(crate) fn parse_compressed_state_event( pub fn parse_compressed_state_event(
compressed_event: CompressedStateEvent, compressed_event: CompressedStateEvent,
) -> (ShortStateKey, ShortEventId) { ) -> (ShortStateKey, ShortEventId) {
use utils::u64_from_u8; use utils::u64_from_u8;
+1 -3
View File
@@ -186,10 +186,8 @@ impl Service {
} }
/// Returns the pdu. /// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[inline] #[inline]
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<impl Event> { pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
self.db.get_non_outlier_pdu(event_id).await self.db.get_non_outlier_pdu(event_id).await
} }