diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 81fb24cf2..26fe7af56 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -381,6 +381,7 @@ async fn handle_room( .rooms .event_handler .handle_incoming_pdu(origin, room_id, &event_id, value, true) + .boxed() .await .map(|_| ()); results.push((event_id, result)); diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 0576d994a..6f1262ffd 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -6,17 +6,87 @@ use std::{ use conduwuit::{ Err, Event, PduEvent, debug, debug_info, debug_warn, err, - matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, - warn, + matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort, + trace, utils::continue_exponential_backoff_secs, warn, }; use futures::StreamExt; use ruma::{ - CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, ServerName, + CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, + OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt, api::federation::event::{get_event, get_missing_events}, + int, }; use super::get_room_version_rules; +/// Attempts to build a localised directed acyclic graph out of the given PDUs, +/// returning them in a topologically sorted order. +/// +/// This is used to attempt to process PDUs in an order that respects their +/// dependencies, however it is ultimately the sender's responsibility to send +/// them in a processable order, so this is just a best effort attempt. It does +/// not account for power levels or other tie breaks. +pub async fn build_local_dag( + pdu_map: &HashMap, +) -> conduwuit::Result> { + debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs"); + let mut dag: HashMap> = + HashMap::with_capacity(pdu_map.len()); + let mut id_origin_ts: HashMap = HashMap::with_capacity(pdu_map.len()); + + for (event_id, value) in pdu_map { + // We already checked that these properties are correct in parse_incoming_pdu, + // so it's safe to unwrap here. + // We also filter to remove any prev_events that are not in this pdu_map, as we + // need to have at least one event with zero out degrees for the lexico-topo + // sort below. If there are multiple events with omitted prevs, they will be + // ordered by timestamp, then event ID. At that point though, it's unlikely to + // matter. + let prev_events = value + .get("prev_events") + .unwrap() + .as_array() + .unwrap() + .iter() + .map(|v| EventId::parse(v.as_str().unwrap()).unwrap()) + .filter(|id| pdu_map.contains_key(id)) + .collect(); + + dag.insert(event_id.clone(), prev_events); + let origin_server_ts = value + .get("origin_server_ts") + .and_then(CanonicalJsonValue::as_integer) + .unwrap_or_default(); + id_origin_ts.insert(event_id.clone(), origin_server_ts); + } + + debug!(count = dag.len(), "Sorting incoming events with partial graph"); + lexicographical_topological_sort(&dag, &async |node_id| { + // Note: we don't bother fetching power levels because that would massively slow + // this function down. This is a best-effort attempt to order events correctly + // for processing, however ultimately that should be the sender's job. + let ts = id_origin_ts + .get(&node_id) + .copied() + .unwrap_or_else(|| int!(0)) + .to_string() + .parse::() + .ok() + .and_then(UInt::new) + .unwrap_or_default(); + Ok((int!(0), MilliSecondsSinceUnixEpoch(ts))) + }) + .await + .inspect(|sorted| { + debug_assert_eq!( + sorted.len(), + pdu_map.len(), + "Sorted graph was not the same size as the input graph" + ); + }) + .map_err(|e| err!("failed to resolve local graph: {e}")) +} + impl super::Service { /// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the /// DAG. @@ -31,13 +101,13 @@ impl super::Service { pub(super) async fn backfill_missing_events( &self, room_id: OwnedRoomId, - latest_events: Vec, + head: Vec, via: OwnedServerName, ) -> conduwuit::Result> { - if latest_events.is_empty() { + if head.is_empty() { return Ok(HashMap::new()); } - let earliest_events = self + let tail = self .services .state .get_forward_extremities(&room_id) @@ -45,40 +115,31 @@ impl super::Service { .await; // TODO: min_depth is probably necessary to avoid fetching the entire room // history if there are very long gaps - let mut latest_events = latest_events; + let mut latest_events: HashSet = HashSet::from_iter(head.clone()); let mut loop_count: u64 = 3; // Start with a base number of 3 so that we fetch 10, 16, 25, 36, etc // instead of 1, 2, 4, 9, so on. let mut backfilled_events = HashMap::with_capacity(10); while !latest_events.is_empty() { - let mut request = get_missing_events::v1::Request::new( - room_id.clone(), - earliest_events.clone(), - latest_events.clone(), - ); - request.limit = min(loop_count.saturating_pow(2), 100) + let todo: Vec = latest_events.clone().into_iter().collect(); + let mut request = + get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone()); + let limit = min(loop_count.saturating_pow(2), 100); + request.limit = limit .try_into() .expect("limit cannot be greater than 100, which fits into UInt"); - if backfilled_events.len() > 1000 { - warn!( - "Received {} missing events, refusing to fetch more (infinite loop?)", - backfilled_events.len() - ); - break; - } debug_info!( backfilled=%backfilled_events.len(), %loop_count, - "Asking {via} for up to {} missing events", - request.limit + "Asking {via} for up to {limit} missing events", ); trace!( ?latest_events, - ?earliest_events, + ?tail, %via, - limit=%request.limit, + %limit, "Requesting missing events" ); let response: get_missing_events::v1::Response = self @@ -100,47 +161,76 @@ impl super::Service { break; } for event in response.events { + trace!("Parsing incoming event from backfill"); let (incoming_room_id, event_id, pdu_json) = self.parse_incoming_pdu(&event).await.map_err(|e| { err!(BadServerResponse("{via} returned an invalid event: {e:?}")) })?; + trace!(%incoming_room_id, %event_id, "Parsed incoming event from backfill"); if incoming_room_id != room_id { return Err!(BadServerResponse( "{via} returned {event_id} in missing events which belongs to \ {incoming_room_id}, not {room_id}" )); } + latest_events.remove(&event_id); + if head.contains(&event_id) || tail.contains(&event_id) { + debug!("Skipping known event {event_id}"); + continue; + } if backfilled_events.contains_key(&event_id) { debug_warn!(%via, %event_id, "Remote retransmitted event"); continue; } // TODO: Should this be scoped to the GME session? We might end up incorrectly // assuming we've caught up if we do this - if self.services.timeline.pdu_exists(&event_id).await { + if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await { debug!(%via, %event_id, "Already seen event in database"); - continue; + backfilled_events.insert(event_id.clone(), pdu); + } else { + unseen = unseen.saturating_add(1); } - unseen = unseen.saturating_add(1); let parsed = PduEvent::from_id_val(&event_id, pdu_json) .map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?; - backfilled_events.insert(event_id, parsed.clone()); for prev_event_id in parsed.prev_events() { - if backfilled_events.contains_key(prev_event_id) - || self.services.timeline.pdu_exists(prev_event_id).await + // Verify that we have all of this event's prev_events. If we don't, add it to + // the work queue + if !(backfilled_events.contains_key(prev_event_id) + || self.services.timeline.pdu_exists(prev_event_id).await) { - continue; + latest_events.insert(prev_event_id.to_owned()); + break; } - latest_events.push(prev_event_id.to_owned()); + continue; + } + backfilled_events.insert(event_id.clone(), parsed); + } + for event_id in todo { + latest_events.remove(&event_id); + if let hash_map::Entry::Vacant(e) = backfilled_events.entry(event_id.clone()) { + let evt = self.services.timeline.get_pdu(&event_id).await?; + e.insert(evt); } } - latest_events.retain(|e| !backfilled_events.contains_key(e)); debug!( - chunk=%chunk_len, + count=%chunk_len, + new=%unseen, remaining=%latest_events.len(), "Got missing events" ); + if unseen == 0 { + debug_warn!("Didn't see any new events, breaking cycle"); + break; + } else if chunk_len < usize::try_from(limit)? { + debug!( + "Got less than the limit number of events, assuming there's no more to fetch" + ); + break; + } } + debug_info!("Successfully fetched {} missing events from {via}", backfilled_events.len()); + trace!("Missing_events: {backfilled_events:?}"); Ok(backfilled_events) } diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index efc7a434d..270b3d0d7 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -1,128 +1,118 @@ -use std::{ - collections::{BTreeMap, HashMap, HashSet, VecDeque}, - iter::once, -}; +use std::collections::{HashMap, HashSet, VecDeque}; -use conduwuit::{ - Event, PduEvent, Result, debug_warn, err, implement, - state_res::{self}, -}; -use futures::{FutureExt, future}; -use ruma::{ - CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, - int, uint, -}; +use conduwuit::{Event, PduEvent, debug, debug_info, error, trace}; +use ruma::{OwnedEventId, RoomId, ServerName}; -use super::check_room_id; +use crate::rooms::event_handler::build_local_dag; -#[implement(super::Service)] -#[tracing::instrument( - level = "debug", - skip_all, - fields(%origin), -)] -#[allow(clippy::type_complexity)] -pub(super) async fn fetch_prev<'a, Pdu, Events>( - &self, - origin: &ServerName, - create_event: &Pdu, - room_id: &RoomId, - first_ts_in_room: MilliSecondsSinceUnixEpoch, - initial_set: Events, -) -> Result<( - Vec, - HashMap)>, -)> -where - Pdu: Event + Send + Sync, - Events: Iterator + Clone + Send, -{ - let num_ids = initial_set.clone().count(); - let mut eventid_info = HashMap::new(); - let mut graph: HashMap = HashMap::with_capacity(num_ids); - let mut todo_outlier_stack: VecDeque = - initial_set.map(ToOwned::to_owned).collect(); +impl super::Service { + pub(super) async fn fetch_prevs( + &self, + room_id: &RoomId, + create_event: &PduEvent, + incoming_pdu: &PduEvent, + origin: &ServerName, + ) -> conduwuit::Result<()> { + let mut queue: VecDeque = VecDeque::new(); + queue.push_back(incoming_pdu.event_id().to_owned()); - let mut amount = 0; - - while let Some(prev_event_id) = todo_outlier_stack.pop_front() { - self.services.server.check_running()?; - - match self - .fetch_and_handle_outliers( - origin, - once(prev_event_id.as_ref()), - create_event, - room_id, - ) - .boxed() - .await - .pop() - { - | Some((pdu, mut json_opt)) => { - check_room_id(room_id, &pdu)?; - - let limit = self.services.server.config.max_fetch_prev_events; - if amount > limit { - debug_warn!("Max prev event limit reached! Limit: {limit}"); - graph.insert(prev_event_id.clone(), HashSet::new()); - continue; - } - - if json_opt.is_none() { - json_opt = self - .services - .outlier - .get_outlier_pdu_json(&prev_event_id) - .await - .ok(); - } - - if let Some(json) = json_opt { - if pdu.origin_server_ts() > first_ts_in_room { - amount = amount.saturating_add(1); - for prev_prev in pdu.prev_events() { - if !graph.contains_key(prev_prev) { - todo_outlier_stack.push_back(prev_prev.to_owned()); - } - } - - graph.insert( - prev_event_id.clone(), - pdu.prev_events().map(ToOwned::to_owned).collect(), - ); - } else { - // Time based check failed - graph.insert(prev_event_id.clone(), HashSet::new()); - } - - eventid_info.insert(prev_event_id.clone(), (pdu, json)); + while let Some(event_id) = queue.pop_front() { + debug!(event_id=%incoming_pdu.event_id, "Fetching any missing prev_events"); + let mut missing_prev_events: HashSet = + incoming_pdu.prev_events().map(ToOwned::to_owned).collect(); + for pid in missing_prev_events.clone() { + if self.services.timeline.pdu_exists(&pid).await { + trace!("Found prev event {pid} for outlier event {event_id} locally"); + missing_prev_events.remove(&pid); } else { - // Get json failed, so this was not fetched over federation - graph.insert(prev_event_id.clone(), HashSet::new()); + debug_info!( + "Could not find prev event {pid} for outlier event {event_id} locally, \ + will fetch over federation" + ); } - }, - | _ => { - // Fetch and handle failed - graph.insert(prev_event_id.clone(), HashSet::new()); - }, + } + if !missing_prev_events.is_empty() { + debug_info!( + "Fetching {} missing prev events for outlier event {event_id}", + missing_prev_events.len() + ); + let backfilled = self + .backfill_missing_events( + room_id.to_owned(), + vec![event_id.clone()], + origin.to_owned(), + ) + .await?; + debug_info!("Fetched {} missing events for {event_id}", backfilled.len()); + let mapped = backfilled + .iter() + .map(|(eid, evt)| { + let mut obj = evt.to_canonical_object(); + obj.remove("event_id"); // event_id is inserted by backfill_missing_events + (eid.clone(), obj) + }) + .collect::>(); + let local_dag = if mapped.len() == 1 { + mapped.keys().map(ToOwned::to_owned).collect() + } else { + build_local_dag(&mapped).await? + }; + debug_info!("Preparing to handle {} missing events", backfilled.len()); + for prev_event_id in local_dag { + let obj = mapped + .get(&prev_event_id) + .expect("We should have this event in memory"); + debug_info!("Handling prev event {prev_event_id}"); + match self + .handle_outlier_pdu( + origin, + create_event, + &prev_event_id, + room_id, + obj.clone(), + false, + ) + .await + { + | Ok(_) => { + debug!("Successfully handled {prev_event_id} as an outlier"); + missing_prev_events.remove(&prev_event_id); + }, + | Err(e) => + error!(error=?e, %prev_event_id, %event_id, "Failed to handle prev event"), + } + debug_info!("Finished handling prev"); + } + } + let outlier = self.services.timeline.get_pdu(&event_id).await; + if missing_prev_events.is_empty() + && let Ok(pdu) = outlier + { + // promote any prevs first + for prev_event_id in pdu.prev_events() { + debug_info!("Promoting prev event {prev_event_id} to timeline"); + let prev_pdu = self.services.timeline.get_pdu(&event_id).await?; + let val = prev_pdu.to_canonical_object(); + self.upgrade_outlier_to_timeline_pdu( + prev_pdu, + val, + create_event, + origin, + room_id, + ) + .await?; + debug_info!("Finished prev promoting {prev_event_id} to timeline"); + } + debug_info!("Promoting event {event_id} to timeline"); + let val = pdu.to_canonical_object(); + self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id) + .await?; + debug_info!("Finished promoting {event_id} to timeline"); + } else { + debug!(?missing_prev_events, ok=%outlier.is_ok(), "Not promoting {event_id}"); + } } + + Ok(()) } - - let event_fetch = |event_id| { - let origin_server_ts = eventid_info - .get(&event_id) - .map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get()); - - // This return value is the key used for sorting events, - // events are then sorted by power level, time, - // and lexically by event_id. - future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts))) - }; - - let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch) - .await - .map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?; - - Ok((sorted, eventid_info)) } diff --git a/src/service/rooms/event_handler/fetch_state.rs b/src/service/rooms/event_handler/fetch_state.rs index 790b4ec9e..d6c20518c 100644 --- a/src/service/rooms/event_handler/fetch_state.rs +++ b/src/service/rooms/event_handler/fetch_state.rs @@ -1,7 +1,6 @@ use std::collections::{HashMap, hash_map}; use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement}; -use futures::FutureExt; use ruma::{ EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids, events::StateEventType, @@ -42,7 +41,6 @@ where let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); let state_vec = self .fetch_and_handle_outliers(origin, state_ids, create_event, room_id) - .boxed() .await; let mut state: HashMap = HashMap::with_capacity(state_vec.len()); diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 204dfa715..5033ab625 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -1,14 +1,11 @@ -use std::{ - collections::{BTreeMap, hash_map}, - time::Instant, -}; +use std::{collections::BTreeMap, time::Instant}; use conduwuit::{ Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err, - implement, info, trace, utils::stream::IterStream, warn, + implement, info, trace, warn, }; use futures::{ - FutureExt, TryFutureExt, TryStreamExt, + FutureExt, future::{OptionFuture, try_join4}, }; use ruma::{ @@ -236,63 +233,21 @@ pub async fn handle_incoming_pdu<'a>( } // Skip old events - let first_ts_in_room = self - .services - .timeline - .first_pdu_in_room(room_id) - .await? - .origin_server_ts(); + // let first_ts_in_room = self + // .services + // .timeline + // .first_pdu_in_room(room_id) + // .await? + // .origin_server_ts(); // 9. Fetch any missing prev events doing all checks listed here starting at 1. // These are timeline events - let (sorted_prev_events, mut eventid_info) = self - .fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events()) - .await?; + debug!("Handling previous events"); - debug!( - events = ?sorted_prev_events, - "Handling previous events" - ); - - sorted_prev_events - .iter() - .try_stream() - .map_ok(AsRef::as_ref) - .try_for_each(|prev_id| { - self.handle_prev_pdu( - origin, - event_id, - room_id, - eventid_info.remove(prev_id), - create_event, - first_ts_in_room, - prev_id, - ) - .inspect_err(move |e| { - warn!("Prev {prev_id} failed: {e}"); - match self - .services - .globals - .bad_event_ratelimiter - .write() - .entry(prev_id.into()) - { - | hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - }, - | hash_map::Entry::Occupied(mut e) => { - let tries = e.get().1.saturating_add(1); - *e.get_mut() = (Instant::now(), tries); - }, - } - }) - .map(|_| self.services.server.check_running()) - }) - .boxed() + self.fetch_prevs(room_id, create_event, &incoming_pdu, origin) .await?; // Done with prev events, now handling the incoming event self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id) - .boxed() .await } diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index dc08ded5e..a8bba57fc 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap, hash_map}; +use std::collections::{BTreeMap, HashMap, HashSet, hash_map}; use conduwuit::{ Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, @@ -10,7 +10,7 @@ use ruma::{ events::StateEventType, }; -use super::{check_room_id, get_room_version_rules}; +use super::{build_local_dag, check_room_id, get_room_version_rules}; use crate::rooms::timeline::pdu_fits; #[implement(super::Service)] @@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>( event_id: &'a EventId, room_id: &'a RoomId, mut value: CanonicalJsonObject, - auth_events_known: bool, + _auth_events_known: bool, ) -> Result<(PduEvent, BTreeMap)> where Pdu: Event + Send + Sync, @@ -107,44 +107,71 @@ where } // Fetch any missing ones & reject invalid ones - let missing_auth_events = if auth_events_known { - pdu_event - .auth_events() - .filter(|id| !auth_events.contains_key(*id)) - .collect::>() - } else { - pdu_event.auth_events().collect::>() - }; - if !missing_auth_events.is_empty() || !auth_events_known { + let mut missing_auth_events: HashSet = pdu_event + .auth_events() + .filter(|id| !auth_events.contains_key(*id)) + .map(ToOwned::to_owned) + .collect(); + + if !missing_auth_events.is_empty() { debug_info!( "Fetching {} missing auth events for outlier event {event_id}", missing_auth_events.len() ); - for (pdu, _) in self - .fetch_and_handle_outliers( - origin, - missing_auth_events.iter().copied(), - create_event, - room_id, + let backfilled = self + .backfill_missing_events( + room_id.to_owned(), + vec![event_id.to_owned()], + origin.to_owned(), ) - .await - { - auth_events.insert(pdu.event_id().to_owned(), pdu); + .await?; + debug_info!("Fetched {} missing auth events for {event_id}", backfilled.len()); + let mapped = backfilled + .iter() + .map(|(eid, evt)| { + let mut obj = evt.to_canonical_object(); + obj.remove("event_id"); // event_id is inserted by backfill_missing_events + (eid.clone(), obj) + }) + .collect::>(); + let local_dag = if mapped.len() == 1 { + mapped.keys().map(ToOwned::to_owned).collect() + } else { + build_local_dag(&mapped).await? + }; + debug_info!("Preparing to handle {} missing auth events", backfilled.len()); + for prev_event_id in local_dag { + let obj = mapped + .get(&prev_event_id) + .expect("We should have this event in memory"); + debug_info!("Handling prev {prev_event_id}"); + let (prev, _) = Box::pin(self.handle_outlier_pdu( + origin, + create_event, + &prev_event_id, + room_id, + obj.clone(), + false, + )) + .await?; + if missing_auth_events.contains(&*prev_event_id) { + missing_auth_events.remove(&prev_event_id); + auth_events.insert(prev_event_id, prev); + } + debug_info!("Finished handling prev auth event"); } } else { debug!("No missing auth events for outlier event {event_id}"); } - // reject if we are still missing some - let still_missing = pdu_event - .auth_events() - .filter(|id| !auth_events.contains_key(*id)) - .collect::>(); - if !still_missing.is_empty() { + // reject if we are still missing some auth events. + // If we're still missing prev events, we will fetch them individually later, + // but there's no reason for us to be missing auth events now we've gapfilled + // the DAG. + if !missing_auth_events.is_empty() { // Don't reject: this could be a temporary condition - // TODO: use get_missing_events? return Err!(Request(InvalidParam( "Could not fetch all auth events for outlier event {event_id}, still missing: \ - {still_missing:?}" + {missing_auth_events:?}" ))); } diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs deleted file mode 100644 index cb4978d94..000000000 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::{collections::BTreeMap, time::Instant}; - -use conduwuit::{ - Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement, - utils::continue_exponential_backoff_secs, -}; -use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName}; -use tracing::debug; - -#[implement(super::Service)] -#[allow(clippy::type_complexity)] -#[allow(clippy::too_many_arguments)] -#[tracing::instrument( - name = "prev", - level = INFO_SPAN_LEVEL, - skip_all, - fields(%prev_id), -)] -pub(super) async fn handle_prev_pdu<'a, Pdu>( - &self, - origin: &'a ServerName, - event_id: &'a EventId, - room_id: &'a RoomId, - eventid_info: Option<(PduEvent, BTreeMap)>, - create_event: &'a Pdu, - first_ts_in_room: MilliSecondsSinceUnixEpoch, - prev_id: &'a EventId, -) -> Result -where - Pdu: Event + Send + Sync, -{ - // Check for disabled again because it might have changed - if self.services.metadata.is_disabled(room_id).await { - return Err!(Request(Forbidden(debug_warn!( - "Federaton of room {room_id} is currently disabled on this server. Request by \ - origin {origin} and event ID {event_id}" - )))); - } - - if let Some((time, tries)) = self - .services - .globals - .bad_event_ratelimiter - .read() - .get(prev_id) - { - // Exponential backoff - const MIN_DURATION: u64 = 5 * 60; - const MAX_DURATION: u64 = 60 * 60 * 24; - if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { - debug!( - ?tries, - duration = ?time.elapsed(), - "Backing off from prev_event" - ); - return Ok(()); - } - } - - let Some((pdu, json)) = eventid_info else { - return Ok(()); - }; - - // Skip old events - if pdu.origin_server_ts() < first_ts_in_room { - return Ok(()); - } - - let start_time = Instant::now(); - self.federation_handletime - .write() - .insert(room_id.into(), ((*prev_id).to_owned(), start_time)); - - defer! {{ - self.federation_handletime - .write() - .remove(room_id); - }}; - - self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id) - .await?; - - debug!( - elapsed = ?start_time.elapsed(), - "Handled prev_event", - ); - - Ok(()) -} diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 6197bc87e..f51d701f0 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -4,7 +4,6 @@ mod fetch_prev; mod fetch_state; mod handle_incoming_pdu; mod handle_outlier_pdu; -mod handle_prev_pdu; mod parse_incoming_pdu; mod policy_server; mod resolve_state; @@ -15,6 +14,7 @@ use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant}; use async_trait::async_trait; use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap}; +pub use fetch_and_handle_outliers::build_local_dag; use ruma::{ OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent, room_version_rules::RoomVersionRules, @@ -22,7 +22,6 @@ use ruma::{ use tokio::sync::Notify; use crate::{Dep, globals, rooms, sending, server_keys}; - pub struct Service { pub mutex_federation: RoomMutexMap, pub federation_handletime: SyncRwLock, diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 0a711b468..db947a803 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -41,6 +41,7 @@ where .get_pdu_id(incoming_pdu.event_id()) .await { + trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU"); return Ok(Some(pduid)); } @@ -81,6 +82,7 @@ where }; if state_at_incoming_event.is_none() { + trace!("Could not calculate incoming state, asking remote {origin} for it"); state_at_incoming_event = self .fetch_state(origin, create_event, room_id, incoming_pdu.event_id()) .await?; diff --git a/src/service/server_keys/request.rs b/src/service/server_keys/request.rs index c8823fdc7..422bdc599 100644 --- a/src/service/server_keys/request.rs +++ b/src/service/server_keys/request.rs @@ -34,8 +34,9 @@ where batch }); - - debug_assert!(!server_keys.is_empty(), "empty batch request to notary"); + if server_keys.is_empty() { + return Ok(vec![]); + } let mut results = Vec::new(); while let Some(batch) = server_keys