mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
feat: Better prev event fetching
fix: Don't panic in debug mode when making an empty notary query
This commit is contained in:
@@ -381,6 +381,7 @@ async fn handle_room(
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||
.boxed()
|
||||
.await
|
||||
.map(|_| ());
|
||||
results.push((event_id, result));
|
||||
|
||||
@@ -6,17 +6,87 @@ use std::{
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, debug, debug_info, debug_warn, err,
|
||||
matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs,
|
||||
warn,
|
||||
matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort,
|
||||
trace, utils::continue_exponential_backoff_secs, warn,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, ServerName,
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||
OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt,
|
||||
api::federation::event::{get_event, get_missing_events},
|
||||
int,
|
||||
};
|
||||
|
||||
use super::get_room_version_rules;
|
||||
|
||||
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
|
||||
/// returning them in a topologically sorted order.
|
||||
///
|
||||
/// This is used to attempt to process PDUs in an order that respects their
|
||||
/// dependencies, however it is ultimately the sender's responsibility to send
|
||||
/// them in a processable order, so this is just a best effort attempt. It does
|
||||
/// not account for power levels or other tie breaks.
|
||||
pub async fn build_local_dag<S: std::hash::BuildHasher>(
|
||||
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject, S>,
|
||||
) -> conduwuit::Result<Vec<OwnedEventId>> {
|
||||
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
|
||||
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
|
||||
HashMap::with_capacity(pdu_map.len());
|
||||
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
|
||||
|
||||
for (event_id, value) in pdu_map {
|
||||
// We already checked that these properties are correct in parse_incoming_pdu,
|
||||
// so it's safe to unwrap here.
|
||||
// We also filter to remove any prev_events that are not in this pdu_map, as we
|
||||
// need to have at least one event with zero out degrees for the lexico-topo
|
||||
// sort below. If there are multiple events with omitted prevs, they will be
|
||||
// ordered by timestamp, then event ID. At that point though, it's unlikely to
|
||||
// matter.
|
||||
let prev_events = value
|
||||
.get("prev_events")
|
||||
.unwrap()
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
|
||||
.filter(|id| pdu_map.contains_key(id))
|
||||
.collect();
|
||||
|
||||
dag.insert(event_id.clone(), prev_events);
|
||||
let origin_server_ts = value
|
||||
.get("origin_server_ts")
|
||||
.and_then(CanonicalJsonValue::as_integer)
|
||||
.unwrap_or_default();
|
||||
id_origin_ts.insert(event_id.clone(), origin_server_ts);
|
||||
}
|
||||
|
||||
debug!(count = dag.len(), "Sorting incoming events with partial graph");
|
||||
lexicographical_topological_sort(&dag, &async |node_id| {
|
||||
// Note: we don't bother fetching power levels because that would massively slow
|
||||
// this function down. This is a best-effort attempt to order events correctly
|
||||
// for processing, however ultimately that should be the sender's job.
|
||||
let ts = id_origin_ts
|
||||
.get(&node_id)
|
||||
.copied()
|
||||
.unwrap_or_else(|| int!(0))
|
||||
.to_string()
|
||||
.parse::<u64>()
|
||||
.ok()
|
||||
.and_then(UInt::new)
|
||||
.unwrap_or_default();
|
||||
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
|
||||
})
|
||||
.await
|
||||
.inspect(|sorted| {
|
||||
debug_assert_eq!(
|
||||
sorted.len(),
|
||||
pdu_map.len(),
|
||||
"Sorted graph was not the same size as the input graph"
|
||||
);
|
||||
})
|
||||
.map_err(|e| err!("failed to resolve local graph: {e}"))
|
||||
}
|
||||
|
||||
impl super::Service {
|
||||
/// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the
|
||||
/// DAG.
|
||||
@@ -31,13 +101,13 @@ impl super::Service {
|
||||
pub(super) async fn backfill_missing_events(
|
||||
&self,
|
||||
room_id: OwnedRoomId,
|
||||
latest_events: Vec<OwnedEventId>,
|
||||
head: Vec<OwnedEventId>,
|
||||
via: OwnedServerName,
|
||||
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
|
||||
if latest_events.is_empty() {
|
||||
if head.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
let earliest_events = self
|
||||
let tail = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(&room_id)
|
||||
@@ -45,40 +115,31 @@ impl super::Service {
|
||||
.await;
|
||||
// TODO: min_depth is probably necessary to avoid fetching the entire room
|
||||
// history if there are very long gaps
|
||||
let mut latest_events = latest_events;
|
||||
let mut latest_events: HashSet<OwnedEventId> = HashSet::from_iter(head.clone());
|
||||
let mut loop_count: u64 = 3;
|
||||
// Start with a base number of 3 so that we fetch 10, 16, 25, 36, etc
|
||||
// instead of 1, 2, 4, 9, so on.
|
||||
let mut backfilled_events = HashMap::with_capacity(10);
|
||||
|
||||
while !latest_events.is_empty() {
|
||||
let mut request = get_missing_events::v1::Request::new(
|
||||
room_id.clone(),
|
||||
earliest_events.clone(),
|
||||
latest_events.clone(),
|
||||
);
|
||||
request.limit = min(loop_count.saturating_pow(2), 100)
|
||||
let todo: Vec<OwnedEventId> = latest_events.clone().into_iter().collect();
|
||||
let mut request =
|
||||
get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone());
|
||||
let limit = min(loop_count.saturating_pow(2), 100);
|
||||
request.limit = limit
|
||||
.try_into()
|
||||
.expect("limit cannot be greater than 100, which fits into UInt");
|
||||
if backfilled_events.len() > 1000 {
|
||||
warn!(
|
||||
"Received {} missing events, refusing to fetch more (infinite loop?)",
|
||||
backfilled_events.len()
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
debug_info!(
|
||||
backfilled=%backfilled_events.len(),
|
||||
%loop_count,
|
||||
"Asking {via} for up to {} missing events",
|
||||
request.limit
|
||||
"Asking {via} for up to {limit} missing events",
|
||||
);
|
||||
trace!(
|
||||
?latest_events,
|
||||
?earliest_events,
|
||||
?tail,
|
||||
%via,
|
||||
limit=%request.limit,
|
||||
%limit,
|
||||
"Requesting missing events"
|
||||
);
|
||||
let response: get_missing_events::v1::Response = self
|
||||
@@ -100,47 +161,76 @@ impl super::Service {
|
||||
break;
|
||||
}
|
||||
for event in response.events {
|
||||
trace!("Parsing incoming event from backfill");
|
||||
let (incoming_room_id, event_id, pdu_json) =
|
||||
self.parse_incoming_pdu(&event).await.map_err(|e| {
|
||||
err!(BadServerResponse("{via} returned an invalid event: {e:?}"))
|
||||
})?;
|
||||
trace!(%incoming_room_id, %event_id, "Parsed incoming event from backfill");
|
||||
if incoming_room_id != room_id {
|
||||
return Err!(BadServerResponse(
|
||||
"{via} returned {event_id} in missing events which belongs to \
|
||||
{incoming_room_id}, not {room_id}"
|
||||
));
|
||||
}
|
||||
latest_events.remove(&event_id);
|
||||
if head.contains(&event_id) || tail.contains(&event_id) {
|
||||
debug!("Skipping known event {event_id}");
|
||||
continue;
|
||||
}
|
||||
if backfilled_events.contains_key(&event_id) {
|
||||
debug_warn!(%via, %event_id, "Remote retransmitted event");
|
||||
continue;
|
||||
}
|
||||
// TODO: Should this be scoped to the GME session? We might end up incorrectly
|
||||
// assuming we've caught up if we do this
|
||||
if self.services.timeline.pdu_exists(&event_id).await {
|
||||
if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await {
|
||||
debug!(%via, %event_id, "Already seen event in database");
|
||||
continue;
|
||||
backfilled_events.insert(event_id.clone(), pdu);
|
||||
} else {
|
||||
unseen = unseen.saturating_add(1);
|
||||
}
|
||||
unseen = unseen.saturating_add(1);
|
||||
let parsed = PduEvent::from_id_val(&event_id, pdu_json)
|
||||
.map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?;
|
||||
backfilled_events.insert(event_id, parsed.clone());
|
||||
for prev_event_id in parsed.prev_events() {
|
||||
if backfilled_events.contains_key(prev_event_id)
|
||||
|| self.services.timeline.pdu_exists(prev_event_id).await
|
||||
// Verify that we have all of this event's prev_events. If we don't, add it to
|
||||
// the work queue
|
||||
if !(backfilled_events.contains_key(prev_event_id)
|
||||
|| self.services.timeline.pdu_exists(prev_event_id).await)
|
||||
{
|
||||
continue;
|
||||
latest_events.insert(prev_event_id.to_owned());
|
||||
break;
|
||||
}
|
||||
latest_events.push(prev_event_id.to_owned());
|
||||
continue;
|
||||
}
|
||||
backfilled_events.insert(event_id.clone(), parsed);
|
||||
}
|
||||
for event_id in todo {
|
||||
latest_events.remove(&event_id);
|
||||
if let hash_map::Entry::Vacant(e) = backfilled_events.entry(event_id.clone()) {
|
||||
let evt = self.services.timeline.get_pdu(&event_id).await?;
|
||||
e.insert(evt);
|
||||
}
|
||||
}
|
||||
latest_events.retain(|e| !backfilled_events.contains_key(e));
|
||||
debug!(
|
||||
chunk=%chunk_len,
|
||||
count=%chunk_len,
|
||||
new=%unseen,
|
||||
remaining=%latest_events.len(),
|
||||
"Got missing events"
|
||||
);
|
||||
if unseen == 0 {
|
||||
debug_warn!("Didn't see any new events, breaking cycle");
|
||||
break;
|
||||
} else if chunk_len < usize::try_from(limit)? {
|
||||
debug!(
|
||||
"Got less than the limit number of events, assuming there's no more to fetch"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
debug_info!("Successfully fetched {} missing events from {via}", backfilled_events.len());
|
||||
trace!("Missing_events: {backfilled_events:?}");
|
||||
Ok(backfilled_events)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,128 +1,118 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet, VecDeque},
|
||||
iter::once,
|
||||
};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
|
||||
use conduwuit::{
|
||||
Event, PduEvent, Result, debug_warn, err, implement,
|
||||
state_res::{self},
|
||||
};
|
||||
use futures::{FutureExt, future};
|
||||
use ruma::{
|
||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
|
||||
int, uint,
|
||||
};
|
||||
use conduwuit::{Event, PduEvent, debug, debug_info, error, trace};
|
||||
use ruma::{OwnedEventId, RoomId, ServerName};
|
||||
|
||||
use super::check_room_id;
|
||||
use crate::rooms::event_handler::build_local_dag;
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(%origin),
|
||||
)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(super) async fn fetch_prev<'a, Pdu, Events>(
|
||||
&self,
|
||||
origin: &ServerName,
|
||||
create_event: &Pdu,
|
||||
room_id: &RoomId,
|
||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
||||
initial_set: Events,
|
||||
) -> Result<(
|
||||
Vec<OwnedEventId>,
|
||||
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
||||
)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let num_ids = initial_set.clone().count();
|
||||
let mut eventid_info = HashMap::new();
|
||||
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
|
||||
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
|
||||
initial_set.map(ToOwned::to_owned).collect();
|
||||
impl super::Service {
|
||||
pub(super) async fn fetch_prevs(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
create_event: &PduEvent,
|
||||
incoming_pdu: &PduEvent,
|
||||
origin: &ServerName,
|
||||
) -> conduwuit::Result<()> {
|
||||
let mut queue: VecDeque<OwnedEventId> = VecDeque::new();
|
||||
queue.push_back(incoming_pdu.event_id().to_owned());
|
||||
|
||||
let mut amount = 0;
|
||||
|
||||
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
|
||||
self.services.server.check_running()?;
|
||||
|
||||
match self
|
||||
.fetch_and_handle_outliers(
|
||||
origin,
|
||||
once(prev_event_id.as_ref()),
|
||||
create_event,
|
||||
room_id,
|
||||
)
|
||||
.boxed()
|
||||
.await
|
||||
.pop()
|
||||
{
|
||||
| Some((pdu, mut json_opt)) => {
|
||||
check_room_id(room_id, &pdu)?;
|
||||
|
||||
let limit = self.services.server.config.max_fetch_prev_events;
|
||||
if amount > limit {
|
||||
debug_warn!("Max prev event limit reached! Limit: {limit}");
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
continue;
|
||||
}
|
||||
|
||||
if json_opt.is_none() {
|
||||
json_opt = self
|
||||
.services
|
||||
.outlier
|
||||
.get_outlier_pdu_json(&prev_event_id)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
|
||||
if let Some(json) = json_opt {
|
||||
if pdu.origin_server_ts() > first_ts_in_room {
|
||||
amount = amount.saturating_add(1);
|
||||
for prev_prev in pdu.prev_events() {
|
||||
if !graph.contains_key(prev_prev) {
|
||||
todo_outlier_stack.push_back(prev_prev.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
graph.insert(
|
||||
prev_event_id.clone(),
|
||||
pdu.prev_events().map(ToOwned::to_owned).collect(),
|
||||
);
|
||||
} else {
|
||||
// Time based check failed
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
}
|
||||
|
||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
||||
while let Some(event_id) = queue.pop_front() {
|
||||
debug!(event_id=%incoming_pdu.event_id, "Fetching any missing prev_events");
|
||||
let mut missing_prev_events: HashSet<OwnedEventId> =
|
||||
incoming_pdu.prev_events().map(ToOwned::to_owned).collect();
|
||||
for pid in missing_prev_events.clone() {
|
||||
if self.services.timeline.pdu_exists(&pid).await {
|
||||
trace!("Found prev event {pid} for outlier event {event_id} locally");
|
||||
missing_prev_events.remove(&pid);
|
||||
} else {
|
||||
// Get json failed, so this was not fetched over federation
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
debug_info!(
|
||||
"Could not find prev event {pid} for outlier event {event_id} locally, \
|
||||
will fetch over federation"
|
||||
);
|
||||
}
|
||||
},
|
||||
| _ => {
|
||||
// Fetch and handle failed
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
},
|
||||
}
|
||||
if !missing_prev_events.is_empty() {
|
||||
debug_info!(
|
||||
"Fetching {} missing prev events for outlier event {event_id}",
|
||||
missing_prev_events.len()
|
||||
);
|
||||
let backfilled = self
|
||||
.backfill_missing_events(
|
||||
room_id.to_owned(),
|
||||
vec![event_id.clone()],
|
||||
origin.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
debug_info!("Fetched {} missing events for {event_id}", backfilled.len());
|
||||
let mapped = backfilled
|
||||
.iter()
|
||||
.map(|(eid, evt)| {
|
||||
let mut obj = evt.to_canonical_object();
|
||||
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||
(eid.clone(), obj)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
let local_dag = if mapped.len() == 1 {
|
||||
mapped.keys().map(ToOwned::to_owned).collect()
|
||||
} else {
|
||||
build_local_dag(&mapped).await?
|
||||
};
|
||||
debug_info!("Preparing to handle {} missing events", backfilled.len());
|
||||
for prev_event_id in local_dag {
|
||||
let obj = mapped
|
||||
.get(&prev_event_id)
|
||||
.expect("We should have this event in memory");
|
||||
debug_info!("Handling prev event {prev_event_id}");
|
||||
match self
|
||||
.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&prev_event_id,
|
||||
room_id,
|
||||
obj.clone(),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(_) => {
|
||||
debug!("Successfully handled {prev_event_id} as an outlier");
|
||||
missing_prev_events.remove(&prev_event_id);
|
||||
},
|
||||
| Err(e) =>
|
||||
error!(error=?e, %prev_event_id, %event_id, "Failed to handle prev event"),
|
||||
}
|
||||
debug_info!("Finished handling prev");
|
||||
}
|
||||
}
|
||||
let outlier = self.services.timeline.get_pdu(&event_id).await;
|
||||
if missing_prev_events.is_empty()
|
||||
&& let Ok(pdu) = outlier
|
||||
{
|
||||
// promote any prevs first
|
||||
for prev_event_id in pdu.prev_events() {
|
||||
debug_info!("Promoting prev event {prev_event_id} to timeline");
|
||||
let prev_pdu = self.services.timeline.get_pdu(&event_id).await?;
|
||||
let val = prev_pdu.to_canonical_object();
|
||||
self.upgrade_outlier_to_timeline_pdu(
|
||||
prev_pdu,
|
||||
val,
|
||||
create_event,
|
||||
origin,
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
debug_info!("Finished prev promoting {prev_event_id} to timeline");
|
||||
}
|
||||
debug_info!("Promoting event {event_id} to timeline");
|
||||
let val = pdu.to_canonical_object();
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
|
||||
.await?;
|
||||
debug_info!("Finished promoting {event_id} to timeline");
|
||||
} else {
|
||||
debug!(?missing_prev_events, ok=%outlier.is_ok(), "Not promoting {event_id}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let event_fetch = |event_id| {
|
||||
let origin_server_ts = eventid_info
|
||||
.get(&event_id)
|
||||
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
|
||||
|
||||
// This return value is the key used for sorting events,
|
||||
// events are then sorted by power level, time,
|
||||
// and lexically by event_id.
|
||||
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
|
||||
};
|
||||
|
||||
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
|
||||
.await
|
||||
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
|
||||
|
||||
Ok((sorted, eventid_info))
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::{HashMap, hash_map};
|
||||
|
||||
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
|
||||
use futures::FutureExt;
|
||||
use ruma::{
|
||||
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
|
||||
events::StateEventType,
|
||||
@@ -42,7 +41,6 @@ where
|
||||
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
|
||||
let state_vec = self
|
||||
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, hash_map},
|
||||
time::Instant,
|
||||
};
|
||||
use std::{collections::BTreeMap, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
|
||||
implement, info, trace, utils::stream::IterStream, warn,
|
||||
implement, info, trace, warn,
|
||||
};
|
||||
use futures::{
|
||||
FutureExt, TryFutureExt, TryStreamExt,
|
||||
FutureExt,
|
||||
future::{OptionFuture, try_join4},
|
||||
};
|
||||
use ruma::{
|
||||
@@ -236,63 +233,21 @@ pub async fn handle_incoming_pdu<'a>(
|
||||
}
|
||||
|
||||
// Skip old events
|
||||
let first_ts_in_room = self
|
||||
.services
|
||||
.timeline
|
||||
.first_pdu_in_room(room_id)
|
||||
.await?
|
||||
.origin_server_ts();
|
||||
// let first_ts_in_room = self
|
||||
// .services
|
||||
// .timeline
|
||||
// .first_pdu_in_room(room_id)
|
||||
// .await?
|
||||
// .origin_server_ts();
|
||||
|
||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
||||
// These are timeline events
|
||||
let (sorted_prev_events, mut eventid_info) = self
|
||||
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
|
||||
.await?;
|
||||
debug!("Handling previous events");
|
||||
|
||||
debug!(
|
||||
events = ?sorted_prev_events,
|
||||
"Handling previous events"
|
||||
);
|
||||
|
||||
sorted_prev_events
|
||||
.iter()
|
||||
.try_stream()
|
||||
.map_ok(AsRef::as_ref)
|
||||
.try_for_each(|prev_id| {
|
||||
self.handle_prev_pdu(
|
||||
origin,
|
||||
event_id,
|
||||
room_id,
|
||||
eventid_info.remove(prev_id),
|
||||
create_event,
|
||||
first_ts_in_room,
|
||||
prev_id,
|
||||
)
|
||||
.inspect_err(move |e| {
|
||||
warn!("Prev {prev_id} failed: {e}");
|
||||
match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(prev_id.into())
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
let tries = e.get().1.saturating_add(1);
|
||||
*e.get_mut() = (Instant::now(), tries);
|
||||
},
|
||||
}
|
||||
})
|
||||
.map(|_| self.services.server.check_running())
|
||||
})
|
||||
.boxed()
|
||||
self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
|
||||
.await?;
|
||||
|
||||
// Done with prev events, now handling the incoming event
|
||||
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
||||
.boxed()
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::{BTreeMap, HashMap, hash_map};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
|
||||
@@ -10,7 +10,7 @@ use ruma::{
|
||||
events::StateEventType,
|
||||
};
|
||||
|
||||
use super::{check_room_id, get_room_version_rules};
|
||||
use super::{build_local_dag, check_room_id, get_room_version_rules};
|
||||
use crate::rooms::timeline::pdu_fits;
|
||||
|
||||
#[implement(super::Service)]
|
||||
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
event_id: &'a EventId,
|
||||
room_id: &'a RoomId,
|
||||
mut value: CanonicalJsonObject,
|
||||
auth_events_known: bool,
|
||||
_auth_events_known: bool,
|
||||
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
@@ -107,44 +107,71 @@ where
|
||||
}
|
||||
|
||||
// Fetch any missing ones & reject invalid ones
|
||||
let missing_auth_events = if auth_events_known {
|
||||
pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
pdu_event.auth_events().collect::<Vec<_>>()
|
||||
};
|
||||
if !missing_auth_events.is_empty() || !auth_events_known {
|
||||
let mut missing_auth_events: HashSet<OwnedEventId> = pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.map(ToOwned::to_owned)
|
||||
.collect();
|
||||
|
||||
if !missing_auth_events.is_empty() {
|
||||
debug_info!(
|
||||
"Fetching {} missing auth events for outlier event {event_id}",
|
||||
missing_auth_events.len()
|
||||
);
|
||||
for (pdu, _) in self
|
||||
.fetch_and_handle_outliers(
|
||||
origin,
|
||||
missing_auth_events.iter().copied(),
|
||||
create_event,
|
||||
room_id,
|
||||
let backfilled = self
|
||||
.backfill_missing_events(
|
||||
room_id.to_owned(),
|
||||
vec![event_id.to_owned()],
|
||||
origin.to_owned(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
auth_events.insert(pdu.event_id().to_owned(), pdu);
|
||||
.await?;
|
||||
debug_info!("Fetched {} missing auth events for {event_id}", backfilled.len());
|
||||
let mapped = backfilled
|
||||
.iter()
|
||||
.map(|(eid, evt)| {
|
||||
let mut obj = evt.to_canonical_object();
|
||||
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||
(eid.clone(), obj)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
let local_dag = if mapped.len() == 1 {
|
||||
mapped.keys().map(ToOwned::to_owned).collect()
|
||||
} else {
|
||||
build_local_dag(&mapped).await?
|
||||
};
|
||||
debug_info!("Preparing to handle {} missing auth events", backfilled.len());
|
||||
for prev_event_id in local_dag {
|
||||
let obj = mapped
|
||||
.get(&prev_event_id)
|
||||
.expect("We should have this event in memory");
|
||||
debug_info!("Handling prev {prev_event_id}");
|
||||
let (prev, _) = Box::pin(self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&prev_event_id,
|
||||
room_id,
|
||||
obj.clone(),
|
||||
false,
|
||||
))
|
||||
.await?;
|
||||
if missing_auth_events.contains(&*prev_event_id) {
|
||||
missing_auth_events.remove(&prev_event_id);
|
||||
auth_events.insert(prev_event_id, prev);
|
||||
}
|
||||
debug_info!("Finished handling prev auth event");
|
||||
}
|
||||
} else {
|
||||
debug!("No missing auth events for outlier event {event_id}");
|
||||
}
|
||||
// reject if we are still missing some
|
||||
let still_missing = pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.collect::<Vec<_>>();
|
||||
if !still_missing.is_empty() {
|
||||
// reject if we are still missing some auth events.
|
||||
// If we're still missing prev events, we will fetch them individually later,
|
||||
// but there's no reason for us to be missing auth events now we've gapfilled
|
||||
// the DAG.
|
||||
if !missing_auth_events.is_empty() {
|
||||
// Don't reject: this could be a temporary condition
|
||||
// TODO: use get_missing_events?
|
||||
return Err!(Request(InvalidParam(
|
||||
"Could not fetch all auth events for outlier event {event_id}, still missing: \
|
||||
{still_missing:?}"
|
||||
{missing_auth_events:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
use std::{collections::BTreeMap, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
|
||||
utils::continue_exponential_backoff_secs,
|
||||
};
|
||||
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
|
||||
use tracing::debug;
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(
|
||||
name = "prev",
|
||||
level = INFO_SPAN_LEVEL,
|
||||
skip_all,
|
||||
fields(%prev_id),
|
||||
)]
|
||||
pub(super) async fn handle_prev_pdu<'a, Pdu>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
event_id: &'a EventId,
|
||||
room_id: &'a RoomId,
|
||||
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
||||
create_event: &'a Pdu,
|
||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
||||
prev_id: &'a EventId,
|
||||
) -> Result
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
{
|
||||
// Check for disabled again because it might have changed
|
||||
if self.services.metadata.is_disabled(room_id).await {
|
||||
return Err!(Request(Forbidden(debug_warn!(
|
||||
"Federaton of room {room_id} is currently disabled on this server. Request by \
|
||||
origin {origin} and event ID {event_id}"
|
||||
))));
|
||||
}
|
||||
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(prev_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
|
||||
debug!(
|
||||
?tries,
|
||||
duration = ?time.elapsed(),
|
||||
"Backing off from prev_event"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let Some((pdu, json)) = eventid_info else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Skip old events
|
||||
if pdu.origin_server_ts() < first_ts_in_room {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let start_time = Instant::now();
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
|
||||
|
||||
defer! {{
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.remove(room_id);
|
||||
}};
|
||||
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
elapsed = ?start_time.elapsed(),
|
||||
"Handled prev_event",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -4,7 +4,6 @@ mod fetch_prev;
|
||||
mod fetch_state;
|
||||
mod handle_incoming_pdu;
|
||||
mod handle_outlier_pdu;
|
||||
mod handle_prev_pdu;
|
||||
mod parse_incoming_pdu;
|
||||
mod policy_server;
|
||||
mod resolve_state;
|
||||
@@ -15,6 +14,7 @@ use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
|
||||
pub use fetch_and_handle_outliers::build_local_dag;
|
||||
use ruma::{
|
||||
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
|
||||
room_version_rules::RoomVersionRules,
|
||||
@@ -22,7 +22,6 @@ use ruma::{
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::{Dep, globals, rooms, sending, server_keys};
|
||||
|
||||
pub struct Service {
|
||||
pub mutex_federation: RoomMutexMap,
|
||||
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
||||
|
||||
@@ -41,6 +41,7 @@ where
|
||||
.get_pdu_id(incoming_pdu.event_id())
|
||||
.await
|
||||
{
|
||||
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
|
||||
return Ok(Some(pduid));
|
||||
}
|
||||
|
||||
@@ -81,6 +82,7 @@ where
|
||||
};
|
||||
|
||||
if state_at_incoming_event.is_none() {
|
||||
trace!("Could not calculate incoming state, asking remote {origin} for it");
|
||||
state_at_incoming_event = self
|
||||
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
||||
.await?;
|
||||
|
||||
@@ -34,8 +34,9 @@ where
|
||||
|
||||
batch
|
||||
});
|
||||
|
||||
debug_assert!(!server_keys.is_empty(), "empty batch request to notary");
|
||||
if server_keys.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
while let Some(batch) = server_keys
|
||||
|
||||
Reference in New Issue
Block a user