Compare commits

...

8 Commits

Author SHA1 Message Date
timedout c7f8eec282 chore: Add newsfrag 2026-05-26 21:26:57 +01:00
timedout 0d4bbe612d feat: Keep track of a min_depth value
Should prevent weird situations where we accidentally gapfill into backfill territory
2026-05-26 21:22:32 +01:00
timedout c4d297ae3b perf: Increase default max_fetch_prev_events to 256 2026-05-26 20:25:37 +01:00
timedout d15064871e perf: Make max gap depth fetch configurable 2026-05-26 20:22:45 +01:00
timedout b925936195 perf: Improve gap filling, handle missing auth events better 2026-05-26 20:22:45 +01:00
timedout 56feba0ea0 fix: This is some bullshit I tell you 2026-05-26 20:22:40 +01:00
timedout 8d89ba94d5 feat: Better prev event fetching
fix: Don't panic in debug mode when making an empty notary query
2026-05-26 20:22:10 +01:00
timedout 0b135c7717 feat: Add backfill_missing_events helper 2026-05-26 20:22:10 +01:00
19 changed files with 623 additions and 507 deletions
+2
View File
@@ -0,0 +1,2 @@
Improved the performance and reliability of fetching missing events, improving network partition recovery. Contributed
by @nex.
+1 -1
View File
@@ -297,7 +297,7 @@
# This item is undocumented. Please contribute documentation for it. # This item is undocumented. Please contribute documentation for it.
# #
#max_fetch_prev_events = 192 #max_fetch_prev_events = 256
# How many incoming federation transactions the server is willing to be # How many incoming federation transactions the server is willing to be
# processing at any given time before it becomes overloaded and starts # processing at any given time before it becomes overloaded and starts
+1
View File
@@ -381,6 +381,7 @@ async fn handle_room(
.rooms .rooms
.event_handler .event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true) .handle_incoming_pdu(origin, room_id, &event_id, value, true)
.boxed()
.await .await
.map(|_| ()); .map(|_| ());
results.push((event_id, result)); results.push((event_id, result));
+2 -2
View File
@@ -375,7 +375,7 @@ pub struct Config {
#[serde(default = "default_max_request_size")] #[serde(default = "default_max_request_size")]
pub max_request_size: usize, pub max_request_size: usize,
/// default: 192 /// default: 256
#[serde(default = "default_max_fetch_prev_events")] #[serde(default = "default_max_fetch_prev_events")]
pub max_fetch_prev_events: u16, pub max_fetch_prev_events: u16,
@@ -2549,7 +2549,7 @@ fn default_pusher_timeout() -> u64 { 60 }
fn default_pusher_idle_timeout() -> u64 { 15 } fn default_pusher_idle_timeout() -> u64 { 15 }
fn default_max_fetch_prev_events() -> u16 { 192_u16 } fn default_max_fetch_prev_events() -> u16 { 256_u16 }
fn default_max_concurrent_inbound_transactions() -> usize { 150 } fn default_max_concurrent_inbound_transactions() -> usize { 150 }
+4
View File
@@ -187,6 +187,10 @@ pub(super) static MAPS: &[Descriptor] = &[
val_size_hint: Some(8), val_size_hint: Some(8),
..descriptor::RANDOM_SMALL ..descriptor::RANDOM_SMALL
}, },
Descriptor {
name: "roomid_mindepth",
..descriptor::RANDOM_SMALL
},
Descriptor { Descriptor {
name: "roomserverids", name: "roomserverids",
..descriptor::RANDOM_SMALL ..descriptor::RANDOM_SMALL
@@ -1,40 +1,261 @@
use std::{ use std::{
collections::{BTreeMap, HashSet, VecDeque, hash_map}, collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
time::Instant, time::Instant,
}; };
use assign::assign;
use conduwuit::{ use conduwuit::{
Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json, Event, PduEvent, debug, debug_info, debug_warn, err, error,
trace, utils::continue_exponential_backoff_secs, warn, matrix::event::gen_event_id_canonical_json,
state_res::lexicographical_topological_sort,
trace,
utils::{IterStream, continue_exponential_backoff_secs, stream::BroadbandExt},
warn,
}; };
use futures::StreamExt;
use ruma::{ use ruma::{
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
api::federation::event::get_event, RoomId, ServerName, UInt,
api::federation::event::{get_event, get_missing_events},
int,
}; };
use super::get_room_version_rules; use super::get_room_version_rules;
/// Find the event and auth it. Once the event is validated (steps 1 - 8) /// Attempts to build a localised directed acyclic graph out of the given PDUs,
/// it is appended to the outliers Tree. /// returning them in a topologically sorted order.
/// ///
/// Returns pdu and if we fetched it over federation the raw json. /// This is used to attempt to process PDUs in an order that respects their
/// /// dependencies, however it is ultimately the sender's responsibility to send
/// a. Look in the main timeline (pduid_pdu tree) /// them in a processable order, so this is just a best effort attempt. It does
/// b. Look at outlier pdu tree /// not account for power levels or other tie breaks.
/// c. Ask origin server over federation pub async fn build_local_dag<S: std::hash::BuildHasher>(
/// d. TODO: Ask other servers over federation? pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject, S>,
#[implement(super::Service)] ) -> conduwuit::Result<Vec<OwnedEventId>> {
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>( debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
HashMap::with_capacity(pdu_map.len());
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
for (event_id, value) in pdu_map {
// We already checked that these properties are correct in parse_incoming_pdu,
// so it's safe to unwrap here.
// We also filter to remove any prev_events that are not in this pdu_map, as we
// need to have at least one event with zero out degrees for the lexico-topo
// sort below. If there are multiple events with omitted prevs, they will be
// ordered by timestamp, then event ID. At that point though, it's unlikely to
// matter.
let prev_events = value
.get("prev_events")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
.filter(|id| pdu_map.contains_key(id))
.collect();
dag.insert(event_id.clone(), prev_events);
let origin_server_ts = value
.get("origin_server_ts")
.and_then(CanonicalJsonValue::as_integer)
.unwrap_or_default();
id_origin_ts.insert(event_id.clone(), origin_server_ts);
}
debug!(count = dag.len(), "Sorting incoming events with partial graph");
lexicographical_topological_sort(&dag, &async |node_id| {
// Note: we don't bother fetching power levels because that would massively slow
// this function down. This is a best-effort attempt to order events correctly
// for processing, however ultimately that should be the sender's job.
let ts = id_origin_ts
.get(&node_id)
.copied()
.unwrap_or_else(|| int!(0))
.to_string()
.parse::<u64>()
.ok()
.and_then(UInt::new)
.unwrap_or_default();
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
})
.await
.inspect(|sorted| {
debug_assert_eq!(
sorted.len(),
pdu_map.len(),
"Sorted graph was not the same size as the input graph"
);
})
.map_err(|e| err!("failed to resolve local graph: {e}"))
}
impl super::Service {
/// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the
/// DAG.
///
/// When this function is called, the "earliest events" (current forward
/// extremities) will be collected, and the function will loop with an
/// exponentially incrementing limit (up to 100 per request) until it has
/// filled the gap, i.e. when the remote says there's no more events.
///
/// This function will iterate until the remote returns no more events,
/// increasing the limit by a factor of 10. If 100 iterations are reached or
/// max_fetch_prev_events events are backfilled, the function will give up
/// and return what it has, to avoid pulling in too much data (for example,
/// absurdly large gaps).
///
/// This function does not persist the events. The caller is responsible for
/// passing them through handle_incoming_pdu.
///
/// ## Parameters
///
/// - `room_id`: The room's ID.
/// - `head`: The event we are potentially missing prev_events for.
/// - `tail`: The most recently known events in the graph (typically forward
/// extremities).
/// - `via`: The server to ask for missing events.
/// - `min_depth`: Don't process events with a `depth` lower than this
/// value. Not massively useful, but can help short-circuit infinite loops
/// and weird edge paths.
pub async fn get_missing_events(
&self,
room_id: &RoomId,
head: &PduEvent,
tail: Vec<OwnedEventId>,
via: &ServerName,
min_depth: UInt,
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
#[cfg(debug_assertions)]
{
let missing_count = head
.prev_events()
.stream()
.broad_filter_map(|event_id| async move {
match self
.services
.timeline
.get_non_outlier_pdu_json(event_id)
.await
.inspect(|_| debug!("Found prev_event {event_id} locally."))
.inspect_err(
|e| debug!(%e, "Could not find prev_event {event_id} locally."),
) {
| Ok(_) => None,
| Err(_) => Some(event_id),
}
})
.count()
.await;
debug_assert_ne!(
missing_count, 0,
"event passed to get_missing_events is not missing any events (wasteful call)"
);
};
let mut discovered = HashMap::with_capacity(20);
let mut latest_events = vec![head.event_id().to_owned()];
let mut iterations = 0_u8;
loop {
iterations = iterations.saturating_add(1);
let limit = iterations.saturating_mul(10).min(100);
debug_info!(%limit, %via, %iterations, discovered=discovered.len(), %min_depth, "Attempting to gap fill missing events");
let response: get_missing_events::v1::Response = self
.services
.sending
.send_federation_request(
via,
assign!(
get_missing_events::v1::Request::new(
room_id.to_owned(),
tail.clone(),
latest_events.clone()
),
{limit: limit.into(), min_depth}
),
)
.await?;
if response.events.is_empty() {
debug_info!(%via, "Finished gap filling missing events (remote returned no more events).");
break;
}
debug_info!("Got {} events back from remote", response.events.len());
latest_events.clear();
for raw_event in response.events {
let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?;
let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| {
err!(Request(BadJson("Failed to parse backfilled event {event_id}: {e}")))
})?;
if pdu.depth < min_depth {
debug_warn!(
"Received PDU with depth {} below min_depth {}, ignoring",
pdu.depth,
min_depth
);
continue;
}
for prev_event_id in pdu.prev_events() {
if discovered.contains_key(prev_event_id) {
continue;
}
if self
.services
.timeline
.non_outlier_pdu_exists(prev_event_id)
.await
{
continue;
}
latest_events.push(prev_event_id.to_owned());
break;
}
discovered.insert(event_id.clone(), pdu);
}
if latest_events.is_empty() {
break;
} else if discovered.len() > self.services.server.config.max_fetch_prev_events.into()
|| iterations >= 20
{
error!(
filled=discovered.len(),
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
%iterations,
"Gap too large, giving up"
);
break;
}
}
Ok(discovered)
}
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
///
/// Returns pdu and if we fetched it over federation the raw json.
///
/// a. Look in the main timeline (pduid_pdu tree)
/// b. Look at outlier pdu tree
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
#[deprecated]
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
&self, &self,
origin: &'a ServerName, origin: &'a ServerName,
events: Events, events: Events,
create_event: &'a Pdu, create_event: &'a Pdu,
room_id: &'a RoomId, room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)> ) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
where where
Pdu: Event + Send + Sync, Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send, Events: Iterator<Item = &'a EventId> + Clone + Send,
{ {
let back_off = |id| match self let back_off = |id| match self
.services .services
.globals .globals
@@ -131,8 +352,8 @@ where
if calculated_event_id != *next_id { if calculated_event_id != *next_id {
warn!( warn!(
"Server didn't return event id we requested: requested: {next_id}, \ "Server didn't return event id we requested: requested: \
we got {calculated_event_id}. Event: {:?}", {next_id}, we got {calculated_event_id}. Event: {:?}",
&res.pdu &res.pdu
); );
} }
@@ -147,7 +368,8 @@ where
) { ) {
| Ok(auth_event) => { | Ok(auth_event) => {
trace!( trace!(
"Found auth event id {auth_event} for event {next_id}" "Found auth event id {auth_event} for event \
{next_id}"
); );
todo_auth_events.push_back(auth_event); todo_auth_events.push_back(auth_event);
}, },
@@ -230,4 +452,5 @@ where
} }
trace!("Fetched and handled {} outlier pdus", pdus.len()); trace!("Fetched and handled {} outlier pdus", pdus.len());
pdus pdus
}
} }
+78 -110
View File
@@ -1,128 +1,96 @@
use std::{ use std::collections::HashMap;
collections::{BTreeMap, HashMap, HashSet, VecDeque},
iter::once,
};
use conduwuit::{ use conduwuit::{
Event, PduEvent, Result, debug_warn, err, implement, Event, PduEvent, debug, debug_info,
state_res::{self}, utils::{BoolExt, IterStream, stream::BroadbandExt},
}; warn,
use futures::{FutureExt, future};
use ruma::{
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
int, uint,
}; };
use futures::StreamExt;
use ruma::{RoomId, ServerName};
use super::check_room_id; use crate::rooms::event_handler::build_local_dag;
#[implement(super::Service)] impl super::Service {
#[tracing::instrument( /// Fetches any missing prev_events for this event and persists them before
level = "debug", /// returning.
skip_all, pub(super) async fn fetch_prevs(
fields(%origin),
)]
#[allow(clippy::type_complexity)]
pub(super) async fn fetch_prev<'a, Pdu, Events>(
&self, &self,
origin: &ServerName,
create_event: &Pdu,
room_id: &RoomId, room_id: &RoomId,
first_ts_in_room: MilliSecondsSinceUnixEpoch, create_event: &PduEvent,
initial_set: Events, incoming_pdu: &PduEvent,
) -> Result<( origin: &ServerName,
Vec<OwnedEventId>, ) -> conduwuit::Result<()> {
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>, let missing = incoming_pdu
)> .prev_events()
where .stream()
Pdu: Event + Send + Sync, .broad_filter_map(|event_id| async move {
Events: Iterator<Item = &'a EventId> + Clone + Send, self.services
{ .timeline
let num_ids = initial_set.clone().count(); .get_non_outlier_pdu_json(event_id)
let mut eventid_info = HashMap::new();
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
initial_set.map(ToOwned::to_owned).collect();
let mut amount = 0;
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
self.services.server.check_running()?;
match self
.fetch_and_handle_outliers(
origin,
once(prev_event_id.as_ref()),
create_event,
room_id,
)
.boxed()
.await .await
.pop() .is_ok()
{ .or(|| event_id.to_owned())
| Some((pdu, mut json_opt)) => { })
check_room_id(room_id, &pdu)?; .collect::<Vec<_>>()
.await;
let limit = self.services.server.config.max_fetch_prev_events; if missing.is_empty() {
if amount > limit { debug!(event_id=%incoming_pdu.event_id(), "No missing prev events.");
debug_warn!("Max prev event limit reached! Limit: {limit}"); return Ok(());
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
} }
debug!(%room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events");
if json_opt.is_none() { let tail = self
json_opt = self
.services .services
.outlier .state
.get_outlier_pdu_json(&prev_event_id) .get_forward_extremities(room_id)
.await .collect::<Vec<_>>()
.ok(); .await;
}
if let Some(json) = json_opt { let backfilled = self
if pdu.origin_server_ts() > first_ts_in_room { .get_missing_events(
amount = amount.saturating_add(1); room_id,
for prev_prev in pdu.prev_events() { incoming_pdu,
if !graph.contains_key(prev_prev) { tail,
todo_outlier_stack.push_back(prev_prev.to_owned()); origin,
} self.services.metadata.get_mindepth(room_id).await,
} )
.await?;
debug_info!("Fetched {} missing events", backfilled.len());
graph.insert( // Persist all fetched events
prev_event_id.clone(), let mapped = backfilled
pdu.prev_events().map(ToOwned::to_owned).collect(), .iter()
); .map(|(eid, evt)| {
let mut obj = evt.to_canonical_object();
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
(eid.clone(), obj)
})
.collect::<HashMap<_, _>>();
let to_persist = if mapped.len() <= 1 {
mapped.keys().map(ToOwned::to_owned).collect()
} else { } else {
// Time based check failed build_local_dag(&mapped).await?
graph.insert(prev_event_id.clone(), HashSet::new());
}
eventid_info.insert(prev_event_id.clone(), (pdu, json));
} else {
// Get json failed, so this was not fetched over federation
graph.insert(prev_event_id.clone(), HashSet::new());
}
},
| _ => {
// Fetch and handle failed
graph.insert(prev_event_id.clone(), HashSet::new());
},
}
}
let event_fetch = |event_id| {
let origin_server_ts = eventid_info
.get(&event_id)
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
// This return value is the key used for sorting events,
// events are then sorted by power level, time,
// and lexically by event_id.
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
}; };
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch) for event_id in to_persist {
debug_info!("Persisting fetched prev event {event_id}");
let obj = mapped.get(&event_id).cloned().unwrap();
match self
.handle_outlier_pdu(origin, create_event, &event_id, room_id, obj, false)
.await .await
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?; {
| Ok((pdu, val)) =>
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
.await,
| Err(e) => {
warn!("Failed to persist prev_event {event_id}: {e}");
continue;
},
}?;
}
Ok((sorted, eventid_info)) // NOTE because i keep forgetting: the caller persists incoming_pdu.
// we only care about its prev events
Ok(())
}
} }
@@ -1,7 +1,6 @@
use std::collections::{HashMap, hash_map}; use std::collections::{HashMap, hash_map};
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement}; use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
use futures::FutureExt;
use ruma::{ use ruma::{
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids, EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
events::StateEventType, events::StateEventType,
@@ -42,7 +41,6 @@ where
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
let state_vec = self let state_vec = self
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id) .fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
.boxed()
.await; .await;
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len()); let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
@@ -1,14 +1,11 @@
use std::{ use std::{collections::BTreeMap, time::Instant};
collections::{BTreeMap, hash_map},
time::Instant,
};
use conduwuit::{ use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err, Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
implement, info, trace, utils::stream::IterStream, warn, implement, info, trace, warn,
}; };
use futures::{ use futures::{
FutureExt, TryFutureExt, TryStreamExt, FutureExt,
future::{OptionFuture, try_join4}, future::{OptionFuture, try_join4},
}; };
use ruma::{ use ruma::{
@@ -236,63 +233,21 @@ pub async fn handle_incoming_pdu<'a>(
} }
// Skip old events // Skip old events
let first_ts_in_room = self // let first_ts_in_room = self
.services // .services
.timeline // .timeline
.first_pdu_in_room(room_id) // .first_pdu_in_room(room_id)
.await? // .await?
.origin_server_ts(); // .origin_server_ts();
// 9. Fetch any missing prev events doing all checks listed here starting at 1. // 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events // These are timeline events
let (sorted_prev_events, mut eventid_info) = self debug!("Handling previous events");
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
.await?;
debug!( self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
events = ?sorted_prev_events,
"Handling previous events"
);
sorted_prev_events
.iter()
.try_stream()
.map_ok(AsRef::as_ref)
.try_for_each(|prev_id| {
self.handle_prev_pdu(
origin,
event_id,
room_id,
eventid_info.remove(prev_id),
create_event,
first_ts_in_room,
prev_id,
)
.inspect_err(move |e| {
warn!("Prev {prev_id} failed: {e}");
match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(prev_id.into())
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
let tries = e.get().1.saturating_add(1);
*e.get_mut() = (Instant::now(), tries);
},
}
})
.map(|_| self.services.server.check_running())
})
.boxed()
.await?; .await?;
// Done with prev events, now handling the incoming event // Done with prev events, now handling the incoming event
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id) self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
.boxed()
.await .await
} }
@@ -1,13 +1,13 @@
use std::collections::{BTreeMap, HashMap, hash_map}; use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{ use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, info, state_res,
trace, warn, trace, warn,
}; };
use futures::future::ready; use futures::future::ready;
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
events::StateEventType, api::federation::authorization::get_event_authorization, events::StateEventType,
}; };
use super::{check_room_id, get_room_version_rules}; use super::{check_room_id, get_room_version_rules};
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
event_id: &'a EventId, event_id: &'a EventId,
room_id: &'a RoomId, room_id: &'a RoomId,
mut value: CanonicalJsonObject, mut value: CanonicalJsonObject,
auth_events_known: bool, _auth_events_known: bool,
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)> ) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
where where
Pdu: Event + Send + Sync, Pdu: Event + Send + Sync,
@@ -107,46 +107,53 @@ where
} }
// Fetch any missing ones & reject invalid ones // Fetch any missing ones & reject invalid ones
let missing_auth_events = if auth_events_known { if auth_events.len() != pdu_event.auth_events().count() {
pdu_event info!("Missing some auth events, asking remote for auth chain");
.auth_events() let response: get_event_authorization::v1::Response = self
.filter(|id| !auth_events.contains_key(*id)) .services
.collect::<Vec<_>>() .sending
} else { .send_federation_request(
pdu_event.auth_events().collect::<Vec<_>>()
};
if !missing_auth_events.is_empty() || !auth_events_known {
debug_info!(
"Fetching {} missing auth events for outlier event {event_id}",
missing_auth_events.len()
);
for (pdu, _) in self
.fetch_and_handle_outliers(
origin, origin,
missing_auth_events.iter().copied(), get_event_authorization::v1::Request::new(
create_event, room_id.to_owned(),
room_id, event_id.to_owned(),
),
) )
.await .await
{ .map_err(|e| {
auth_events.insert(pdu.event_id().to_owned(), pdu); err!(Request(Forbidden(
} "Remote server is not divulging incoming event's auth chain: {e}"
} else { )))
debug!("No missing auth events for outlier event {event_id}"); })?;
} let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len());
// reject if we are still missing some for auth_pdu_json in response.auth_chain {
let still_missing = pdu_event let (auth_event_room_id, auth_event_id, auth_pdu_json) =
.auth_events() self.parse_incoming_pdu(&auth_pdu_json).await?;
.filter(|id| !auth_events.contains_key(*id)) if auth_event_room_id != room_id {
.collect::<Vec<_>>(); return Err!(Request(BadJson(
if !still_missing.is_empty() { "Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}."
// Don't reject: this could be a temporary condition
// TODO: use get_missing_events?
return Err!(Request(InvalidParam(
"Could not fetch all auth events for outlier event {event_id}, still missing: \
{still_missing:?}"
))); )));
} }
let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json)
.map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?;
auth_chain_map.insert(auth_event_id, auth_pdu);
}
for aid in pdu_event.auth_events() {
if auth_events.contains_key(aid) {
continue;
}
if let Some(auth_event) = auth_chain_map.get(aid) {
auth_events.insert(aid.to_owned(), auth_event.clone());
} else {
return Err!(Request(Forbidden(
"Remote server is not divulging incoming event's auth events (missing: \
{aid})"
)));
}
}
// TODO: do events received from auth chain need persisting? that sounds
// awfully slow
}
// 6. Reject "due to auth events" if the event doesn't pass auth based on the // 6. Reject "due to auth events" if the event doesn't pass auth based on the
// auth events // auth events
@@ -1,89 +0,0 @@
use std::{collections::BTreeMap, time::Instant};
use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
utils::continue_exponential_backoff_secs,
};
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use tracing::debug;
#[implement(super::Service)]
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "prev",
level = INFO_SPAN_LEVEL,
skip_all,
fields(%prev_id),
)]
pub(super) async fn handle_prev_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
create_event: &'a Pdu,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &'a EventId,
) -> Result
where
Pdu: Event + Send + Sync,
{
// Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(debug_warn!(
"Federaton of room {room_id} is currently disabled on this server. Request by \
origin {origin} and event ID {event_id}"
))));
}
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(prev_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
debug!(
?tries,
duration = ?time.elapsed(),
"Backing off from prev_event"
);
return Ok(());
}
}
let Some((pdu, json)) = eventid_info else {
return Ok(());
};
// Skip old events
if pdu.origin_server_ts() < first_ts_in_room {
return Ok(());
}
let start_time = Instant::now();
self.federation_handletime
.write()
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
defer! {{
self.federation_handletime
.write()
.remove(room_id);
}};
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
.await?;
debug!(
elapsed = ?start_time.elapsed(),
"Handled prev_event",
);
Ok(())
}
+1 -2
View File
@@ -4,7 +4,6 @@ mod fetch_prev;
mod fetch_state; mod fetch_state;
mod handle_incoming_pdu; mod handle_incoming_pdu;
mod handle_outlier_pdu; mod handle_outlier_pdu;
mod handle_prev_pdu;
mod parse_incoming_pdu; mod parse_incoming_pdu;
mod policy_server; mod policy_server;
mod resolve_state; mod resolve_state;
@@ -15,6 +14,7 @@ use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant};
use async_trait::async_trait; use async_trait::async_trait;
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap}; use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
pub use fetch_and_handle_outliers::build_local_dag;
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent, OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
room_version_rules::RoomVersionRules, room_version_rules::RoomVersionRules,
@@ -22,7 +22,6 @@ use ruma::{
use tokio::sync::Notify; use tokio::sync::Notify;
use crate::{Dep, globals, rooms, sending, server_keys}; use crate::{Dep, globals, rooms, sending, server_keys};
pub struct Service { pub struct Service {
pub mutex_federation: RoomMutexMap, pub mutex_federation: RoomMutexMap,
pub federation_handletime: SyncRwLock<HandleTimeMap>, pub federation_handletime: SyncRwLock<HandleTimeMap>,
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
/// Parses every entry in an array as an event ID, returning an error if any /// Parses every entry in an array as an event ID, returning an error if any
/// step fails. /// step fails.
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> { pub(super) fn expect_event_id_array(
value: &CanonicalJsonObject,
field: &str,
) -> Result<Vec<OwnedEventId>> {
value value
.get(field) .get(field)
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))? .ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
@@ -5,7 +5,7 @@ use std::{
}; };
use conduwuit::{ use conduwuit::{
Result, debug, err, error, implement, Result, debug, debug_error, err, error, implement,
matrix::{Event, StateMap}, matrix::{Event, StateMap},
trace, trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
@@ -37,6 +37,7 @@ where
.pdu_shortstatehash(prev_event) .pdu_shortstatehash(prev_event)
.await .await
else { else {
trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state.");
return Ok(None); return Ok(None);
}; };
@@ -99,6 +100,7 @@ where
.map_ok(move |sstatehash| (sstatehash, prev_event)) .map_ok(move |sstatehash| (sstatehash, prev_event))
}) })
.try_collect::<HashMap<_, _>>() .try_collect::<HashMap<_, _>>()
.inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}"))
.await .await
else { else {
return Ok(None); return Ok(None);
@@ -41,6 +41,7 @@ where
.get_pdu_id(incoming_pdu.event_id()) .get_pdu_id(incoming_pdu.event_id())
.await .await
{ {
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
return Ok(Some(pduid)); return Ok(Some(pduid));
} }
@@ -63,6 +64,7 @@ where
"Upgrading PDU from outlier to timeline" "Upgrading PDU from outlier to timeline"
); );
let timer = Instant::now(); let timer = Instant::now();
let min_depth = self.services.metadata.get_mindepth(room_id).await;
let room_version_rules = get_room_version_rules(create_event)?; let room_version_rules = get_room_version_rules(create_event)?;
// 10. Fetch missing state and auth chain events by calling /state_ids at // 10. Fetch missing state and auth chain events by calling /state_ids at
@@ -81,6 +83,7 @@ where
}; };
if state_at_incoming_event.is_none() { if state_at_incoming_event.is_none() {
trace!("Could not calculate incoming state, asking remote {origin} for it");
state_at_incoming_event = self state_at_incoming_event = self
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id()) .fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.await?; .await?;
@@ -382,6 +385,11 @@ where
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
drop(state_lock); drop(state_lock);
if incoming_pdu.depth > min_depth {
self.services
.metadata
.set_mindepth(room_id, incoming_pdu.depth.into());
}
Ok(pdu_id) Ok(pdu_id)
} }
+4
View File
@@ -626,6 +626,10 @@ impl Service {
room_id, room_id,
) )
.await?; .await?;
self.services
.metadata
.maybe_set_mindepth(room_id, parsed_join_pdu.depth.into())
.await;
info!("Setting final room state for new room"); info!("Setting final room state for new room");
// We set the room state after inserting the pdu, so that we never have a moment // We set the room state after inserting the pdu, so that we never have a moment
+28 -2
View File
@@ -1,9 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use conduwuit::{Result, implement, utils::stream::TryIgnore}; use conduwuit::{Result, implement, utils::stream::TryIgnore};
use database::Map; use database::{Deserialized, Map};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use ruma::{OwnedRoomId, RoomId}; use ruma::{OwnedRoomId, RoomId, UInt, uint};
use crate::{Dep, rooms}; use crate::{Dep, rooms};
@@ -17,6 +17,7 @@ struct Data {
bannedroomids: Arc<Map>, bannedroomids: Arc<Map>,
roomid_shortroomid: Arc<Map>, roomid_shortroomid: Arc<Map>,
pduid_pdu: Arc<Map>, pduid_pdu: Arc<Map>,
roomid_mindepth: Arc<Map>,
} }
struct Services { struct Services {
@@ -31,6 +32,7 @@ impl crate::Service for Service {
bannedroomids: args.db["bannedroomids"].clone(), bannedroomids: args.db["bannedroomids"].clone(),
roomid_shortroomid: args.db["roomid_shortroomid"].clone(), roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
pduid_pdu: args.db["pduid_pdu"].clone(), pduid_pdu: args.db["pduid_pdu"].clone(),
roomid_mindepth: args.db["roomid_mindepth"].clone(),
}, },
services: Services { services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"), short: args.depend::<rooms::short::Service>("rooms::short"),
@@ -98,3 +100,27 @@ pub async fn is_disabled(&self, room_id: &RoomId) -> bool {
pub async fn is_banned(&self, room_id: &RoomId) -> bool { pub async fn is_banned(&self, room_id: &RoomId) -> bool {
self.db.bannedroomids.get(room_id).await.is_ok() self.db.bannedroomids.get(room_id).await.is_ok()
} }
#[implement(Service)]
pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt {
self.db
.roomid_mindepth
.get(room_id)
.await
.deserialized::<UInt>()
.unwrap_or_else(|_| uint!(0))
}
#[implement(Service)]
pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
self.db
.roomid_mindepth
.put_raw(room_id.as_bytes(), min_depth.to_be_bytes());
}
#[implement(Service)]
pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
if min_depth > self.get_mindepth(room_id).await.into() {
self.set_mindepth(room_id, min_depth);
}
}
+4
View File
@@ -173,6 +173,10 @@ impl Service {
self.db.get_non_outlier_pdu_json(event_id).await self.db.get_non_outlier_pdu_json(event_id).await
} }
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool {
self.db.non_outlier_pdu_exists(event_id).await.is_ok()
}
/// Returns the pdu's id. /// Returns the pdu's id.
#[inline] #[inline]
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> { pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
+3 -2
View File
@@ -34,8 +34,9 @@ where
batch batch
}); });
if server_keys.is_empty() {
debug_assert!(!server_keys.is_empty(), "empty batch request to notary"); return Ok(vec![]);
}
let mut results = Vec::new(); let mut results = Vec::new();
while let Some(batch) = server_keys while let Some(batch) = server_keys