From 0d4bbe612d8e41168fc42c9441205c0a780b966a Mon Sep 17 00:00:00 2001 From: timedout Date: Tue, 26 May 2026 21:04:52 +0100 Subject: [PATCH] feat: Keep track of a min_depth value Should prevent weird situations where we accidentally gapfill into backfill territory --- src/admin/debug/commands.rs | 31 +------------------ src/admin/debug/mod.rs | 6 ---- src/database/maps.rs | 4 +++ .../fetch_and_handle_outliers.rs | 17 ++++++++-- src/service/rooms/event_handler/fetch_prev.rs | 8 ++++- .../event_handler/upgrade_outlier_pdu.rs | 6 ++++ src/service/rooms/membership/mod.rs | 4 +++ src/service/rooms/metadata/mod.rs | 30 ++++++++++++++++-- 8 files changed, 65 insertions(+), 41 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index dd6201c74..f9a678f19 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -22,7 +22,7 @@ use futures::{FutureExt, StreamExt, TryStreamExt}; use lettre::message::Mailbox; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, - OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, ServerName, UInt, + OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, UInt, api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw, }; use service::rooms::{ @@ -1138,32 +1138,3 @@ pub(super) async fn send_test_email(&self) -> Result { Ok(()) } - -#[admin_command] -pub(super) async fn walk_missing_events( - &self, - latest: OwnedEventId, - earliest: OwnedEventId, - via: String, -) -> Result { - let latest_pdu = self.services.rooms.timeline.get_pdu(&latest).await?; - let events = self - .services - .rooms - .event_handler - .get_missing_events( - &latest_pdu.room_id_or_hash(), - &latest_pdu, - vec![earliest], - &ServerName::parse(via)?, - ) - .await?; - self.write_str(&format!("Found {} events:\n\n```\n", events.len())) - .await?; - for (event_id, event) in events { - self.write_str(&format!("{event_id}: {:?}\n\n", serde_json::to_string(&event))) - .await?; - } - self.write_str("\n```").await?; - Ok(()) -} diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index 608d8c5aa..a9478ce82 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -32,12 +32,6 @@ pub enum DebugCommand { event_id: OwnedEventId, }, - WalkMissingEvents { - latest: OwnedEventId, - earliest: OwnedEventId, - via: String, - }, - /// Parse and print a PDU from a JSON /// /// The PDU event is only checked for validity and is not added to the diff --git a/src/database/maps.rs b/src/database/maps.rs index e8eb02331..6c41237fc 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -187,6 +187,10 @@ pub(super) static MAPS: &[Descriptor] = &[ val_size_hint: Some(8), ..descriptor::RANDOM_SMALL }, + Descriptor { + name: "roomid_mindepth", + ..descriptor::RANDOM_SMALL + }, Descriptor { name: "roomserverids", ..descriptor::RANDOM_SMALL diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 21f148921..1f5fc7e78 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -115,12 +115,16 @@ impl super::Service { /// - `tail`: The most recently known events in the graph (typically forward /// extremities). /// - `via`: The server to ask for missing events. + /// - `min_depth`: Don't process events with a `depth` lower than this + /// value. Not massively useful, but can help short-circuit infinite loops + /// and weird edge paths. pub async fn get_missing_events( &self, room_id: &RoomId, head: &PduEvent, tail: Vec, via: &ServerName, + min_depth: UInt, ) -> conduwuit::Result> { #[cfg(debug_assertions)] { @@ -155,7 +159,7 @@ impl super::Service { loop { iterations = iterations.saturating_add(1); let limit = iterations.saturating_mul(10).min(100); - debug_info!(%limit, %via, %iterations, "Attempting to gap fill missing events"); + debug_info!(%limit, %via, %iterations, discovered=discovered.len(), %min_depth, "Attempting to gap fill missing events"); let response: get_missing_events::v1::Response = self .services .sending @@ -167,7 +171,7 @@ impl super::Service { tail.clone(), latest_events.clone() ), - {limit: limit.into()} + {limit: limit.into(), min_depth} ), ) .await?; @@ -185,6 +189,15 @@ impl super::Service { err!(Request(BadJson("Failed to parse backfilled event {event_id}: {e}"))) })?; + if pdu.depth < min_depth { + debug_warn!( + "Received PDU with depth {} below min_depth {}, ignoring", + pdu.depth, + min_depth + ); + continue; + } + for prev_event_id in pdu.prev_events() { if discovered.contains_key(prev_event_id) { continue; diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index bd67e4f28..9be7b2e96 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -46,7 +46,13 @@ impl super::Service { .await; let backfilled = self - .get_missing_events(room_id, incoming_pdu, tail, origin) + .get_missing_events( + room_id, + incoming_pdu, + tail, + origin, + self.services.metadata.get_mindepth(room_id).await, + ) .await?; debug_info!("Fetched {} missing events", backfilled.len()); diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index db947a803..ad81c0af1 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -64,6 +64,7 @@ where "Upgrading PDU from outlier to timeline" ); let timer = Instant::now(); + let min_depth = self.services.metadata.get_mindepth(room_id).await; let room_version_rules = get_room_version_rules(create_event)?; // 10. Fetch missing state and auth chain events by calling /state_ids at @@ -384,6 +385,11 @@ where // Event has passed all auth/stateres checks drop(state_lock); + if incoming_pdu.depth > min_depth { + self.services + .metadata + .set_mindepth(room_id, incoming_pdu.depth.into()); + } Ok(pdu_id) } diff --git a/src/service/rooms/membership/mod.rs b/src/service/rooms/membership/mod.rs index ff69e095e..539d63ea4 100644 --- a/src/service/rooms/membership/mod.rs +++ b/src/service/rooms/membership/mod.rs @@ -626,6 +626,10 @@ impl Service { room_id, ) .await?; + self.services + .metadata + .maybe_set_mindepth(room_id, parsed_join_pdu.depth.into()) + .await; info!("Setting final room state for new room"); // We set the room state after inserting the pdu, so that we never have a moment diff --git a/src/service/rooms/metadata/mod.rs b/src/service/rooms/metadata/mod.rs index 310ab2c76..da2112515 100644 --- a/src/service/rooms/metadata/mod.rs +++ b/src/service/rooms/metadata/mod.rs @@ -1,9 +1,9 @@ use std::sync::Arc; use conduwuit::{Result, implement, utils::stream::TryIgnore}; -use database::Map; +use database::{Deserialized, Map}; use futures::{Stream, StreamExt}; -use ruma::{OwnedRoomId, RoomId}; +use ruma::{OwnedRoomId, RoomId, UInt, uint}; use crate::{Dep, rooms}; @@ -17,6 +17,7 @@ struct Data { bannedroomids: Arc, roomid_shortroomid: Arc, pduid_pdu: Arc, + roomid_mindepth: Arc, } struct Services { @@ -31,6 +32,7 @@ impl crate::Service for Service { bannedroomids: args.db["bannedroomids"].clone(), roomid_shortroomid: args.db["roomid_shortroomid"].clone(), pduid_pdu: args.db["pduid_pdu"].clone(), + roomid_mindepth: args.db["roomid_mindepth"].clone(), }, services: Services { short: args.depend::("rooms::short"), @@ -98,3 +100,27 @@ pub async fn is_disabled(&self, room_id: &RoomId) -> bool { pub async fn is_banned(&self, room_id: &RoomId) -> bool { self.db.bannedroomids.get(room_id).await.is_ok() } + +#[implement(Service)] +pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt { + self.db + .roomid_mindepth + .get(room_id) + .await + .deserialized::() + .unwrap_or_else(|_| uint!(0)) +} + +#[implement(Service)] +pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) { + self.db + .roomid_mindepth + .put_raw(room_id.as_bytes(), min_depth.to_be_bytes()); +} + +#[implement(Service)] +pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) { + if min_depth > self.get_mindepth(room_id).await.into() { + self.set_mindepth(room_id, min_depth); + } +}