From 471eb54c660924f8a29bef6919d5a0d3c290f28d Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 9 Apr 2026 11:39:53 -0400 Subject: [PATCH] refactor: Consolidate hierarchy and summary logic in a new service --- src/api/client/room/mod.rs | 7 +- src/api/client/room/summary.rs | 333 +---------- src/api/client/space.rs | 204 +------ src/api/router.rs | 6 +- src/api/server/hierarchy.rs | 67 +-- src/service/rooms/mod.rs | 4 +- src/service/rooms/spaces/mod.rs | 489 --------------- src/service/rooms/spaces/pagination_token.rs | 76 --- src/service/rooms/spaces/tests.rs | 147 ----- src/service/rooms/state/mod.rs | 10 - src/service/rooms/summary/mod.rs | 590 +++++++++++++++++++ src/service/rooms/timeline/append.rs | 15 +- src/service/rooms/timeline/mod.rs | 2 - src/service/services.rs | 2 +- 14 files changed, 652 insertions(+), 1300 deletions(-) delete mode 100644 src/service/rooms/spaces/mod.rs delete mode 100644 src/service/rooms/spaces/pagination_token.rs delete mode 100644 src/service/rooms/spaces/tests.rs create mode 100644 src/service/rooms/summary/mod.rs diff --git a/src/api/client/room/mod.rs b/src/api/client/room/mod.rs index 86d68f7e0..e249e18b6 100644 --- a/src/api/client/room/mod.rs +++ b/src/api/client/room/mod.rs @@ -6,10 +6,7 @@ mod summary; mod upgrade; pub(crate) use self::{ - aliases::get_room_aliases_route, - create::create_room_route, - event::get_room_event_route, - initial_sync::room_initial_sync_route, - summary::{get_room_summary, get_room_summary_legacy}, + aliases::get_room_aliases_route, create::create_room_route, event::get_room_event_route, + initial_sync::room_initial_sync_route, summary::get_room_summary, upgrade::upgrade_room_route, }; diff --git a/src/api/client/room/summary.rs b/src/api/client/room/summary.rs index 4333d1afa..b6e217b43 100644 --- a/src/api/client/room/summary.rs +++ b/src/api/client/room/summary.rs @@ -1,48 +1,11 @@ use axum::extract::State; use axum_client_ip::ClientIp; -use conduwuit::{ - Err, Result, debug, debug_warn, info, trace, - utils::{IterStream, future::TryExtExt}, -}; -use futures::{ - FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join3}, - stream::FuturesUnordered, -}; -use ruma::{ - OwnedServerName, RoomId, UserId, - api::{ - client::room::get_summary, - federation::space::{SpaceHierarchyParentSummary, get_hierarchy}, - }, - events::room::member::MembershipState, - space::SpaceRoomJoinRule::{self, *}, -}; -use service::Services; +use conduwuit::{Err, Result}; +use ruma::api::client::room::get_summary; +use service::rooms::summary::Accessibility; -use crate::{Ruma, RumaResponse}; +use crate::Ruma; -/// # `GET /_matrix/client/unstable/im.nheko.summary/rooms/{roomIdOrAlias}/summary` -/// -/// Returns a short description of the state of a room. -/// -/// This is the "wrong" endpoint that some implementations/clients may use -/// according to the MSC. Request and response bodies are the same as -/// `get_room_summary`. -/// -/// An implementation of [MSC3266](https://github.com/matrix-org/matrix-spec-proposals/pull/3266) -pub(crate) async fn get_room_summary_legacy( - State(services): State, - ClientIp(client): ClientIp, - body: Ruma, -) -> Result> { - get_room_summary(State(services), ClientIp(client), body) - .boxed() - .await - .map(RumaResponse) -} - -/// # `GET /_matrix/client/unstable/im.nheko.summary/summary/{roomIdOrAlias}` /// # `GET /_matrix/client/v1/room_summary/{roomIdOrAlias}` /// /// Returns a short description of the state of a room. @@ -50,8 +13,8 @@ pub(crate) async fn get_room_summary_legacy( pub(crate) async fn get_room_summary( State(services): State, ClientIp(client): ClientIp, - body: Ruma, -) -> Result { + body: Ruma, +) -> Result { let (room_id, servers) = services .rooms .alias @@ -62,285 +25,19 @@ pub(crate) async fn get_room_summary( return Err!(Request(Forbidden("This room is banned on this homeserver."))); } - room_summary_response(&services, &room_id, &servers, body.sender_user.as_deref()) - .boxed() - .await -} - -async fn room_summary_response( - services: &Services, - room_id: &RoomId, - servers: &[OwnedServerName], - sender_user: Option<&UserId>, -) -> Result { - if services + let summary = services .rooms - .state_cache - .server_in_room(services.globals.server_name(), room_id) - .await - { - match local_room_summary_response(services, room_id, sender_user) - .boxed() - .await - { - | Ok(response) => return Ok(response), - | Err(e) => { - debug_warn!("Failed to get local room summary: {e:?}, falling back to remote"); - }, - } - } - - let room = - remote_room_summary_hierarchy_response(services, room_id, servers, sender_user).await?; - - Ok(get_summary::msc3266::Response { - room_id: room_id.to_owned(), - canonical_alias: room.canonical_alias, - avatar_url: room.avatar_url, - guest_can_join: room.guest_can_join, - name: room.name, - num_joined_members: room.num_joined_members, - topic: room.topic, - world_readable: room.world_readable, - join_rule: room.join_rule, - room_type: room.room_type, - room_version: room.room_version, - encryption: room.encryption, - allowed_room_ids: room.allowed_room_ids, - membership: sender_user.is_some().then_some(MembershipState::Leave), - }) -} - -async fn local_room_summary_response( - services: &Services, - room_id: &RoomId, - sender_user: Option<&UserId>, -) -> Result { - trace!( - sender_user = sender_user.map(tracing::field::display), - "Sending local room summary response for {room_id:?}" - ); - let (join_rule, world_readable, guest_can_join) = join3( - services.rooms.state_accessor.get_join_rules(room_id), - services.rooms.state_accessor.is_world_readable(room_id), - services.rooms.state_accessor.guest_can_join(room_id), - ) - .await; - - // Synapse allows server admins to bypass visibility checks. - // That seems neat so we'll copy that behaviour. - if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await { - user_can_see_summary( - services, - room_id, - &join_rule.clone().into(), - guest_can_join, - world_readable, - join_rule.allowed_rooms(), - sender_user, - ) + .summary + .get_room_summary_for_user(body.sender_user.as_deref(), &room_id, &servers) .await?; - } - let canonical_alias = services - .rooms - .state_accessor - .get_canonical_alias(room_id) - .ok(); - - let name = services.rooms.state_accessor.get_name(room_id).ok(); - - let topic = services.rooms.state_accessor.get_room_topic(room_id).ok(); - - let room_type = services.rooms.state_accessor.get_room_type(room_id).ok(); - - let avatar_url = services - .rooms - .state_accessor - .get_avatar(room_id) - .map(|res| res.into_option().unwrap_or_default().url); - - let room_version = services.rooms.state.get_room_version(room_id).ok(); - - let encryption = services - .rooms - .state_accessor - .get_room_encryption(room_id) - .ok(); - - let num_joined_members = services - .rooms - .state_cache - .room_joined_count(room_id) - .unwrap_or(0); - - let membership: OptionFuture<_> = sender_user - .map(|sender_user| { - services - .rooms - .state_accessor - .get_member(room_id, sender_user) - .map_ok_or(MembershipState::Leave, |content| content.membership) - }) - .into(); - - let ( - canonical_alias, - name, - num_joined_members, - topic, - avatar_url, - room_type, - room_version, - encryption, - membership, - ) = futures::join!( - canonical_alias, - name, - num_joined_members, - topic, - avatar_url, - room_type, - room_version, - encryption, - membership, - ); - - Ok(get_summary::msc3266::Response { - room_id: room_id.to_owned(), - canonical_alias, - avatar_url, - guest_can_join, - name, - num_joined_members: num_joined_members.try_into().unwrap_or_default(), - topic, - world_readable, - room_type, - room_version, - encryption, - membership, - allowed_room_ids: join_rule.allowed_rooms().map(Into::into).collect(), - join_rule: join_rule.into(), - }) -} - -/// used by MSC3266 to fetch a room's info if we do not know about it -async fn remote_room_summary_hierarchy_response( - services: &Services, - room_id: &RoomId, - servers: &[OwnedServerName], - sender_user: Option<&UserId>, -) -> Result { - trace!(sender_user = ?sender_user.map(tracing::field::display), ?servers, "Sending remote room summary response for {room_id:?}"); - if !services.config.allow_federation { - return Err!(Request(Forbidden("Federation is disabled."))); - } - - if services.rooms.metadata.is_disabled(room_id).await { - return Err!(Request(Forbidden( - "Federaton of room {room_id} is currently disabled on this server." - ))); - } - if servers.is_empty() { - return Err!(Request(MissingParam( - "No servers were provided to fetch the room over federation" - ))); - } - - let request = get_hierarchy::v1::Request::new(room_id.to_owned()); - - let mut requests: FuturesUnordered<_> = servers - .iter() - .map(|server| { - info!("Fetching room summary for {room_id} from server {server}"); - services - .sending - .send_federation_request(server, request.clone()) - .inspect_ok(move |v| { - debug!("Fetched room summary for {room_id} from server {server}: {v:?}"); - }) - .inspect_err(move |e| { - info!("Failed to fetch room summary for {room_id} from server {server}: {e}"); - }) - }) - .collect(); - - while let Some(Ok(response)) = requests.next().await { - trace!("{response:?}"); - let room = response.room.clone(); - if room.room_id != room_id { - debug_warn!( - "Room ID {} returned does not belong to the requested room ID {}", - room.room_id, - room_id - ); - continue; - } - - if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await { - return user_can_see_summary( - services, - room_id, - &room.join_rule, - room.guest_can_join, - room.world_readable, - room.allowed_room_ids.iter().map(AsRef::as_ref), - sender_user, - ) - .await - .map(|()| room); - } - return Ok(room); - } - - Err!(Request(NotFound("Room not found or is not accessible"))) -} - -async fn user_can_see_summary<'a, I>( - services: &Services, - room_id: &RoomId, - join_rule: &SpaceRoomJoinRule, - guest_can_join: bool, - world_readable: bool, - allowed_room_ids: I, - sender_user: Option<&UserId>, -) -> Result -where - I: Iterator + Send, -{ - let is_public_room = matches!(join_rule, Public | Knock | KnockRestricted); - match sender_user { - | Some(sender_user) => { - let user_can_see_state_events = services - .rooms - .state_accessor - .user_can_see_state_events(sender_user, room_id); - let is_guest = services.users.is_deactivated(sender_user).unwrap_or(false); - let user_in_allowed_restricted_room = allowed_room_ids - .stream() - .any(|room| services.rooms.state_cache.is_joined(sender_user, room)); - - let (user_can_see_state_events, is_guest, user_in_allowed_restricted_room) = - join3(user_can_see_state_events, is_guest, user_in_allowed_restricted_room) - .boxed() - .await; - - if user_can_see_state_events - || (is_guest && guest_can_join) - || is_public_room - || user_in_allowed_restricted_room - { - return Ok(()); - } - - Err!(Request(Forbidden("Room is not accessible"))) + match summary { + | Accessibility::Accessible(summary) => Ok(get_summary::v1::Response::new(summary)), + | Accessibility::Inaccessible => { + Err!(Request(Forbidden("You may not preview this room."), FORBIDDEN)) }, - | None => { - if is_public_room || world_readable { - return Ok(()); - } - - Err!(Request(Forbidden("Room is not accessible"))) + | Accessibility::NotFound => { + Err!(Request(Forbidden("This room does not exist."), FORBIDDEN)) }, } } diff --git a/src/api/client/space.rs b/src/api/client/space.rs index 20dc2ec7d..53af69b0a 100644 --- a/src/api/client/space.rs +++ b/src/api/client/space.rs @@ -1,26 +1,12 @@ -use std::{ - collections::{BTreeSet, VecDeque}, - str::FromStr, -}; - use axum::extract::State; -use conduwuit::{ - Err, Result, - utils::{future::TryExtExt, stream::IterStream}, -}; -use conduwuit_service::{ - Services, - rooms::spaces::{ - PaginationToken, SummaryAccessibility, get_parent_children_via, summary_to_chunk, - }, -}; -use futures::{StreamExt, TryFutureExt, future::OptionFuture}; -use ruma::{ - OwnedRoomId, OwnedServerName, RoomId, UInt, UserId, api::client::space::get_hierarchy, -}; +use conduwuit::{Err, Result}; +use ruma::{UInt, api::client::space::get_hierarchy, assign}; +use service::rooms::summary::Accessibility; use crate::Ruma; +const MAX_MAX_DEPTH: u32 = 10; + /// # `GET /_matrix/client/v1/rooms/{room_id}/hierarchy` /// /// Paginates over the space tree in a depth-first manner to locate child rooms @@ -29,167 +15,33 @@ pub(crate) async fn get_hierarchy_route( State(services): State, body: Ruma, ) -> Result { - let limit = body - .limit - .unwrap_or_else(|| UInt::from(10_u32)) - .min(UInt::from(100_u32)); + // We don't do pagination for this route (and therefore ignore `limit`), since + // there's no reasonable way to handle a space hierarchy changing during + // pagination. let max_depth = body .max_depth - .unwrap_or_else(|| UInt::from(3_u32)) - .min(UInt::from(10_u32)); + .map(|max_depth| max_depth.min(UInt::from(MAX_MAX_DEPTH))); - let key = body - .from - .as_ref() - .and_then(|s| PaginationToken::from_str(s).ok()); + let hierarchy = services + .rooms + .summary + .get_room_hierarchy_for_user( + body.sender_user(), + body.room_id.clone(), + max_depth, + body.suggested_only, + ) + .await?; - // Should prevent unexpected behaviour in (bad) clients - if let Some(ref token) = key { - if token.suggested_only != body.suggested_only || token.max_depth != max_depth { - return Err!(Request(InvalidParam( - "suggested_only and max_depth cannot change on paginated requests" - ))); - } + match hierarchy { + | Accessibility::Accessible(rooms) => + Ok(assign!(get_hierarchy::v1::Response::new(), { rooms: rooms })), + | Accessibility::Inaccessible => { + Err!(Request(Forbidden("You may not preview this room."), FORBIDDEN)) + }, + | Accessibility::NotFound => { + Err!(Request(Forbidden("This room does not exist."), FORBIDDEN)) + }, } - - get_client_hierarchy( - &services, - body.sender_user(), - &body.room_id, - limit.try_into().unwrap_or(10), - max_depth.try_into().unwrap_or(usize::MAX), - body.suggested_only, - key.as_ref() - .into_iter() - .flat_map(|t| t.short_room_ids.iter()), - ) - .await -} - -async fn get_client_hierarchy<'a, ShortRoomIds>( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - limit: usize, - max_depth: usize, - suggested_only: bool, - short_room_ids: ShortRoomIds, -) -> Result -where - ShortRoomIds: Iterator + Clone + Send + Sync + 'a, -{ - type Via = Vec; - type Entry = (OwnedRoomId, Via); - type Rooms = VecDeque; - - let mut queue: Rooms = [( - room_id.to_owned(), - room_id - .server_name() - .map(ToOwned::to_owned) - .into_iter() - .collect(), - )] - .into(); - - let mut rooms = Vec::with_capacity(limit); - let mut parents = BTreeSet::new(); - while let Some((current_room, via)) = queue.pop_front() { - let summary = services - .rooms - .spaces - .get_summary_and_children_client(¤t_room, suggested_only, sender_user, &via) - .await?; - - match (summary, current_room == room_id) { - | (None | Some(SummaryAccessibility::Inaccessible), false) => { - // Just ignore other unavailable rooms - }, - | (None, true) => { - return Err!(Request(Forbidden("The requested room was not found"))); - }, - | (Some(SummaryAccessibility::Inaccessible), true) => { - return Err!(Request(Forbidden("The requested room is inaccessible"))); - }, - | (Some(SummaryAccessibility::Accessible(summary)), _) => { - let populate = parents.len() >= short_room_ids.clone().count(); - - let mut children: Vec = get_parent_children_via(&summary, suggested_only) - .filter(|(room, _)| !parents.contains(room)) - .rev() - .map(|(key, val)| (key, val.collect())) - .collect(); - - if populate { - rooms.push(summary_to_chunk(summary.clone())); - } else { - children = children - .iter() - .rev() - .stream() - .skip_while(|(room, _)| { - services - .rooms - .short - .get_shortroomid(room) - .map_ok(|short| { - Some(&short) != short_room_ids.clone().nth(parents.len()) - }) - .unwrap_or_else(|_| false) - }) - .map(Clone::clone) - .collect::>() - .await - .into_iter() - .rev() - .collect(); - } - - if !populate && queue.is_empty() && children.is_empty() { - break; - } - - parents.insert(current_room.clone()); - if rooms.len() >= limit { - break; - } - - if parents.len() > max_depth { - continue; - } - - queue.extend(children); - }, - } - } - - let next_batch: OptionFuture<_> = queue - .pop_front() - .map(|(room, _)| async move { - parents.insert(room); - - let next_short_room_ids: Vec<_> = parents - .iter() - .stream() - .filter_map(|room_id| services.rooms.short.get_shortroomid(room_id).ok()) - .collect() - .await; - - (next_short_room_ids.iter().ne(short_room_ids) && !next_short_room_ids.is_empty()) - .then_some(PaginationToken { - short_room_ids: next_short_room_ids, - limit: limit.try_into().ok()?, - max_depth: max_depth.try_into().ok()?, - suggested_only, - }) - .as_ref() - .map(PaginationToken::to_string) - }) - .into(); - - Ok(get_hierarchy::v1::Response { - next_batch: next_batch.await.flatten(), - rooms, - }) } diff --git a/src/api/router.rs b/src/api/router.rs index c8d5c8029..a5161a150 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -184,18 +184,14 @@ pub fn build(router: Router, server: &Server) -> Router { .ruma_route(&client::get_hierarchy_route) .ruma_route(&client::get_mutual_rooms_route) .ruma_route(&client::get_room_summary) - .route( - "/_matrix/client/unstable/im.nheko.summary/rooms/{room_id_or_alias}/summary", - get(client::get_room_summary_legacy) - ) .ruma_route(&client::get_suspended_status) .ruma_route(&client::put_suspended_status) .ruma_route(&client::well_known_support) .ruma_route(&client::well_known_client) .ruma_route(&client::get_rtc_transports) + .ruma_route(&client::room_initial_sync_route) .route("/_conduwuit/server_version", get(client::conduwuit_server_version)) .route("/_continuwuity/server_version", get(client::conduwuit_server_version)) - .ruma_route(&client::room_initial_sync_route) .route("/client/server.json", get(client::syncv3_client_server_json)) .ruma_route(&admin::rooms::ban::ban_room) .ruma_route(&admin::rooms::list::list_rooms); diff --git a/src/api/server/hierarchy.rs b/src/api/server/hierarchy.rs index f67e1b768..d83c0979d 100644 --- a/src/api/server/hierarchy.rs +++ b/src/api/server/hierarchy.rs @@ -1,13 +1,7 @@ use axum::extract::State; -use conduwuit::{ - Err, Result, info, - utils::stream::{BroadbandExt, IterStream}, -}; -use conduwuit_service::rooms::spaces::{ - Identifier, SummaryAccessibility, get_parent_children_via, -}; -use futures::{FutureExt, StreamExt}; +use conduwuit::{Err, Result, info}; use ruma::api::federation::space::get_hierarchy; +use service::rooms::summary::Accessibility; use crate::Ruma; @@ -19,10 +13,6 @@ pub(crate) async fn get_hierarchy_route( State(services): State, body: Ruma, ) -> Result { - if !services.rooms.metadata.exists(&body.room_id).await { - return Err!(Request(NotFound("Room does not exist."))); - } - if !services .rooms .state_cache @@ -36,52 +26,15 @@ pub(crate) async fn get_hierarchy_route( return Err!(Request(NotFound("This server is not participating in that room."))); } - let room_id = &body.room_id; - let suggested_only = body.suggested_only; - let ref identifier = Identifier::ServerName(body.origin()); - match services + let response = services .rooms - .spaces - .get_summary_and_children_local(room_id, identifier) - .await? - { - | None => Err!(Request(NotFound("The requested room was not found"))), + .summary + .get_local_room_summary_for_server(body.origin(), &body.room_id, body.suggested_only) + .await; - | Some(SummaryAccessibility::Inaccessible) => { - Err!(Request(NotFound("The requested room is inaccessible"))) - }, - - | Some(SummaryAccessibility::Accessible(room)) => { - let (children, inaccessible_children) = - get_parent_children_via(&room, suggested_only) - .stream() - .broad_filter_map(|(child, _via)| async move { - match services - .rooms - .spaces - .get_summary_and_children_local(&child, identifier) - .await - .ok()? - { - | None => None, - - | Some(SummaryAccessibility::Inaccessible) => - Some((None, Some(child))), - - | Some(SummaryAccessibility::Accessible(summary)) => - Some((Some(summary), None)), - } - }) - .unzip() - .map(|(children, inaccessible_children): (Vec<_>, Vec<_>)| { - ( - children.into_iter().flatten().map(Into::into).collect(), - inaccessible_children.into_iter().flatten().collect(), - ) - }) - .await; - - Ok(get_hierarchy::v1::Response { room, children, inaccessible_children }) - }, + if let Accessibility::Accessible(response) = response { + Ok(response) + } else { + Err!(Request(NotFound("This room is not accessible."))) } } diff --git a/src/service/rooms/mod.rs b/src/service/rooms/mod.rs index 44a83582d..48dd73983 100644 --- a/src/service/rooms/mod.rs +++ b/src/service/rooms/mod.rs @@ -9,11 +9,11 @@ pub mod pdu_metadata; pub mod read_receipt; pub mod search; pub mod short; -pub mod spaces; pub mod state; pub mod state_accessor; pub mod state_cache; pub mod state_compressor; +pub mod summary; pub mod threads; pub mod timeline; pub mod typing; @@ -33,11 +33,11 @@ pub struct Service { pub read_receipt: Arc, pub search: Arc, pub short: Arc, - pub spaces: Arc, pub state: Arc, pub state_accessor: Arc, pub state_cache: Arc, pub state_compressor: Arc, + pub summary: Arc, pub threads: Arc, pub timeline: Arc, pub typing: Arc, diff --git a/src/service/rooms/spaces/mod.rs b/src/service/rooms/spaces/mod.rs deleted file mode 100644 index 7bcd6f455..000000000 --- a/src/service/rooms/spaces/mod.rs +++ /dev/null @@ -1,489 +0,0 @@ -mod pagination_token; -#[cfg(test)] -mod tests; - -use std::{fmt::Write, sync::Arc}; - -use async_trait::async_trait; -use conduwuit_core::{ - Err, Error, Event, PduEvent, Result, implement, - utils::{ - IterStream, - future::{BoolExt, TryExtExt}, - math::usize_from_f64, - stream::{BroadbandExt, ReadyExt}, - }, -}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, pin_mut, stream::FuturesUnordered}; -use lru_cache::LruCache; -use ruma::{ - OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, ServerName, UserId, - api::{ - client::space::SpaceHierarchyRoomsChunk, - federation::{self, space::SpaceHierarchyParentSummary}, - }, - events::{ - StateEventType, - space::child::{HierarchySpaceChildEvent, SpaceChildEventContent}, - }, - room::{JoinRuleSummary, RoomSummary}, - serde::Raw, -}; -use tokio::sync::{Mutex, MutexGuard}; - -pub use self::pagination_token::PaginationToken; -use crate::{Dep, rooms, sending}; - -pub struct Service { - services: Services, - pub roomid_spacehierarchy_cache: Mutex, -} - -struct Services { - state_accessor: Dep, - state_cache: Dep, - state: Dep, - event_handler: Dep, - timeline: Dep, - sending: Dep, -} - -pub struct CachedSpaceHierarchySummary { - summary: SpaceHierarchyParentSummary, -} - -#[allow(clippy::large_enum_variant)] -pub enum SummaryAccessibility { - Accessible(SpaceHierarchyParentSummary), - Inaccessible, -} - -/// Identifier used to check if rooms are accessible. None is used if you want -/// to return the room, no matter if accessible or not -pub enum Identifier<'a> { - UserId(&'a UserId), - ServerName(&'a ServerName), -} - -type Cache = LruCache>; - -#[async_trait] -impl crate::Service for Service { - fn build(args: crate::Args<'_>) -> Result> { - let config = &args.server.config; - let cache_size = f64::from(config.roomid_spacehierarchy_cache_capacity); - let cache_size = cache_size * config.cache_capacity_modifier; - Ok(Arc::new(Self { - services: Services { - state_accessor: args - .depend::("rooms::state_accessor"), - state_cache: args.depend::("rooms::state_cache"), - state: args.depend::("rooms::state"), - event_handler: args - .depend::("rooms::event_handler"), - timeline: args.depend::("rooms::timeline"), - sending: args.depend::("sending"), - }, - roomid_spacehierarchy_cache: Mutex::new(LruCache::new(usize_from_f64(cache_size)?)), - })) - } - - async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { - let roomid_spacehierarchy_cache = self.roomid_spacehierarchy_cache.lock().await.len(); - - writeln!(out, "roomid_spacehierarchy_cache: {roomid_spacehierarchy_cache}")?; - - Ok(()) - } - - async fn clear_cache(&self) { self.roomid_spacehierarchy_cache.lock().await.clear(); } - - fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } -} - -/// Gets the summary of a space using solely local information -#[implement(Service)] -pub async fn get_summary_and_children_local( - &self, - current_room: &RoomId, - identifier: &Identifier<'_>, -) -> Result> { - match self - .roomid_spacehierarchy_cache - .lock() - .await - .get_mut(current_room) - .as_ref() - { - | None => (), // cache miss - | Some(None) => return Ok(None), - | Some(Some(cached)) => { - let is_accessible_child = self.is_accessible_child( - current_room, - &cached.summary.summary.join_rule, - identifier, - ); - - let accessibility = if is_accessible_child.await { - SummaryAccessibility::Accessible(cached.summary.clone()) - } else { - SummaryAccessibility::Inaccessible - }; - - return Ok(Some(accessibility)); - }, - } - - let children_pdus: Vec<_> = self - .get_space_child_events(current_room) - .map(Event::into_format) - .collect() - .await; - - let Ok(summary) = self - .get_room_summary(current_room, children_pdus, identifier) - .boxed() - .await - else { - return Ok(None); - }; - - self.roomid_spacehierarchy_cache.lock().await.insert( - current_room.to_owned(), - Some(CachedSpaceHierarchySummary { summary: summary.clone() }), - ); - - Ok(Some(SummaryAccessibility::Accessible(summary))) -} - -/// Gets the summary of a space using solely federation -#[implement(Service)] -#[tracing::instrument(level = "debug", skip(self))] -async fn get_summary_and_children_federation( - &self, - current_room: &RoomId, - suggested_only: bool, - user_id: &UserId, - via: &[OwnedServerName], -) -> Result> { - let mut request = federation::space::get_hierarchy::v1::Request::new(current_room.to_owned()); - request.suggested_only = suggested_only; - - let mut requests: FuturesUnordered<_> = via - .iter() - .map(|server| { - self.services - .sending - .send_federation_request(server, request.clone()) - }) - .collect(); - - let Some(Ok(response)) = requests.next().await else { - self.roomid_spacehierarchy_cache - .lock() - .await - .insert(current_room.to_owned(), None); - - return Ok(None); - }; - - let summary = response.room; - self.roomid_spacehierarchy_cache.lock().await.insert( - current_room.to_owned(), - Some(CachedSpaceHierarchySummary { summary: summary.clone() }), - ); - - response - .children - .into_iter() - .stream() - .then(|child| { - self.roomid_spacehierarchy_cache - .lock() - .map(|lock| (child, lock)) - }) - .ready_filter_map(|(child, mut cache)| { - (!cache.contains_key(current_room)).then_some((child, cache)) - }) - .for_each(|(summary, cache)| self.cache_insert(cache, current_room, summary)) - .await; - - let identifier = Identifier::UserId(user_id); - - let is_accessible_child = self - .is_accessible_child(current_room, &summary.summary.join_rule, &identifier) - .await; - - let accessibility = if is_accessible_child { - SummaryAccessibility::Accessible(summary) - } else { - SummaryAccessibility::Inaccessible - }; - - Ok(Some(accessibility)) -} - -/// Simply returns the stripped m.space.child events of a room -#[implement(Service)] -fn get_space_child_events<'a>( - &'a self, - room_id: &'a RoomId, -) -> impl Stream + Send + 'a { - self.services - .state - .get_room_shortstatehash(room_id) - .map_ok(|current_shortstatehash| { - self.services - .state_accessor - .state_keys_with_ids(current_shortstatehash, &StateEventType::SpaceChild) - .boxed() - }) - .map(Result::into_iter) - .map(IterStream::stream) - .map(StreamExt::flatten) - .flatten_stream() - .broad_filter_map(move |(state_key, event_id): (_, OwnedEventId)| async move { - self.services - .timeline - .get_pdu(&event_id) - .map_ok(move |pdu| (state_key, pdu)) - .ok() - .await - }) - .ready_filter_map(move |(state_key, pdu)| { - if let Ok(content) = pdu.get_content::() { - if content.via.is_empty() { - return None; - } - } else { - return None; - } - - if RoomId::parse(&state_key).is_err() { - return None; - } - - Some(pdu) - }) -} - -/// Gets the summary of a space using either local or remote (federation) -/// sources -#[implement(Service)] -pub async fn get_summary_and_children_client( - &self, - current_room: &OwnedRoomId, - suggested_only: bool, - user_id: &UserId, - via: &[OwnedServerName], -) -> Result> { - let identifier = Identifier::UserId(user_id); - - if let Ok(Some(response)) = self - .get_summary_and_children_local(current_room, &identifier) - .await - { - return Ok(Some(response)); - } - - self.get_summary_and_children_federation(current_room, suggested_only, user_id, via) - .await -} - -#[implement(Service)] -async fn get_room_summary( - &self, - room_id: &RoomId, - children_state: Vec>, - identifier: &Identifier<'_>, -) -> Result { - let join_rule = self.services.state_accessor.get_join_rules(room_id).await; - - let is_accessible_child = self - .is_accessible_child(room_id, &join_rule.clone().into(), identifier) - .await; - - if !is_accessible_child { - return Err!(Request(Forbidden("User is not allowed to see the room"))); - } - - let name = self.services.state_accessor.get_name(room_id).ok(); - - let topic = self.services.state_accessor.get_room_topic(room_id).ok(); - - let room_type = self.services.state_accessor.get_room_type(room_id).ok(); - - let world_readable = self.services.state_accessor.is_world_readable(room_id); - - let guest_can_join = self.services.state_accessor.guest_can_join(room_id); - - let num_joined_members = self - .services - .state_cache - .room_joined_count(room_id) - .unwrap_or(0); - - let canonical_alias = self - .services - .state_accessor - .get_canonical_alias(room_id) - .ok(); - - let avatar_url = self - .services - .state_accessor - .get_avatar(room_id) - .map(|res| res.into_option().unwrap_or_default().url); - - let room_version = self.services.state.get_room_version(room_id).ok(); - - let encryption = self - .services - .state_accessor - .get_room_encryption(room_id) - .ok(); - - let ( - canonical_alias, - name, - num_joined_members, - topic, - world_readable, - guest_can_join, - avatar_url, - room_type, - room_version, - encryption, - ) = futures::join!( - canonical_alias, - name, - num_joined_members, - topic, - world_readable, - guest_can_join, - avatar_url, - room_type, - room_version, - encryption, - ); - - let mut summary = RoomSummary::new( - room_id.to_owned(), - join_rule.clone().into(), - guest_can_join, - num_joined_members.try_into().unwrap_or_default(), - world_readable, - ); - summary.canonical_alias = canonical_alias; - summary.name = name; - summary.topic = topic; - summary.avatar_url = avatar_url; - summary.encryption = encryption; - summary.room_type = room_type; - summary.room_version = room_version; - - let summary = SpaceHierarchyParentSummary::new(summary, children_state); - - Ok(summary) -} - -/// With the given identifier, checks if a room is accessible -#[implement(Service)] -async fn is_accessible_child( - &self, - current_room: &RoomId, - join_rule: &JoinRuleSummary, - identifier: &Identifier<'_>, -) -> bool { - if let Identifier::ServerName(server_name) = identifier { - // Checks if ACLs allow for the server to participate - if self - .services - .event_handler - .acl_check(server_name, current_room) - .await - .is_err() - { - return false; - } - } - - if let Identifier::UserId(user_id) = identifier { - let is_joined = self.services.state_cache.is_joined(user_id, current_room); - - let is_invited = self.services.state_cache.is_invited(user_id, current_room); - - pin_mut!(is_joined, is_invited); - if is_joined.or(is_invited).await { - return true; - } - } - - match join_rule { - | JoinRuleSummary::Public - | JoinRuleSummary::Knock - | JoinRuleSummary::KnockRestricted(_) => true, - | JoinRuleSummary::Restricted(restricted_summary) => - (&restricted_summary.allowed_room_ids) - .stream() - .any(async |room| match identifier { - | Identifier::UserId(user) => - self.services.state_cache.is_joined(user, room).await, - | Identifier::ServerName(server) => - self.services.state_cache.server_in_room(server, room).await, - }) - .await, - | _ => false, - } -} - -/// Returns the children of a SpaceHierarchyParentSummary, making use of the -/// children_state field -pub fn get_parent_children_via( - parent: &SpaceHierarchyParentSummary, - suggested_only: bool, -) -> impl DoubleEndedIterator + use<>)> -+ Send -+ '_ { - parent - .children_state - .iter() - .map(Raw::deserialize) - .filter_map(Result::ok) - .filter_map(move |ce| { - (!suggested_only || ce.content.suggested) - .then_some((ce.state_key, ce.content.via.into_iter())) - }) -} - -#[implement(Service)] -async fn cache_insert( - &self, - mut cache: MutexGuard<'_, Cache>, - current_room: &RoomId, - summary: RoomSummary, -) { - let children_state = self - .get_space_child_events(&summary.room_id) - .map(Event::into_format) - .collect() - .await; - let summary = SpaceHierarchyParentSummary::new(summary, children_state); - - cache.insert(current_room.to_owned(), Some(CachedSpaceHierarchySummary { summary })); -} - -// Here because cannot implement `From` across ruma-federation-api and -// ruma-client-api types -impl From for SpaceHierarchyRoomsChunk { - fn from(value: CachedSpaceHierarchySummary) -> Self { - Self::new(value.summary.summary, value.summary.children_state) - } -} - -/// Here because cannot implement `From` across ruma-federation-api and -/// ruma-client-api types -#[must_use] -pub fn summary_to_chunk(summary: SpaceHierarchyParentSummary) -> SpaceHierarchyRoomsChunk { - SpaceHierarchyRoomsChunk::new(summary.summary, summary.children_state) -} diff --git a/src/service/rooms/spaces/pagination_token.rs b/src/service/rooms/spaces/pagination_token.rs deleted file mode 100644 index 00475109a..000000000 --- a/src/service/rooms/spaces/pagination_token.rs +++ /dev/null @@ -1,76 +0,0 @@ -use std::{ - fmt::{Display, Formatter}, - str::FromStr, -}; - -use conduwuit::{Error, Result}; -use ruma::{UInt, api::error::ErrorKind}; - -use crate::rooms::short::ShortRoomId; - -// TODO: perhaps use some better form of token rather than just room count -#[derive(Debug, Eq, PartialEq)] -pub struct PaginationToken { - /// Path down the hierarchy of the room to start the response at, - /// excluding the root space. - pub short_room_ids: Vec, - pub limit: UInt, - pub max_depth: UInt, - pub suggested_only: bool, -} - -impl FromStr for PaginationToken { - type Err = Error; - - fn from_str(value: &str) -> Result { - let mut values = value.split('_'); - let mut pag_tok = || { - let short_room_ids = values - .next()? - .split(',') - .filter_map(|room_s| u64::from_str(room_s).ok()) - .collect(); - - let limit = UInt::from_str(values.next()?).ok()?; - let max_depth = UInt::from_str(values.next()?).ok()?; - let slice = values.next()?; - let suggested_only = if values.next().is_none() { - if slice == "true" { - true - } else if slice == "false" { - false - } else { - None? - } - } else { - None? - }; - - Some(Self { - short_room_ids, - limit, - max_depth, - suggested_only, - }) - }; - - if let Some(token) = pag_tok() { - Ok(token) - } else { - Err(Error::BadRequest(ErrorKind::InvalidParam, "invalid token")) - } - } -} - -impl Display for PaginationToken { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let short_room_ids = self - .short_room_ids - .iter() - .map(ToString::to_string) - .collect::>() - .join(","); - - write!(f, "{short_room_ids}_{}_{}_{}", self.limit, self.max_depth, self.suggested_only) - } -} diff --git a/src/service/rooms/spaces/tests.rs b/src/service/rooms/spaces/tests.rs deleted file mode 100644 index 42cc7aab5..000000000 --- a/src/service/rooms/spaces/tests.rs +++ /dev/null @@ -1,147 +0,0 @@ -use std::str::FromStr; - -use ruma::{ - UInt, - api::federation::space::SpaceHierarchyParentSummary, - owned_room_id, owned_server_name, - room::{JoinRuleSummary, RoomSummary}, -}; - -use crate::rooms::spaces::{PaginationToken, get_parent_children_via}; - -#[test] -fn get_summary_children() { - let summary = SpaceHierarchyParentSummary::new( - RoomSummary::new( - owned_room_id!("!root:example.org"), - JoinRuleSummary::Public, - true, - UInt::from(1_u32), - true, - ), - vec![ - serde_json::from_str( - r#"{ - "content": { - "via": [ - "example.org" - ], - "suggested": false - }, - "origin_server_ts": 1629413349153, - "sender": "@alice:example.org", - "state_key": "!foo:example.org", - "type": "m.space.child" - }"#, - ) - .unwrap(), - serde_json::from_str( - r#"{ - "content": { - "via": [ - "example.org" - ], - "suggested": true - }, - "origin_server_ts": 1629413349157, - "sender": "@alice:example.org", - "state_key": "!bar:example.org", - "type": "m.space.child" - }"#, - ) - .unwrap(), - serde_json::from_str( - r#"{ - "content": { - "via": [ - "example.org" - ] - }, - "origin_server_ts": 1629413349160, - "sender": "@alice:example.org", - "state_key": "!baz:example.org", - "type": "m.space.child" - }"#, - ) - .unwrap(), - ], - ); - - assert_eq!( - get_parent_children_via(&summary, false) - .map(|(k, v)| (k, v.collect::>())) - .collect::>(), - vec![ - (owned_room_id!("!foo:example.org"), vec![owned_server_name!("example.org")]), - (owned_room_id!("!bar:example.org"), vec![owned_server_name!("example.org")]), - (owned_room_id!("!baz:example.org"), vec![owned_server_name!("example.org")]) - ] - ); - assert_eq!( - get_parent_children_via(&summary, true) - .map(|(k, v)| (k, v.collect::>())) - .collect::>(), - vec![(owned_room_id!("!bar:example.org"), vec![owned_server_name!("example.org")])] - ); -} - -#[test] -fn invalid_pagination_tokens() { - fn token_is_err(token: &str) { PaginationToken::from_str(token).unwrap_err(); } - - token_is_err("231_2_noabool"); - token_is_err(""); - token_is_err("111_3_"); - token_is_err("foo_not_int"); - token_is_err("11_4_true_"); - token_is_err("___"); - token_is_err("__false"); -} - -#[test] -fn valid_pagination_tokens() { - assert_eq!( - PaginationToken { - short_room_ids: vec![5383, 42934, 283, 423], - limit: UInt::from(20_u32), - max_depth: UInt::from(1_u32), - suggested_only: true - }, - PaginationToken::from_str("5383,42934,283,423_20_1_true").unwrap() - ); - - assert_eq!( - PaginationToken { - short_room_ids: vec![740], - limit: UInt::from(97_u32), - max_depth: UInt::from(10539_u32), - suggested_only: false - }, - PaginationToken::from_str("740_97_10539_false").unwrap() - ); -} - -#[test] -fn pagination_token_to_string() { - assert_eq!( - PaginationToken { - short_room_ids: vec![740], - limit: UInt::from(97_u32), - max_depth: UInt::from(10539_u32), - suggested_only: false - } - .to_string(), - "740_97_10539_false" - ); - - assert_eq!( - PaginationToken { - short_room_ids: vec![9, 34], - limit: UInt::from(3_u32), - max_depth: UInt::from(1_u32), - suggested_only: true - } - .to_string(), - "9,34_3_1_true" - ); -} diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index e4ca06688..d89ca5604 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -43,7 +43,6 @@ pub struct Service { struct Services { globals: Dep, short: Dep, - spaces: Dep, state_cache: Dep, state_accessor: Dep, state_compressor: Dep, @@ -67,7 +66,6 @@ impl crate::Service for Service { services: Services { globals: args.depend::("globals"), short: args.depend::("rooms::short"), - spaces: args.depend::("rooms::spaces"), state_cache: args.depend::("rooms::state_cache"), state_accessor: args .depend::("rooms::state_accessor"), @@ -132,14 +130,6 @@ impl Service { .update_membership(room_id, &user_id, &pdu, false) .await?; }, - | TimelineEventType::SpaceChild => { - self.services - .spaces - .roomid_spacehierarchy_cache - .lock() - .await - .remove(room_id); - }, | _ => continue, } } diff --git a/src/service/rooms/summary/mod.rs b/src/service/rooms/summary/mod.rs new file mode 100644 index 000000000..0df680336 --- /dev/null +++ b/src/service/rooms/summary/mod.rs @@ -0,0 +1,590 @@ +use std::{collections::HashSet, sync::Arc}; + +use conduwuit::{ + Err, Event, Result, info, + utils::{IterStream, ReadyExt, TryFutureExtExt, stream::BroadbandExt}, + warn, +}; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use ruma::{ + OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt, UserId, + api::{ + client::space::SpaceHierarchyRoomsChunk, + federation::space::{SpaceHierarchyParentSummary, get_hierarchy}, + }, + assign, + events::{ + StateEventType, + space::child::{HierarchySpaceChildEvent, SpaceChildEventContent}, + }, + room::{JoinRuleSummary, RestrictedSummary, RoomSummary}, + serde::Raw, +}; + +use crate::{Dep, rooms, sending}; + +pub struct Service { + services: Services, +} + +struct Services { + event_handler: Dep, + metadata: Dep, + sending: Dep, + state: Dep, + state_accessor: Dep, + state_cache: Dep, + timeline: Dep, +} + +pub enum Accessibility { + Accessible(T), + Inaccessible, + NotFound, +} + +struct SpaceSummaryAndChildren { + /// The summary of the space. + summary: SpaceHierarchyRoomsChunk, + /// All child rooms of the space. + children: Vec, + /// Child rooms of the space which are not accessible to the local server. + inaccessible_children: Vec, +} + +struct SpaceChild { + room_id: OwnedRoomId, + via: Vec, +} + +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { + services: Services { + event_handler: args + .depend::("rooms::event_handler"), + metadata: args.depend::("rooms::metadata"), + sending: args.depend::("sending"), + state: args.depend::("rooms::state"), + state_accessor: args + .depend::("rooms::state_accessor"), + state_cache: args.depend::("rooms::state_cache"), + timeline: args.depend::("rooms::timeline"), + }, + })) + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} + +impl Service { + /// Summarize a room for a local user, possibly by querying over federation + /// if we don't have the room locally. + pub async fn get_room_summary_for_user( + &self, + querying_user: Option<&UserId>, + room_id: &RoomId, + via: &[OwnedServerName], + ) -> Result> { + let summary = { + if let Some(summary) = self.build_local_room_summary(room_id).await { + // We have this room locally. + + summary + } else if let Some((SpaceHierarchyParentSummary { summary, .. }, _)) = + self.fetch_remote_summary(room_id, via, false).await? + { + // A via has this room. + + summary + } else { + // We don't have this room and none of the vias have it either. + + return Ok(Accessibility::NotFound); + } + }; + + // Check if the room is visible to the querying user. + if !self.user_may_see_summary(querying_user, &summary).await { + return Ok(Accessibility::Inaccessible); + } + + Ok(Accessibility::Accessible(summary)) + } + + /// Fetch information about a room and its children, possibly by querying + /// over federation if we don't have the room locally. + /// + /// This is similar to [`Self::get_room_summary_for_user`] but includes + /// additional data which is needed to traverse the room hierarchy. + async fn get_room_summary_and_children_for_user( + &self, + querying_user: Option<&UserId>, + room_id: &RoomId, + via: Option<&[OwnedServerName]>, + suggested_only: bool, + ) -> Result> { + let (summary, inaccessible_children) = { + if let Some(summary) = self.build_local_room_summary(room_id).await { + // We have this room locally. + let children_state = self.get_space_child_events(room_id).await; + + // All of the room's children are accessible to this server (because we have the + // full room and its state), although some of them may not be accessible to + // the querying user. + (SpaceHierarchyRoomsChunk::new(summary, children_state), vec![]) + } else if let Some(via) = via + && let Some(( + SpaceHierarchyParentSummary { summary, children_state, .. }, + inaccessible_children, + )) = self + .fetch_remote_summary(room_id, via, suggested_only) + .await? + { + // A via has this room. + + (SpaceHierarchyRoomsChunk::new(summary, children_state), inaccessible_children) + } else { + // We don't have this room and none of the vias have it either. + + return Ok(Accessibility::NotFound); + } + }; + + // Check if the room is visible to the querying user. + if !self + .user_may_see_summary(querying_user, &summary.summary) + .await + { + return Ok(Accessibility::Inaccessible); + } + + let children = summary + .children_state + .iter() + // Ignore deserialization failures + .flat_map(Raw::deserialize) + // Filter out non-suggested children if suggested_only is set + .filter(|child| !suggested_only || child.content.suggested) + .map(|child| SpaceChild { room_id: child.state_key, via: child.content.via }) + .collect(); + + Ok(Accessibility::Accessible(SpaceSummaryAndChildren { + summary, + children, + inaccessible_children, + })) + } + + /// Summarize a room and its children for a local user, possibly by querying + /// over federation if we don't have the space locally. + pub async fn get_room_hierarchy_for_user( + &self, + querying_user: &UserId, + room_id: OwnedRoomId, + max_depth: Option, + suggested_only: bool, + ) -> Result>> { + // This function traverses the space hierarchy tree depth-first as required by + // the specification. + + // Check accessibility of the root room first, because we need to error out + // if it isn't accessible. + // TODO refactor this once the Try trait is stable + let root_summary = match self + .get_room_summary_and_children_for_user( + Some(querying_user), + &room_id, + // Clients can't specify vias for the root room + None, + suggested_only, + ) + .await? + { + | Accessibility::Accessible(root_summary) => root_summary, + | Accessibility::Inaccessible => return Ok(Accessibility::Inaccessible), + | Accessibility::NotFound => return Ok(Accessibility::NotFound), + }; + + let mut queue = vec![root_summary.children]; + let mut summaries = vec![root_summary.summary]; + let mut inaccessible_children: HashSet<_> = + root_summary.inaccessible_children.into_iter().collect(); + + // TODO refactor this with Vec::peek_mut once it's stabilized + loop { + let Some(layer) = queue.last_mut() else { + break; + }; + + let Some(SpaceChild { room_id, via }) = layer.pop() else { + // If this layer is empty, discard it from the queue and continue + queue.pop(); + continue; + }; + + // Do not request rooms which have been determined to be inaccessible + if inaccessible_children.contains(&room_id) { + continue; + } + + let summary = match self + .get_room_summary_and_children_for_user( + Some(querying_user), + &room_id, + Some(&via), + suggested_only, + ) + .await + { + | Ok(Accessibility::Accessible(summary)) => summary, + | Ok(Accessibility::Inaccessible) => { + // Mark this room as inaccessible and skip it + inaccessible_children.insert(room_id); + continue; + }, + | Ok(Accessibility::NotFound) => { + // Skip children which we can't find + continue; + }, + | Err(_) => { + // Skip children which we failed to fetch over federation + continue; + }, + }; + + summaries.push(summary.summary); + inaccessible_children.extend(summary.inaccessible_children.into_iter()); + + // Don't traverse the tree deeper than max_depth + #[allow( + clippy::as_conversions, + clippy::arithmetic_side_effects, + reason = "queue.len() should never be large enough to cause strange behavior \ + here" + )] + if max_depth.is_some_and(|max_depth| (queue.len() as u64 + 1) > max_depth.into()) { + continue; + } + + // Add accessible children as a new layer + if !summary.children.is_empty() { + queue.push(summary.children); + } + } + + Ok(Accessibility::Accessible(summaries)) + } + + /// Summarize a _local_ room and its children for a remote server. + pub async fn get_local_room_summary_for_server( + &self, + querying_server: &ServerName, + room_id: &RoomId, + suggested_only: bool, + ) -> Accessibility { + let Some(summary) = self.build_local_room_summary(room_id).await else { + return Accessibility::NotFound; + }; + + // Check if the server can see the root room's summary + if !self.server_may_see_summary(querying_server, &summary).await { + return Accessibility::Inaccessible; + } + + let children_state = self.get_space_child_events(room_id).await; + + let (accessible_children, inaccessible_children) = children_state + .iter() + // Ignore deserialization failures + .flat_map(Raw::deserialize) + // Filter out non-suggested children if suggested_only is set + .filter(|child| !suggested_only || child.content.suggested) + // Fetch summaries for the children in parallel + .stream() + .broad_then(async |child| { + let summary = { + if let Some(summary) = self.build_local_room_summary(&child.state_key).await { + if self.server_may_see_summary(querying_server, &summary).await { + Accessibility::Accessible(summary) + } else { + Accessibility::Inaccessible + } + } else { + Accessibility::NotFound + } + }; + + (child.state_key, summary) + }) + // Sort the children into two Vecs by accessibility + .ready_fold_default(|(mut accessible_children, mut inaccessible_children): (Vec<_>, Vec<_>), (room_id, summary)| { + match summary { + Accessibility::Accessible(summary) => { + accessible_children.push(summary); + }, + Accessibility::Inaccessible => { + inaccessible_children.push(room_id); + }, + Accessibility::NotFound => { + // Skip inaccessible children + } + } + + (accessible_children, inaccessible_children) + }) + .await; + + Accessibility::Accessible(assign!( + get_hierarchy::v1::Response::new(SpaceHierarchyParentSummary::new(summary, children_state)), + { children: accessible_children, inaccessible_children: inaccessible_children } + )) + } + + /// Prepare a summary of a room known to this server. + async fn build_local_room_summary(&self, room_id: &RoomId) -> Option { + // If we can't find a version for this room, it doesn't exist. + let room_version = self.services.state.get_room_version(room_id).await.ok()?; + + let ( + join_rule, + guest_can_join, + num_joined_members, + world_readable, + canonical_alias, + name, + topic, + avatar_url, + room_type, + encryption, + ) = async { + futures::join!( + self.services.state_accessor.get_join_rules(room_id), + self.services.state_accessor.guest_can_join(room_id), + self.services + .state_cache + .room_joined_count(room_id) + .unwrap_or(0), + self.services.state_accessor.is_world_readable(room_id), + self.services + .state_accessor + .get_canonical_alias(room_id) + .ok(), + self.services.state_accessor.get_name(room_id).ok(), + self.services.state_accessor.get_room_topic(room_id).ok(), + self.services + .state_accessor + .get_avatar(room_id) + .map(|res| res.into_option().unwrap_or_default().url), + self.services.state_accessor.get_room_type(room_id).ok(), + self.services + .state_accessor + .get_room_encryption(room_id) + .ok(), + ) + } + .boxed() + .await; + + let summary = assign!( + RoomSummary::new( + room_id.to_owned(), + join_rule.into(), + guest_can_join, + num_joined_members.try_into().expect("number of joined members should fit into a UInt"), + world_readable, + ), + { + canonical_alias: canonical_alias, + name: name, + topic: topic, + avatar_url: avatar_url, + room_type: room_type, + encryption: encryption, + room_version: Some(room_version), + } + ); + + Some(summary) + } + + /// Query remote servers for the summary of a room. + async fn fetch_remote_summary( + &self, + room_id: &RoomId, + via: &[OwnedServerName], + suggested_only: bool, + ) -> Result)>> { + if self.services.metadata.is_disabled(room_id).await { + return Err!(Request(Forbidden("This room is blocked by this server."))); + } + + if via.is_empty() { + return Err!(Request(MissingParam( + "No servers were provided with which to query this room's summary." + ))); + } + + let request = assign!(get_hierarchy::v1::Request::new(room_id.to_owned()), { suggested_only: suggested_only }); + + for server in via { + info!(%room_id, %server, "Asking for room summary over federation"); + + match self + .services + .sending + .send_federation_request(server, request.clone()) + .await + { + | Ok(get_hierarchy::v1::Response { room, inaccessible_children, .. }) => { + if room.summary.room_id != room_id { + warn!( + %server, + expected_room = %room_id, + returned_room = %room.summary.room_id, + "Server didn't return the room we asked for" + ); + continue; + } + + return Ok(Some((room, inaccessible_children))); + }, + | Err(err) => { + info!(%room_id, %server, %err, "Server could not provide a summary for this room"); + }, + } + } + + info!(%room_id, "No servers queried could provide a summary for this room"); + Ok(None) + } + + /// Get the stripped m.space.child events of a room. + async fn get_space_child_events( + &self, + room_id: &RoomId, + ) -> Vec> { + let current_shortstatehash = self + .services + .state + .get_room_shortstatehash(room_id) + .await + .expect("room should have a current state"); + + self.services + .state_accessor + .state_keys_with_ids(current_shortstatehash, &StateEventType::SpaceChild) + .broad_filter_map(move |(state_key, event_id): (_, OwnedEventId)| async move { + self.services + .timeline + .get_pdu(&event_id) + .map_ok(move |pdu| (state_key, pdu)) + .ok() + .await + }) + .ready_filter_map(move |(state_key, pdu)| { + let Ok(content) = pdu.get_content::() else { + return None; + }; + + if content.via.is_empty() { + return None; + } + + if RoomId::parse(&state_key).is_err() { + return None; + } + + Some(pdu.into_format()) + }) + .collect() + .await + } + + /// Determine if a user (possibly anonymous) may view a room summary. + async fn user_may_see_summary( + &self, + querying_user: Option<&UserId>, + summary: &RoomSummary, + ) -> bool { + // Anyone can view the summary of world-readable rooms. + if summary.world_readable { + return true; + } + + // If the user is joined or invited they may always view the summary. + if let Some(querying_user) = querying_user + && (self + .services + .state_cache + .is_joined(querying_user, &summary.room_id) + .await || self + .services + .state_cache + .is_invited(querying_user, &summary.room_id) + .await) + { + return true; + } + + // Otherwise, visibility depends on the join rule. + match (&summary.join_rule, querying_user) { + // Anyone can view summaries for `public`, `knock`, and `knock_restricted` rooms. + | ( + JoinRuleSummary::Public + | JoinRuleSummary::Knock + | JoinRuleSummary::KnockRestricted(_), + _, + ) => true, + + // The user may be able to view the summary for a `restricted` room, even if they + // aren't invited, provided they're in one of the allowed rooms. + | ( + JoinRuleSummary::Restricted(RestrictedSummary { allowed_room_ids, .. }), + Some(querying_user), + ) => + self.services + .state_cache + .rooms_joined(querying_user) + .ready_any(|room_id| allowed_room_ids.contains(&room_id)) + .await, + + // In all other cases, the user may not view the summary. + | _ => false, + } + } + + // Determine if a remote server may view a room summary. + async fn server_may_see_summary( + &self, + querying_server: &ServerName, + summary: &RoomSummary, + ) -> bool { + // Servers may not see summaries of rooms they're ACLed from. + if self + .services + .event_handler + .acl_check(querying_server, &summary.room_id) + .await + .is_err() + { + return false; + } + + // Servers may always see summaries if any of their users are participating in + // the room. It's the requesting server's job to restrict visibility on a + // per-user basis. + if self + .services + .state_cache + .server_in_room(querying_server, &summary.room_id) + .await + { + return true; + } + + // If the server isn't in the room, the same visibility rules apply as for + // anonymous summary requests. + self.user_may_see_summary(None, summary).await + } +} diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index fd93b4d12..e4f439464 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -83,7 +83,7 @@ where body, Some(pdu.event_id().into()), source, - pdu.sender.clone().into(), + pdu.sender.clone(), )?; } } @@ -224,7 +224,7 @@ where let target_user_id = UserId::parse(state_key)?; if self.services.users.is_active_local(&target_user_id).await { - push_target.insert(target_user_id.to_owned()); + push_target.insert(target_user_id.clone()); } } } @@ -320,15 +320,6 @@ where }, } }, - | TimelineEventType::SpaceChild => - if let Some(_state_key) = pdu.state_key() { - self.services - .spaces - .roomid_spacehierarchy_cache - .lock() - .await - .remove(room_id); - }, | TimelineEventType::RoomMember => { if let Some(state_key) = pdu.state_key() { // if the state_key fails @@ -410,7 +401,7 @@ where .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) { let appservice_uid = appservice.registration.sender_localpart.as_str(); - if state_key_uid == &appservice_uid { + if state_key_uid == appservice_uid { self.services .sending .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index a35b502c4..8b142463f 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -79,7 +79,6 @@ struct Services { pusher: Dep, threads: Dep, search: Dep, - spaces: Dep, event_handler: Dep, } @@ -111,7 +110,6 @@ impl crate::Service for Service { pusher: args.depend::("pusher"), threads: args.depend::("rooms::threads"), search: args.depend::("rooms::search"), - spaces: args.depend::("rooms::spaces"), event_handler: args .depend::("rooms::event_handler"), }, diff --git a/src/service/services.rs b/src/service/services.rs index 4b81c4612..726a6842d 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -101,11 +101,11 @@ impl Services { read_receipt: build!(rooms::read_receipt::Service), search: build!(rooms::search::Service), short: build!(rooms::short::Service), - spaces: build!(rooms::spaces::Service), state: build!(rooms::state::Service), state_accessor: build!(rooms::state_accessor::Service), state_cache: build!(rooms::state_cache::Service), state_compressor: build!(rooms::state_compressor::Service), + summary: build!(rooms::summary::Service), threads: build!(rooms::threads::Service), timeline: build!(rooms::timeline::Service), typing: build!(rooms::typing::Service),