From 56feba0ea0003cf17ee5241a1bed6956ba680ba1 Mon Sep 17 00:00:00 2001 From: timedout Date: Tue, 26 May 2026 01:00:12 +0100 Subject: [PATCH] fix: This is some bullshit I tell you --- src/admin/debug/commands.rs | 31 +++- src/admin/debug/mod.rs | 6 + src/api/server/send.rs | 1 - .../fetch_and_handle_outliers.rs | 72 +++----- src/service/rooms/event_handler/fetch_prev.rs | 173 ++++++++---------- .../rooms/event_handler/handle_outlier_pdu.rs | 11 +- .../rooms/event_handler/state_at_incoming.rs | 4 +- src/service/rooms/timeline/mod.rs | 4 + 8 files changed, 154 insertions(+), 148 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index f9a678f19..8194c726b 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -22,7 +22,7 @@ use futures::{FutureExt, StreamExt, TryStreamExt}; use lettre::message::Mailbox; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, - OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, UInt, + OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, ServerName, UInt, api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw, }; use service::rooms::{ @@ -1138,3 +1138,32 @@ pub(super) async fn send_test_email(&self) -> Result { Ok(()) } + +#[admin_command] +pub(super) async fn walk_missing_events( + &self, + latest: OwnedEventId, + earliest: OwnedEventId, + via: String, +) -> Result { + let earliest_pdu = self.services.rooms.timeline.get_pdu(&earliest).await?; + let events = self + .services + .rooms + .event_handler + .backfill_missing_events( + earliest_pdu.room_id_or_hash(), + HashSet::from_iter(vec![latest]), + vec![earliest], + ServerName::parse(via)?, + ) + .await?; + self.write_str(&format!("Found {} events:\n\n```\n", events.len())) + .await?; + for (event_id, event) in events { + self.write_str(&format!("{event_id}: {:?}\n\n", serde_json::to_string(&event))) + .await?; + } + self.write_str("\n```").await?; + Ok(()) +} diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index a9478ce82..608d8c5aa 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -32,6 +32,12 @@ pub enum DebugCommand { event_id: OwnedEventId, }, + WalkMissingEvents { + latest: OwnedEventId, + earliest: OwnedEventId, + via: String, + }, + /// Parse and print a PDU from a JSON /// /// The PDU event is only checked for validity and is not added to the diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 26fe7af56..c047448c5 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -47,7 +47,6 @@ use service::transactions::{ FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse, }; use tokio::sync::watch::{Receiver, Sender}; -use tracing::instrument; use crate::Ruma; diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 6f1262ffd..1797ebce8 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -1,5 +1,4 @@ use std::{ - cmp::min, collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map}, time::Instant, }; @@ -9,7 +8,6 @@ use conduwuit::{ matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort, trace, utils::continue_exponential_backoff_secs, warn, }; -use futures::StreamExt; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt, @@ -98,37 +96,33 @@ impl super::Service { /// /// This function does not persist the events. The caller is responsible for /// passing them through handle_incoming_pdu. - pub(super) async fn backfill_missing_events( + pub async fn backfill_missing_events( &self, room_id: OwnedRoomId, - head: Vec, + head: HashSet, + tail: Vec, via: OwnedServerName, ) -> conduwuit::Result> { if head.is_empty() { return Ok(HashMap::new()); } - let tail = self - .services - .state - .get_forward_extremities(&room_id) - .collect::>() - .await; // TODO: min_depth is probably necessary to avoid fetching the entire room // history if there are very long gaps - let mut latest_events: HashSet = HashSet::from_iter(head.clone()); + let mut latest_events = head.clone(); let mut loop_count: u64 = 3; - // Start with a base number of 3 so that we fetch 10, 16, 25, 36, etc - // instead of 1, 2, 4, 9, so on. + // Start with 3 so that we fetch 9, 16, 25, 36, 49, 64, 81, 100 events. + // This gives steady growth to the server's typical limit of 100. It's unlikely + // we'll end up close to that. let mut backfilled_events = HashMap::with_capacity(10); while !latest_events.is_empty() { + // TODO: holy clone() + let frontier_before = latest_events.clone(); let todo: Vec = latest_events.clone().into_iter().collect(); let mut request = get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone()); - let limit = min(loop_count.saturating_pow(2), 100); - request.limit = limit - .try_into() - .expect("limit cannot be greater than 100, which fits into UInt"); + let limit = loop_count.saturating_pow(2).min(100); + request.limit = limit.try_into().expect("limit must fit into UInt"); debug_info!( backfilled=%backfilled_events.len(), @@ -148,6 +142,7 @@ impl super::Service { .send_federation_request(&via, request) .await?; loop_count = loop_count.saturating_add(1); + trace!(?response, "get_missing_events response"); // Some buggy servers (including old continuwuity) may return the same events // multiple times, which can cause this to be an infinite loop. @@ -160,6 +155,7 @@ impl super::Service { debug_info!("No more missing events found"); break; } + for event in response.events { trace!("Parsing incoming event from backfill"); let (incoming_room_id, event_id, pdu_json) = @@ -178,52 +174,41 @@ impl super::Service { debug!("Skipping known event {event_id}"); continue; } - if backfilled_events.contains_key(&event_id) { + let retransmitted = backfilled_events.contains_key(&event_id); + if retransmitted { debug_warn!(%via, %event_id, "Remote retransmitted event"); - continue; - } - // TODO: Should this be scoped to the GME session? We might end up incorrectly - // assuming we've caught up if we do this - if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await { - debug!(%via, %event_id, "Already seen event in database"); - backfilled_events.insert(event_id.clone(), pdu); } else { - unseen = unseen.saturating_add(1); + if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await { + debug!(%via, %event_id, "Already seen event in database"); + backfilled_events.insert(event_id.clone(), pdu); + } else { + unseen = unseen.saturating_add(1); + } } let parsed = PduEvent::from_id_val(&event_id, pdu_json) .map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?; for prev_event_id in parsed.prev_events() { - // Verify that we have all of this event's prev_events. If we don't, add it to - // the work queue if !(backfilled_events.contains_key(prev_event_id) || self.services.timeline.pdu_exists(prev_event_id).await) { latest_events.insert(prev_event_id.to_owned()); - break; } - continue; } - backfilled_events.insert(event_id.clone(), parsed); - } - for event_id in todo { - latest_events.remove(&event_id); - if let hash_map::Entry::Vacant(e) = backfilled_events.entry(event_id.clone()) { - let evt = self.services.timeline.get_pdu(&event_id).await?; - e.insert(evt); + if !retransmitted { + backfilled_events.insert(event_id.clone(), parsed); } } + latest_events.retain(|event_id| !backfilled_events.contains_key(event_id)); debug!( count=%chunk_len, new=%unseen, remaining=%latest_events.len(), "Got missing events" ); - if unseen == 0 { - debug_warn!("Didn't see any new events, breaking cycle"); - break; - } else if chunk_len < usize::try_from(limit)? { - debug!( - "Got less than the limit number of events, assuming there's no more to fetch" + let frontier_changed = latest_events != frontier_before; + if unseen == 0 && !frontier_changed { + debug_warn!( + "Didn't see any new events and the frontier did not change, breaking cycle" ); break; } @@ -243,6 +228,7 @@ impl super::Service { /// b. Look at outlier pdu tree /// c. Ask origin server over federation /// d. TODO: Ask other servers over federation? + #[deprecated] pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>( &self, origin: &'a ServerName, diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 270b3d0d7..628b776a1 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -1,11 +1,18 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; -use conduwuit::{Event, PduEvent, debug, debug_info, error, trace}; -use ruma::{OwnedEventId, RoomId, ServerName}; +use conduwuit::{ + Event, PduEvent, debug, debug_info, + result::DebugInspect, + utils::{BoolExt, IterStream, stream::BroadbandExt}, +}; +use futures::StreamExt; +use ruma::{RoomId, ServerName}; use crate::rooms::event_handler::build_local_dag; impl super::Service { + /// Fetches any missing prev_events for this event and persists them before + /// returning. pub(super) async fn fetch_prevs( &self, room_id: &RoomId, @@ -13,106 +20,72 @@ impl super::Service { incoming_pdu: &PduEvent, origin: &ServerName, ) -> conduwuit::Result<()> { - let mut queue: VecDeque = VecDeque::new(); - queue.push_back(incoming_pdu.event_id().to_owned()); + let missing = incoming_pdu + .prev_events() + .stream() + .broad_filter_map(|event_id| async move { + self.services + .timeline + .get_non_outlier_pdu_json(event_id) + .await + .inspect(|_| debug!("Found prev_event {event_id} locally.")) + .inspect_err(|e| debug!(%e, "Could not find prev_event {event_id} locally.")) + .is_ok() + .or(|| event_id.to_owned()) + }) + .collect::>() + .await; + if missing.is_empty() { + debug!(event_id=%incoming_pdu.event_id(), "No missing prev events."); + return Ok(()); + } + debug!(%room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events"); + let tail = self + .services + .state + .get_forward_extremities(room_id) + .collect::>() + .await; - while let Some(event_id) = queue.pop_front() { - debug!(event_id=%incoming_pdu.event_id, "Fetching any missing prev_events"); - let mut missing_prev_events: HashSet = - incoming_pdu.prev_events().map(ToOwned::to_owned).collect(); - for pid in missing_prev_events.clone() { - if self.services.timeline.pdu_exists(&pid).await { - trace!("Found prev event {pid} for outlier event {event_id} locally"); - missing_prev_events.remove(&pid); - } else { - debug_info!( - "Could not find prev event {pid} for outlier event {event_id} locally, \ - will fetch over federation" - ); - } - } - if !missing_prev_events.is_empty() { - debug_info!( - "Fetching {} missing prev events for outlier event {event_id}", - missing_prev_events.len() - ); - let backfilled = self - .backfill_missing_events( - room_id.to_owned(), - vec![event_id.clone()], - origin.to_owned(), - ) - .await?; - debug_info!("Fetched {} missing events for {event_id}", backfilled.len()); - let mapped = backfilled - .iter() - .map(|(eid, evt)| { - let mut obj = evt.to_canonical_object(); - obj.remove("event_id"); // event_id is inserted by backfill_missing_events - (eid.clone(), obj) - }) - .collect::>(); - let local_dag = if mapped.len() == 1 { - mapped.keys().map(ToOwned::to_owned).collect() - } else { - build_local_dag(&mapped).await? - }; - debug_info!("Preparing to handle {} missing events", backfilled.len()); - for prev_event_id in local_dag { - let obj = mapped - .get(&prev_event_id) - .expect("We should have this event in memory"); - debug_info!("Handling prev event {prev_event_id}"); - match self - .handle_outlier_pdu( - origin, - create_event, - &prev_event_id, - room_id, - obj.clone(), - false, - ) - .await - { - | Ok(_) => { - debug!("Successfully handled {prev_event_id} as an outlier"); - missing_prev_events.remove(&prev_event_id); - }, - | Err(e) => - error!(error=?e, %prev_event_id, %event_id, "Failed to handle prev event"), - } - debug_info!("Finished handling prev"); - } - } - let outlier = self.services.timeline.get_pdu(&event_id).await; - if missing_prev_events.is_empty() - && let Ok(pdu) = outlier - { - // promote any prevs first - for prev_event_id in pdu.prev_events() { - debug_info!("Promoting prev event {prev_event_id} to timeline"); - let prev_pdu = self.services.timeline.get_pdu(&event_id).await?; - let val = prev_pdu.to_canonical_object(); - self.upgrade_outlier_to_timeline_pdu( - prev_pdu, - val, - create_event, - origin, - room_id, - ) - .await?; - debug_info!("Finished prev promoting {prev_event_id} to timeline"); - } - debug_info!("Promoting event {event_id} to timeline"); - let val = pdu.to_canonical_object(); - self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id) - .await?; - debug_info!("Finished promoting {event_id} to timeline"); - } else { - debug!(?missing_prev_events, ok=%outlier.is_ok(), "Not promoting {event_id}"); - } + let backfilled = self + .backfill_missing_events( + room_id.to_owned(), + HashSet::from_iter(vec![incoming_pdu.event_id().to_owned()]), + tail, + origin.to_owned(), + ) + .await?; + debug_info!("Fetched {} missing events", backfilled.len()); + + // Persist all fetched events + let mapped = backfilled + .iter() + .map(|(eid, evt)| { + let mut obj = evt.to_canonical_object(); + obj.remove("event_id"); // event_id is inserted by backfill_missing_events + (eid.clone(), obj) + }) + .collect::>(); + + let to_persist = if mapped.len() <= 1 { + mapped.keys().map(ToOwned::to_owned).collect() + } else { + build_local_dag(&mapped).await? + }; + + for event_id in to_persist { + debug_info!("Persisting fetched prev event {event_id}"); + let pdu_event = backfilled.get(&event_id).cloned().unwrap_or_else(|| { + panic!("Event {event_id} was in backfill response but not in map") + }); + let obj = pdu_event.to_canonical_object(); + self.upgrade_outlier_to_timeline_pdu(pdu_event, obj, create_event, origin, room_id) + .await + .debug_inspect(|_| debug_info!("Persisted fetched prev event {event_id}"))?; } + // NOTE because i keep forgetting: the caller persists incoming_pdu. + // we only care about its prev events Ok(()) } } diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index a8bba57fc..2d349a29d 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -4,7 +4,7 @@ use conduwuit::{ Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, trace, warn, }; -use futures::future::ready; +use futures::{StreamExt, future::ready}; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, events::StateEventType, @@ -118,10 +118,17 @@ where "Fetching {} missing auth events for outlier event {event_id}", missing_auth_events.len() ); + let tail = self + .services + .state + .get_forward_extremities(room_id) + .collect::>() + .await; let backfilled = self .backfill_missing_events( room_id.to_owned(), - vec![event_id.to_owned()], + HashSet::from_iter(vec![event_id.to_owned()]), + tail, origin.to_owned(), ) .await?; diff --git a/src/service/rooms/event_handler/state_at_incoming.rs b/src/service/rooms/event_handler/state_at_incoming.rs index 9f11ac4b8..4010c4462 100644 --- a/src/service/rooms/event_handler/state_at_incoming.rs +++ b/src/service/rooms/event_handler/state_at_incoming.rs @@ -5,7 +5,7 @@ use std::{ }; use conduwuit::{ - Result, debug, err, error, implement, + Result, debug, debug_error, err, error, implement, matrix::{Event, StateMap}, trace, utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, @@ -37,6 +37,7 @@ where .pdu_shortstatehash(prev_event) .await else { + trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state."); return Ok(None); }; @@ -99,6 +100,7 @@ where .map_ok(move |sstatehash| (sstatehash, prev_event)) }) .try_collect::>() + .inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}")) .await else { return Ok(None); diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 8b142463f..f4cae2ec7 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -173,6 +173,10 @@ impl Service { self.db.get_non_outlier_pdu_json(event_id).await } + pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool { + self.db.non_outlier_pdu_exists(event_id).await.is_ok() + } + /// Returns the pdu's id. #[inline] pub async fn get_pdu_id(&self, event_id: &EventId) -> Result {