From 4e456249acc131da9472c9f3c07865c1b9a886dd Mon Sep 17 00:00:00 2001 From: Ginger Date: Fri, 10 Apr 2026 13:48:35 -0400 Subject: [PATCH] refactor: Fix errors in `api/client/sync` --- src/api/admin/rooms/ban.rs | 4 +- src/api/client/state/tests.rs | 2 +- src/api/client/sync/mod.rs | 3 +- src/api/client/sync/v3/joined.rs | 47 +++---- src/api/client/sync/v3/left.rs | 21 +-- src/api/client/sync/v3/mod.rs | 109 ++++++++------- src/api/client/sync/v5.rs | 220 ++++++++++++++++--------------- 7 files changed, 208 insertions(+), 198 deletions(-) diff --git a/src/api/admin/rooms/ban.rs b/src/api/admin/rooms/ban.rs index a3a34d7dc..9a96aad96 100644 --- a/src/api/admin/rooms/ban.rs +++ b/src/api/admin/rooms/ban.rs @@ -60,9 +60,9 @@ pub(crate) async fn ban_room( .rooms .alias .local_aliases_for_room(&body.room_id) - .map(ToOwned::to_owned) - .collect::>() + .collect() .await; + for alias in &aliases { info!("Removing alias {} for banned room {}", alias, body.room_id); services diff --git a/src/api/client/state/tests.rs b/src/api/client/state/tests.rs index c425b834d..41b1cd833 100644 --- a/src/api/client/state/tests.rs +++ b/src/api/client/state/tests.rs @@ -14,7 +14,7 @@ fn test_strip_room_member() -> Result<()> { let json: &mut Raw = &mut Raw::::from_json_string(body.to_owned())?; let mut membership_content: RoomMemberEventContent = - json.deserialize_as::()?; + json.deserialize_as_unchecked::()?; //Begin Test membership_content.join_authorized_via_users_server = None; diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 218d5ae08..2b6d3d291 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -152,8 +152,7 @@ async fn share_encrypted_room( .rooms .state_cache .get_shared_rooms(sender_user, user_id) - .ready_filter(|&room_id| Some(room_id) != ignore_room) - .map(ToOwned::to_owned) + .ready_filter(|room_id| Some(room_id.as_ref()) != ignore_room) .broad_any(|other_room_id| async move { services .rooms diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index d5ffd0b29..6b4af1dcc 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -23,17 +23,21 @@ use ruma::{ OwnedRoomId, OwnedUserId, RoomId, UserId, api::client::sync::sync_events::{ UnreadNotificationsCount, - v3::{Ephemeral, JoinedRoom, RoomAccountData, RoomSummary, State as RoomState, Timeline}, + v3::{ + Ephemeral, JoinedRoom, RoomAccountData, RoomSummary, State as RoomState, StateEvents, + Timeline, + }, }, + assign, events::{ - AnyRawAccountDataEvent, StateEventType, + StateEventType, TimelineEventType::*, room::member::{MembershipState, RoomMemberEventContent}, }, serde::Raw, uint, }; -use service::rooms::short::ShortStateHash; +use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash}; use super::{load_timeline, share_encrypted_room}; use crate::client::{ @@ -92,17 +96,15 @@ pub(super) async fn load_joined_room( ); } - let joined_room = JoinedRoom { + let joined_room = assign!(JoinedRoom::new(), { account_data, summary: summary.unwrap_or_default(), unread_notifications: notification_counts.unwrap_or_default(), timeline, - state: RoomState { - events: state_events.into_iter().map(Event::into_format).collect(), - }, + state: RoomState::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())), ephemeral, unread_thread_notifications: BTreeMap::new(), - }; + }); Ok((joined_room, device_list_updates)) } @@ -126,7 +128,7 @@ async fn build_account_data( .collect() .await; - Ok(RoomAccountData { events: account_data_changes }) + Ok(assign!(RoomAccountData::new(), { events: account_data_changes })) } /// Collect new ephemeral events. @@ -233,7 +235,7 @@ async fn build_ephemeral( edus.extend(typing_event); edus.extend(private_read_event); - Ok(Ephemeral { events: edus }) + Ok(assign!(Ephemeral::new(), { events: edus })) } /// A struct to hold the state events, timeline, and other data which is @@ -318,11 +320,11 @@ async fn build_state_and_timeline( Ok(StateAndTimeline { state_events, - timeline: Timeline { + timeline: assign!(Timeline::new(), { limited, prev_batch: prev_batch.as_ref().map(ToString::to_string), events: filtered_timeline, - }, + }), summary, notification_counts, device_list_updates, @@ -580,10 +582,10 @@ async fn build_notification_counts( trace!(%notification_count, %highlight_count, "syncing new notification counts"); - Ok(Some(UnreadNotificationsCount { + Ok(Some(assign!(UnreadNotificationsCount::new(), { notification_count: Some(notification_count), highlight_count: Some(highlight_count), - })) + }))) } else { Ok(None) } @@ -698,13 +700,13 @@ async fn build_room_summary( "syncing updated summary" ); - Ok(Some(RoomSummary { + Ok(Some(assign!(RoomSummary::new(), { heroes: heroes .map(|heroes| heroes.into_iter().collect()) .unwrap_or_default(), joined_member_count: Some(ruma_from_u64(joined_member_count)), invited_member_count: Some(ruma_from_u64(invited_member_count)), - })) + }))) } /// Fetch the user IDs to include in the `m.heroes` property of the room @@ -718,18 +720,10 @@ async fn build_heroes( const MAX_HERO_COUNT: usize = 5; // fetch joined members from the state cache first - let joined_members_stream = services - .rooms - .state_cache - .room_members(room_id) - .map(ToOwned::to_owned); + let joined_members_stream = services.rooms.state_cache.room_members(room_id); // then fetch invited members - let invited_members_stream = services - .rooms - .state_cache - .room_members_invited(room_id) - .map(ToOwned::to_owned); + let invited_members_stream = services.rooms.state_cache.room_members_invited(room_id); // then as a last resort fetch every membership event let all_members_stream = services @@ -796,7 +790,6 @@ async fn build_device_list_updates( .users .room_keys_changed(room_id, last_sync_end_count, Some(current_count)) .map(at!(0)) - .map(ToOwned::to_owned) .ready_for_each(|user_id| { device_list_updates.changed.insert(user_id); }) diff --git a/src/api/client/sync/v3/left.rs b/src/api/client/sync/v3/left.rs index 69a8aee60..d833bfdf9 100644 --- a/src/api/client/sync/v3/left.rs +++ b/src/api/client/sync/v3/left.rs @@ -7,7 +7,10 @@ use conduwuit::{ use futures::{StreamExt, future::join}; use ruma::{ EventId, OwnedRoomId, RoomId, - api::client::sync::sync_events::v3::{LeftRoom, RoomAccountData, State, Timeline}, + api::client::sync::sync_events::v3::{ + LeftRoom, RoomAccountData, State, StateEvents, Timeline, + }, + assign, events::{StateEventType, TimelineEventType}, uint, }; @@ -178,17 +181,15 @@ pub(super) async fn load_left_room( .collect::>() .await; - Ok(Some(LeftRoom { - account_data: RoomAccountData { events: Vec::new() }, - timeline: Timeline { + Ok(Some(assign!(LeftRoom::new(), { + account_data: RoomAccountData::new(), + timeline: assign!(Timeline::new(), { limited: timeline.limited, prev_batch: Some(current_count.to_string()), events: raw_timeline_pdus, - }, - state: State { - events: state_events.into_iter().map(Event::into_format).collect(), - }, - })) + }), + state: State::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())), + }))) } async fn build_left_state_and_timeline( @@ -317,7 +318,7 @@ fn create_dummy_leave_event( // clients. perhaps a database table could be created to hold these dummy // events, or they could be stored as outliers? PduEvent { - event_id: EventId::new(services.globals.server_name()), + event_id: EventId::new_v1(services.globals.server_name()), sender: syncing_user.to_owned(), origin: None, origin_server_ts: utils::millis_since_unix_epoch() diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs index f9489053c..fb9b064f2 100644 --- a/src/api/client/sync/v3/mod.rs +++ b/src/api/client/sync/v3/mod.rs @@ -11,7 +11,7 @@ use std::{ use axum::extract::State; use axum_client_ip::ClientIp; use conduwuit::{ - Result, at, extract_variant, + Err, Result, at, extract_variant, utils::{ ReadyExt, TryFutureExtExt, stream::{BroadbandExt, Tools, WidebandExt}, @@ -19,10 +19,7 @@ use conduwuit::{ warn, }; use conduwuit_service::Services; -use futures::{ - FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join3, join4, join5}, -}; +use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture}; use ruma::{ DeviceId, OwnedUserId, RoomId, UserId, api::client::{ @@ -36,13 +33,14 @@ use ruma::{ }, uiaa::UiaaResponse, }, - events::{ - AnyRawAccountDataEvent, - presence::{PresenceEvent, PresenceEventContent}, - }, + assign, + events::presence::{PresenceEvent, PresenceEventContent}, serde::Raw, }; -use service::rooms::lazy_loading::{self, MemberSet, Options as _}; +use service::{ + account_data::AnyRawAccountDataEvent, + rooms::lazy_loading::{self, MemberSet, Options as _}, +}; use super::{load_timeline, share_encrypted_room}; use crate::{ @@ -82,10 +80,10 @@ impl DeviceListUpdates { impl From for DeviceLists { fn from(val: DeviceListUpdates) -> Self { - Self { + assign!(Self::new(), { changed: val.changed.into_iter().collect(), left: val.left.into_iter().collect(), - } + }) } } @@ -253,6 +251,8 @@ pub(crate) async fn build_sync_events( .get_filter(syncing_user, filter_id) .await .unwrap_or_default(), + // error out for unknown filter types + | _ => return Err!(Request(InvalidParam("Unknown filter type"))), }); let context = SyncContext { @@ -268,7 +268,6 @@ pub(crate) async fn build_sync_events( .rooms .state_cache .rooms_joined(syncing_user) - .map(ToOwned::to_owned) .broad_filter_map(|room_id| async { let joined_room = load_joined_room(services, context, room_id.clone()).await; @@ -332,9 +331,9 @@ pub(crate) async fn build_sync_events( // only sync this invite if it was sent after the last /sync call if last_sync_end_count < invite_count { - let invited_room = InvitedRoom { - invite_state: InviteState { events: invite_state }, - }; + let invited_room = assign!(InvitedRoom::new(), { + invite_state: InviteState::from(invite_state), + }); invited_rooms.insert(room_id, invited_room); } @@ -355,9 +354,9 @@ pub(crate) async fn build_sync_events( // only sync this knock if it was sent after the last /sync call if last_sync_end_count < knock_count { - let knocked_room = KnockedRoom { - knock_state: KnockState { events: knock_state }, - }; + let knocked_room = assign!(KnockedRoom::new(), { + knock_state: assign!(KnockState::new(), { events: knock_state }), + }); knocked_rooms.insert(room_id, knocked_room); } @@ -376,13 +375,6 @@ pub(crate) async fn build_sync_events( .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .collect(); - // Look for device list updates of this account - let keys_changed = services - .users - .keys_changed(syncing_user, last_sync_end_count, Some(current_count)) - .map(ToOwned::to_owned) - .collect::>(); - let to_device_events = services .users .get_to_device_events( @@ -394,36 +386,57 @@ pub(crate) async fn build_sync_events( .map(at!(1)) .collect::>(); + // Look for device list updates of this account + let keys_changed = services + .users + .keys_changed(syncing_user, last_sync_end_count, Some(current_count)) + .collect::>(); + let device_one_time_keys_count = services .users .count_one_time_keys(syncing_user, syncing_device); - // Remove all to-device events the device received *last time* - let remove_to_device_events = - services - .users - .remove_to_device_events(syncing_user, syncing_device, last_sync_end_count); + let ( + (joined_rooms, mut device_list_updates), + left_rooms, + invited_rooms, + knocked_rooms, + presence_updates, + account_data, + to_device_events, + keys_changed, + device_one_time_keys_count, + ) = async { + futures::join!( + joined_rooms, + left_rooms, + invited_rooms, + knocked_rooms, + presence_updates, + account_data, + to_device_events, + keys_changed, + device_one_time_keys_count + ) + } + .boxed() + .await; - let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms); - let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates); - let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) - .boxed() + // Remove all to-device events the device received *last time* + services + .users + .remove_to_device_events(syncing_user, syncing_device, last_sync_end_count) .await; - let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top; - let ((), to_device_events, presence_updates) = ephemeral; - let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms; - let (joined_rooms, mut device_list_updates) = joined_rooms; device_list_updates.changed.extend(keys_changed); - let response = sync_events::v3::Response { - account_data: GlobalAccountData { events: account_data }, + let response = assign!(sync_events::v3::Response::new(current_count.to_string()), { + account_data: assign!(GlobalAccountData::new(), { events: account_data }), device_lists: device_list_updates.into(), device_one_time_keys_count, // Fallback keys are not yet supported device_unused_fallback_key_types: None, - next_batch: current_count.to_string(), - presence: Presence { + presence: assign!(Presence::new(), { events: presence_updates .into_iter() .flat_map(IntoIterator::into_iter) @@ -431,15 +444,15 @@ pub(crate) async fn build_sync_events( .map(|ref event| Raw::new(event)) .filter_map(Result::ok) .collect(), - }, - rooms: Rooms { + }), + rooms: assign!(Rooms::new(), { leave: left_rooms, join: joined_rooms, invite: invited_rooms, knock: knocked_rooms, - }, - to_device: ToDevice { events: to_device_events }, - }; + }), + to_device: assign!(ToDevice::new(), { events: to_device_events }), + }); Ok(response) } diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 183f3aaac..0e1343083 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -27,17 +27,20 @@ use futures::{ }; use ruma::{ DeviceId, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId, - api::client::sync::sync_events::{self, DeviceLists, UnreadNotificationsCount}, + api::client::sync::sync_events::{ + self, DeviceLists, UnreadNotificationsCount, v5::request::ExtensionRoomConfig, + }, + assign, directory::RoomTypeFilter, events::{ - AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, AnySyncStateEvent, StateEventType, - TimelineEventType, + AnySyncEphemeralRoomEvent, AnySyncStateEvent, StateEventType, TimelineEventType, room::member::{MembershipState, RoomMemberEventContent}, - typing::TypingEventContent, + typing::{SyncTypingEvent, TypingEventContent}, }, serde::Raw, uint, }; +use service::account_data::AnyRawAccountDataEvent; use super::share_encrypted_room; use crate::{ @@ -67,8 +70,8 @@ pub(crate) async fn sync_events_v5_route( body: Ruma, ) -> Result { debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); - let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let sender_device = body.sender_device.as_ref().expect("user is authenticated"); + let ref sender_user = body.sender_user().to_owned(); + let ref sender_device = body.sender_device().to_owned(); services .users @@ -90,7 +93,7 @@ pub(crate) async fn sync_events_v5_route( .and_then(|string| string.parse().ok()) .unwrap_or(0); - let snake_key = into_snake_key(sender_user, sender_device, conn_id); + let snake_key = into_snake_key(sender_user.as_ref(), sender_device.as_str(), conn_id); if globalsince != 0 && !services.sync.snake_connection_cached(&snake_key) { return Err!(Request(UnknownPos( @@ -112,7 +115,6 @@ pub(crate) async fn sync_events_v5_route( .rooms .state_cache .rooms_joined(sender_user) - .map(ToOwned::to_owned) .collect::>(); let all_invited_rooms = services @@ -164,21 +166,20 @@ pub(crate) async fn sync_events_v5_route( let (account_data, e2ee, to_device, receipts) = try_join4(account_data, e2ee, to_device, receipts).await?; - let extensions = sync_events::v5::response::Extensions { + let extensions = assign!(sync_events::v5::response::Extensions::default(), { account_data, e2ee, to_device, receipts, typing: sync_events::v5::response::Typing::default(), - }; + }); - let mut response = sync_events::v5::Response { + let mut response = assign!(sync_events::v5::Response::new(pos), { txn_id: body.txn_id.clone(), - pos, lists: BTreeMap::new(), rooms: BTreeMap::new(), extensions, - }; + }); handle_lists( services, @@ -379,11 +380,12 @@ where ); } } - response - .lists - .insert(list_id.clone(), sync_events::v5::response::List { + response.lists.insert( + list_id.clone(), + assign!(sync_events::v5::response::List::default(), { count: ruma_from_usize(active_rooms.len()), - }); + }), + ); if let Some(conn_id) = body.conn_id.clone() { let snake_key = into_snake_key(sender_user, sender_device, conn_id); @@ -563,17 +565,19 @@ where .state_cache .room_members(room_id) .ready_filter(|member| *member != sender_user) - .filter_map(|user_id| { + .filter_map(async |user_id| { services .rooms .state_accessor - .get_member(room_id, user_id) - .map_ok(|memberevent| sync_events::v5::response::Hero { - user_id: user_id.into(), - name: memberevent.displayname, - avatar: memberevent.avatar_url, + .get_member(room_id, &user_id) + .map_ok(|member_event| { + assign!(sync_events::v5::response::Hero::new(user_id.clone()), { + name: member_event.displayname, + avatar: member_event.avatar_url, + }) }) .ok() + .await }) .take(5) .collect() @@ -609,73 +613,76 @@ where None }; - rooms.insert(room_id.clone(), sync_events::v5::response::Room { - name: services - .rooms - .state_accessor - .get_name(room_id) - .await - .ok() - .or(name), - avatar: match heroes_avatar { - | Some(heroes_avatar) => ruma::JsOption::Some(heroes_avatar), - | _ => match services.rooms.state_accessor.get_avatar(room_id).await { - | ruma::JsOption::Some(avatar) => ruma::JsOption::from_option(avatar.url), - | ruma::JsOption::Null => ruma::JsOption::Null, - | ruma::JsOption::Undefined => ruma::JsOption::Undefined, + rooms.insert( + room_id.clone(), + assign!(sync_events::v5::response::Room::new(), { + name: services + .rooms + .state_accessor + .get_name(room_id) + .await + .ok() + .or(name), + avatar: match heroes_avatar { + | Some(heroes_avatar) => ruma::JsOption::Some(heroes_avatar), + | _ => match services.rooms.state_accessor.get_avatar(room_id).await { + | ruma::JsOption::Some(avatar) => ruma::JsOption::from_option(avatar.url), + | ruma::JsOption::Null => ruma::JsOption::Null, + | ruma::JsOption::Undefined => ruma::JsOption::Undefined, + }, }, - }, - initial: Some(roomsince == &0), - is_dm: None, - invite_state, - unread_notifications: UnreadNotificationsCount { - highlight_count: Some( + initial: Some(roomsince == &0), + is_dm: None, + invite_state, + unread_notifications: assign!(UnreadNotificationsCount::new(), { + highlight_count: Some( + services + .rooms + .user + .highlight_count(sender_user, room_id) + .await + .try_into() + .expect("notification count can't go that high"), + ), + notification_count: Some( + services + .rooms + .user + .notification_count(sender_user, room_id) + .await + .try_into() + .expect("notification count can't go that high"), + ), + }), + timeline: room_events, + required_state, + prev_batch, + limited, + joined_count: Some( services .rooms - .user - .highlight_count(sender_user, room_id) + .state_cache + .room_joined_count(room_id) .await + .unwrap_or(0) .try_into() - .expect("notification count can't go that high"), + .unwrap_or_else(|_| uint!(0)), ), - notification_count: Some( + invited_count: Some( services .rooms - .user - .notification_count(sender_user, room_id) + .state_cache + .room_invited_count(room_id) .await + .unwrap_or(0) .try_into() - .expect("notification count can't go that high"), + .unwrap_or_else(|_| uint!(0)), ), - }, - timeline: room_events, - required_state, - prev_batch, - limited, - joined_count: Some( - services - .rooms - .state_cache - .room_joined_count(room_id) - .await - .unwrap_or(0) - .try_into() - .unwrap_or_else(|_| uint!(0)), - ), - invited_count: Some( - services - .rooms - .state_cache - .room_invited_count(room_id) - .await - .unwrap_or(0) - .try_into() - .unwrap_or_else(|_| uint!(0)), - ), - num_live: None, // Count events in timeline greater than global sync counter - bump_stamp: timestamp, - heroes: Some(heroes), - }); + num_live: None, // Count events in timeline greater than global sync counter + bump_stamp: timestamp, + heroes: Some(heroes), + }), + ); } Ok(rooms) } @@ -737,7 +744,8 @@ async fn collect_typing_events( let rooms: Vec<_> = body.extensions.typing.rooms.clone().unwrap_or_else(|| { body.room_subscriptions .keys() - .map(ToOwned::to_owned) + .cloned() + .map(ExtensionRoomConfig::Room) .collect() }); let lists: Vec<_> = body @@ -766,9 +774,7 @@ async fn collect_typing_events( | Ok(typing_users) => { typing_response.rooms.insert( room_id.to_owned(), // Already OwnedRoomId - Raw::new(&sync_events::v5::response::SyncTypingEvent { - content: TypingEventContent::new(typing_users), - })?, + Raw::new(&SyncTypingEvent::new(TypingEventContent::new(typing_users)))?, ); }, | Err(e) => { @@ -784,10 +790,7 @@ async fn collect_account_data( services: &Services, (sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request), ) -> sync_events::v5::response::AccountData { - let mut account_data = sync_events::v5::response::AccountData { - global: Vec::new(), - rooms: BTreeMap::new(), - }; + let mut account_data = sync_events::v5::response::AccountData::default(); if !body.extensions.account_data.enabled.unwrap_or(false) { return sync_events::v5::response::AccountData::default(); @@ -802,15 +805,17 @@ async fn collect_account_data( if let Some(rooms) = &body.extensions.account_data.rooms { for room in rooms { - account_data.rooms.insert( - room.clone(), - services - .account_data - .changes_since(Some(room), sender_user, Some(globalsince), None) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect() - .await, - ); + if let ExtensionRoomConfig::Room(room) = room { + account_data.rooms.insert( + room.clone(), + services + .account_data + .changes_since(Some(room.as_ref()), sender_user, Some(globalsince), None) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await, + ); + } } } @@ -841,7 +846,6 @@ where services .users .keys_changed(sender_user, Some(globalsince), None) - .map(ToOwned::to_owned) .collect::>() .await, ); @@ -932,7 +936,7 @@ where if !share_encrypted_room( services, sender_user, - user_id, + &user_id, Some(room_id), ) .await @@ -962,9 +966,10 @@ where .ready_filter(|user_id| sender_user != *user_id) // Only send keys if the sender doesn't share an encrypted room with the target // already - .filter_map(|user_id| { - share_encrypted_room(services, sender_user, user_id, Some(room_id)) + .filter_map(async |user_id| { + share_encrypted_room(services, sender_user, &user_id, Some(room_id)) .map(|res| res.or_some(user_id.to_owned())) + .await }) .collect::>() .await, @@ -978,7 +983,6 @@ where .users .room_keys_changed(room_id, Some(globalsince), None) .map(|(user_id, _)| user_id) - .map(ToOwned::to_owned) .collect::>() .await, ); @@ -995,7 +999,7 @@ where } } - Ok(sync_events::v5::response::E2EE { + Ok(assign!(sync_events::v5::response::E2EE::default(), { device_unused_fallback_key_types: None, device_one_time_keys_count: services @@ -1003,11 +1007,11 @@ where .count_one_time_keys(sender_user, sender_device) .await, - device_lists: DeviceLists { + device_lists: assign!(DeviceLists::new(), { changed: device_list_changes.into_iter().collect(), left: device_list_left.into_iter().collect(), - }, - }) + }), + })) } async fn collect_to_device( @@ -1024,7 +1028,7 @@ async fn collect_to_device( .remove_to_device_events(sender_user, sender_device, globalsince) .await; - Some(sync_events::v5::response::ToDevice { + Some(assign!(sync_events::v5::response::ToDevice::default(), { next_batch: next_batch.to_string(), events: services .users @@ -1032,12 +1036,12 @@ async fn collect_to_device( .map(at!(1)) .collect() .await, - }) + })) } async fn collect_receipts(_services: &Services) -> sync_events::v5::response::Receipts { - sync_events::v5::response::Receipts { rooms: BTreeMap::new() } // TODO: get explicitly requested read receipts + sync_events::v5::response::Receipts::default() } fn filter_rooms<'a, Rooms>(