diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index ea467754b..fc9f03fb7 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -133,6 +133,8 @@ where .filter(|id| !auth_events.contains_key(*id)) .collect::>(); if !still_missing.is_empty() { + // Don't reject: this could be a temporary condition + // TODO: use get_missing_events? return Err!(Request(InvalidParam( "Could not fetch all auth events for outlier event {event_id}, still missing: \ {still_missing:?}" @@ -163,6 +165,7 @@ where v.insert(auth_event); }, | hash_map::Entry::Occupied(_) => { + self.services.pdu_metadata.mark_event_rejected(event_id); return Err!(Request(InvalidParam( "Auth event's type and state_key combination exists multiple times: {}, {}", auth_event.kind, @@ -177,6 +180,7 @@ where auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())), Some(_) | None ) { + self.services.pdu_metadata.mark_event_rejected(event_id); return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); } @@ -185,6 +189,7 @@ where ready(auth_events_by_key.get(&key).map(ToOwned::to_owned)) }; + // PDU check: 3 let auth_check = state_res::event_auth::auth_check( &room_version_rules, &pdu_event, @@ -196,7 +201,10 @@ where .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; if !auth_check { - return Err!(Request(Forbidden("Auth check failed"))); + self.services.pdu_metadata.mark_event_rejected(event_id); + return Err!(Request(Forbidden( + "Event authorisation fails based on event's claimed auth events" + ))); } trace!("Validation successful."); diff --git a/src/service/rooms/event_handler/state_at_incoming.rs b/src/service/rooms/event_handler/state_at_incoming.rs index 1d86e5891..9f11ac4b8 100644 --- a/src/service/rooms/event_handler/state_at_incoming.rs +++ b/src/service/rooms/event_handler/state_at_incoming.rs @@ -5,7 +5,7 @@ use std::{ }; use conduwuit::{ - Result, debug, err, implement, + Result, debug, err, error, implement, matrix::{Event, StateMap}, trace, utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, @@ -121,6 +121,7 @@ where .state_resolution(room_version_rules, fork_states.iter(), &auth_chain_sets) .boxed() .await + .inspect_err(|e| error!("State resolution failed: {e:?}")) else { return Ok(None); }; diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 87e048a34..bc570e764 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,14 +1,18 @@ -use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; +use std::{borrow::Borrow, collections::BTreeMap, sync::Arc, time::Instant}; use conduwuit::{ - Err, Result, debug, debug_info, err, implement, info, is_equal_to, + Err, Result, debug, debug_info, err, implement, is_equal_to, matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, trace, - utils::stream::{BroadbandExt, ReadyExt}, + utils::{ + IterStream, + stream::{BroadbandExt, ReadyExt}, + }, warn, }; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; +use tokio::join; use super::get_room_version_rules; use crate::rooms::{ @@ -38,18 +42,24 @@ where return Ok(Some(pduid)); } - if !self - .services - .pdu_metadata - .is_event_accepted(incoming_pdu.event_id()) - .await - { + let (rejected, soft_failed) = join!( + self.services + .pdu_metadata + .is_event_rejected(incoming_pdu.event_id()), + self.services + .pdu_metadata + .is_event_soft_failed(incoming_pdu.event_id()) + ); + if rejected || soft_failed { + // TODO: debug_warn instead of warn + warn!(%rejected, %soft_failed, "Event is not accepted"); return Err!(Request(InvalidParam("Event has been rejected or soft-failed"))); } // If any of the auth events are rejected, this event is also rejected. for aid in incoming_pdu.auth_events() { if self.services.pdu_metadata.is_event_rejected(aid).await { + // TODO: debug_warn instead of warn warn!( "Rejecting incoming event {} which depends on rejected auth event {aid}", incoming_pdu.event_id() @@ -57,7 +67,7 @@ where self.services .pdu_metadata .mark_event_rejected(incoming_pdu.event_id()); - return Err!(Request(InvalidParam("Event has rejected auth events"))); + return Err!(Request(InvalidParam("Event has rejected auth event: {aid}"))); } } @@ -109,6 +119,7 @@ where event_id = %incoming_pdu.event_id, "Running initial auth check" ); + // PDU check: 5 let auth_check = state_res::event_auth::auth_check( &room_version_rules, &incoming_pdu, @@ -123,7 +134,9 @@ where self.services .pdu_metadata .mark_event_rejected(incoming_pdu.event_id()); - return Err!(Request(Forbidden("Event has failed auth check with state at the event."))); + return Err!(Request(Forbidden( + "Event authorisation fails based on the state before the event" + ))); } debug!( @@ -152,6 +165,7 @@ where event_id = %incoming_pdu.event_id, "Running auth check with claimed state auth" ); + // PDU check: 6 let auth_check = state_res::event_auth::auth_check( &room_version_rules, &incoming_pdu, @@ -161,6 +175,12 @@ where ) .await .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; + if !auth_check { + warn!( + event_id = %incoming_pdu.event_id, + "Event authentication fails based on the current state of the room" + ); + } // Soft fail check before doing state res debug!( @@ -170,16 +190,22 @@ where let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_rules)) { | (false, _) => true, | (true, None) => false, - | (true, Some(redact_id)) => - !self + | (true, Some(redact_id)) => { + if !self .services .state_accessor .user_can_redact(&redact_id, incoming_pdu.sender(), room_id, true) - .await?, + .await? + { + warn!(redacts = %redact_id, "User is not allowed to redact event"); + true + } else { + false + } + }, }; // 13. Use state resolution to find new room state - // We start looking at current room state now, so lets lock the room trace!( room_id = %room_id, @@ -187,36 +213,6 @@ where ); let state_lock = self.services.state.mutex.lock(room_id).await; - // Now we calculate the set of extremities this room has after the incoming - // event has been applied. We start with the previous extremities (aka leaves) - trace!("Calculating extremities"); - let mut extremities: Vec<_> = self - .services - .state - .get_forward_extremities(room_id) - .ready_filter(|event_id| { - // Remove any that are referenced by this incoming event's prev_events - !incoming_pdu.prev_events().any(is_equal_to!(event_id)) - }) - .broad_filter_map(|event_id| async move { - // Only keep those extremities were not referenced yet - self.services - .pdu_metadata - .is_event_referenced(room_id, &event_id) - .await - .eq(&false) - .then_some(event_id) - }) - .collect() - .await; - extremities.push(incoming_pdu.event_id().to_owned()); - - debug!( - "Retained {} extremities checked against {} prev_events", - extremities.len(), - incoming_pdu.prev_events().count() - ); - let state_ids_compressed: Arc = self .services .state_compressor @@ -311,60 +307,57 @@ where .is_event_soft_failed(&redact_id) .await { + // TODO: This should avoid pushing the event to the timeline instead of using + // soft-fails as a hack warn!( redact_id = %redact_id, - "Redaction is for a soft-failed event, soft failing the redaction" + "Redaction is for a soft-failed event" ); soft_fail = true; } } } - // 14. Check if the event passes auth based on the "current state" of the room, - // if not soft fail it - if soft_fail { - info!( - event_id = %incoming_pdu.event_id, - "Soft failing event" - ); - // assert!(extremities.is_empty(), "soft_fail extremities empty"); - let extremities = extremities.iter().map(Borrow::borrow); - debug_assert!(extremities.clone().count() > 0, "extremities not empty"); - - self.services - .timeline - .append_incoming_pdu( - &incoming_pdu, - val, - extremities, - state_ids_compressed, - soft_fail, - &state_lock, - room_id, - ) - .await?; - - // Soft fail, we keep the event as an outlier but don't add it to the timeline - self.services - .pdu_metadata - .mark_event_soft_failed(incoming_pdu.event_id()); - - warn!( - event_id = %incoming_pdu.event_id, - "Event was soft failed" - ); - return Err!(Request(InvalidParam("Event has been soft failed"))); - } - - // Now that the event has passed all auth it is added into the timeline. - // We use the `state_at_event` instead of `state_after` so we accurately - // represent the state for this event. trace!("Appending pdu to timeline"); - let extremities = extremities - .iter() - .map(Borrow::borrow) - .chain(once(incoming_pdu.event_id())); - debug_assert!(extremities.clone().count() > 0, "extremities not empty"); + let mut extremities: Vec<_> = self + .services + .state + .get_forward_extremities(room_id) + .collect() + .await; + if !soft_fail { + // Per https://spec.matrix.org/unstable/server-server-api/#soft-failure, soft-failed events + // are not added as forward extremities. + + // Now we calculate the set of extremities this room has after the incoming + // event has been applied. We start with the previous extremities (aka leaves) + trace!("Calculating extremities"); + extremities = extremities + .into_iter() + .stream() + .ready_filter(|event_id| { + // Remove any that are referenced by this incoming event's prev_events + !incoming_pdu.prev_events().any(is_equal_to!(event_id)) + }) + .broad_filter_map(|event_id| async move { + // Only keep those extremities were not referenced yet + self.services + .pdu_metadata + .is_event_referenced(room_id, &event_id) + .await + .eq(&false) + .then_some(event_id) + }) + .collect::>() + .await; + extremities.push(incoming_pdu.event_id().to_owned()); + debug!( + "Retained {} extremities checked against {} prev_events", + extremities.len(), + incoming_pdu.prev_events().count() + ); + assert!(!extremities.is_empty(), "extremities must not empty"); + } let pdu_id = self .services @@ -372,13 +365,19 @@ where .append_incoming_pdu( &incoming_pdu, val, - extremities, + extremities.iter().map(Borrow::borrow), state_ids_compressed, soft_fail, &state_lock, room_id, ) .await?; + if soft_fail { + self.services + .pdu_metadata + .mark_event_soft_failed(incoming_pdu.event_id()); + return Err!(Request(InvalidParam("Event has been soft failed"))); + } // Event has passed all auth/stateres checks drop(state_lock); diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index ab9081990..8f999a737 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -53,15 +53,7 @@ where .await?; if soft_fail { - self.services - .pdu_metadata - .mark_as_referenced(room_id, pdu.prev_events.iter().map(AsRef::as_ref)); - - // self.services - // .state - // .set_forward_extremities(room_id, new_room_leaves, state_lock) - // .await; - + // Nothing else to do with a soft-failed event. return Ok(None); }