From 0d2eeed56761225867fda3b0db9a00e076b7eda7 Mon Sep 17 00:00:00 2001 From: Ginger Date: Tue, 5 May 2026 14:18:32 -0400 Subject: [PATCH] refactor: Move room joining logic into a new service --- src/admin/user/commands.rs | 62 +- src/api/client/account/mod.rs | 2 +- src/api/client/account/register.rs | 22 +- src/api/client/membership/join.rs | 688 +------------------- src/api/client/membership/knock.rs | 8 +- src/api/client/membership/leave.rs | 3 +- src/api/client/membership/mod.rs | 91 +-- src/api/client/mod.rs | 2 +- src/service/rooms/membership/mod.rs | 933 ++++++++++++++++++++++++++++ src/service/rooms/mod.rs | 2 + src/service/services.rs | 1 + 11 files changed, 1009 insertions(+), 805 deletions(-) create mode 100644 src/service/rooms/membership/mod.rs diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index ab11877a8..5e56c6451 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -4,8 +4,7 @@ use std::{ }; use api::client::{ - full_user_deactivate, join_room_by_id_helper, leave_room, recreate_push_rules_and_return, - remote_leave_room, + full_user_deactivate, leave_room, recreate_push_rules_and_return, remote_leave_room, }; use conduwuit::{ Err, Result, debug_warn, error, info, @@ -135,17 +134,20 @@ pub(super) async fn create_user(&self, username: String, password: Option { info!("Automatically joined room {room} for user {user_id}"); @@ -628,14 +630,12 @@ pub(super) async fn force_join_list_of_local_users( let mut successful_joins: usize = 0; for user_id in user_ids { - match join_room_by_id_helper( - self.services, - &user_id, - &room_id, - Some(String::from(BULK_JOIN_REASON)), - &servers, - ) - .await + match self + .services + .rooms + .membership + .join_room(&user_id, &room_id, Some(String::from(BULK_JOIN_REASON)), &servers) + .await { | Ok(_res) => { successful_joins = successful_joins.saturating_add(1); @@ -711,14 +711,12 @@ pub(super) async fn force_join_all_local_users( .collect::>() .await { - match join_room_by_id_helper( - self.services, - user_id, - &room_id, - Some(String::from(BULK_JOIN_REASON)), - &servers, - ) - .await + match self + .services + .rooms + .membership + .join_room(user_id, &room_id, Some(String::from(BULK_JOIN_REASON)), &servers) + .await { | Ok(_res) => { successful_joins = successful_joins.saturating_add(1); @@ -755,7 +753,11 @@ pub(super) async fn force_join_room( self.services.globals.user_is_local(&user_id), "Parsed user_id must be a local user" ); - join_room_by_id_helper(self.services, &user_id, &room_id, None, &servers).await?; + self.services + .rooms + .membership + .join_room(&user_id, &room_id, None, &servers) + .await?; self.write_str(&format!("{user_id} has been joined to {room_id}.")) .await diff --git a/src/api/client/account/mod.rs b/src/api/client/account/mod.rs index b1692952a..49df8af37 100644 --- a/src/api/client/account/mod.rs +++ b/src/api/client/account/mod.rs @@ -26,7 +26,7 @@ use ruma::{ }; use service::{mailer::messages, uiaa::Identity, users::HashedPassword}; -use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH, join_room_by_id_helper}; +use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH}; use crate::Ruma; pub(crate) mod register; diff --git a/src/api/client/account/register.rs b/src/api/client/account/register.rs index a4424425f..6556fbd43 100644 --- a/src/api/client/account/register.rs +++ b/src/api/client/account/register.rs @@ -29,7 +29,7 @@ use ruma::{ use serde_json::value::RawValue; use service::{mailer::messages, users::HashedPassword}; -use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH, join_room_by_id_helper}; +use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH}; use crate::Ruma; const RANDOM_USER_ID_LENGTH: usize = 10; @@ -278,15 +278,17 @@ pub(crate) async fn register_route( } if let Some(room_server_name) = room.server_name() { - match join_room_by_id_helper( - &services, - &user_id, - &room_id, - Some("Automatically joining this room upon registration".to_owned()), - &[services.globals.server_name().to_owned(), room_server_name.to_owned()], - ) - .boxed() - .await + match services + .rooms + .membership + .join_room( + &user_id, + &room_id, + Some("Automatically joining this room upon registration".to_owned()), + &[services.globals.server_name().to_owned(), room_server_name.to_owned()], + ) + .boxed() + .await { | Err(e) => { // don't return this error so we don't fail registrations diff --git a/src/api/client/membership/join.rs b/src/api/client/membership/join.rs index b708a145a..69183c144 100644 --- a/src/api/client/membership/join.rs +++ b/src/api/client/membership/join.rs @@ -1,57 +1,18 @@ -use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc}; - use axum::extract::State; use axum_client_ip::ClientIp; use conduwuit::{ - Err, Result, debug, debug_info, debug_warn, err, error, info, is_true, - matrix::{ - StateKey, - event::{gen_event_id, gen_event_id_canonical_json}, - pdu::{PartialPdu, PduEvent}, - state_res, - }, + Err, Result, debug, result::FlatOk, - trace, - utils::{ - self, shuffle, - stream::{IterStream, ReadyExt}, - to_canonical_object, - }, - warn, + utils::{shuffle, stream::IterStream}, }; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{FutureExt, StreamExt}; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, - RoomVersionId, UserId, - api::{ - client::membership::{join_room_by_id, join_room_by_id_or_alias}, - error::{ErrorKind, IncompatibleRoomVersionErrorData}, - federation::{self}, - }, - canonical_json::to_canonical_value, - events::{ - StateEventType, - room::{ - join_rules::JoinRule, - member::{MembershipState, RoomMemberEventContent}, - }, - }, + OwnedRoomId, OwnedServerName, OwnedUserId, UserId, + api::client::membership::{join_room_by_id, join_room_by_id_or_alias}, }; -use service::{ - Services, - rooms::{ - state::RoomMutexGuard, - state_compressor::{CompressedState, HashSetCompressStateEvent}, - timeline::pdu_fits, - }, -}; -use tokio::join; -use super::{banned_room_check, validate_remote_member_event_stub}; -use crate::{ - Ruma, - server::{select_authorising_user, user_can_perform_restricted_join}, -}; +use super::banned_room_check; +use crate::Ruma; /// # `POST /_matrix/client/r0/rooms/{roomId}/join` /// @@ -111,9 +72,14 @@ pub(crate) async fn join_room_by_id_route( shuffle(&mut servers); let servers = deprioritize(servers, &services.config.deprioritize_joins_through_servers); - join_room_by_id_helper(&services, sender_user, &body.room_id, body.reason.clone(), &servers) + let room_id = services + .rooms + .membership + .join_room(sender_user, &body.room_id, body.reason.clone(), &servers) .boxed() - .await + .await?; + + Ok(join_room_by_id::v3::Response::new(room_id)) } /// # `POST /_matrix/client/r0/join/{roomIdOrAlias}` @@ -226,632 +192,14 @@ pub(crate) async fn join_room_by_id_or_alias_route( }; let servers = deprioritize(servers, &services.config.deprioritize_joins_through_servers); - let join_room_response = - join_room_by_id_helper(&services, sender_user, &room_id, body.reason.clone(), &servers) - .boxed() - .await?; - - Ok(join_room_by_id_or_alias::v3::Response::new(join_room_response.room_id)) -} - -pub async fn join_room_by_id_helper( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - reason: Option, - servers: &[OwnedServerName], -) -> Result { - let state_lock = services.rooms.state.mutex.lock(room_id).await; - - if services + let room_id = services .rooms - .state_cache - .is_joined(sender_user, room_id) - .await - { - debug_warn!("{sender_user} is already joined in {room_id}"); - return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())); - } - - if let Err(e) = services - .antispam - .user_may_join_room( - sender_user.to_owned(), - room_id.to_owned(), - services - .rooms - .state_cache - .is_invited(sender_user, room_id) - .await, - ) - .await - { - warn!("Antispam prevented user {} from joining room {}: {}", sender_user, room_id, e); - return Err!(Request(Forbidden("You are not allowed to join this room."))); - } - - let server_in_room = services - .rooms - .state_cache - .server_in_room(services.globals.server_name(), room_id) - .await; - - // Only check our known membership if we're already in the room. - // See: https://forgejo.ellis.link/continuwuation/continuwuity/issues/855 - let membership = if server_in_room { - services - .rooms - .state_accessor - .get_member(room_id, sender_user) - .await - } else { - debug!("Ignoring local state for join {room_id}, we aren't in the room yet."); - Ok(RoomMemberEventContent::new(MembershipState::Leave)) - }; - if let Ok(m) = membership { - if m.membership == MembershipState::Ban { - debug_warn!("{sender_user} is banned from {room_id} but attempted to join"); - // TODO: return reason - return Err!(Request(Forbidden("You are banned from the room."))); - } - } - - if !server_in_room && servers.is_empty() { - return Err!(Request(NotFound( - "No servers were provided to assist in joining the room remotely, and we are not \ - already participating in the room." - ))); - } - - if services.antispam.check_all_joins() { - if let Err(e) = services - .antispam - .meowlnir_accept_make_join(room_id.to_owned(), sender_user.to_owned()) - .await - { - warn!("Antispam prevented user {} from joining room {}: {}", sender_user, room_id, e); - return Err!(Request(Forbidden("Antispam rejected join request."))); - } - } - - if server_in_room { - join_room_by_id_helper_local(services, sender_user, room_id, reason, servers, state_lock) - .boxed() - .await?; - } else { - // Ask a remote server if we are not participating in this room - join_room_by_id_helper_remote( - services, - sender_user, - room_id, - reason, - servers, - state_lock, - ) + .membership + .join_room(sender_user, &room_id, body.reason.clone(), &servers) .boxed() .await?; - } - Ok(join_room_by_id::v3::Response::new(room_id.to_owned())) -} -#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote", level = "info")] -async fn join_room_by_id_helper_remote( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - reason: Option, - servers: &[OwnedServerName], - state_lock: RoomMutexGuard, -) -> Result { - info!("Joining {room_id} over federation."); - - let (make_join_response, remote_server) = - make_join_request(services, sender_user, room_id, servers).await?; - - info!("make_join finished"); - - let room_version = make_join_response.room_version.unwrap_or(RoomVersionId::V1); - let room_version_rules = room_version - .rules() - .expect("room version should have defined rules"); - - if !services.server.supported_room_version(&room_version) { - // How did we get here? - return Err!(BadServerResponse("Remote room version {room_version} is not supported")); - } - - let mut join_event_stub: CanonicalJsonObject = - serde_json::from_str(make_join_response.event.get()).map_err(|e| { - err!(BadServerResponse(warn!( - "Invalid make_join event json received from server: {e:?}" - ))) - })?; - - let join_authorized_via_users_server = { - use RoomVersionId::*; - if !matches!(room_version, V1 | V2 | V3 | V4 | V5 | V6 | V7) { - join_event_stub - .get("content") - .map(|s| { - s.as_object()? - .get("join_authorised_via_users_server")? - .as_str() - }) - .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()) - } else { - None - } - }; - - join_event_stub.insert( - "origin_server_ts".to_owned(), - CanonicalJsonValue::Integer( - utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - ), - ); - - let mut join_content = RoomMemberEventContent::new(MembershipState::Join); - join_content.displayname = services.users.displayname(sender_user).await.ok(); - join_content.avatar_url = services.users.avatar_url(sender_user).await.ok(); - join_content.blurhash = services.users.blurhash(sender_user).await.ok(); - join_content.reason = reason; - join_content - .join_authorized_via_users_server - .clone_from(&join_authorized_via_users_server); - - join_event_stub.insert( - "content".to_owned(), - to_canonical_value(join_content).expect("event is valid, we just created it"), - ); - - // Remove event id if it exists - join_event_stub.remove("event_id"); - - // In order to create a compatible ref hash (EventID) the `hashes` field needs - // to be present - services - .server_keys - .hash_and_sign_event(&mut join_event_stub, &room_version_rules)?; - - // Generate event id - let event_id = gen_event_id(&join_event_stub, &room_version_rules)?; - - // Add event_id back - join_event_stub - .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); - - // It has enough fields to be called a proper event now - let mut join_event = join_event_stub; - - info!("Asking {remote_server} for send_join in room {room_id}"); - let send_join_request = federation::membership::create_join_event::v2::Request::new( - room_id.to_owned(), - event_id.clone(), - services - .sending - .convert_to_outgoing_federation_event(join_event.clone()) - .await, - ); - - let send_join_response = match services - .sending - .send_synapse_request(&remote_server, send_join_request) - .await - { - | Ok(response) => response, - | Err(e) => { - error!("send_join failed: {e}"); - return Err(e); - }, - }; - - info!("send_join finished"); - - if join_authorized_via_users_server.is_some() { - if let Some(signed_raw) = &send_join_response.room_state.event { - debug_info!( - "There is a signed event with join_authorized_via_users_server. This room is \ - probably using restricted joins. Adding signature to our event" - ); - - let (signed_event_id, signed_value) = - gen_event_id_canonical_json(signed_raw, &room_version_rules).map_err(|e| { - err!(Request(BadJson(warn!( - "Could not convert event to canonical JSON: {e}" - )))) - })?; - - if signed_event_id != event_id { - return Err!(Request(BadJson(warn!( - %signed_event_id, %event_id, - "Server {remote_server} sent event with wrong event ID" - )))); - } - - match signed_value["signatures"] - .as_object() - .ok_or_else(|| { - err!(BadServerResponse(warn!( - "Server {remote_server} sent invalid signatures type" - ))) - }) - .and_then(|e| { - e.get(remote_server.as_str()).ok_or_else(|| { - err!(BadServerResponse(warn!( - "Server {remote_server} did not send its signature for a restricted \ - room" - ))) - }) - }) { - | Ok(signature) => { - join_event - .get_mut("signatures") - .expect("we created a valid pdu") - .as_object_mut() - .expect("we created a valid pdu") - .insert(remote_server.to_string(), signature.clone()); - }, - | Err(e) => { - warn!( - "Server {remote_server} sent invalid signature in send_join signatures \ - for event {signed_value:?}: {e:?}", - ); - }, - } - } - } - - services - .rooms - .short - .get_or_create_shortroomid(room_id) - .await; - - info!("Parsing join event"); - let parsed_join_pdu = PduEvent::from_id_val(&event_id, join_event.clone()) - .map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?; - - info!("Acquiring server signing keys for response events"); - let resp_events = &send_join_response.room_state; - let resp_state = &resp_events.state; - let resp_auth = &resp_events.auth_chain; - services - .server_keys - .acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter())) - .await; - - info!("Going through send_join response room_state"); - let cork = services.db.cork_and_flush(); - let state = send_join_response - .room_state - .state - .iter() - .stream() - .then(|pdu| { - services - .server_keys - .validate_and_add_event_id_no_fetch(pdu, &room_version_rules) - .inspect_err(|e| { - debug_warn!("Could not validate send_join response room_state event: {e:?}"); - }) - .inspect(|_| debug!("Completed validating send_join response room_state event")) - }) - .ready_filter_map(Result::ok) - .fold(HashMap::new(), |mut state, (event_id, value)| async move { - let pdu = match PduEvent::from_id_val(&event_id, value.clone()) { - | Ok(pdu) => pdu, - | Err(e) => { - debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}"); - return state; - }, - }; - if !pdu_fits(&mut value.clone()) { - warn!( - "dropping incoming PDU {event_id} in room {room_id} from room join because \ - it exceeds 65535 bytes or is otherwise too large." - ); - return state; - } - services.rooms.outlier.add_pdu_outlier(&event_id, &value); - if let Some(state_key) = &pdu.state_key { - let shortstatekey = services - .rooms - .short - .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key) - .await; - - state.insert(shortstatekey, pdu.event_id.clone()); - } - state - }) - .await; - - drop(cork); - - info!("Going through send_join response auth_chain"); - let cork = services.db.cork_and_flush(); - send_join_response - .room_state - .auth_chain - .iter() - .stream() - .then(|pdu| { - services - .server_keys - .validate_and_add_event_id_no_fetch(pdu, &room_version_rules) - }) - .ready_filter_map(Result::ok) - .ready_for_each(|(event_id, value)| { - trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain"); - services.rooms.outlier.add_pdu_outlier(&event_id, &value); - }) - .await; - - drop(cork); - - debug!("Running send_join auth check"); - let fetch_state = &state; - let state_fetch = |k: StateEventType, s: StateKey| async move { - let shortstatekey = services.rooms.short.get_shortstatekey(&k, &s).await.ok()?; - - let event_id = fetch_state.get(&shortstatekey)?; - services.rooms.timeline.get_pdu(event_id).await.ok() - }; - - let auth_check = state_res::event_auth::auth_check( - &room_version.rules().unwrap(), - &parsed_join_pdu, - None, // TODO: third party invite - |k, s| state_fetch(k.clone(), s.into()), - &state_fetch(StateEventType::RoomCreate, "".into()) - .await - .expect("create event is missing from send_join auth"), - ) - .await - .map_err(|e| err!(Request(Forbidden(warn!("Auth check failed: {e:?}")))))?; - - if !auth_check { - return Err!(Request(Forbidden("Auth check failed"))); - } - - info!("Compressing state from send_join"); - let compressed: CompressedState = services - .rooms - .state_compressor - .compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow()))) - .collect() - .await; - - debug!("Saving compressed state"); - let HashSetCompressStateEvent { - shortstatehash: statehash_before_join, - added, - removed, - } = services - .rooms - .state_compressor - .save_state(room_id, Arc::new(compressed)) - .await?; - - debug!("Forcing state for new room"); - services - .rooms - .state - .force_state(room_id, statehash_before_join, added, removed, &state_lock) - .await?; - - debug!("Updating joined counts for new room"); - services - .rooms - .state_cache - .update_joined_count(room_id) - .await; - - // We append to state before appending the pdu, so we don't have a moment in - // time with the pdu without it's state. This is okay because append_pdu can't - // fail. - let statehash_after_join = services - .rooms - .state - .append_to_state(&parsed_join_pdu, room_id) - .await?; - - info!("Appending new room join event"); - services - .rooms - .timeline - .append_pdu( - &parsed_join_pdu, - join_event, - once(parsed_join_pdu.event_id.borrow()), - &state_lock, - room_id, - ) - .await?; - - info!("Setting final room state for new room"); - // We set the room state after inserting the pdu, so that we never have a moment - // in time where events in the current room state do not exist - services - .rooms - .state - .set_room_state(room_id, statehash_after_join, &state_lock); - - Ok(()) -} - -#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local", level = "info")] -async fn join_room_by_id_helper_local( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - reason: Option, - servers: &[OwnedServerName], - state_lock: RoomMutexGuard, -) -> Result { - info!("Joining room locally"); - - let (room_version, join_rules, is_invited) = join!( - services.rooms.state.get_room_version(room_id), - services.rooms.state_accessor.get_join_rules(room_id), - services.rooms.state_cache.is_invited(sender_user, room_id) - ); - - let room_version = room_version?; - let mut auth_user: Option = None; - if !is_invited && matches!(join_rules, JoinRule::Restricted(_) | JoinRule::KnockRestricted(_)) - { - use RoomVersionId::*; - if !matches!(room_version, V1 | V2 | V3 | V4 | V5 | V6 | V7) { - // This is a restricted room, check if we can complete the join requirements - // locally. - let needs_auth_user = - user_can_perform_restricted_join(services, sender_user, room_id).await; - if needs_auth_user.is_ok_and(is_true!()) { - // If there was an error or the value is false, we'll try joining over - // federation. Since it's Ok(true), we can authorise this locally. - // If we can't select a local user, this will remain None, the join will fail, - // and we'll fall back to federation. - auth_user = select_authorising_user(services, room_id, sender_user, &state_lock) - .await - .ok(); - } - } - } - - let mut content = RoomMemberEventContent::new(MembershipState::Join); - content.displayname = services.users.displayname(sender_user).await.ok(); - content.avatar_url = services.users.avatar_url(sender_user).await.ok(); - content.blurhash = services.users.blurhash(sender_user).await.ok(); - content.reason.clone_from(&reason); - content.join_authorized_via_users_server = auth_user; - - // Try normal join first - let Err(error) = services - .rooms - .timeline - .build_and_append_pdu( - PartialPdu::state(sender_user.to_string(), &content), - sender_user, - Some(room_id), - &state_lock, - ) - .await - else { - info!("Joined room locally"); - return Ok(()); - }; - - if servers.is_empty() || servers.len() == 1 && services.globals.server_is_ours(&servers[0]) { - if !services.rooms.metadata.exists(room_id).await { - return Err!(Request( - Unknown( - "Room was not found locally and no servers were found to help us discover it" - ), - NOT_FOUND - )); - } - - return Err(error); - } - - info!( - ?error, - remote_servers = %servers.len(), - "Could not join room locally, attempting remote join", - ); - join_room_by_id_helper_remote(services, sender_user, room_id, reason, servers, state_lock) - .await -} - -async fn make_join_request( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - servers: &[OwnedServerName], -) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> { - let mut make_join_counter: usize = 1; - - for remote_server in servers { - if services.globals.server_is_ours(remote_server) { - continue; - } - info!( - "Asking {remote_server} for make_join (attempt {make_join_counter}/{})", - servers.len() - ); - - let mut request = federation::membership::prepare_join_event::v1::Request::new( - room_id.to_owned(), - sender_user.to_owned(), - ); - request.ver = services.server.supported_room_versions().collect(); - - let make_join_response = services - .sending - .send_federation_request(remote_server, request) - .await; - - trace!("make_join response: {:?}", make_join_response); - make_join_counter = make_join_counter.saturating_add(1); - - match make_join_response { - | Ok(response) => { - info!("Received make_join response from {remote_server}"); - if let Err(e) = validate_remote_member_event_stub( - &MembershipState::Join, - sender_user, - room_id, - &to_canonical_object(&response.event)?, - ) { - warn!("make_join response from {remote_server} failed validation: {e}"); - continue; - } - return Ok((response, remote_server.clone())); - }, - | Err(e) => match e.kind() { - | ErrorKind::UnableToAuthorizeJoin => { - info!( - "{remote_server} was unable to verify the joining user satisfied \ - restricted join requirements: {e}. Will continue trying." - ); - }, - | ErrorKind::UnableToGrantJoin => { - info!( - "{remote_server} believes the joining user satisfies restricted join \ - rules, but is unable to authorise a join for us. Will continue trying." - ); - }, - | ErrorKind::IncompatibleRoomVersion(IncompatibleRoomVersionErrorData { - room_version, - .. - }) => { - warn!( - "{remote_server} reports the room we are trying to join is \ - v{room_version}, which we do not support." - ); - return Err(e); - }, - | ErrorKind::Forbidden => { - warn!("{remote_server} refuses to let us join: {e}."); - return Err(e); - }, - | ErrorKind::NotFound => { - info!( - "{remote_server} does not know about {room_id}: {e}. Will continue \ - trying." - ); - }, - | _ => { - info!("{remote_server} failed to make_join: {e}. Will continue trying."); - }, - }, - } - } - info!("All {} servers were unable to assist in joining {room_id} :(", servers.len()); - Err!(BadServerResponse("No server available to assist in joining.")) + Ok(join_room_by_id_or_alias::v3::Response::new(room_id)) } /// Moves deprioritized servers (if any) to the back of the list. diff --git a/src/api/client/membership/knock.rs b/src/api/client/membership/knock.rs index b19cc232c..c0d9d9b3d 100644 --- a/src/api/client/membership/knock.rs +++ b/src/api/client/membership/knock.rs @@ -33,12 +33,13 @@ use ruma::{ use service::{ Services, rooms::{ + membership::validate_remote_member_event_stub, state::RoomMutexGuard, state_compressor::{CompressedState, HashSetCompressStateEvent}, }, }; -use super::{banned_room_check, join::join_room_by_id_helper, validate_remote_member_event_stub}; +use super::banned_room_check; use crate::Ruma; /// # `POST /_matrix/client/*/knock/{roomIdOrAlias}` @@ -238,7 +239,10 @@ async fn knock_room_by_id_helper( // join_room_by_id_helper We need to release the lock here and let // join_room_by_id_helper acquire it again drop(state_lock); - match join_room_by_id_helper(services, sender_user, room_id, reason.clone(), servers) + match services + .rooms + .membership + .join_room(sender_user, room_id, reason.clone(), servers) .await { | Ok(_) => return Ok(knock_room::v3::Response::new(room_id.to_owned())), diff --git a/src/api/client/membership/leave.rs b/src/api/client/membership/leave.rs index af8c042dd..dbfb6c5a1 100644 --- a/src/api/client/membership/leave.rs +++ b/src/api/client/membership/leave.rs @@ -19,9 +19,8 @@ use ruma::{ room::member::{MembershipState, RoomMemberEventContent}, }, }; -use service::Services; +use service::{Services, rooms::membership::validate_remote_member_event_stub}; -use super::validate_remote_member_event_stub; use crate::Ruma; /// # `POST /_matrix/client/v3/rooms/{roomId}/leave` diff --git a/src/api/client/membership/mod.rs b/src/api/client/membership/mod.rs index 0b6d0a906..8398e107a 100644 --- a/src/api/client/membership/mod.rs +++ b/src/api/client/membership/mod.rs @@ -13,16 +13,10 @@ use std::net::IpAddr; use axum::extract::State; use conduwuit::{Err, Result, warn}; use futures::{FutureExt, StreamExt}; -use ruma::{ - CanonicalJsonObject, OwnedRoomId, RoomId, ServerName, UserId, - api::client::membership::joined_rooms, - events::{ - StaticEventContent, - room::member::{MembershipState, RoomMemberEventContent}, - }, -}; +use ruma::{OwnedRoomId, RoomId, ServerName, UserId, api::client::membership::joined_rooms}; use service::Services; +pub use self::leave::{leave_all_rooms, leave_room, remote_leave_room}; pub(crate) use self::{ ban::ban_user_route, forget::forget_room_route, @@ -34,10 +28,6 @@ pub(crate) use self::{ members::{get_member_events_route, joined_members_route}, unban::unban_user_route, }; -pub use self::{ - join::join_room_by_id_helper, - leave::{leave_all_rooms, leave_room, remote_leave_room}, -}; use crate::{Ruma, client::full_user_deactivate}; /// # `POST /_matrix/client/r0/joined_rooms` @@ -159,80 +149,3 @@ pub(crate) async fn banned_room_check( Ok(()) } - -/// Validates that an event returned from a remote server by `/make_*` -/// actually is a membership event with the expected fields. -/// -/// Without checking this, the remote server could use the remote membership -/// mechanism to trick our server into signing arbitrary malicious events. -pub(crate) fn validate_remote_member_event_stub( - membership: &MembershipState, - user_id: &UserId, - room_id: &RoomId, - event_stub: &CanonicalJsonObject, -) -> Result<()> { - let Some(event_type) = event_stub.get("type") else { - return Err!(BadServerResponse( - "Remote server returned member event with missing type field" - )); - }; - if event_type != &RoomMemberEventContent::TYPE { - return Err!(BadServerResponse( - "Remote server returned member event with invalid event type" - )); - } - - let Some(sender) = event_stub.get("sender") else { - return Err!(BadServerResponse( - "Remote server returned member event with missing sender field" - )); - }; - if sender != &user_id.as_str() { - return Err!(BadServerResponse( - "Remote server returned member event with incorrect sender" - )); - } - - let Some(state_key) = event_stub.get("state_key") else { - return Err!(BadServerResponse( - "Remote server returned member event with missing state_key field" - )); - }; - if state_key != &user_id.as_str() { - return Err!(BadServerResponse( - "Remote server returned member event with incorrect state_key" - )); - } - - let Some(event_room_id) = event_stub.get("room_id") else { - return Err!(BadServerResponse( - "Remote server returned member event with missing room_id field" - )); - }; - if event_room_id != &room_id.as_str() { - return Err!(BadServerResponse( - "Remote server returned member event with incorrect room_id" - )); - } - - let Some(content) = event_stub - .get("content") - .and_then(|content| content.as_object()) - else { - return Err!(BadServerResponse( - "Remote server returned member event with missing content field" - )); - }; - let Some(event_membership) = content.get("membership") else { - return Err!(BadServerResponse( - "Remote server returned member event with missing membership field" - )); - }; - if event_membership != &membership.as_str() { - return Err!(BadServerResponse( - "Remote server returned member event with incorrect membership type" - )); - } - - Ok(()) -} diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 11deeadca..bc4c0413a 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -58,7 +58,7 @@ pub(super) use keys::*; pub(super) use media::*; pub(super) use media_legacy::*; pub(super) use membership::*; -pub use membership::{join_room_by_id_helper, leave_all_rooms, leave_room, remote_leave_room}; +pub use membership::{leave_all_rooms, leave_room, remote_leave_room}; pub(super) use message::*; pub(super) use mutual_rooms::*; pub(super) use openid::*; diff --git a/src/service/rooms/membership/mod.rs b/src/service/rooms/membership/mod.rs new file mode 100644 index 000000000..cbf4a0f98 --- /dev/null +++ b/src/service/rooms/membership/mod.rs @@ -0,0 +1,933 @@ +use std::{collections::HashMap, sync::Arc}; + +use conduwuit::{ + Err, Pdu, Result, Server, debug, debug_info, debug_warn, err, error, info, is_true, + matrix::{ + StateKey, + event::{gen_event_id, gen_event_id_canonical_json}, + }, + pdu::PartialPdu, + state_res, trace, + utils::{self, IterStream, ReadyExt, to_canonical_object}, + warn, +}; +use database::Database; +use futures::{FutureExt, StreamExt, TryFutureExt, join}; +use ruma::{ + CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, + RoomVersionId, UserId, + api::{ + error::{ErrorKind, IncompatibleRoomVersionErrorData}, + federation, + }, + canonical_json::to_canonical_value, + events::{ + StateEventType, StaticEventContent, + room::{ + join_rules::RoomJoinRulesEventContent, + member::{MembershipState, RoomMemberEventContent}, + }, + }, + room::{AllowRule, JoinRule}, +}; + +use crate::{ + Dep, antispam, globals, + rooms::{ + metadata, outlier, short, + state::{self, RoomMutexGuard}, + state_accessor, state_cache, + state_compressor::{self, CompressedState, HashSetCompressStateEvent}, + timeline::{self, pdu_fits}, + }, + sending, server_keys, users, +}; + +pub struct Service { + services: Services, +} + +struct Services { + server: Arc, + db: Arc, + antispam: Dep, + globals: Dep, + metadata: Dep, + outlier: Dep, + sending: Dep, + server_keys: Dep, + short: Dep, + state: Dep, + state_accessor: Dep, + state_cache: Dep, + state_compressor: Dep, + timeline: Dep, + users: Dep, +} + +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { + services: Services { + server: args.server.clone(), + db: args.db.clone(), + antispam: args.depend::("antispam"), + globals: args.depend::("globals"), + metadata: args.depend::("metadata"), + outlier: args.depend::("rooms::outlier"), + sending: args.depend::("sending"), + server_keys: args.depend::("server_keys"), + short: args.depend::("rooms::short"), + state: args.depend::("rooms::state"), + state_accessor: args.depend::("rooms::state_accessor"), + state_cache: args.depend::("rooms::state_cache"), + state_compressor: args.depend::("state_compressor"), + timeline: args.depend::("rooms::timeline"), + users: args.depend::("users"), + }, + })) + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} + +impl Service { + /// Join a local user to a room. + pub async fn join_room( + &self, + sender_user: &UserId, + room_id: &RoomId, + reason: Option, + servers: &[OwnedServerName], + ) -> Result { + assert!(self.services.globals.user_is_local(sender_user), "user should be local"); + + let state_lock = self.services.state.mutex.lock(room_id).await; + + if self + .services + .state_cache + .is_joined(sender_user, room_id) + .await + { + debug_warn!("{sender_user} is already joined in {room_id}"); + return Ok(room_id.to_owned()); + } + + if let Err(e) = self + .services + .antispam + .user_may_join_room( + sender_user.to_owned(), + room_id.to_owned(), + self.services + .state_cache + .is_invited(sender_user, room_id) + .await, + ) + .await + { + warn!("Antispam prevented user {} from joining room {}: {}", sender_user, room_id, e); + return Err!(Request(Forbidden("You are not allowed to join this room."))); + } + + let server_in_room = self + .services + .state_cache + .server_in_room(self.services.globals.server_name(), room_id) + .await; + + // Only check our known membership if we're already in the room. + // See: https://forgejo.ellis.link/continuwuation/continuwuity/issues/855 + let membership = if server_in_room { + self.services + .state_accessor + .get_member(room_id, sender_user) + .await + } else { + debug!("Ignoring local state for join {room_id}, we aren't in the room yet."); + Ok(RoomMemberEventContent::new(MembershipState::Leave)) + }; + + if let Ok(m) = membership { + if m.membership == MembershipState::Ban { + debug_warn!("{sender_user} is banned from {room_id} but attempted to join"); + // TODO: return reason + return Err!(Request(Forbidden("You are banned from the room."))); + } + } + + if !server_in_room && servers.is_empty() { + return Err!(Request(NotFound( + "No servers were provided to assist in joining the room remotely, and we are \ + not already participating in the room." + ))); + } + + if self.services.antispam.check_all_joins() { + if let Err(e) = self + .services + .antispam + .meowlnir_accept_make_join(room_id.to_owned(), sender_user.to_owned()) + .await + { + warn!( + "Antispam prevented user {} from joining room {}: {}", + sender_user, room_id, e + ); + return Err!(Request(Forbidden("Antispam rejected join request."))); + } + } + + if server_in_room { + self.join_local_room(sender_user, room_id, reason, servers, state_lock) + .boxed() + .await?; + } else { + // Ask a remote server if we are not participating in this room + self.join_remote_room(sender_user, room_id, reason, servers, state_lock) + .boxed() + .await?; + } + + Ok(room_id.to_owned()) + } + + #[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local", level = "info")] + async fn join_local_room( + &self, + sender_user: &UserId, + room_id: &RoomId, + reason: Option, + servers: &[OwnedServerName], + state_lock: RoomMutexGuard, + ) -> Result { + info!("Joining room locally"); + + let (room_version, join_rules, is_invited) = join!( + self.services.state.get_room_version(room_id), + self.services.state_accessor.get_join_rules(room_id), + self.services.state_cache.is_invited(sender_user, room_id) + ); + + let room_version = room_version?; + let room_version_rules = room_version.rules().unwrap(); + + let mut auth_user: Option = None; + if !is_invited + && matches!(join_rules, JoinRule::Restricted(_) | JoinRule::KnockRestricted(_)) + { + if room_version_rules.authorization.restricted_join_rule { + // This is a restricted room, check if we can complete the join requirements + // locally. + let needs_auth_user = self + .user_can_perform_restricted_join(sender_user, room_id) + .await; + if needs_auth_user.is_ok_and(is_true!()) { + // If there was an error or the value is false, we'll try joining over + // federation. Since it's Ok(true), we can authorise this locally. + // If we can't select a local user, this will remain None, the join will fail, + // and we'll fall back to federation. + auth_user = self + .select_authorising_user(room_id, sender_user, &state_lock) + .await + .ok(); + } + } + } + + let mut content = RoomMemberEventContent::new(MembershipState::Join); + content.displayname = self.services.users.displayname(sender_user).await.ok(); + content.avatar_url = self.services.users.avatar_url(sender_user).await.ok(); + content.blurhash = self.services.users.blurhash(sender_user).await.ok(); + content.reason.clone_from(&reason); + content.join_authorized_via_users_server = auth_user; + + // Try normal join first + let Err(error) = self + .services + .timeline + .build_and_append_pdu( + PartialPdu::state(sender_user.to_string(), &content), + sender_user, + Some(room_id), + &state_lock, + ) + .await + else { + info!("Joined room locally"); + return Ok(()); + }; + + if servers.is_empty() + || servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]) + { + if !self.services.metadata.exists(room_id).await { + return Err!(Request( + Unknown( + "Room was not found locally and no servers were found to help us \ + discover it" + ), + NOT_FOUND + )); + } + + return Err(error); + } + + info!( + ?error, + remote_servers = %servers.len(), + "Could not join room locally, attempting remote join", + ); + self.join_remote_room(sender_user, room_id, reason, servers, state_lock) + .await + } + + #[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote_room", level = "info")] + async fn join_remote_room( + &self, + sender_user: &UserId, + room_id: &RoomId, + reason: Option, + servers: &[OwnedServerName], + state_lock: RoomMutexGuard, + ) -> Result { + info!("Joining {room_id} over federation."); + + let (make_join_response, remote_server) = self + .make_join_request(sender_user, room_id, servers) + .await?; + + info!("make_join finished"); + + let room_version = make_join_response.room_version.unwrap_or(RoomVersionId::V1); + let room_version_rules = room_version + .rules() + .expect("room version should have defined rules"); + + if !self.services.server.supported_room_version(&room_version) { + // How did we get here? + return Err!(BadServerResponse( + "Remote room version {room_version} is not supported" + )); + } + + let mut join_event_stub: CanonicalJsonObject = + serde_json::from_str(make_join_response.event.get()).map_err(|e| { + err!(BadServerResponse(warn!( + "Invalid make_join event json received from server: {e:?}" + ))) + })?; + + let join_authorized_via_users_server = { + if room_version_rules + .signatures + .check_join_authorised_via_users_server + { + join_event_stub + .get("content") + .map(|s| { + s.as_object()? + .get("join_authorised_via_users_server")? + .as_str() + }) + .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()) + } else { + None + } + }; + + join_event_stub.insert( + "origin_server_ts".to_owned(), + CanonicalJsonValue::Integer( + utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + ), + ); + + let mut join_content = RoomMemberEventContent::new(MembershipState::Join); + join_content.displayname = self.services.users.displayname(sender_user).await.ok(); + join_content.avatar_url = self.services.users.avatar_url(sender_user).await.ok(); + join_content.blurhash = self.services.users.blurhash(sender_user).await.ok(); + join_content.reason = reason; + join_content + .join_authorized_via_users_server + .clone_from(&join_authorized_via_users_server); + + join_event_stub.insert( + "content".to_owned(), + to_canonical_value(join_content).expect("event is valid, we just created it"), + ); + + // Remove event id if it exists + join_event_stub.remove("event_id"); + + // In order to create a compatible ref hash (EventID) the `hashes` field needs + // to be present + self.services + .server_keys + .hash_and_sign_event(&mut join_event_stub, &room_version_rules)?; + + // Generate event id + let event_id = gen_event_id(&join_event_stub, &room_version_rules)?; + + // Add event_id back + join_event_stub + .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); + + // It has enough fields to be called a proper event now + let mut join_event = join_event_stub; + + info!("Asking {remote_server} for send_join in room {room_id}"); + let send_join_request = federation::membership::create_join_event::v2::Request::new( + room_id.to_owned(), + event_id.clone(), + self.services + .sending + .convert_to_outgoing_federation_event(join_event.clone()) + .await, + ); + + let send_join_response = match self + .services + .sending + .send_synapse_request(&remote_server, send_join_request) + .await + { + | Ok(response) => response, + | Err(e) => { + error!("send_join failed: {e}"); + return Err(e); + }, + }; + + info!("send_join finished"); + + if join_authorized_via_users_server.is_some() { + if let Some(signed_raw) = &send_join_response.room_state.event { + debug_info!( + "There is a signed event with join_authorized_via_users_server. This room \ + is probably using restricted joins. Adding signature to our event" + ); + + let (signed_event_id, signed_value) = + gen_event_id_canonical_json(signed_raw, &room_version_rules).map_err( + |e| { + err!(Request(BadJson(warn!( + "Could not convert event to canonical JSON: {e}" + )))) + }, + )?; + + if signed_event_id != event_id { + return Err!(Request(BadJson(warn!( + %signed_event_id, %event_id, + "Server {remote_server} sent event with wrong event ID" + )))); + } + + match signed_value["signatures"] + .as_object() + .ok_or_else(|| { + err!(BadServerResponse(warn!( + "Server {remote_server} sent invalid signatures type" + ))) + }) + .and_then(|e| { + e.get(remote_server.as_str()).ok_or_else(|| { + err!(BadServerResponse(warn!( + "Server {remote_server} did not send its signature for a \ + restricted room" + ))) + }) + }) { + | Ok(signature) => { + join_event + .get_mut("signatures") + .expect("we created a valid pdu") + .as_object_mut() + .expect("we created a valid pdu") + .insert(remote_server.to_string(), signature.clone()); + }, + | Err(e) => { + warn!( + "Server {remote_server} sent invalid signature in send_join \ + signatures for event {signed_value:?}: {e:?}", + ); + }, + } + } + } + + self.services.short.get_or_create_shortroomid(room_id).await; + + info!("Parsing join event"); + let parsed_join_pdu = Pdu::from_id_val(&event_id, join_event.clone()) + .map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?; + + info!("Acquiring server signing keys for response events"); + let resp_events = &send_join_response.room_state; + let resp_state = &resp_events.state; + let resp_auth = &resp_events.auth_chain; + self.services + .server_keys + .acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter())) + .await; + + info!("Going through send_join response room_state"); + let cork = self.services.db.cork_and_flush(); + let state = send_join_response + .room_state + .state + .iter() + .stream() + .then(|pdu| { + self.services + .server_keys + .validate_and_add_event_id_no_fetch(pdu, &room_version_rules) + .inspect_err(|e| { + debug_warn!( + "Could not validate send_join response room_state event: {e:?}" + ); + }) + .inspect(|_| { + debug!("Completed validating send_join response room_state event"); + }) + }) + .ready_filter_map(Result::ok) + .fold(HashMap::new(), |mut state, (event_id, value)| async move { + let pdu = match Pdu::from_id_val(&event_id, value.clone()) { + | Ok(pdu) => pdu, + | Err(e) => { + debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}"); + return state; + }, + }; + if !pdu_fits(&mut value.clone()) { + warn!( + "dropping incoming PDU {event_id} in room {room_id} from room join \ + because it exceeds 65535 bytes or is otherwise too large." + ); + return state; + } + self.services.outlier.add_pdu_outlier(&event_id, &value); + if let Some(state_key) = &pdu.state_key { + let shortstatekey = self + .services + .short + .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key) + .await; + + state.insert(shortstatekey, pdu.event_id.clone()); + } + state + }) + .await; + + drop(cork); + + info!("Going through send_join response auth_chain"); + let cork = self.services.db.cork_and_flush(); + send_join_response + .room_state + .auth_chain + .iter() + .stream() + .then(|pdu| { + self.services + .server_keys + .validate_and_add_event_id_no_fetch(pdu, &room_version_rules) + }) + .ready_filter_map(Result::ok) + .ready_for_each(|(event_id, value)| { + trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain"); + self.services.outlier.add_pdu_outlier(&event_id, &value); + }) + .await; + + drop(cork); + + debug!("Running send_join auth check"); + let fetch_state = &state; + let state_fetch = |k: StateEventType, s: StateKey| async move { + let shortstatekey = self.services.short.get_shortstatekey(&k, &s).await.ok()?; + + let event_id = fetch_state.get(&shortstatekey)?; + self.services.timeline.get_pdu(event_id).await.ok() + }; + + let auth_check = state_res::event_auth::auth_check( + &room_version.rules().unwrap(), + &parsed_join_pdu, + None, // TODO: third party invite + |k, s| state_fetch(k.clone(), s.into()), + &state_fetch(StateEventType::RoomCreate, "".into()) + .await + .expect("create event is missing from send_join auth"), + ) + .await + .map_err(|e| err!(Request(Forbidden(warn!("Auth check failed: {e:?}")))))?; + + if !auth_check { + return Err!(Request(Forbidden("Auth check failed"))); + } + + info!("Compressing state from send_join"); + let compressed: CompressedState = self + .services + .state_compressor + .compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.as_ref()))) + .collect() + .await; + + debug!("Saving compressed state"); + let HashSetCompressStateEvent { + shortstatehash: statehash_before_join, + added, + removed, + } = self + .services + .state_compressor + .save_state(room_id, Arc::new(compressed)) + .await?; + + debug!("Forcing state for new room"); + self.services + .state + .force_state(room_id, statehash_before_join, added, removed, &state_lock) + .await?; + + debug!("Updating joined counts for new room"); + self.services.state_cache.update_joined_count(room_id).await; + + // We append to state before appending the pdu, so we don't have a moment in + // time with the pdu without it's state. This is okay because append_pdu can't + // fail. + let statehash_after_join = self + .services + .state + .append_to_state(&parsed_join_pdu, room_id) + .await?; + + info!("Appending new room join event"); + self.services + .timeline + .append_pdu( + &parsed_join_pdu, + join_event, + std::iter::once(parsed_join_pdu.event_id.as_ref()), + &state_lock, + room_id, + ) + .await?; + + info!("Setting final room state for new room"); + // We set the room state after inserting the pdu, so that we never have a moment + // in time where events in the current room state do not exist + self.services + .state + .set_room_state(room_id, statehash_after_join, &state_lock); + + Ok(()) + } + + async fn make_join_request( + &self, + sender_user: &UserId, + room_id: &RoomId, + servers: &[OwnedServerName], + ) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> { + let mut make_join_counter: usize = 1; + + for remote_server in servers { + if self.services.globals.server_is_ours(remote_server) { + continue; + } + info!( + "Asking {remote_server} for make_join (attempt {make_join_counter}/{})", + servers.len() + ); + + let mut request = federation::membership::prepare_join_event::v1::Request::new( + room_id.to_owned(), + sender_user.to_owned(), + ); + request.ver = self.services.server.supported_room_versions().collect(); + + let make_join_response = self + .services + .sending + .send_federation_request(remote_server, request) + .await; + + trace!("make_join response: {:?}", make_join_response); + make_join_counter = make_join_counter.saturating_add(1); + + match make_join_response { + | Ok(response) => { + info!("Received make_join response from {remote_server}"); + if let Err(e) = validate_remote_member_event_stub( + &MembershipState::Join, + sender_user, + room_id, + &to_canonical_object(&response.event)?, + ) { + warn!("make_join response from {remote_server} failed validation: {e}"); + continue; + } + return Ok((response, remote_server.clone())); + }, + | Err(e) => match e.kind() { + | ErrorKind::UnableToAuthorizeJoin => { + info!( + "{remote_server} was unable to verify the joining user satisfied \ + restricted join requirements: {e}. Will continue trying." + ); + }, + | ErrorKind::UnableToGrantJoin => { + info!( + "{remote_server} believes the joining user satisfies restricted \ + join rules, but is unable to authorise a join for us. Will \ + continue trying." + ); + }, + | ErrorKind::IncompatibleRoomVersion(IncompatibleRoomVersionErrorData { + room_version, + .. + }) => { + warn!( + "{remote_server} reports the room we are trying to join is \ + v{room_version}, which we do not support." + ); + return Err(e); + }, + | ErrorKind::Forbidden => { + warn!("{remote_server} refuses to let us join: {e}."); + return Err(e); + }, + | ErrorKind::NotFound => { + info!( + "{remote_server} does not know about {room_id}: {e}. Will continue \ + trying." + ); + }, + | _ => { + info!("{remote_server} failed to make_join: {e}. Will continue trying."); + }, + }, + } + } + info!("All {} servers were unable to assist in joining {room_id} :(", servers.len()); + Err!(BadServerResponse("No server available to assist in joining.")) + } + + /// Attempts to find a user who is able to issue an invite in the target + /// room. + pub async fn select_authorising_user<'a>( + &self, + room_id: &'a RoomId, + user_id: &'a UserId, + state_lock: &'a RoomMutexGuard, + ) -> Result { + let candidates = self.services.state_cache.local_users_in_room(room_id); + + let mut candidates = std::pin::pin!(candidates); + + while let Some(candidate) = candidates.next().await { + if self + .services + .state_accessor + .user_can_invite(room_id, &candidate, user_id, state_lock) + .await + { + return Ok(candidate); + } + } + + Err!(Request(UnableToGrantJoin( + "No user on this server is able to assist in joining." + ))) + } + + /// Checks whether the given user can join the given room via a restricted + /// join. + pub(crate) async fn user_can_perform_restricted_join( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result { + let Ok(join_rules_event_content) = self + .services + .state_accessor + .room_state_get_content::( + room_id, + &StateEventType::RoomJoinRules, + "", + ) + .await + else { + // No join rules means there's nothing to authorise (defaults to invite) + return Ok(false); + }; + + let (JoinRule::Restricted(r) | JoinRule::KnockRestricted(r)) = + join_rules_event_content.join_rule + else { + // This is not a restricted room + return Ok(false); + }; + + if r.allow.is_empty() { + // This will never be authorisable, return forbidden. + return Err!(Request(Forbidden("You are not invited to this room."))); + } + + let mut could_satisfy = true; + for allow_rule in &r.allow { + match allow_rule { + | AllowRule::RoomMembership(membership) => { + if !self + .services + .state_cache + .server_in_room(self.services.globals.server_name(), &membership.room_id) + .await + { + // Since we can't check this room, mark could_satisfy as false + // so that we can return M_UNABLE_TO_AUTHORIZE_JOIN later. + could_satisfy = false; + continue; + } + + if self + .services + .state_cache + .is_joined(user_id, &membership.room_id) + .await + { + debug!( + "User {} is allowed to join room {} via membership in room {}", + user_id, room_id, membership.room_id + ); + return Ok(true); + } + }, + | other if other.rule_type() == "fi.mau.spam_checker" => + return match self + .services + .antispam + .meowlnir_accept_make_join(room_id.to_owned(), user_id.to_owned()) + .await + { + | Ok(()) => Ok(true), + | Err(_) => Err!(Request(Forbidden("Antispam rejected join request."))), + }, + | _ => { + // We don't recognise this join rule, so we cannot satisfy the request. + could_satisfy = false; + debug_info!( + "Unsupported allow rule in restricted join for room {}: {:?}", + room_id, + allow_rule + ); + }, + } + } + + if could_satisfy { + // We were able to check all the restrictions and can be certain that the + // prospective member is not permitted to join. + Err!(Request(Forbidden( + "You do not belong to any of the rooms or spaces required to join this room." + ))) + } else { + // We were unable to check all the restrictions. This usually means we aren't in + // one of the rooms this one is restricted to, ergo can't check its state for + // the user's membership, and consequently the user *might* be able to join if + // they ask another server. + Err!(Request(UnableToAuthorizeJoin( + "You do not belong to any of the recognised rooms or spaces required to join \ + this room, but this server is unable to verify every requirement. You may be \ + able to join via another server." + ))) + } + } +} + +/// Validates that an event returned from a remote server by `/make_*` +/// actually is a membership event with the expected fields. +/// +/// Without checking this, the remote server could use the remote membership +/// mechanism to trick our server into signing arbitrary malicious events. +pub fn validate_remote_member_event_stub( + membership: &MembershipState, + user_id: &UserId, + room_id: &RoomId, + event_stub: &CanonicalJsonObject, +) -> Result<()> { + let Some(event_type) = event_stub.get("type") else { + return Err!(BadServerResponse( + "Remote server returned member event with missing type field" + )); + }; + if event_type != &RoomMemberEventContent::TYPE { + return Err!(BadServerResponse( + "Remote server returned member event with invalid event type" + )); + } + + let Some(sender) = event_stub.get("sender") else { + return Err!(BadServerResponse( + "Remote server returned member event with missing sender field" + )); + }; + if sender != &user_id.as_str() { + return Err!(BadServerResponse( + "Remote server returned member event with incorrect sender" + )); + } + + let Some(state_key) = event_stub.get("state_key") else { + return Err!(BadServerResponse( + "Remote server returned member event with missing state_key field" + )); + }; + if state_key != &user_id.as_str() { + return Err!(BadServerResponse( + "Remote server returned member event with incorrect state_key" + )); + } + + let Some(event_room_id) = event_stub.get("room_id") else { + return Err!(BadServerResponse( + "Remote server returned member event with missing room_id field" + )); + }; + if event_room_id != &room_id.as_str() { + return Err!(BadServerResponse( + "Remote server returned member event with incorrect room_id" + )); + } + + let Some(content) = event_stub + .get("content") + .and_then(|content| content.as_object()) + else { + return Err!(BadServerResponse( + "Remote server returned member event with missing content field" + )); + }; + let Some(event_membership) = content.get("membership") else { + return Err!(BadServerResponse( + "Remote server returned member event with missing membership field" + )); + }; + if event_membership != &membership.as_str() { + return Err!(BadServerResponse( + "Remote server returned member event with incorrect membership type" + )); + } + + Ok(()) +} diff --git a/src/service/rooms/mod.rs b/src/service/rooms/mod.rs index 48dd73983..1aeae877c 100644 --- a/src/service/rooms/mod.rs +++ b/src/service/rooms/mod.rs @@ -3,6 +3,7 @@ pub mod auth_chain; pub mod directory; pub mod event_handler; pub mod lazy_loading; +pub mod membership; pub mod metadata; pub mod outlier; pub mod pdu_metadata; @@ -27,6 +28,7 @@ pub struct Service { pub directory: Arc, pub event_handler: Arc, pub lazy_loading: Arc, + pub membership: Arc, pub metadata: Arc, pub outlier: Arc, pub pdu_metadata: Arc, diff --git a/src/service/services.rs b/src/service/services.rs index 726a6842d..7e9470cf0 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -95,6 +95,7 @@ impl Services { directory: build!(rooms::directory::Service), event_handler: build!(rooms::event_handler::Service), lazy_loading: build!(rooms::lazy_loading::Service), + membership: build!(rooms::membership::Service), metadata: build!(rooms::metadata::Service), outlier: build!(rooms::outlier::Service), pdu_metadata: build!(rooms::pdu_metadata::Service),