refactor: Fix errors in api/client/sync

This commit is contained in:
Ginger
2026-04-10 13:48:35 -04:00
parent 01e403f05f
commit 4e456249ac
7 changed files with 208 additions and 198 deletions
+61 -48
View File
@@ -11,7 +11,7 @@ use std::{
use axum::extract::State;
use axum_client_ip::ClientIp;
use conduwuit::{
Result, at, extract_variant,
Err, Result, at, extract_variant,
utils::{
ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, Tools, WidebandExt},
@@ -19,10 +19,7 @@ use conduwuit::{
warn,
};
use conduwuit_service::Services;
use futures::{
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join3, join4, join5},
};
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture};
use ruma::{
DeviceId, OwnedUserId, RoomId, UserId,
api::client::{
@@ -36,13 +33,14 @@ use ruma::{
},
uiaa::UiaaResponse,
},
events::{
AnyRawAccountDataEvent,
presence::{PresenceEvent, PresenceEventContent},
},
assign,
events::presence::{PresenceEvent, PresenceEventContent},
serde::Raw,
};
use service::rooms::lazy_loading::{self, MemberSet, Options as _};
use service::{
account_data::AnyRawAccountDataEvent,
rooms::lazy_loading::{self, MemberSet, Options as _},
};
use super::{load_timeline, share_encrypted_room};
use crate::{
@@ -82,10 +80,10 @@ impl DeviceListUpdates {
impl From<DeviceListUpdates> for DeviceLists {
fn from(val: DeviceListUpdates) -> Self {
Self {
assign!(Self::new(), {
changed: val.changed.into_iter().collect(),
left: val.left.into_iter().collect(),
}
})
}
}
@@ -253,6 +251,8 @@ pub(crate) async fn build_sync_events(
.get_filter(syncing_user, filter_id)
.await
.unwrap_or_default(),
// error out for unknown filter types
| _ => return Err!(Request(InvalidParam("Unknown filter type"))),
});
let context = SyncContext {
@@ -268,7 +268,6 @@ pub(crate) async fn build_sync_events(
.rooms
.state_cache
.rooms_joined(syncing_user)
.map(ToOwned::to_owned)
.broad_filter_map(|room_id| async {
let joined_room = load_joined_room(services, context, room_id.clone()).await;
@@ -332,9 +331,9 @@ pub(crate) async fn build_sync_events(
// only sync this invite if it was sent after the last /sync call
if last_sync_end_count < invite_count {
let invited_room = InvitedRoom {
invite_state: InviteState { events: invite_state },
};
let invited_room = assign!(InvitedRoom::new(), {
invite_state: InviteState::from(invite_state),
});
invited_rooms.insert(room_id, invited_room);
}
@@ -355,9 +354,9 @@ pub(crate) async fn build_sync_events(
// only sync this knock if it was sent after the last /sync call
if last_sync_end_count < knock_count {
let knocked_room = KnockedRoom {
knock_state: KnockState { events: knock_state },
};
let knocked_room = assign!(KnockedRoom::new(), {
knock_state: assign!(KnockState::new(), { events: knock_state }),
});
knocked_rooms.insert(room_id, knocked_room);
}
@@ -376,13 +375,6 @@ pub(crate) async fn build_sync_events(
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect();
// Look for device list updates of this account
let keys_changed = services
.users
.keys_changed(syncing_user, last_sync_end_count, Some(current_count))
.map(ToOwned::to_owned)
.collect::<HashSet<_>>();
let to_device_events = services
.users
.get_to_device_events(
@@ -394,36 +386,57 @@ pub(crate) async fn build_sync_events(
.map(at!(1))
.collect::<Vec<_>>();
// Look for device list updates of this account
let keys_changed = services
.users
.keys_changed(syncing_user, last_sync_end_count, Some(current_count))
.collect::<HashSet<_>>();
let device_one_time_keys_count = services
.users
.count_one_time_keys(syncing_user, syncing_device);
// Remove all to-device events the device received *last time*
let remove_to_device_events =
services
.users
.remove_to_device_events(syncing_user, syncing_device, last_sync_end_count);
let (
(joined_rooms, mut device_list_updates),
left_rooms,
invited_rooms,
knocked_rooms,
presence_updates,
account_data,
to_device_events,
keys_changed,
device_one_time_keys_count,
) = async {
futures::join!(
joined_rooms,
left_rooms,
invited_rooms,
knocked_rooms,
presence_updates,
account_data,
to_device_events,
keys_changed,
device_one_time_keys_count
)
}
.boxed()
.await;
let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms);
let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates);
let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms)
.boxed()
// Remove all to-device events the device received *last time*
services
.users
.remove_to_device_events(syncing_user, syncing_device, last_sync_end_count)
.await;
let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top;
let ((), to_device_events, presence_updates) = ephemeral;
let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms;
let (joined_rooms, mut device_list_updates) = joined_rooms;
device_list_updates.changed.extend(keys_changed);
let response = sync_events::v3::Response {
account_data: GlobalAccountData { events: account_data },
let response = assign!(sync_events::v3::Response::new(current_count.to_string()), {
account_data: assign!(GlobalAccountData::new(), { events: account_data }),
device_lists: device_list_updates.into(),
device_one_time_keys_count,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
next_batch: current_count.to_string(),
presence: Presence {
presence: assign!(Presence::new(), {
events: presence_updates
.into_iter()
.flat_map(IntoIterator::into_iter)
@@ -431,15 +444,15 @@ pub(crate) async fn build_sync_events(
.map(|ref event| Raw::new(event))
.filter_map(Result::ok)
.collect(),
},
rooms: Rooms {
}),
rooms: assign!(Rooms::new(), {
leave: left_rooms,
join: joined_rooms,
invite: invited_rooms,
knock: knocked_rooms,
},
to_device: ToDevice { events: to_device_events },
};
}),
to_device: assign!(ToDevice::new(), { events: to_device_events }),
});
Ok(response)
}