perf: Improve gap filling, handle missing auth events better

This commit is contained in:
timedout
2026-05-26 20:00:55 +01:00
parent 56feba0ea0
commit b925936195
4 changed files with 166 additions and 198 deletions
+5 -5
View File
@@ -1146,16 +1146,16 @@ pub(super) async fn walk_missing_events(
earliest: OwnedEventId,
via: String,
) -> Result {
let earliest_pdu = self.services.rooms.timeline.get_pdu(&earliest).await?;
let latest_pdu = self.services.rooms.timeline.get_pdu(&latest).await?;
let events = self
.services
.rooms
.event_handler
.backfill_missing_events(
earliest_pdu.room_id_or_hash(),
HashSet::from_iter(vec![latest]),
.get_missing_events(
&latest_pdu.room_id_or_hash(),
&latest_pdu,
vec![earliest],
ServerName::parse(via)?,
&ServerName::parse(via)?,
)
.await?;
self.write_str(&format!("Found {} events:\n\n```\n", events.len()))
@@ -3,14 +3,19 @@ use std::{
time::Instant,
};
use assign::assign;
use conduwuit::{
Err, Event, PduEvent, debug, debug_info, debug_warn, err,
matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort,
trace, utils::continue_exponential_backoff_secs, warn,
Event, PduEvent, debug, debug_info, debug_warn, err, error,
matrix::event::gen_event_id_canonical_json,
state_res::lexicographical_topological_sort,
trace,
utils::{IterStream, continue_exponential_backoff_secs, stream::BroadbandExt},
warn,
};
use futures::StreamExt;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt,
RoomId, ServerName, UInt,
api::federation::event::{get_event, get_missing_events},
int,
};
@@ -94,129 +99,120 @@ impl super::Service {
/// exponentially incrementing limit (up to 100 per request) until it has
/// filled the gap, i.e. when the remote says there's no more events.
///
/// This function will iterate until the remote returns no more events,
/// increasing the limit by a factor of 10. If 20 iterations are reached or
/// 200 events are backfilled, the function will give up and return what it
/// has, to avoid pulling in too much data (for example, absurdly large
/// gaps).
///
/// This function does not persist the events. The caller is responsible for
/// passing them through handle_incoming_pdu.
pub async fn backfill_missing_events(
///
/// ## Parameters
///
/// - `room_id`: The room's ID.
/// - `head`: The event we are potentially missing prev_events for.
/// - `tail`: The most recently known events in the graph (typically forward
/// extremities).
/// - `via`: The server to ask for missing events.
pub async fn get_missing_events(
&self,
room_id: OwnedRoomId,
head: HashSet<OwnedEventId>,
room_id: &RoomId,
head: &PduEvent,
tail: Vec<OwnedEventId>,
via: OwnedServerName,
via: &ServerName,
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
if head.is_empty() {
return Ok(HashMap::new());
}
// TODO: min_depth is probably necessary to avoid fetching the entire room
// history if there are very long gaps
let mut latest_events = head.clone();
let mut loop_count: u64 = 3;
// Start with 3 so that we fetch 9, 16, 25, 36, 49, 64, 81, 100 events.
// This gives steady growth to the server's typical limit of 100. It's unlikely
// we'll end up close to that.
let mut backfilled_events = HashMap::with_capacity(10);
while !latest_events.is_empty() {
// TODO: holy clone()
let frontier_before = latest_events.clone();
let todo: Vec<OwnedEventId> = latest_events.clone().into_iter().collect();
let mut request =
get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone());
let limit = loop_count.saturating_pow(2).min(100);
request.limit = limit.try_into().expect("limit must fit into UInt");
debug_info!(
backfilled=%backfilled_events.len(),
%loop_count,
"Asking {via} for up to {limit} missing events",
);
trace!(
?latest_events,
?tail,
%via,
%limit,
"Requesting missing events"
#[cfg(debug_assertions)]
{
let missing_count = head
.prev_events()
.stream()
.broad_filter_map(|event_id| async move {
match self
.services
.timeline
.get_non_outlier_pdu_json(event_id)
.await
.inspect(|_| debug!("Found prev_event {event_id} locally."))
.inspect_err(
|e| debug!(%e, "Could not find prev_event {event_id} locally."),
) {
| Ok(_) => None,
| Err(_) => Some(event_id),
}
})
.count()
.await;
debug_assert_ne!(
missing_count, 0,
"event passed to get_missing_events is not missing any events (wasteful call)"
);
};
let mut discovered = HashMap::with_capacity(20);
let mut latest_events = vec![head.event_id().to_owned()];
let mut iterations = 0_u8;
loop {
iterations = iterations.saturating_add(1);
let limit = iterations.saturating_mul(10);
debug_info!(%limit, %via, %iterations, "Attempting to gap fill missing events");
let response: get_missing_events::v1::Response = self
.services
.sending
.send_federation_request(&via, request)
.send_federation_request(
via,
assign!(
get_missing_events::v1::Request::new(
room_id.to_owned(),
tail.clone(),
latest_events.clone()
),
{limit: limit.into()}
),
)
.await?;
loop_count = loop_count.saturating_add(1);
trace!(?response, "get_missing_events response");
// Some buggy servers (including old continuwuity) may return the same events
// multiple times, which can cause this to be an infinite loop.
// In order to break this loop, if we see no new events from this response (i.e.
// all events in the response are already in backfilled_events), we stop,
// with a warning.
let mut unseen: usize = 0;
let chunk_len = response.events.len();
if response.events.is_empty() {
debug_info!("No more missing events found");
debug_info!(%via, "Finished gap filling missing events (remote returned no more events).");
break;
}
debug_info!("Got {} events back from remote", response.events.len());
for event in response.events {
trace!("Parsing incoming event from backfill");
let (incoming_room_id, event_id, pdu_json) =
self.parse_incoming_pdu(&event).await.map_err(|e| {
err!(BadServerResponse("{via} returned an invalid event: {e:?}"))
})?;
trace!(%incoming_room_id, %event_id, "Parsed incoming event from backfill");
if incoming_room_id != room_id {
return Err!(BadServerResponse(
"{via} returned {event_id} in missing events which belongs to \
{incoming_room_id}, not {room_id}"
));
}
latest_events.remove(&event_id);
if head.contains(&event_id) || tail.contains(&event_id) {
debug!("Skipping known event {event_id}");
continue;
}
let retransmitted = backfilled_events.contains_key(&event_id);
if retransmitted {
debug_warn!(%via, %event_id, "Remote retransmitted event");
} else {
if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await {
debug!(%via, %event_id, "Already seen event in database");
backfilled_events.insert(event_id.clone(), pdu);
} else {
unseen = unseen.saturating_add(1);
latest_events.clear();
for raw_event in response.events {
let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?;
let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| {
err!(Request(BadJson("Failed to parse backfilled event {event_id}: {e}")))
})?;
for prev_event_id in pdu.prev_events() {
if discovered.contains_key(prev_event_id) {
continue;
}
}
let parsed = PduEvent::from_id_val(&event_id, pdu_json)
.map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?;
for prev_event_id in parsed.prev_events() {
if !(backfilled_events.contains_key(prev_event_id)
|| self.services.timeline.pdu_exists(prev_event_id).await)
if self
.services
.timeline
.non_outlier_pdu_exists(prev_event_id)
.await
{
latest_events.insert(prev_event_id.to_owned());
continue;
}
latest_events.push(prev_event_id.to_owned());
break;
}
if !retransmitted {
backfilled_events.insert(event_id.clone(), parsed);
}
discovered.insert(event_id.clone(), pdu);
}
latest_events.retain(|event_id| !backfilled_events.contains_key(event_id));
debug!(
count=%chunk_len,
new=%unseen,
remaining=%latest_events.len(),
"Got missing events"
);
let frontier_changed = latest_events != frontier_before;
if unseen == 0 && !frontier_changed {
debug_warn!(
"Didn't see any new events and the frontier did not change, breaking cycle"
);
if latest_events.is_empty() {
break;
} else if discovered.len() >= 200 || iterations >= 20 {
error!("Gap too large, giving up");
break;
}
}
debug_info!("Successfully fetched {} missing events from {via}", backfilled_events.len());
trace!("Missing_events: {backfilled_events:?}");
Ok(backfilled_events)
Ok(discovered)
}
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
+15 -16
View File
@@ -1,9 +1,9 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use conduwuit::{
Event, PduEvent, debug, debug_info,
result::DebugInspect,
utils::{BoolExt, IterStream, stream::BroadbandExt},
warn,
};
use futures::StreamExt;
use ruma::{RoomId, ServerName};
@@ -28,8 +28,6 @@ impl super::Service {
.timeline
.get_non_outlier_pdu_json(event_id)
.await
.inspect(|_| debug!("Found prev_event {event_id} locally."))
.inspect_err(|e| debug!(%e, "Could not find prev_event {event_id} locally."))
.is_ok()
.or(|| event_id.to_owned())
})
@@ -48,12 +46,7 @@ impl super::Service {
.await;
let backfilled = self
.backfill_missing_events(
room_id.to_owned(),
HashSet::from_iter(vec![incoming_pdu.event_id().to_owned()]),
tail,
origin.to_owned(),
)
.get_missing_events(room_id, incoming_pdu, tail, origin)
.await?;
debug_info!("Fetched {} missing events", backfilled.len());
@@ -75,13 +68,19 @@ impl super::Service {
for event_id in to_persist {
debug_info!("Persisting fetched prev event {event_id}");
let pdu_event = backfilled.get(&event_id).cloned().unwrap_or_else(|| {
panic!("Event {event_id} was in backfill response but not in map")
});
let obj = pdu_event.to_canonical_object();
self.upgrade_outlier_to_timeline_pdu(pdu_event, obj, create_event, origin, room_id)
let obj = mapped.get(&event_id).cloned().unwrap();
match self
.handle_outlier_pdu(origin, create_event, &event_id, room_id, obj, false)
.await
.debug_inspect(|_| debug_info!("Persisted fetched prev event {event_id}"))?;
{
| Ok((pdu, val)) =>
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
.await,
| Err(e) => {
warn!("Failed to persist prev_event {event_id}: {e}");
continue;
},
}?;
}
// NOTE because i keep forgetting: the caller persists incoming_pdu.
@@ -1,16 +1,16 @@
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, info, state_res,
trace, warn,
};
use futures::{StreamExt, future::ready};
use futures::future::ready;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
events::StateEventType,
api::federation::authorization::get_event_authorization, events::StateEventType,
};
use super::{build_local_dag, check_room_id, get_room_version_rules};
use super::{check_room_id, get_room_version_rules};
use crate::rooms::timeline::pdu_fits;
#[implement(super::Service)]
@@ -107,79 +107,52 @@ where
}
// Fetch any missing ones & reject invalid ones
let mut missing_auth_events: HashSet<OwnedEventId> = pdu_event
.auth_events()
.filter(|id| !auth_events.contains_key(*id))
.map(ToOwned::to_owned)
.collect();
if !missing_auth_events.is_empty() {
debug_info!(
"Fetching {} missing auth events for outlier event {event_id}",
missing_auth_events.len()
);
let tail = self
if auth_events.len() != pdu_event.auth_events().count() {
info!("Missing some auth events, asking remote for auth chain");
let response: get_event_authorization::v1::Response = self
.services
.state
.get_forward_extremities(room_id)
.collect::<Vec<_>>()
.await;
let backfilled = self
.backfill_missing_events(
room_id.to_owned(),
HashSet::from_iter(vec![event_id.to_owned()]),
tail,
origin.to_owned(),
)
.await?;
debug_info!("Fetched {} missing auth events for {event_id}", backfilled.len());
let mapped = backfilled
.iter()
.map(|(eid, evt)| {
let mut obj = evt.to_canonical_object();
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
(eid.clone(), obj)
})
.collect::<HashMap<_, _>>();
let local_dag = if mapped.len() == 1 {
mapped.keys().map(ToOwned::to_owned).collect()
} else {
build_local_dag(&mapped).await?
};
debug_info!("Preparing to handle {} missing auth events", backfilled.len());
for prev_event_id in local_dag {
let obj = mapped
.get(&prev_event_id)
.expect("We should have this event in memory");
debug_info!("Handling prev {prev_event_id}");
let (prev, _) = Box::pin(self.handle_outlier_pdu(
.sending
.send_federation_request(
origin,
create_event,
&prev_event_id,
room_id,
obj.clone(),
false,
))
.await?;
if missing_auth_events.contains(&*prev_event_id) {
missing_auth_events.remove(&prev_event_id);
auth_events.insert(prev_event_id, prev);
get_event_authorization::v1::Request::new(
room_id.to_owned(),
event_id.to_owned(),
),
)
.await
.map_err(|e| {
err!(Request(Forbidden(
"Remote server is not divulging incoming event's auth chain: {e}"
)))
})?;
let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len());
for auth_pdu_json in response.auth_chain {
let (auth_event_room_id, auth_event_id, auth_pdu_json) =
self.parse_incoming_pdu(&auth_pdu_json).await?;
if auth_event_room_id != room_id {
return Err!(Request(BadJson(
"Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}."
)));
}
debug_info!("Finished handling prev auth event");
let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json)
.map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?;
auth_chain_map.insert(auth_event_id, auth_pdu);
}
} else {
debug!("No missing auth events for outlier event {event_id}");
}
// reject if we are still missing some auth events.
// If we're still missing prev events, we will fetch them individually later,
// but there's no reason for us to be missing auth events now we've gapfilled
// the DAG.
if !missing_auth_events.is_empty() {
// Don't reject: this could be a temporary condition
return Err!(Request(InvalidParam(
"Could not fetch all auth events for outlier event {event_id}, still missing: \
{missing_auth_events:?}"
)));
for aid in pdu_event.auth_events() {
if auth_events.contains_key(aid) {
continue;
}
if let Some(auth_event) = auth_chain_map.get(aid) {
auth_events.insert(aid.to_owned(), auth_event.clone());
} else {
return Err!(Request(Forbidden(
"Remote server is not divulging incoming event's auth events (missing: \
{aid})"
)));
}
}
// TODO: do events received from auth chain need persisting? that sounds
// awfully slow
}
// 6. Reject "due to auth events" if the event doesn't pass auth based on the