refactor: Consolidate hierarchy and summary logic in a new service

This commit is contained in:
Ginger
2026-04-09 11:39:53 -04:00
parent 755006c66d
commit 471eb54c66
14 changed files with 652 additions and 1300 deletions
+2 -5
View File
@@ -6,10 +6,7 @@ mod summary;
mod upgrade;
pub(crate) use self::{
aliases::get_room_aliases_route,
create::create_room_route,
event::get_room_event_route,
initial_sync::room_initial_sync_route,
summary::{get_room_summary, get_room_summary_legacy},
aliases::get_room_aliases_route, create::create_room_route, event::get_room_event_route,
initial_sync::room_initial_sync_route, summary::get_room_summary,
upgrade::upgrade_room_route,
};
+15 -318
View File
@@ -1,48 +1,11 @@
use axum::extract::State;
use axum_client_ip::ClientIp;
use conduwuit::{
Err, Result, debug, debug_warn, info, trace,
utils::{IterStream, future::TryExtExt},
};
use futures::{
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join3},
stream::FuturesUnordered,
};
use ruma::{
OwnedServerName, RoomId, UserId,
api::{
client::room::get_summary,
federation::space::{SpaceHierarchyParentSummary, get_hierarchy},
},
events::room::member::MembershipState,
space::SpaceRoomJoinRule::{self, *},
};
use service::Services;
use conduwuit::{Err, Result};
use ruma::api::client::room::get_summary;
use service::rooms::summary::Accessibility;
use crate::{Ruma, RumaResponse};
use crate::Ruma;
/// # `GET /_matrix/client/unstable/im.nheko.summary/rooms/{roomIdOrAlias}/summary`
///
/// Returns a short description of the state of a room.
///
/// This is the "wrong" endpoint that some implementations/clients may use
/// according to the MSC. Request and response bodies are the same as
/// `get_room_summary`.
///
/// An implementation of [MSC3266](https://github.com/matrix-org/matrix-spec-proposals/pull/3266)
pub(crate) async fn get_room_summary_legacy(
State(services): State<crate::State>,
ClientIp(client): ClientIp,
body: Ruma<get_summary::msc3266::Request>,
) -> Result<RumaResponse<get_summary::msc3266::Response>> {
get_room_summary(State(services), ClientIp(client), body)
.boxed()
.await
.map(RumaResponse)
}
/// # `GET /_matrix/client/unstable/im.nheko.summary/summary/{roomIdOrAlias}`
/// # `GET /_matrix/client/v1/room_summary/{roomIdOrAlias}`
///
/// Returns a short description of the state of a room.
@@ -50,8 +13,8 @@ pub(crate) async fn get_room_summary_legacy(
pub(crate) async fn get_room_summary(
State(services): State<crate::State>,
ClientIp(client): ClientIp,
body: Ruma<get_summary::msc3266::Request>,
) -> Result<get_summary::msc3266::Response> {
body: Ruma<get_summary::v1::Request>,
) -> Result<get_summary::v1::Response> {
let (room_id, servers) = services
.rooms
.alias
@@ -62,285 +25,19 @@ pub(crate) async fn get_room_summary(
return Err!(Request(Forbidden("This room is banned on this homeserver.")));
}
room_summary_response(&services, &room_id, &servers, body.sender_user.as_deref())
.boxed()
.await
}
async fn room_summary_response(
services: &Services,
room_id: &RoomId,
servers: &[OwnedServerName],
sender_user: Option<&UserId>,
) -> Result<get_summary::msc3266::Response> {
if services
let summary = services
.rooms
.state_cache
.server_in_room(services.globals.server_name(), room_id)
.await
{
match local_room_summary_response(services, room_id, sender_user)
.boxed()
.await
{
| Ok(response) => return Ok(response),
| Err(e) => {
debug_warn!("Failed to get local room summary: {e:?}, falling back to remote");
},
}
}
let room =
remote_room_summary_hierarchy_response(services, room_id, servers, sender_user).await?;
Ok(get_summary::msc3266::Response {
room_id: room_id.to_owned(),
canonical_alias: room.canonical_alias,
avatar_url: room.avatar_url,
guest_can_join: room.guest_can_join,
name: room.name,
num_joined_members: room.num_joined_members,
topic: room.topic,
world_readable: room.world_readable,
join_rule: room.join_rule,
room_type: room.room_type,
room_version: room.room_version,
encryption: room.encryption,
allowed_room_ids: room.allowed_room_ids,
membership: sender_user.is_some().then_some(MembershipState::Leave),
})
}
async fn local_room_summary_response(
services: &Services,
room_id: &RoomId,
sender_user: Option<&UserId>,
) -> Result<get_summary::msc3266::Response> {
trace!(
sender_user = sender_user.map(tracing::field::display),
"Sending local room summary response for {room_id:?}"
);
let (join_rule, world_readable, guest_can_join) = join3(
services.rooms.state_accessor.get_join_rules(room_id),
services.rooms.state_accessor.is_world_readable(room_id),
services.rooms.state_accessor.guest_can_join(room_id),
)
.await;
// Synapse allows server admins to bypass visibility checks.
// That seems neat so we'll copy that behaviour.
if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await {
user_can_see_summary(
services,
room_id,
&join_rule.clone().into(),
guest_can_join,
world_readable,
join_rule.allowed_rooms(),
sender_user,
)
.summary
.get_room_summary_for_user(body.sender_user.as_deref(), &room_id, &servers)
.await?;
}
let canonical_alias = services
.rooms
.state_accessor
.get_canonical_alias(room_id)
.ok();
let name = services.rooms.state_accessor.get_name(room_id).ok();
let topic = services.rooms.state_accessor.get_room_topic(room_id).ok();
let room_type = services.rooms.state_accessor.get_room_type(room_id).ok();
let avatar_url = services
.rooms
.state_accessor
.get_avatar(room_id)
.map(|res| res.into_option().unwrap_or_default().url);
let room_version = services.rooms.state.get_room_version(room_id).ok();
let encryption = services
.rooms
.state_accessor
.get_room_encryption(room_id)
.ok();
let num_joined_members = services
.rooms
.state_cache
.room_joined_count(room_id)
.unwrap_or(0);
let membership: OptionFuture<_> = sender_user
.map(|sender_user| {
services
.rooms
.state_accessor
.get_member(room_id, sender_user)
.map_ok_or(MembershipState::Leave, |content| content.membership)
})
.into();
let (
canonical_alias,
name,
num_joined_members,
topic,
avatar_url,
room_type,
room_version,
encryption,
membership,
) = futures::join!(
canonical_alias,
name,
num_joined_members,
topic,
avatar_url,
room_type,
room_version,
encryption,
membership,
);
Ok(get_summary::msc3266::Response {
room_id: room_id.to_owned(),
canonical_alias,
avatar_url,
guest_can_join,
name,
num_joined_members: num_joined_members.try_into().unwrap_or_default(),
topic,
world_readable,
room_type,
room_version,
encryption,
membership,
allowed_room_ids: join_rule.allowed_rooms().map(Into::into).collect(),
join_rule: join_rule.into(),
})
}
/// used by MSC3266 to fetch a room's info if we do not know about it
async fn remote_room_summary_hierarchy_response(
services: &Services,
room_id: &RoomId,
servers: &[OwnedServerName],
sender_user: Option<&UserId>,
) -> Result<SpaceHierarchyParentSummary> {
trace!(sender_user = ?sender_user.map(tracing::field::display), ?servers, "Sending remote room summary response for {room_id:?}");
if !services.config.allow_federation {
return Err!(Request(Forbidden("Federation is disabled.")));
}
if services.rooms.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(
"Federaton of room {room_id} is currently disabled on this server."
)));
}
if servers.is_empty() {
return Err!(Request(MissingParam(
"No servers were provided to fetch the room over federation"
)));
}
let request = get_hierarchy::v1::Request::new(room_id.to_owned());
let mut requests: FuturesUnordered<_> = servers
.iter()
.map(|server| {
info!("Fetching room summary for {room_id} from server {server}");
services
.sending
.send_federation_request(server, request.clone())
.inspect_ok(move |v| {
debug!("Fetched room summary for {room_id} from server {server}: {v:?}");
})
.inspect_err(move |e| {
info!("Failed to fetch room summary for {room_id} from server {server}: {e}");
})
})
.collect();
while let Some(Ok(response)) = requests.next().await {
trace!("{response:?}");
let room = response.room.clone();
if room.room_id != room_id {
debug_warn!(
"Room ID {} returned does not belong to the requested room ID {}",
room.room_id,
room_id
);
continue;
}
if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await {
return user_can_see_summary(
services,
room_id,
&room.join_rule,
room.guest_can_join,
room.world_readable,
room.allowed_room_ids.iter().map(AsRef::as_ref),
sender_user,
)
.await
.map(|()| room);
}
return Ok(room);
}
Err!(Request(NotFound("Room not found or is not accessible")))
}
async fn user_can_see_summary<'a, I>(
services: &Services,
room_id: &RoomId,
join_rule: &SpaceRoomJoinRule,
guest_can_join: bool,
world_readable: bool,
allowed_room_ids: I,
sender_user: Option<&UserId>,
) -> Result
where
I: Iterator<Item = &'a RoomId> + Send,
{
let is_public_room = matches!(join_rule, Public | Knock | KnockRestricted);
match sender_user {
| Some(sender_user) => {
let user_can_see_state_events = services
.rooms
.state_accessor
.user_can_see_state_events(sender_user, room_id);
let is_guest = services.users.is_deactivated(sender_user).unwrap_or(false);
let user_in_allowed_restricted_room = allowed_room_ids
.stream()
.any(|room| services.rooms.state_cache.is_joined(sender_user, room));
let (user_can_see_state_events, is_guest, user_in_allowed_restricted_room) =
join3(user_can_see_state_events, is_guest, user_in_allowed_restricted_room)
.boxed()
.await;
if user_can_see_state_events
|| (is_guest && guest_can_join)
|| is_public_room
|| user_in_allowed_restricted_room
{
return Ok(());
}
Err!(Request(Forbidden("Room is not accessible")))
match summary {
| Accessibility::Accessible(summary) => Ok(get_summary::v1::Response::new(summary)),
| Accessibility::Inaccessible => {
Err!(Request(Forbidden("You may not preview this room."), FORBIDDEN))
},
| None => {
if is_public_room || world_readable {
return Ok(());
}
Err!(Request(Forbidden("Room is not accessible")))
| Accessibility::NotFound => {
Err!(Request(Forbidden("This room does not exist."), FORBIDDEN))
},
}
}
+28 -176
View File
@@ -1,26 +1,12 @@
use std::{
collections::{BTreeSet, VecDeque},
str::FromStr,
};
use axum::extract::State;
use conduwuit::{
Err, Result,
utils::{future::TryExtExt, stream::IterStream},
};
use conduwuit_service::{
Services,
rooms::spaces::{
PaginationToken, SummaryAccessibility, get_parent_children_via, summary_to_chunk,
},
};
use futures::{StreamExt, TryFutureExt, future::OptionFuture};
use ruma::{
OwnedRoomId, OwnedServerName, RoomId, UInt, UserId, api::client::space::get_hierarchy,
};
use conduwuit::{Err, Result};
use ruma::{UInt, api::client::space::get_hierarchy, assign};
use service::rooms::summary::Accessibility;
use crate::Ruma;
const MAX_MAX_DEPTH: u32 = 10;
/// # `GET /_matrix/client/v1/rooms/{room_id}/hierarchy`
///
/// Paginates over the space tree in a depth-first manner to locate child rooms
@@ -29,167 +15,33 @@ pub(crate) async fn get_hierarchy_route(
State(services): State<crate::State>,
body: Ruma<get_hierarchy::v1::Request>,
) -> Result<get_hierarchy::v1::Response> {
let limit = body
.limit
.unwrap_or_else(|| UInt::from(10_u32))
.min(UInt::from(100_u32));
// We don't do pagination for this route (and therefore ignore `limit`), since
// there's no reasonable way to handle a space hierarchy changing during
// pagination.
let max_depth = body
.max_depth
.unwrap_or_else(|| UInt::from(3_u32))
.min(UInt::from(10_u32));
.map(|max_depth| max_depth.min(UInt::from(MAX_MAX_DEPTH)));
let key = body
.from
.as_ref()
.and_then(|s| PaginationToken::from_str(s).ok());
let hierarchy = services
.rooms
.summary
.get_room_hierarchy_for_user(
body.sender_user(),
body.room_id.clone(),
max_depth,
body.suggested_only,
)
.await?;
// Should prevent unexpected behaviour in (bad) clients
if let Some(ref token) = key {
if token.suggested_only != body.suggested_only || token.max_depth != max_depth {
return Err!(Request(InvalidParam(
"suggested_only and max_depth cannot change on paginated requests"
)));
}
match hierarchy {
| Accessibility::Accessible(rooms) =>
Ok(assign!(get_hierarchy::v1::Response::new(), { rooms: rooms })),
| Accessibility::Inaccessible => {
Err!(Request(Forbidden("You may not preview this room."), FORBIDDEN))
},
| Accessibility::NotFound => {
Err!(Request(Forbidden("This room does not exist."), FORBIDDEN))
},
}
get_client_hierarchy(
&services,
body.sender_user(),
&body.room_id,
limit.try_into().unwrap_or(10),
max_depth.try_into().unwrap_or(usize::MAX),
body.suggested_only,
key.as_ref()
.into_iter()
.flat_map(|t| t.short_room_ids.iter()),
)
.await
}
async fn get_client_hierarchy<'a, ShortRoomIds>(
services: &Services,
sender_user: &UserId,
room_id: &RoomId,
limit: usize,
max_depth: usize,
suggested_only: bool,
short_room_ids: ShortRoomIds,
) -> Result<get_hierarchy::v1::Response>
where
ShortRoomIds: Iterator<Item = &'a u64> + Clone + Send + Sync + 'a,
{
type Via = Vec<OwnedServerName>;
type Entry = (OwnedRoomId, Via);
type Rooms = VecDeque<Entry>;
let mut queue: Rooms = [(
room_id.to_owned(),
room_id
.server_name()
.map(ToOwned::to_owned)
.into_iter()
.collect(),
)]
.into();
let mut rooms = Vec::with_capacity(limit);
let mut parents = BTreeSet::new();
while let Some((current_room, via)) = queue.pop_front() {
let summary = services
.rooms
.spaces
.get_summary_and_children_client(&current_room, suggested_only, sender_user, &via)
.await?;
match (summary, current_room == room_id) {
| (None | Some(SummaryAccessibility::Inaccessible), false) => {
// Just ignore other unavailable rooms
},
| (None, true) => {
return Err!(Request(Forbidden("The requested room was not found")));
},
| (Some(SummaryAccessibility::Inaccessible), true) => {
return Err!(Request(Forbidden("The requested room is inaccessible")));
},
| (Some(SummaryAccessibility::Accessible(summary)), _) => {
let populate = parents.len() >= short_room_ids.clone().count();
let mut children: Vec<Entry> = get_parent_children_via(&summary, suggested_only)
.filter(|(room, _)| !parents.contains(room))
.rev()
.map(|(key, val)| (key, val.collect()))
.collect();
if populate {
rooms.push(summary_to_chunk(summary.clone()));
} else {
children = children
.iter()
.rev()
.stream()
.skip_while(|(room, _)| {
services
.rooms
.short
.get_shortroomid(room)
.map_ok(|short| {
Some(&short) != short_room_ids.clone().nth(parents.len())
})
.unwrap_or_else(|_| false)
})
.map(Clone::clone)
.collect::<Vec<Entry>>()
.await
.into_iter()
.rev()
.collect();
}
if !populate && queue.is_empty() && children.is_empty() {
break;
}
parents.insert(current_room.clone());
if rooms.len() >= limit {
break;
}
if parents.len() > max_depth {
continue;
}
queue.extend(children);
},
}
}
let next_batch: OptionFuture<_> = queue
.pop_front()
.map(|(room, _)| async move {
parents.insert(room);
let next_short_room_ids: Vec<_> = parents
.iter()
.stream()
.filter_map(|room_id| services.rooms.short.get_shortroomid(room_id).ok())
.collect()
.await;
(next_short_room_ids.iter().ne(short_room_ids) && !next_short_room_ids.is_empty())
.then_some(PaginationToken {
short_room_ids: next_short_room_ids,
limit: limit.try_into().ok()?,
max_depth: max_depth.try_into().ok()?,
suggested_only,
})
.as_ref()
.map(PaginationToken::to_string)
})
.into();
Ok(get_hierarchy::v1::Response {
next_batch: next_batch.await.flatten(),
rooms,
})
}
+1 -5
View File
@@ -184,18 +184,14 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.ruma_route(&client::get_hierarchy_route)
.ruma_route(&client::get_mutual_rooms_route)
.ruma_route(&client::get_room_summary)
.route(
"/_matrix/client/unstable/im.nheko.summary/rooms/{room_id_or_alias}/summary",
get(client::get_room_summary_legacy)
)
.ruma_route(&client::get_suspended_status)
.ruma_route(&client::put_suspended_status)
.ruma_route(&client::well_known_support)
.ruma_route(&client::well_known_client)
.ruma_route(&client::get_rtc_transports)
.ruma_route(&client::room_initial_sync_route)
.route("/_conduwuit/server_version", get(client::conduwuit_server_version))
.route("/_continuwuity/server_version", get(client::conduwuit_server_version))
.ruma_route(&client::room_initial_sync_route)
.route("/client/server.json", get(client::syncv3_client_server_json))
.ruma_route(&admin::rooms::ban::ban_room)
.ruma_route(&admin::rooms::list::list_rooms);
+10 -57
View File
@@ -1,13 +1,7 @@
use axum::extract::State;
use conduwuit::{
Err, Result, info,
utils::stream::{BroadbandExt, IterStream},
};
use conduwuit_service::rooms::spaces::{
Identifier, SummaryAccessibility, get_parent_children_via,
};
use futures::{FutureExt, StreamExt};
use conduwuit::{Err, Result, info};
use ruma::api::federation::space::get_hierarchy;
use service::rooms::summary::Accessibility;
use crate::Ruma;
@@ -19,10 +13,6 @@ pub(crate) async fn get_hierarchy_route(
State(services): State<crate::State>,
body: Ruma<get_hierarchy::v1::Request>,
) -> Result<get_hierarchy::v1::Response> {
if !services.rooms.metadata.exists(&body.room_id).await {
return Err!(Request(NotFound("Room does not exist.")));
}
if !services
.rooms
.state_cache
@@ -36,52 +26,15 @@ pub(crate) async fn get_hierarchy_route(
return Err!(Request(NotFound("This server is not participating in that room.")));
}
let room_id = &body.room_id;
let suggested_only = body.suggested_only;
let ref identifier = Identifier::ServerName(body.origin());
match services
let response = services
.rooms
.spaces
.get_summary_and_children_local(room_id, identifier)
.await?
{
| None => Err!(Request(NotFound("The requested room was not found"))),
.summary
.get_local_room_summary_for_server(body.origin(), &body.room_id, body.suggested_only)
.await;
| Some(SummaryAccessibility::Inaccessible) => {
Err!(Request(NotFound("The requested room is inaccessible")))
},
| Some(SummaryAccessibility::Accessible(room)) => {
let (children, inaccessible_children) =
get_parent_children_via(&room, suggested_only)
.stream()
.broad_filter_map(|(child, _via)| async move {
match services
.rooms
.spaces
.get_summary_and_children_local(&child, identifier)
.await
.ok()?
{
| None => None,
| Some(SummaryAccessibility::Inaccessible) =>
Some((None, Some(child))),
| Some(SummaryAccessibility::Accessible(summary)) =>
Some((Some(summary), None)),
}
})
.unzip()
.map(|(children, inaccessible_children): (Vec<_>, Vec<_>)| {
(
children.into_iter().flatten().map(Into::into).collect(),
inaccessible_children.into_iter().flatten().collect(),
)
})
.await;
Ok(get_hierarchy::v1::Response { room, children, inaccessible_children })
},
if let Accessibility::Accessible(response) = response {
Ok(response)
} else {
Err!(Request(NotFound("This room is not accessible.")))
}
}
+2 -2
View File
@@ -9,11 +9,11 @@ pub mod pdu_metadata;
pub mod read_receipt;
pub mod search;
pub mod short;
pub mod spaces;
pub mod state;
pub mod state_accessor;
pub mod state_cache;
pub mod state_compressor;
pub mod summary;
pub mod threads;
pub mod timeline;
pub mod typing;
@@ -33,11 +33,11 @@ pub struct Service {
pub read_receipt: Arc<read_receipt::Service>,
pub search: Arc<search::Service>,
pub short: Arc<short::Service>,
pub spaces: Arc<spaces::Service>,
pub state: Arc<state::Service>,
pub state_accessor: Arc<state_accessor::Service>,
pub state_cache: Arc<state_cache::Service>,
pub state_compressor: Arc<state_compressor::Service>,
pub summary: Arc<summary::Service>,
pub threads: Arc<threads::Service>,
pub timeline: Arc<timeline::Service>,
pub typing: Arc<typing::Service>,
-489
View File
@@ -1,489 +0,0 @@
mod pagination_token;
#[cfg(test)]
mod tests;
use std::{fmt::Write, sync::Arc};
use async_trait::async_trait;
use conduwuit_core::{
Err, Error, Event, PduEvent, Result, implement,
utils::{
IterStream,
future::{BoolExt, TryExtExt},
math::usize_from_f64,
stream::{BroadbandExt, ReadyExt},
},
};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, pin_mut, stream::FuturesUnordered};
use lru_cache::LruCache;
use ruma::{
OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, ServerName, UserId,
api::{
client::space::SpaceHierarchyRoomsChunk,
federation::{self, space::SpaceHierarchyParentSummary},
},
events::{
StateEventType,
space::child::{HierarchySpaceChildEvent, SpaceChildEventContent},
},
room::{JoinRuleSummary, RoomSummary},
serde::Raw,
};
use tokio::sync::{Mutex, MutexGuard};
pub use self::pagination_token::PaginationToken;
use crate::{Dep, rooms, sending};
pub struct Service {
services: Services,
pub roomid_spacehierarchy_cache: Mutex<Cache>,
}
struct Services {
state_accessor: Dep<rooms::state_accessor::Service>,
state_cache: Dep<rooms::state_cache::Service>,
state: Dep<rooms::state::Service>,
event_handler: Dep<rooms::event_handler::Service>,
timeline: Dep<rooms::timeline::Service>,
sending: Dep<sending::Service>,
}
pub struct CachedSpaceHierarchySummary {
summary: SpaceHierarchyParentSummary,
}
#[allow(clippy::large_enum_variant)]
pub enum SummaryAccessibility {
Accessible(SpaceHierarchyParentSummary),
Inaccessible,
}
/// Identifier used to check if rooms are accessible. None is used if you want
/// to return the room, no matter if accessible or not
pub enum Identifier<'a> {
UserId(&'a UserId),
ServerName(&'a ServerName),
}
type Cache = LruCache<OwnedRoomId, Option<CachedSpaceHierarchySummary>>;
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
let cache_size = f64::from(config.roomid_spacehierarchy_cache_capacity);
let cache_size = cache_size * config.cache_capacity_modifier;
Ok(Arc::new(Self {
services: Services {
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
state: args.depend::<rooms::state::Service>("rooms::state"),
event_handler: args
.depend::<rooms::event_handler::Service>("rooms::event_handler"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
sending: args.depend::<sending::Service>("sending"),
},
roomid_spacehierarchy_cache: Mutex::new(LruCache::new(usize_from_f64(cache_size)?)),
}))
}
async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
let roomid_spacehierarchy_cache = self.roomid_spacehierarchy_cache.lock().await.len();
writeln!(out, "roomid_spacehierarchy_cache: {roomid_spacehierarchy_cache}")?;
Ok(())
}
async fn clear_cache(&self) { self.roomid_spacehierarchy_cache.lock().await.clear(); }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
/// Gets the summary of a space using solely local information
#[implement(Service)]
pub async fn get_summary_and_children_local(
&self,
current_room: &RoomId,
identifier: &Identifier<'_>,
) -> Result<Option<SummaryAccessibility>> {
match self
.roomid_spacehierarchy_cache
.lock()
.await
.get_mut(current_room)
.as_ref()
{
| None => (), // cache miss
| Some(None) => return Ok(None),
| Some(Some(cached)) => {
let is_accessible_child = self.is_accessible_child(
current_room,
&cached.summary.summary.join_rule,
identifier,
);
let accessibility = if is_accessible_child.await {
SummaryAccessibility::Accessible(cached.summary.clone())
} else {
SummaryAccessibility::Inaccessible
};
return Ok(Some(accessibility));
},
}
let children_pdus: Vec<_> = self
.get_space_child_events(current_room)
.map(Event::into_format)
.collect()
.await;
let Ok(summary) = self
.get_room_summary(current_room, children_pdus, identifier)
.boxed()
.await
else {
return Ok(None);
};
self.roomid_spacehierarchy_cache.lock().await.insert(
current_room.to_owned(),
Some(CachedSpaceHierarchySummary { summary: summary.clone() }),
);
Ok(Some(SummaryAccessibility::Accessible(summary)))
}
/// Gets the summary of a space using solely federation
#[implement(Service)]
#[tracing::instrument(level = "debug", skip(self))]
async fn get_summary_and_children_federation(
&self,
current_room: &RoomId,
suggested_only: bool,
user_id: &UserId,
via: &[OwnedServerName],
) -> Result<Option<SummaryAccessibility>> {
let mut request = federation::space::get_hierarchy::v1::Request::new(current_room.to_owned());
request.suggested_only = suggested_only;
let mut requests: FuturesUnordered<_> = via
.iter()
.map(|server| {
self.services
.sending
.send_federation_request(server, request.clone())
})
.collect();
let Some(Ok(response)) = requests.next().await else {
self.roomid_spacehierarchy_cache
.lock()
.await
.insert(current_room.to_owned(), None);
return Ok(None);
};
let summary = response.room;
self.roomid_spacehierarchy_cache.lock().await.insert(
current_room.to_owned(),
Some(CachedSpaceHierarchySummary { summary: summary.clone() }),
);
response
.children
.into_iter()
.stream()
.then(|child| {
self.roomid_spacehierarchy_cache
.lock()
.map(|lock| (child, lock))
})
.ready_filter_map(|(child, mut cache)| {
(!cache.contains_key(current_room)).then_some((child, cache))
})
.for_each(|(summary, cache)| self.cache_insert(cache, current_room, summary))
.await;
let identifier = Identifier::UserId(user_id);
let is_accessible_child = self
.is_accessible_child(current_room, &summary.summary.join_rule, &identifier)
.await;
let accessibility = if is_accessible_child {
SummaryAccessibility::Accessible(summary)
} else {
SummaryAccessibility::Inaccessible
};
Ok(Some(accessibility))
}
/// Simply returns the stripped m.space.child events of a room
#[implement(Service)]
fn get_space_child_events<'a>(
&'a self,
room_id: &'a RoomId,
) -> impl Stream<Item = PduEvent> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
.map_ok(|current_shortstatehash| {
self.services
.state_accessor
.state_keys_with_ids(current_shortstatehash, &StateEventType::SpaceChild)
.boxed()
})
.map(Result::into_iter)
.map(IterStream::stream)
.map(StreamExt::flatten)
.flatten_stream()
.broad_filter_map(move |(state_key, event_id): (_, OwnedEventId)| async move {
self.services
.timeline
.get_pdu(&event_id)
.map_ok(move |pdu| (state_key, pdu))
.ok()
.await
})
.ready_filter_map(move |(state_key, pdu)| {
if let Ok(content) = pdu.get_content::<SpaceChildEventContent>() {
if content.via.is_empty() {
return None;
}
} else {
return None;
}
if RoomId::parse(&state_key).is_err() {
return None;
}
Some(pdu)
})
}
/// Gets the summary of a space using either local or remote (federation)
/// sources
#[implement(Service)]
pub async fn get_summary_and_children_client(
&self,
current_room: &OwnedRoomId,
suggested_only: bool,
user_id: &UserId,
via: &[OwnedServerName],
) -> Result<Option<SummaryAccessibility>> {
let identifier = Identifier::UserId(user_id);
if let Ok(Some(response)) = self
.get_summary_and_children_local(current_room, &identifier)
.await
{
return Ok(Some(response));
}
self.get_summary_and_children_federation(current_room, suggested_only, user_id, via)
.await
}
#[implement(Service)]
async fn get_room_summary(
&self,
room_id: &RoomId,
children_state: Vec<Raw<HierarchySpaceChildEvent>>,
identifier: &Identifier<'_>,
) -> Result<SpaceHierarchyParentSummary, Error> {
let join_rule = self.services.state_accessor.get_join_rules(room_id).await;
let is_accessible_child = self
.is_accessible_child(room_id, &join_rule.clone().into(), identifier)
.await;
if !is_accessible_child {
return Err!(Request(Forbidden("User is not allowed to see the room")));
}
let name = self.services.state_accessor.get_name(room_id).ok();
let topic = self.services.state_accessor.get_room_topic(room_id).ok();
let room_type = self.services.state_accessor.get_room_type(room_id).ok();
let world_readable = self.services.state_accessor.is_world_readable(room_id);
let guest_can_join = self.services.state_accessor.guest_can_join(room_id);
let num_joined_members = self
.services
.state_cache
.room_joined_count(room_id)
.unwrap_or(0);
let canonical_alias = self
.services
.state_accessor
.get_canonical_alias(room_id)
.ok();
let avatar_url = self
.services
.state_accessor
.get_avatar(room_id)
.map(|res| res.into_option().unwrap_or_default().url);
let room_version = self.services.state.get_room_version(room_id).ok();
let encryption = self
.services
.state_accessor
.get_room_encryption(room_id)
.ok();
let (
canonical_alias,
name,
num_joined_members,
topic,
world_readable,
guest_can_join,
avatar_url,
room_type,
room_version,
encryption,
) = futures::join!(
canonical_alias,
name,
num_joined_members,
topic,
world_readable,
guest_can_join,
avatar_url,
room_type,
room_version,
encryption,
);
let mut summary = RoomSummary::new(
room_id.to_owned(),
join_rule.clone().into(),
guest_can_join,
num_joined_members.try_into().unwrap_or_default(),
world_readable,
);
summary.canonical_alias = canonical_alias;
summary.name = name;
summary.topic = topic;
summary.avatar_url = avatar_url;
summary.encryption = encryption;
summary.room_type = room_type;
summary.room_version = room_version;
let summary = SpaceHierarchyParentSummary::new(summary, children_state);
Ok(summary)
}
/// With the given identifier, checks if a room is accessible
#[implement(Service)]
async fn is_accessible_child(
&self,
current_room: &RoomId,
join_rule: &JoinRuleSummary,
identifier: &Identifier<'_>,
) -> bool {
if let Identifier::ServerName(server_name) = identifier {
// Checks if ACLs allow for the server to participate
if self
.services
.event_handler
.acl_check(server_name, current_room)
.await
.is_err()
{
return false;
}
}
if let Identifier::UserId(user_id) = identifier {
let is_joined = self.services.state_cache.is_joined(user_id, current_room);
let is_invited = self.services.state_cache.is_invited(user_id, current_room);
pin_mut!(is_joined, is_invited);
if is_joined.or(is_invited).await {
return true;
}
}
match join_rule {
| JoinRuleSummary::Public
| JoinRuleSummary::Knock
| JoinRuleSummary::KnockRestricted(_) => true,
| JoinRuleSummary::Restricted(restricted_summary) =>
(&restricted_summary.allowed_room_ids)
.stream()
.any(async |room| match identifier {
| Identifier::UserId(user) =>
self.services.state_cache.is_joined(user, room).await,
| Identifier::ServerName(server) =>
self.services.state_cache.server_in_room(server, room).await,
})
.await,
| _ => false,
}
}
/// Returns the children of a SpaceHierarchyParentSummary, making use of the
/// children_state field
pub fn get_parent_children_via(
parent: &SpaceHierarchyParentSummary,
suggested_only: bool,
) -> impl DoubleEndedIterator<Item = (OwnedRoomId, impl Iterator<Item = OwnedServerName> + use<>)>
+ Send
+ '_ {
parent
.children_state
.iter()
.map(Raw::deserialize)
.filter_map(Result::ok)
.filter_map(move |ce| {
(!suggested_only || ce.content.suggested)
.then_some((ce.state_key, ce.content.via.into_iter()))
})
}
#[implement(Service)]
async fn cache_insert(
&self,
mut cache: MutexGuard<'_, Cache>,
current_room: &RoomId,
summary: RoomSummary,
) {
let children_state = self
.get_space_child_events(&summary.room_id)
.map(Event::into_format)
.collect()
.await;
let summary = SpaceHierarchyParentSummary::new(summary, children_state);
cache.insert(current_room.to_owned(), Some(CachedSpaceHierarchySummary { summary }));
}
// Here because cannot implement `From` across ruma-federation-api and
// ruma-client-api types
impl From<CachedSpaceHierarchySummary> for SpaceHierarchyRoomsChunk {
fn from(value: CachedSpaceHierarchySummary) -> Self {
Self::new(value.summary.summary, value.summary.children_state)
}
}
/// Here because cannot implement `From` across ruma-federation-api and
/// ruma-client-api types
#[must_use]
pub fn summary_to_chunk(summary: SpaceHierarchyParentSummary) -> SpaceHierarchyRoomsChunk {
SpaceHierarchyRoomsChunk::new(summary.summary, summary.children_state)
}
@@ -1,76 +0,0 @@
use std::{
fmt::{Display, Formatter},
str::FromStr,
};
use conduwuit::{Error, Result};
use ruma::{UInt, api::error::ErrorKind};
use crate::rooms::short::ShortRoomId;
// TODO: perhaps use some better form of token rather than just room count
#[derive(Debug, Eq, PartialEq)]
pub struct PaginationToken {
/// Path down the hierarchy of the room to start the response at,
/// excluding the root space.
pub short_room_ids: Vec<ShortRoomId>,
pub limit: UInt,
pub max_depth: UInt,
pub suggested_only: bool,
}
impl FromStr for PaginationToken {
type Err = Error;
fn from_str(value: &str) -> Result<Self> {
let mut values = value.split('_');
let mut pag_tok = || {
let short_room_ids = values
.next()?
.split(',')
.filter_map(|room_s| u64::from_str(room_s).ok())
.collect();
let limit = UInt::from_str(values.next()?).ok()?;
let max_depth = UInt::from_str(values.next()?).ok()?;
let slice = values.next()?;
let suggested_only = if values.next().is_none() {
if slice == "true" {
true
} else if slice == "false" {
false
} else {
None?
}
} else {
None?
};
Some(Self {
short_room_ids,
limit,
max_depth,
suggested_only,
})
};
if let Some(token) = pag_tok() {
Ok(token)
} else {
Err(Error::BadRequest(ErrorKind::InvalidParam, "invalid token"))
}
}
}
impl Display for PaginationToken {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let short_room_ids = self
.short_room_ids
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(",");
write!(f, "{short_room_ids}_{}_{}_{}", self.limit, self.max_depth, self.suggested_only)
}
}
-147
View File
@@ -1,147 +0,0 @@
use std::str::FromStr;
use ruma::{
UInt,
api::federation::space::SpaceHierarchyParentSummary,
owned_room_id, owned_server_name,
room::{JoinRuleSummary, RoomSummary},
};
use crate::rooms::spaces::{PaginationToken, get_parent_children_via};
#[test]
fn get_summary_children() {
let summary = SpaceHierarchyParentSummary::new(
RoomSummary::new(
owned_room_id!("!root:example.org"),
JoinRuleSummary::Public,
true,
UInt::from(1_u32),
true,
),
vec![
serde_json::from_str(
r#"{
"content": {
"via": [
"example.org"
],
"suggested": false
},
"origin_server_ts": 1629413349153,
"sender": "@alice:example.org",
"state_key": "!foo:example.org",
"type": "m.space.child"
}"#,
)
.unwrap(),
serde_json::from_str(
r#"{
"content": {
"via": [
"example.org"
],
"suggested": true
},
"origin_server_ts": 1629413349157,
"sender": "@alice:example.org",
"state_key": "!bar:example.org",
"type": "m.space.child"
}"#,
)
.unwrap(),
serde_json::from_str(
r#"{
"content": {
"via": [
"example.org"
]
},
"origin_server_ts": 1629413349160,
"sender": "@alice:example.org",
"state_key": "!baz:example.org",
"type": "m.space.child"
}"#,
)
.unwrap(),
],
);
assert_eq!(
get_parent_children_via(&summary, false)
.map(|(k, v)| (k, v.collect::<Vec<_>>()))
.collect::<Vec<_>>(),
vec![
(owned_room_id!("!foo:example.org"), vec![owned_server_name!("example.org")]),
(owned_room_id!("!bar:example.org"), vec![owned_server_name!("example.org")]),
(owned_room_id!("!baz:example.org"), vec![owned_server_name!("example.org")])
]
);
assert_eq!(
get_parent_children_via(&summary, true)
.map(|(k, v)| (k, v.collect::<Vec<_>>()))
.collect::<Vec<_>>(),
vec![(owned_room_id!("!bar:example.org"), vec![owned_server_name!("example.org")])]
);
}
#[test]
fn invalid_pagination_tokens() {
fn token_is_err(token: &str) { PaginationToken::from_str(token).unwrap_err(); }
token_is_err("231_2_noabool");
token_is_err("");
token_is_err("111_3_");
token_is_err("foo_not_int");
token_is_err("11_4_true_");
token_is_err("___");
token_is_err("__false");
}
#[test]
fn valid_pagination_tokens() {
assert_eq!(
PaginationToken {
short_room_ids: vec![5383, 42934, 283, 423],
limit: UInt::from(20_u32),
max_depth: UInt::from(1_u32),
suggested_only: true
},
PaginationToken::from_str("5383,42934,283,423_20_1_true").unwrap()
);
assert_eq!(
PaginationToken {
short_room_ids: vec![740],
limit: UInt::from(97_u32),
max_depth: UInt::from(10539_u32),
suggested_only: false
},
PaginationToken::from_str("740_97_10539_false").unwrap()
);
}
#[test]
fn pagination_token_to_string() {
assert_eq!(
PaginationToken {
short_room_ids: vec![740],
limit: UInt::from(97_u32),
max_depth: UInt::from(10539_u32),
suggested_only: false
}
.to_string(),
"740_97_10539_false"
);
assert_eq!(
PaginationToken {
short_room_ids: vec![9, 34],
limit: UInt::from(3_u32),
max_depth: UInt::from(1_u32),
suggested_only: true
}
.to_string(),
"9,34_3_1_true"
);
}
-10
View File
@@ -43,7 +43,6 @@ pub struct Service {
struct Services {
globals: Dep<globals::Service>,
short: Dep<rooms::short::Service>,
spaces: Dep<rooms::spaces::Service>,
state_cache: Dep<rooms::state_cache::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
state_compressor: Dep<rooms::state_compressor::Service>,
@@ -67,7 +66,6 @@ impl crate::Service for Service {
services: Services {
globals: args.depend::<globals::Service>("globals"),
short: args.depend::<rooms::short::Service>("rooms::short"),
spaces: args.depend::<rooms::spaces::Service>("rooms::spaces"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
@@ -132,14 +130,6 @@ impl Service {
.update_membership(room_id, &user_id, &pdu, false)
.await?;
},
| TimelineEventType::SpaceChild => {
self.services
.spaces
.roomid_spacehierarchy_cache
.lock()
.await
.remove(room_id);
},
| _ => continue,
}
}
+590
View File
@@ -0,0 +1,590 @@
use std::{collections::HashSet, sync::Arc};
use conduwuit::{
Err, Event, Result, info,
utils::{IterStream, ReadyExt, TryFutureExtExt, stream::BroadbandExt},
warn,
};
use futures::{FutureExt, StreamExt, TryFutureExt};
use ruma::{
OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt, UserId,
api::{
client::space::SpaceHierarchyRoomsChunk,
federation::space::{SpaceHierarchyParentSummary, get_hierarchy},
},
assign,
events::{
StateEventType,
space::child::{HierarchySpaceChildEvent, SpaceChildEventContent},
},
room::{JoinRuleSummary, RestrictedSummary, RoomSummary},
serde::Raw,
};
use crate::{Dep, rooms, sending};
pub struct Service {
services: Services,
}
struct Services {
event_handler: Dep<rooms::event_handler::Service>,
metadata: Dep<rooms::metadata::Service>,
sending: Dep<sending::Service>,
state: Dep<rooms::state::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
state_cache: Dep<rooms::state_cache::Service>,
timeline: Dep<rooms::timeline::Service>,
}
pub enum Accessibility<T> {
Accessible(T),
Inaccessible,
NotFound,
}
struct SpaceSummaryAndChildren {
/// The summary of the space.
summary: SpaceHierarchyRoomsChunk,
/// All child rooms of the space.
children: Vec<SpaceChild>,
/// Child rooms of the space which are not accessible to the local server.
inaccessible_children: Vec<OwnedRoomId>,
}
struct SpaceChild {
room_id: OwnedRoomId,
via: Vec<OwnedServerName>,
}
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
services: Services {
event_handler: args
.depend::<rooms::event_handler::Service>("rooms::event_handler"),
metadata: args.depend::<rooms::metadata::Service>("rooms::metadata"),
sending: args.depend::<sending::Service>("sending"),
state: args.depend::<rooms::state::Service>("rooms::state"),
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
},
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Summarize a room for a local user, possibly by querying over federation
/// if we don't have the room locally.
pub async fn get_room_summary_for_user(
&self,
querying_user: Option<&UserId>,
room_id: &RoomId,
via: &[OwnedServerName],
) -> Result<Accessibility<RoomSummary>> {
let summary = {
if let Some(summary) = self.build_local_room_summary(room_id).await {
// We have this room locally.
summary
} else if let Some((SpaceHierarchyParentSummary { summary, .. }, _)) =
self.fetch_remote_summary(room_id, via, false).await?
{
// A via has this room.
summary
} else {
// We don't have this room and none of the vias have it either.
return Ok(Accessibility::NotFound);
}
};
// Check if the room is visible to the querying user.
if !self.user_may_see_summary(querying_user, &summary).await {
return Ok(Accessibility::Inaccessible);
}
Ok(Accessibility::Accessible(summary))
}
/// Fetch information about a room and its children, possibly by querying
/// over federation if we don't have the room locally.
///
/// This is similar to [`Self::get_room_summary_for_user`] but includes
/// additional data which is needed to traverse the room hierarchy.
async fn get_room_summary_and_children_for_user(
&self,
querying_user: Option<&UserId>,
room_id: &RoomId,
via: Option<&[OwnedServerName]>,
suggested_only: bool,
) -> Result<Accessibility<SpaceSummaryAndChildren>> {
let (summary, inaccessible_children) = {
if let Some(summary) = self.build_local_room_summary(room_id).await {
// We have this room locally.
let children_state = self.get_space_child_events(room_id).await;
// All of the room's children are accessible to this server (because we have the
// full room and its state), although some of them may not be accessible to
// the querying user.
(SpaceHierarchyRoomsChunk::new(summary, children_state), vec![])
} else if let Some(via) = via
&& let Some((
SpaceHierarchyParentSummary { summary, children_state, .. },
inaccessible_children,
)) = self
.fetch_remote_summary(room_id, via, suggested_only)
.await?
{
// A via has this room.
(SpaceHierarchyRoomsChunk::new(summary, children_state), inaccessible_children)
} else {
// We don't have this room and none of the vias have it either.
return Ok(Accessibility::NotFound);
}
};
// Check if the room is visible to the querying user.
if !self
.user_may_see_summary(querying_user, &summary.summary)
.await
{
return Ok(Accessibility::Inaccessible);
}
let children = summary
.children_state
.iter()
// Ignore deserialization failures
.flat_map(Raw::deserialize)
// Filter out non-suggested children if suggested_only is set
.filter(|child| !suggested_only || child.content.suggested)
.map(|child| SpaceChild { room_id: child.state_key, via: child.content.via })
.collect();
Ok(Accessibility::Accessible(SpaceSummaryAndChildren {
summary,
children,
inaccessible_children,
}))
}
/// Summarize a room and its children for a local user, possibly by querying
/// over federation if we don't have the space locally.
pub async fn get_room_hierarchy_for_user(
&self,
querying_user: &UserId,
room_id: OwnedRoomId,
max_depth: Option<UInt>,
suggested_only: bool,
) -> Result<Accessibility<Vec<SpaceHierarchyRoomsChunk>>> {
// This function traverses the space hierarchy tree depth-first as required by
// the specification.
// Check accessibility of the root room first, because we need to error out
// if it isn't accessible.
// TODO refactor this once the Try trait is stable
let root_summary = match self
.get_room_summary_and_children_for_user(
Some(querying_user),
&room_id,
// Clients can't specify vias for the root room
None,
suggested_only,
)
.await?
{
| Accessibility::Accessible(root_summary) => root_summary,
| Accessibility::Inaccessible => return Ok(Accessibility::Inaccessible),
| Accessibility::NotFound => return Ok(Accessibility::NotFound),
};
let mut queue = vec![root_summary.children];
let mut summaries = vec![root_summary.summary];
let mut inaccessible_children: HashSet<_> =
root_summary.inaccessible_children.into_iter().collect();
// TODO refactor this with Vec::peek_mut once it's stabilized
loop {
let Some(layer) = queue.last_mut() else {
break;
};
let Some(SpaceChild { room_id, via }) = layer.pop() else {
// If this layer is empty, discard it from the queue and continue
queue.pop();
continue;
};
// Do not request rooms which have been determined to be inaccessible
if inaccessible_children.contains(&room_id) {
continue;
}
let summary = match self
.get_room_summary_and_children_for_user(
Some(querying_user),
&room_id,
Some(&via),
suggested_only,
)
.await
{
| Ok(Accessibility::Accessible(summary)) => summary,
| Ok(Accessibility::Inaccessible) => {
// Mark this room as inaccessible and skip it
inaccessible_children.insert(room_id);
continue;
},
| Ok(Accessibility::NotFound) => {
// Skip children which we can't find
continue;
},
| Err(_) => {
// Skip children which we failed to fetch over federation
continue;
},
};
summaries.push(summary.summary);
inaccessible_children.extend(summary.inaccessible_children.into_iter());
// Don't traverse the tree deeper than max_depth
#[allow(
clippy::as_conversions,
clippy::arithmetic_side_effects,
reason = "queue.len() should never be large enough to cause strange behavior \
here"
)]
if max_depth.is_some_and(|max_depth| (queue.len() as u64 + 1) > max_depth.into()) {
continue;
}
// Add accessible children as a new layer
if !summary.children.is_empty() {
queue.push(summary.children);
}
}
Ok(Accessibility::Accessible(summaries))
}
/// Summarize a _local_ room and its children for a remote server.
pub async fn get_local_room_summary_for_server(
&self,
querying_server: &ServerName,
room_id: &RoomId,
suggested_only: bool,
) -> Accessibility<get_hierarchy::v1::Response> {
let Some(summary) = self.build_local_room_summary(room_id).await else {
return Accessibility::NotFound;
};
// Check if the server can see the root room's summary
if !self.server_may_see_summary(querying_server, &summary).await {
return Accessibility::Inaccessible;
}
let children_state = self.get_space_child_events(room_id).await;
let (accessible_children, inaccessible_children) = children_state
.iter()
// Ignore deserialization failures
.flat_map(Raw::deserialize)
// Filter out non-suggested children if suggested_only is set
.filter(|child| !suggested_only || child.content.suggested)
// Fetch summaries for the children in parallel
.stream()
.broad_then(async |child| {
let summary = {
if let Some(summary) = self.build_local_room_summary(&child.state_key).await {
if self.server_may_see_summary(querying_server, &summary).await {
Accessibility::Accessible(summary)
} else {
Accessibility::Inaccessible
}
} else {
Accessibility::NotFound
}
};
(child.state_key, summary)
})
// Sort the children into two Vecs by accessibility
.ready_fold_default(|(mut accessible_children, mut inaccessible_children): (Vec<_>, Vec<_>), (room_id, summary)| {
match summary {
Accessibility::Accessible(summary) => {
accessible_children.push(summary);
},
Accessibility::Inaccessible => {
inaccessible_children.push(room_id);
},
Accessibility::NotFound => {
// Skip inaccessible children
}
}
(accessible_children, inaccessible_children)
})
.await;
Accessibility::Accessible(assign!(
get_hierarchy::v1::Response::new(SpaceHierarchyParentSummary::new(summary, children_state)),
{ children: accessible_children, inaccessible_children: inaccessible_children }
))
}
/// Prepare a summary of a room known to this server.
async fn build_local_room_summary(&self, room_id: &RoomId) -> Option<RoomSummary> {
// If we can't find a version for this room, it doesn't exist.
let room_version = self.services.state.get_room_version(room_id).await.ok()?;
let (
join_rule,
guest_can_join,
num_joined_members,
world_readable,
canonical_alias,
name,
topic,
avatar_url,
room_type,
encryption,
) = async {
futures::join!(
self.services.state_accessor.get_join_rules(room_id),
self.services.state_accessor.guest_can_join(room_id),
self.services
.state_cache
.room_joined_count(room_id)
.unwrap_or(0),
self.services.state_accessor.is_world_readable(room_id),
self.services
.state_accessor
.get_canonical_alias(room_id)
.ok(),
self.services.state_accessor.get_name(room_id).ok(),
self.services.state_accessor.get_room_topic(room_id).ok(),
self.services
.state_accessor
.get_avatar(room_id)
.map(|res| res.into_option().unwrap_or_default().url),
self.services.state_accessor.get_room_type(room_id).ok(),
self.services
.state_accessor
.get_room_encryption(room_id)
.ok(),
)
}
.boxed()
.await;
let summary = assign!(
RoomSummary::new(
room_id.to_owned(),
join_rule.into(),
guest_can_join,
num_joined_members.try_into().expect("number of joined members should fit into a UInt"),
world_readable,
),
{
canonical_alias: canonical_alias,
name: name,
topic: topic,
avatar_url: avatar_url,
room_type: room_type,
encryption: encryption,
room_version: Some(room_version),
}
);
Some(summary)
}
/// Query remote servers for the summary of a room.
async fn fetch_remote_summary(
&self,
room_id: &RoomId,
via: &[OwnedServerName],
suggested_only: bool,
) -> Result<Option<(SpaceHierarchyParentSummary, Vec<OwnedRoomId>)>> {
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden("This room is blocked by this server.")));
}
if via.is_empty() {
return Err!(Request(MissingParam(
"No servers were provided with which to query this room's summary."
)));
}
let request = assign!(get_hierarchy::v1::Request::new(room_id.to_owned()), { suggested_only: suggested_only });
for server in via {
info!(%room_id, %server, "Asking for room summary over federation");
match self
.services
.sending
.send_federation_request(server, request.clone())
.await
{
| Ok(get_hierarchy::v1::Response { room, inaccessible_children, .. }) => {
if room.summary.room_id != room_id {
warn!(
%server,
expected_room = %room_id,
returned_room = %room.summary.room_id,
"Server didn't return the room we asked for"
);
continue;
}
return Ok(Some((room, inaccessible_children)));
},
| Err(err) => {
info!(%room_id, %server, %err, "Server could not provide a summary for this room");
},
}
}
info!(%room_id, "No servers queried could provide a summary for this room");
Ok(None)
}
/// Get the stripped m.space.child events of a room.
async fn get_space_child_events(
&self,
room_id: &RoomId,
) -> Vec<Raw<HierarchySpaceChildEvent>> {
let current_shortstatehash = self
.services
.state
.get_room_shortstatehash(room_id)
.await
.expect("room should have a current state");
self.services
.state_accessor
.state_keys_with_ids(current_shortstatehash, &StateEventType::SpaceChild)
.broad_filter_map(move |(state_key, event_id): (_, OwnedEventId)| async move {
self.services
.timeline
.get_pdu(&event_id)
.map_ok(move |pdu| (state_key, pdu))
.ok()
.await
})
.ready_filter_map(move |(state_key, pdu)| {
let Ok(content) = pdu.get_content::<SpaceChildEventContent>() else {
return None;
};
if content.via.is_empty() {
return None;
}
if RoomId::parse(&state_key).is_err() {
return None;
}
Some(pdu.into_format())
})
.collect()
.await
}
/// Determine if a user (possibly anonymous) may view a room summary.
async fn user_may_see_summary(
&self,
querying_user: Option<&UserId>,
summary: &RoomSummary,
) -> bool {
// Anyone can view the summary of world-readable rooms.
if summary.world_readable {
return true;
}
// If the user is joined or invited they may always view the summary.
if let Some(querying_user) = querying_user
&& (self
.services
.state_cache
.is_joined(querying_user, &summary.room_id)
.await || self
.services
.state_cache
.is_invited(querying_user, &summary.room_id)
.await)
{
return true;
}
// Otherwise, visibility depends on the join rule.
match (&summary.join_rule, querying_user) {
// Anyone can view summaries for `public`, `knock`, and `knock_restricted` rooms.
| (
JoinRuleSummary::Public
| JoinRuleSummary::Knock
| JoinRuleSummary::KnockRestricted(_),
_,
) => true,
// The user may be able to view the summary for a `restricted` room, even if they
// aren't invited, provided they're in one of the allowed rooms.
| (
JoinRuleSummary::Restricted(RestrictedSummary { allowed_room_ids, .. }),
Some(querying_user),
) =>
self.services
.state_cache
.rooms_joined(querying_user)
.ready_any(|room_id| allowed_room_ids.contains(&room_id))
.await,
// In all other cases, the user may not view the summary.
| _ => false,
}
}
// Determine if a remote server may view a room summary.
async fn server_may_see_summary(
&self,
querying_server: &ServerName,
summary: &RoomSummary,
) -> bool {
// Servers may not see summaries of rooms they're ACLed from.
if self
.services
.event_handler
.acl_check(querying_server, &summary.room_id)
.await
.is_err()
{
return false;
}
// Servers may always see summaries if any of their users are participating in
// the room. It's the requesting server's job to restrict visibility on a
// per-user basis.
if self
.services
.state_cache
.server_in_room(querying_server, &summary.room_id)
.await
{
return true;
}
// If the server isn't in the room, the same visibility rules apply as for
// anonymous summary requests.
self.user_may_see_summary(None, summary).await
}
}
+3 -12
View File
@@ -83,7 +83,7 @@ where
body,
Some(pdu.event_id().into()),
source,
pdu.sender.clone().into(),
pdu.sender.clone(),
)?;
}
}
@@ -224,7 +224,7 @@ where
let target_user_id = UserId::parse(state_key)?;
if self.services.users.is_active_local(&target_user_id).await {
push_target.insert(target_user_id.to_owned());
push_target.insert(target_user_id.clone());
}
}
}
@@ -320,15 +320,6 @@ where
},
}
},
| TimelineEventType::SpaceChild =>
if let Some(_state_key) = pdu.state_key() {
self.services
.spaces
.roomid_spacehierarchy_cache
.lock()
.await
.remove(room_id);
},
| TimelineEventType::RoomMember => {
if let Some(state_key) = pdu.state_key() {
// if the state_key fails
@@ -410,7 +401,7 @@ where
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
{
let appservice_uid = appservice.registration.sender_localpart.as_str();
if state_key_uid == &appservice_uid {
if state_key_uid == appservice_uid {
self.services
.sending
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?;
-2
View File
@@ -79,7 +79,6 @@ struct Services {
pusher: Dep<pusher::Service>,
threads: Dep<rooms::threads::Service>,
search: Dep<rooms::search::Service>,
spaces: Dep<rooms::spaces::Service>,
event_handler: Dep<rooms::event_handler::Service>,
}
@@ -111,7 +110,6 @@ impl crate::Service for Service {
pusher: args.depend::<pusher::Service>("pusher"),
threads: args.depend::<rooms::threads::Service>("rooms::threads"),
search: args.depend::<rooms::search::Service>("rooms::search"),
spaces: args.depend::<rooms::spaces::Service>("rooms::spaces"),
event_handler: args
.depend::<rooms::event_handler::Service>("rooms::event_handler"),
},
+1 -1
View File
@@ -101,11 +101,11 @@ impl Services {
read_receipt: build!(rooms::read_receipt::Service),
search: build!(rooms::search::Service),
short: build!(rooms::short::Service),
spaces: build!(rooms::spaces::Service),
state: build!(rooms::state::Service),
state_accessor: build!(rooms::state_accessor::Service),
state_cache: build!(rooms::state_cache::Service),
state_compressor: build!(rooms::state_compressor::Service),
summary: build!(rooms::summary::Service),
threads: build!(rooms::threads::Service),
timeline: build!(rooms::timeline::Service),
typing: build!(rooms::typing::Service),