chore(sync/v3): Use more descriptive names for SyncContext properties

This commit is contained in:
Ginger
2025-11-05 10:01:12 -05:00
parent 5513bb4dff
commit 9cc0cc69f7
4 changed files with 151 additions and 112 deletions
+61 -34
View File
@@ -52,6 +52,7 @@ use crate::{
},
};
/// A collection of updates to users' device lists, used for E2EE.
struct DeviceListUpdates {
changed: HashSet<OwnedUserId>,
left: HashSet<OwnedUserId>,
@@ -80,23 +81,39 @@ impl From<DeviceListUpdates> for DeviceLists {
}
}
/// References to common data needed to calculate the sync response.
#[derive(Clone, Copy)]
struct SyncContext<'a> {
sender_user: &'a UserId,
sender_device: &'a DeviceId,
since: Option<u64>,
next_batch: u64,
/// The ID of the user requesting this sync.
syncing_user: &'a UserId,
/// The ID of the device requesting this sync, which will belong to
/// `syncing_user`.
syncing_device: &'a DeviceId,
/// The global count at the end of the previous sync response.
/// The previous sync's `current_count` will become the next sync's
/// `last_sync_end_count`. This will be None if no `since` query parameter
/// was specified, indicating an initial sync.
last_sync_end_count: Option<u64>,
/// The global count as of when we started building the sync response.
/// This is used as an upper bound when querying the database to ensure the
/// response represents a snapshot in time and doesn't include data which
/// appeared while the response was being built.
current_count: u64,
/// The `full_state` query parameter, used when syncing state for joined and
/// left rooms.
full_state: bool,
/// The sync filter, which the client uses to specify what data should be
/// included in the sync response.
filter: &'a FilterDefinition,
}
impl<'a> SyncContext<'a> {
fn lazy_loading_context(&self, room_id: &'a RoomId) -> lazy_loading::Context<'a> {
lazy_loading::Context {
user_id: self.sender_user,
device_id: Some(self.sender_device),
user_id: self.syncing_user,
device_id: Some(self.syncing_device),
room_id,
token: self.since,
token: self.last_sync_end_count,
options: Some(&self.filter.room.state.lazy_load_options),
}
}
@@ -196,10 +213,12 @@ pub(crate) async fn build_sync_events(
services: &Services,
body: &Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let (sender_user, sender_device) = body.sender();
let (syncing_user, syncing_device) = body.sender();
let next_batch = services.globals.current_count()?;
let since = body
let current_count = services.globals.current_count()?;
// the `since` token is the last sync end count stringified
let last_sync_end_count = body
.body
.since
.as_ref()
@@ -209,20 +228,23 @@ pub(crate) async fn build_sync_events(
// FilterDefinition is very large (0x1000 bytes), let's put it on the heap
let filter = Box::new(match body.body.filter.as_ref() {
// use the default filter if none was specified
| None => FilterDefinition::default(),
// use inline filters directly
| Some(Filter::FilterDefinition(filter)) => filter.clone(),
// look up filter IDs from the database
| Some(Filter::FilterId(filter_id)) => services
.users
.get_filter(sender_user, filter_id)
.get_filter(syncing_user, filter_id)
.await
.unwrap_or_default(),
});
let context = SyncContext {
sender_user,
sender_device,
since,
next_batch,
syncing_user,
syncing_device,
last_sync_end_count,
current_count,
full_state,
filter: &filter,
};
@@ -230,7 +252,7 @@ pub(crate) async fn build_sync_events(
let joined_rooms = services
.rooms
.state_cache
.rooms_joined(sender_user)
.rooms_joined(syncing_user)
.map(ToOwned::to_owned)
.broad_filter_map(|room_id| async {
let joined_room = load_joined_room(services, context, room_id.clone()).await;
@@ -259,7 +281,7 @@ pub(crate) async fn build_sync_events(
let left_rooms = services
.rooms
.state_cache
.rooms_left(sender_user)
.rooms_left(syncing_user)
.broad_filter_map(|(room_id, leave_pdu)| {
load_left_room(services, context, room_id.clone(), leave_pdu)
.map_ok(move |left_room| (room_id, left_room))
@@ -271,9 +293,9 @@ pub(crate) async fn build_sync_events(
let invited_rooms = services
.rooms
.state_cache
.rooms_invited(sender_user)
.rooms_invited(syncing_user)
.wide_filter_map(async |(room_id, invite_state)| {
if is_ignored_invite(services, sender_user, &room_id).await {
if is_ignored_invite(services, syncing_user, &room_id).await {
None
} else {
Some((room_id, invite_state))
@@ -283,12 +305,12 @@ pub(crate) async fn build_sync_events(
let invite_count = services
.rooms
.state_cache
.get_invite_count(&room_id, sender_user)
.get_invite_count(&room_id, syncing_user)
.await
.ok();
// only sync this invite if it was sent after the last /sync call
if since < invite_count {
if last_sync_end_count < invite_count {
let invited_room = InvitedRoom {
invite_state: InviteState { events: invite_state },
};
@@ -301,17 +323,17 @@ pub(crate) async fn build_sync_events(
let knocked_rooms = services
.rooms
.state_cache
.rooms_knocked(sender_user)
.rooms_knocked(syncing_user)
.fold_default(|mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| async move {
let knock_count = services
.rooms
.state_cache
.get_knock_count(&room_id, sender_user)
.get_knock_count(&room_id, syncing_user)
.await
.ok();
// only sync this knock if it was sent after the last /sync call
if since < knock_count {
if last_sync_end_count < knock_count {
let knocked_room = KnockedRoom {
knock_state: KnockState { events: knock_state },
};
@@ -324,36 +346,41 @@ pub(crate) async fn build_sync_events(
let presence_updates: OptionFuture<_> = services
.config
.allow_local_presence
.then(|| process_presence_updates(services, since, sender_user))
.then(|| process_presence_updates(services, last_sync_end_count, syncing_user))
.into();
let account_data = services
.account_data
.changes_since(None, sender_user, since, Some(next_batch))
.changes_since(None, syncing_user, last_sync_end_count, Some(current_count))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect();
// Look for device list updates of this account
let keys_changed = services
.users
.keys_changed(sender_user, since, Some(next_batch))
.keys_changed(syncing_user, last_sync_end_count, Some(current_count))
.map(ToOwned::to_owned)
.collect::<HashSet<_>>();
let to_device_events = services
.users
.get_to_device_events(sender_user, sender_device, since, Some(next_batch))
.get_to_device_events(
syncing_user,
syncing_device,
last_sync_end_count,
Some(current_count),
)
.collect::<Vec<_>>();
let device_one_time_keys_count = services
.users
.count_one_time_keys(sender_user, sender_device);
.count_one_time_keys(syncing_user, syncing_device);
// Remove all to-device events the device received *last time*
let remove_to_device_events =
services
.users
.remove_to_device_events(sender_user, sender_device, since);
.remove_to_device_events(syncing_user, syncing_device, last_sync_end_count);
let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms);
let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates);
@@ -373,7 +400,7 @@ pub(crate) async fn build_sync_events(
device_one_time_keys_count,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
next_batch: next_batch.to_string(),
next_batch: current_count.to_string(),
presence: Presence {
events: presence_updates
.into_iter()
@@ -398,12 +425,12 @@ pub(crate) async fn build_sync_events(
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
async fn process_presence_updates(
services: &Services,
since: Option<u64>,
last_sync_end_count: Option<u64>,
syncing_user: &UserId,
) -> PresenceUpdates {
services
.presence
.presence_since(since.unwrap_or(0)) // send all presences on initial sync
.presence_since(last_sync_end_count.unwrap_or(0)) // send all presences on initial sync
.filter(|(user_id, ..)| {
services
.rooms
@@ -433,7 +460,7 @@ async fn prepare_lazily_loaded_members(
// reset lazy loading state on initial sync.
// do this even if lazy loading is disabled so future lazy loads
// will have the correct members.
if sync_context.since.is_none() {
if sync_context.last_sync_end_count.is_none() {
services
.rooms
.lazy_loading