feat: Keep track of a min_depth value

Should prevent weird situations where we accidentally gapfill into backfill territory
This commit is contained in:
timedout
2026-05-26 21:04:52 +01:00
parent c4d297ae3b
commit 0d4bbe612d
8 changed files with 65 additions and 41 deletions
+1 -30
View File
@@ -22,7 +22,7 @@ use futures::{FutureExt, StreamExt, TryStreamExt};
use lettre::message::Mailbox;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, ServerName, UInt,
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, UInt,
api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw,
};
use service::rooms::{
@@ -1138,32 +1138,3 @@ pub(super) async fn send_test_email(&self) -> Result {
Ok(())
}
#[admin_command]
pub(super) async fn walk_missing_events(
&self,
latest: OwnedEventId,
earliest: OwnedEventId,
via: String,
) -> Result {
let latest_pdu = self.services.rooms.timeline.get_pdu(&latest).await?;
let events = self
.services
.rooms
.event_handler
.get_missing_events(
&latest_pdu.room_id_or_hash(),
&latest_pdu,
vec![earliest],
&ServerName::parse(via)?,
)
.await?;
self.write_str(&format!("Found {} events:\n\n```\n", events.len()))
.await?;
for (event_id, event) in events {
self.write_str(&format!("{event_id}: {:?}\n\n", serde_json::to_string(&event)))
.await?;
}
self.write_str("\n```").await?;
Ok(())
}
-6
View File
@@ -32,12 +32,6 @@ pub enum DebugCommand {
event_id: OwnedEventId,
},
WalkMissingEvents {
latest: OwnedEventId,
earliest: OwnedEventId,
via: String,
},
/// Parse and print a PDU from a JSON
///
/// The PDU event is only checked for validity and is not added to the
+4
View File
@@ -187,6 +187,10 @@ pub(super) static MAPS: &[Descriptor] = &[
val_size_hint: Some(8),
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "roomid_mindepth",
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "roomserverids",
..descriptor::RANDOM_SMALL
@@ -115,12 +115,16 @@ impl super::Service {
/// - `tail`: The most recently known events in the graph (typically forward
/// extremities).
/// - `via`: The server to ask for missing events.
/// - `min_depth`: Don't process events with a `depth` lower than this
/// value. Not massively useful, but can help short-circuit infinite loops
/// and weird edge paths.
pub async fn get_missing_events(
&self,
room_id: &RoomId,
head: &PduEvent,
tail: Vec<OwnedEventId>,
via: &ServerName,
min_depth: UInt,
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
#[cfg(debug_assertions)]
{
@@ -155,7 +159,7 @@ impl super::Service {
loop {
iterations = iterations.saturating_add(1);
let limit = iterations.saturating_mul(10).min(100);
debug_info!(%limit, %via, %iterations, "Attempting to gap fill missing events");
debug_info!(%limit, %via, %iterations, discovered=discovered.len(), %min_depth, "Attempting to gap fill missing events");
let response: get_missing_events::v1::Response = self
.services
.sending
@@ -167,7 +171,7 @@ impl super::Service {
tail.clone(),
latest_events.clone()
),
{limit: limit.into()}
{limit: limit.into(), min_depth}
),
)
.await?;
@@ -185,6 +189,15 @@ impl super::Service {
err!(Request(BadJson("Failed to parse backfilled event {event_id}: {e}")))
})?;
if pdu.depth < min_depth {
debug_warn!(
"Received PDU with depth {} below min_depth {}, ignoring",
pdu.depth,
min_depth
);
continue;
}
for prev_event_id in pdu.prev_events() {
if discovered.contains_key(prev_event_id) {
continue;
@@ -46,7 +46,13 @@ impl super::Service {
.await;
let backfilled = self
.get_missing_events(room_id, incoming_pdu, tail, origin)
.get_missing_events(
room_id,
incoming_pdu,
tail,
origin,
self.services.metadata.get_mindepth(room_id).await,
)
.await?;
debug_info!("Fetched {} missing events", backfilled.len());
@@ -64,6 +64,7 @@ where
"Upgrading PDU from outlier to timeline"
);
let timer = Instant::now();
let min_depth = self.services.metadata.get_mindepth(room_id).await;
let room_version_rules = get_room_version_rules(create_event)?;
// 10. Fetch missing state and auth chain events by calling /state_ids at
@@ -384,6 +385,11 @@ where
// Event has passed all auth/stateres checks
drop(state_lock);
if incoming_pdu.depth > min_depth {
self.services
.metadata
.set_mindepth(room_id, incoming_pdu.depth.into());
}
Ok(pdu_id)
}
+4
View File
@@ -626,6 +626,10 @@ impl Service {
room_id,
)
.await?;
self.services
.metadata
.maybe_set_mindepth(room_id, parsed_join_pdu.depth.into())
.await;
info!("Setting final room state for new room");
// We set the room state after inserting the pdu, so that we never have a moment
+28 -2
View File
@@ -1,9 +1,9 @@
use std::sync::Arc;
use conduwuit::{Result, implement, utils::stream::TryIgnore};
use database::Map;
use database::{Deserialized, Map};
use futures::{Stream, StreamExt};
use ruma::{OwnedRoomId, RoomId};
use ruma::{OwnedRoomId, RoomId, UInt, uint};
use crate::{Dep, rooms};
@@ -17,6 +17,7 @@ struct Data {
bannedroomids: Arc<Map>,
roomid_shortroomid: Arc<Map>,
pduid_pdu: Arc<Map>,
roomid_mindepth: Arc<Map>,
}
struct Services {
@@ -31,6 +32,7 @@ impl crate::Service for Service {
bannedroomids: args.db["bannedroomids"].clone(),
roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
pduid_pdu: args.db["pduid_pdu"].clone(),
roomid_mindepth: args.db["roomid_mindepth"].clone(),
},
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
@@ -98,3 +100,27 @@ pub async fn is_disabled(&self, room_id: &RoomId) -> bool {
pub async fn is_banned(&self, room_id: &RoomId) -> bool {
self.db.bannedroomids.get(room_id).await.is_ok()
}
#[implement(Service)]
pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt {
self.db
.roomid_mindepth
.get(room_id)
.await
.deserialized::<UInt>()
.unwrap_or_else(|_| uint!(0))
}
#[implement(Service)]
pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
self.db
.roomid_mindepth
.put_raw(room_id.as_bytes(), min_depth.to_be_bytes());
}
#[implement(Service)]
pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
if min_depth > self.get_mindepth(room_id).await.into() {
self.set_mindepth(room_id, min_depth);
}
}