diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 8194c726b..dd6201c74 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -1146,16 +1146,16 @@ pub(super) async fn walk_missing_events( earliest: OwnedEventId, via: String, ) -> Result { - let earliest_pdu = self.services.rooms.timeline.get_pdu(&earliest).await?; + let latest_pdu = self.services.rooms.timeline.get_pdu(&latest).await?; let events = self .services .rooms .event_handler - .backfill_missing_events( - earliest_pdu.room_id_or_hash(), - HashSet::from_iter(vec![latest]), + .get_missing_events( + &latest_pdu.room_id_or_hash(), + &latest_pdu, vec![earliest], - ServerName::parse(via)?, + &ServerName::parse(via)?, ) .await?; self.write_str(&format!("Found {} events:\n\n```\n", events.len())) diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 1797ebce8..dae9968f2 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -3,14 +3,19 @@ use std::{ time::Instant, }; +use assign::assign; use conduwuit::{ - Err, Event, PduEvent, debug, debug_info, debug_warn, err, - matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort, - trace, utils::continue_exponential_backoff_secs, warn, + Event, PduEvent, debug, debug_info, debug_warn, err, error, + matrix::event::gen_event_id_canonical_json, + state_res::lexicographical_topological_sort, + trace, + utils::{IterStream, continue_exponential_backoff_secs, stream::BroadbandExt}, + warn, }; +use futures::StreamExt; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, - OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt, + RoomId, ServerName, UInt, api::federation::event::{get_event, get_missing_events}, int, }; @@ -94,129 +99,120 @@ impl super::Service { /// exponentially incrementing limit (up to 100 per request) until it has /// filled the gap, i.e. when the remote says there's no more events. /// + /// This function will iterate until the remote returns no more events, + /// increasing the limit by a factor of 10. If 20 iterations are reached or + /// 200 events are backfilled, the function will give up and return what it + /// has, to avoid pulling in too much data (for example, absurdly large + /// gaps). + /// /// This function does not persist the events. The caller is responsible for /// passing them through handle_incoming_pdu. - pub async fn backfill_missing_events( + /// + /// ## Parameters + /// + /// - `room_id`: The room's ID. + /// - `head`: The event we are potentially missing prev_events for. + /// - `tail`: The most recently known events in the graph (typically forward + /// extremities). + /// - `via`: The server to ask for missing events. + pub async fn get_missing_events( &self, - room_id: OwnedRoomId, - head: HashSet, + room_id: &RoomId, + head: &PduEvent, tail: Vec, - via: OwnedServerName, + via: &ServerName, ) -> conduwuit::Result> { - if head.is_empty() { - return Ok(HashMap::new()); - } - // TODO: min_depth is probably necessary to avoid fetching the entire room - // history if there are very long gaps - let mut latest_events = head.clone(); - let mut loop_count: u64 = 3; - // Start with 3 so that we fetch 9, 16, 25, 36, 49, 64, 81, 100 events. - // This gives steady growth to the server's typical limit of 100. It's unlikely - // we'll end up close to that. - let mut backfilled_events = HashMap::with_capacity(10); - - while !latest_events.is_empty() { - // TODO: holy clone() - let frontier_before = latest_events.clone(); - let todo: Vec = latest_events.clone().into_iter().collect(); - let mut request = - get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone()); - let limit = loop_count.saturating_pow(2).min(100); - request.limit = limit.try_into().expect("limit must fit into UInt"); - - debug_info!( - backfilled=%backfilled_events.len(), - %loop_count, - "Asking {via} for up to {limit} missing events", - ); - trace!( - ?latest_events, - ?tail, - %via, - %limit, - "Requesting missing events" + #[cfg(debug_assertions)] + { + let missing_count = head + .prev_events() + .stream() + .broad_filter_map(|event_id| async move { + match self + .services + .timeline + .get_non_outlier_pdu_json(event_id) + .await + .inspect(|_| debug!("Found prev_event {event_id} locally.")) + .inspect_err( + |e| debug!(%e, "Could not find prev_event {event_id} locally."), + ) { + | Ok(_) => None, + | Err(_) => Some(event_id), + } + }) + .count() + .await; + debug_assert_ne!( + missing_count, 0, + "event passed to get_missing_events is not missing any events (wasteful call)" ); + }; + + let mut discovered = HashMap::with_capacity(20); + let mut latest_events = vec![head.event_id().to_owned()]; + let mut iterations = 0_u8; + loop { + iterations = iterations.saturating_add(1); + let limit = iterations.saturating_mul(10); + debug_info!(%limit, %via, %iterations, "Attempting to gap fill missing events"); let response: get_missing_events::v1::Response = self .services .sending - .send_federation_request(&via, request) + .send_federation_request( + via, + assign!( + get_missing_events::v1::Request::new( + room_id.to_owned(), + tail.clone(), + latest_events.clone() + ), + {limit: limit.into()} + ), + ) .await?; - loop_count = loop_count.saturating_add(1); - trace!(?response, "get_missing_events response"); - // Some buggy servers (including old continuwuity) may return the same events - // multiple times, which can cause this to be an infinite loop. - // In order to break this loop, if we see no new events from this response (i.e. - // all events in the response are already in backfilled_events), we stop, - // with a warning. - let mut unseen: usize = 0; - let chunk_len = response.events.len(); if response.events.is_empty() { - debug_info!("No more missing events found"); + debug_info!(%via, "Finished gap filling missing events (remote returned no more events)."); break; } + debug_info!("Got {} events back from remote", response.events.len()); - for event in response.events { - trace!("Parsing incoming event from backfill"); - let (incoming_room_id, event_id, pdu_json) = - self.parse_incoming_pdu(&event).await.map_err(|e| { - err!(BadServerResponse("{via} returned an invalid event: {e:?}")) - })?; - trace!(%incoming_room_id, %event_id, "Parsed incoming event from backfill"); - if incoming_room_id != room_id { - return Err!(BadServerResponse( - "{via} returned {event_id} in missing events which belongs to \ - {incoming_room_id}, not {room_id}" - )); - } - latest_events.remove(&event_id); - if head.contains(&event_id) || tail.contains(&event_id) { - debug!("Skipping known event {event_id}"); - continue; - } - let retransmitted = backfilled_events.contains_key(&event_id); - if retransmitted { - debug_warn!(%via, %event_id, "Remote retransmitted event"); - } else { - if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await { - debug!(%via, %event_id, "Already seen event in database"); - backfilled_events.insert(event_id.clone(), pdu); - } else { - unseen = unseen.saturating_add(1); + latest_events.clear(); + for raw_event in response.events { + let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?; + let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| { + err!(Request(BadJson("Failed to parse backfilled event {event_id}: {e}"))) + })?; + + for prev_event_id in pdu.prev_events() { + if discovered.contains_key(prev_event_id) { + continue; } - } - let parsed = PduEvent::from_id_val(&event_id, pdu_json) - .map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?; - for prev_event_id in parsed.prev_events() { - if !(backfilled_events.contains_key(prev_event_id) - || self.services.timeline.pdu_exists(prev_event_id).await) + if self + .services + .timeline + .non_outlier_pdu_exists(prev_event_id) + .await { - latest_events.insert(prev_event_id.to_owned()); + continue; } + latest_events.push(prev_event_id.to_owned()); + break; } - if !retransmitted { - backfilled_events.insert(event_id.clone(), parsed); - } + + discovered.insert(event_id.clone(), pdu); } - latest_events.retain(|event_id| !backfilled_events.contains_key(event_id)); - debug!( - count=%chunk_len, - new=%unseen, - remaining=%latest_events.len(), - "Got missing events" - ); - let frontier_changed = latest_events != frontier_before; - if unseen == 0 && !frontier_changed { - debug_warn!( - "Didn't see any new events and the frontier did not change, breaking cycle" - ); + + if latest_events.is_empty() { + break; + } else if discovered.len() >= 200 || iterations >= 20 { + error!("Gap too large, giving up"); break; } } - debug_info!("Successfully fetched {} missing events from {via}", backfilled_events.len()); - trace!("Missing_events: {backfilled_events:?}"); - Ok(backfilled_events) + Ok(discovered) } /// Find the event and auth it. Once the event is validated (steps 1 - 8) diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 628b776a1..bd67e4f28 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -1,9 +1,9 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use conduwuit::{ Event, PduEvent, debug, debug_info, - result::DebugInspect, utils::{BoolExt, IterStream, stream::BroadbandExt}, + warn, }; use futures::StreamExt; use ruma::{RoomId, ServerName}; @@ -28,8 +28,6 @@ impl super::Service { .timeline .get_non_outlier_pdu_json(event_id) .await - .inspect(|_| debug!("Found prev_event {event_id} locally.")) - .inspect_err(|e| debug!(%e, "Could not find prev_event {event_id} locally.")) .is_ok() .or(|| event_id.to_owned()) }) @@ -48,12 +46,7 @@ impl super::Service { .await; let backfilled = self - .backfill_missing_events( - room_id.to_owned(), - HashSet::from_iter(vec![incoming_pdu.event_id().to_owned()]), - tail, - origin.to_owned(), - ) + .get_missing_events(room_id, incoming_pdu, tail, origin) .await?; debug_info!("Fetched {} missing events", backfilled.len()); @@ -75,13 +68,19 @@ impl super::Service { for event_id in to_persist { debug_info!("Persisting fetched prev event {event_id}"); - let pdu_event = backfilled.get(&event_id).cloned().unwrap_or_else(|| { - panic!("Event {event_id} was in backfill response but not in map") - }); - let obj = pdu_event.to_canonical_object(); - self.upgrade_outlier_to_timeline_pdu(pdu_event, obj, create_event, origin, room_id) + let obj = mapped.get(&event_id).cloned().unwrap(); + match self + .handle_outlier_pdu(origin, create_event, &event_id, room_id, obj, false) .await - .debug_inspect(|_| debug_info!("Persisted fetched prev event {event_id}"))?; + { + | Ok((pdu, val)) => + self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id) + .await, + | Err(e) => { + warn!("Failed to persist prev_event {event_id}: {e}"); + continue; + }, + }?; } // NOTE because i keep forgetting: the caller persists incoming_pdu. diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 2d349a29d..208f4aa08 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -1,16 +1,16 @@ -use std::collections::{BTreeMap, HashMap, HashSet, hash_map}; +use std::collections::{BTreeMap, HashMap, hash_map}; use conduwuit::{ - Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, + Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, info, state_res, trace, warn, }; -use futures::{StreamExt, future::ready}; +use futures::future::ready; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, - events::StateEventType, + api::federation::authorization::get_event_authorization, events::StateEventType, }; -use super::{build_local_dag, check_room_id, get_room_version_rules}; +use super::{check_room_id, get_room_version_rules}; use crate::rooms::timeline::pdu_fits; #[implement(super::Service)] @@ -107,79 +107,52 @@ where } // Fetch any missing ones & reject invalid ones - let mut missing_auth_events: HashSet = pdu_event - .auth_events() - .filter(|id| !auth_events.contains_key(*id)) - .map(ToOwned::to_owned) - .collect(); - - if !missing_auth_events.is_empty() { - debug_info!( - "Fetching {} missing auth events for outlier event {event_id}", - missing_auth_events.len() - ); - let tail = self + if auth_events.len() != pdu_event.auth_events().count() { + info!("Missing some auth events, asking remote for auth chain"); + let response: get_event_authorization::v1::Response = self .services - .state - .get_forward_extremities(room_id) - .collect::>() - .await; - let backfilled = self - .backfill_missing_events( - room_id.to_owned(), - HashSet::from_iter(vec![event_id.to_owned()]), - tail, - origin.to_owned(), - ) - .await?; - debug_info!("Fetched {} missing auth events for {event_id}", backfilled.len()); - let mapped = backfilled - .iter() - .map(|(eid, evt)| { - let mut obj = evt.to_canonical_object(); - obj.remove("event_id"); // event_id is inserted by backfill_missing_events - (eid.clone(), obj) - }) - .collect::>(); - let local_dag = if mapped.len() == 1 { - mapped.keys().map(ToOwned::to_owned).collect() - } else { - build_local_dag(&mapped).await? - }; - debug_info!("Preparing to handle {} missing auth events", backfilled.len()); - for prev_event_id in local_dag { - let obj = mapped - .get(&prev_event_id) - .expect("We should have this event in memory"); - debug_info!("Handling prev {prev_event_id}"); - let (prev, _) = Box::pin(self.handle_outlier_pdu( + .sending + .send_federation_request( origin, - create_event, - &prev_event_id, - room_id, - obj.clone(), - false, - )) - .await?; - if missing_auth_events.contains(&*prev_event_id) { - missing_auth_events.remove(&prev_event_id); - auth_events.insert(prev_event_id, prev); + get_event_authorization::v1::Request::new( + room_id.to_owned(), + event_id.to_owned(), + ), + ) + .await + .map_err(|e| { + err!(Request(Forbidden( + "Remote server is not divulging incoming event's auth chain: {e}" + ))) + })?; + let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len()); + for auth_pdu_json in response.auth_chain { + let (auth_event_room_id, auth_event_id, auth_pdu_json) = + self.parse_incoming_pdu(&auth_pdu_json).await?; + if auth_event_room_id != room_id { + return Err!(Request(BadJson( + "Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}." + ))); } - debug_info!("Finished handling prev auth event"); + let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json) + .map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?; + auth_chain_map.insert(auth_event_id, auth_pdu); } - } else { - debug!("No missing auth events for outlier event {event_id}"); - } - // reject if we are still missing some auth events. - // If we're still missing prev events, we will fetch them individually later, - // but there's no reason for us to be missing auth events now we've gapfilled - // the DAG. - if !missing_auth_events.is_empty() { - // Don't reject: this could be a temporary condition - return Err!(Request(InvalidParam( - "Could not fetch all auth events for outlier event {event_id}, still missing: \ - {missing_auth_events:?}" - ))); + for aid in pdu_event.auth_events() { + if auth_events.contains_key(aid) { + continue; + } + if let Some(auth_event) = auth_chain_map.get(aid) { + auth_events.insert(aid.to_owned(), auth_event.clone()); + } else { + return Err!(Request(Forbidden( + "Remote server is not divulging incoming event's auth events (missing: \ + {aid})" + ))); + } + } + // TODO: do events received from auth chain need persisting? that sounds + // awfully slow } // 6. Reject "due to auth events" if the event doesn't pass auth based on the