mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c7f8eec282 | |||
| 0d4bbe612d | |||
| c4d297ae3b | |||
| d15064871e | |||
| b925936195 | |||
| 56feba0ea0 | |||
| 8d89ba94d5 | |||
| 0b135c7717 |
@@ -0,0 +1,2 @@
|
|||||||
|
Improved the performance and reliability of fetching missing events, improving network partition recovery. Contributed
|
||||||
|
by @nex.
|
||||||
@@ -297,7 +297,7 @@
|
|||||||
|
|
||||||
# This item is undocumented. Please contribute documentation for it.
|
# This item is undocumented. Please contribute documentation for it.
|
||||||
#
|
#
|
||||||
#max_fetch_prev_events = 192
|
#max_fetch_prev_events = 256
|
||||||
|
|
||||||
# How many incoming federation transactions the server is willing to be
|
# How many incoming federation transactions the server is willing to be
|
||||||
# processing at any given time before it becomes overloaded and starts
|
# processing at any given time before it becomes overloaded and starts
|
||||||
|
|||||||
@@ -381,6 +381,7 @@ async fn handle_room(
|
|||||||
.rooms
|
.rooms
|
||||||
.event_handler
|
.event_handler
|
||||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||||
|
.boxed()
|
||||||
.await
|
.await
|
||||||
.map(|_| ());
|
.map(|_| ());
|
||||||
results.push((event_id, result));
|
results.push((event_id, result));
|
||||||
|
|||||||
@@ -375,7 +375,7 @@ pub struct Config {
|
|||||||
#[serde(default = "default_max_request_size")]
|
#[serde(default = "default_max_request_size")]
|
||||||
pub max_request_size: usize,
|
pub max_request_size: usize,
|
||||||
|
|
||||||
/// default: 192
|
/// default: 256
|
||||||
#[serde(default = "default_max_fetch_prev_events")]
|
#[serde(default = "default_max_fetch_prev_events")]
|
||||||
pub max_fetch_prev_events: u16,
|
pub max_fetch_prev_events: u16,
|
||||||
|
|
||||||
@@ -2549,7 +2549,7 @@ fn default_pusher_timeout() -> u64 { 60 }
|
|||||||
|
|
||||||
fn default_pusher_idle_timeout() -> u64 { 15 }
|
fn default_pusher_idle_timeout() -> u64 { 15 }
|
||||||
|
|
||||||
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
fn default_max_fetch_prev_events() -> u16 { 256_u16 }
|
||||||
|
|
||||||
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
|
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
|
||||||
|
|
||||||
|
|||||||
@@ -187,6 +187,10 @@ pub(super) static MAPS: &[Descriptor] = &[
|
|||||||
val_size_hint: Some(8),
|
val_size_hint: Some(8),
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
},
|
},
|
||||||
|
Descriptor {
|
||||||
|
name: "roomid_mindepth",
|
||||||
|
..descriptor::RANDOM_SMALL
|
||||||
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "roomserverids",
|
name: "roomserverids",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
|
|||||||
@@ -1,233 +1,456 @@
|
|||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashSet, VecDeque, hash_map},
|
collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use assign::assign;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json,
|
Event, PduEvent, debug, debug_info, debug_warn, err, error,
|
||||||
trace, utils::continue_exponential_backoff_secs, warn,
|
matrix::event::gen_event_id_canonical_json,
|
||||||
|
state_res::lexicographical_topological_sort,
|
||||||
|
trace,
|
||||||
|
utils::{IterStream, continue_exponential_backoff_secs, stream::BroadbandExt},
|
||||||
|
warn,
|
||||||
};
|
};
|
||||||
|
use futures::StreamExt;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||||
api::federation::event::get_event,
|
RoomId, ServerName, UInt,
|
||||||
|
api::federation::event::{get_event, get_missing_events},
|
||||||
|
int,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::get_room_version_rules;
|
use super::get_room_version_rules;
|
||||||
|
|
||||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
|
||||||
/// it is appended to the outliers Tree.
|
/// returning them in a topologically sorted order.
|
||||||
///
|
///
|
||||||
/// Returns pdu and if we fetched it over federation the raw json.
|
/// This is used to attempt to process PDUs in an order that respects their
|
||||||
///
|
/// dependencies, however it is ultimately the sender's responsibility to send
|
||||||
/// a. Look in the main timeline (pduid_pdu tree)
|
/// them in a processable order, so this is just a best effort attempt. It does
|
||||||
/// b. Look at outlier pdu tree
|
/// not account for power levels or other tie breaks.
|
||||||
/// c. Ask origin server over federation
|
pub async fn build_local_dag<S: std::hash::BuildHasher>(
|
||||||
/// d. TODO: Ask other servers over federation?
|
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject, S>,
|
||||||
#[implement(super::Service)]
|
) -> conduwuit::Result<Vec<OwnedEventId>> {
|
||||||
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
|
||||||
&self,
|
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
|
||||||
origin: &'a ServerName,
|
HashMap::with_capacity(pdu_map.len());
|
||||||
events: Events,
|
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
|
||||||
create_event: &'a Pdu,
|
|
||||||
room_id: &'a RoomId,
|
|
||||||
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
|
||||||
where
|
|
||||||
Pdu: Event + Send + Sync,
|
|
||||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
|
||||||
{
|
|
||||||
let back_off = |id| match self
|
|
||||||
.services
|
|
||||||
.globals
|
|
||||||
.bad_event_ratelimiter
|
|
||||||
.write()
|
|
||||||
.entry(id)
|
|
||||||
{
|
|
||||||
| hash_map::Entry::Vacant(e) => {
|
|
||||||
e.insert((Instant::now(), 1));
|
|
||||||
},
|
|
||||||
| hash_map::Entry::Occupied(mut e) => {
|
|
||||||
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
for (event_id, value) in pdu_map {
|
||||||
trace!("Fetching {} outlier pdus", events.clone().count());
|
// We already checked that these properties are correct in parse_incoming_pdu,
|
||||||
|
// so it's safe to unwrap here.
|
||||||
|
// We also filter to remove any prev_events that are not in this pdu_map, as we
|
||||||
|
// need to have at least one event with zero out degrees for the lexico-topo
|
||||||
|
// sort below. If there are multiple events with omitted prevs, they will be
|
||||||
|
// ordered by timestamp, then event ID. At that point though, it's unlikely to
|
||||||
|
// matter.
|
||||||
|
let prev_events = value
|
||||||
|
.get("prev_events")
|
||||||
|
.unwrap()
|
||||||
|
.as_array()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
|
||||||
|
.filter(|id| pdu_map.contains_key(id))
|
||||||
|
.collect();
|
||||||
|
|
||||||
for id in events {
|
dag.insert(event_id.clone(), prev_events);
|
||||||
// a. Look in the main timeline (pduid_pdu tree)
|
let origin_server_ts = value
|
||||||
// b. Look at outlier pdu tree
|
.get("origin_server_ts")
|
||||||
// (get_pdu_json checks both)
|
.and_then(CanonicalJsonValue::as_integer)
|
||||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
.unwrap_or_default();
|
||||||
trace!("Found {id} in main timeline or outlier tree");
|
id_origin_ts.insert(event_id.clone(), origin_server_ts);
|
||||||
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
}
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// c. Ask origin server over federation
|
debug!(count = dag.len(), "Sorting incoming events with partial graph");
|
||||||
// We also handle its auth chain here so we don't get a stack overflow in
|
lexicographical_topological_sort(&dag, &async |node_id| {
|
||||||
// handle_outlier_pdu.
|
// Note: we don't bother fetching power levels because that would massively slow
|
||||||
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
|
// this function down. This is a best-effort attempt to order events correctly
|
||||||
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
// for processing, however ultimately that should be the sender's job.
|
||||||
|
let ts = id_origin_ts
|
||||||
|
.get(&node_id)
|
||||||
|
.copied()
|
||||||
|
.unwrap_or_else(|| int!(0))
|
||||||
|
.to_string()
|
||||||
|
.parse::<u64>()
|
||||||
|
.ok()
|
||||||
|
.and_then(UInt::new)
|
||||||
|
.unwrap_or_default();
|
||||||
|
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.inspect(|sorted| {
|
||||||
|
debug_assert_eq!(
|
||||||
|
sorted.len(),
|
||||||
|
pdu_map.len(),
|
||||||
|
"Sorted graph was not the same size as the input graph"
|
||||||
|
);
|
||||||
|
})
|
||||||
|
.map_err(|e| err!("failed to resolve local graph: {e}"))
|
||||||
|
}
|
||||||
|
|
||||||
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
impl super::Service {
|
||||||
while let Some(next_id) = todo_auth_events.pop_front() {
|
/// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the
|
||||||
if let Some((time, tries)) = self
|
/// DAG.
|
||||||
.services
|
///
|
||||||
.globals
|
/// When this function is called, the "earliest events" (current forward
|
||||||
.bad_event_ratelimiter
|
/// extremities) will be collected, and the function will loop with an
|
||||||
.read()
|
/// exponentially incrementing limit (up to 100 per request) until it has
|
||||||
.get(&*next_id)
|
/// filled the gap, i.e. when the remote says there's no more events.
|
||||||
{
|
///
|
||||||
// Exponential backoff
|
/// This function will iterate until the remote returns no more events,
|
||||||
const MIN_DURATION: u64 = 60 * 2;
|
/// increasing the limit by a factor of 10. If 100 iterations are reached or
|
||||||
const MAX_DURATION: u64 = 60 * 60 * 8;
|
/// max_fetch_prev_events events are backfilled, the function will give up
|
||||||
if continue_exponential_backoff_secs(
|
/// and return what it has, to avoid pulling in too much data (for example,
|
||||||
MIN_DURATION,
|
/// absurdly large gaps).
|
||||||
MAX_DURATION,
|
///
|
||||||
time.elapsed(),
|
/// This function does not persist the events. The caller is responsible for
|
||||||
*tries,
|
/// passing them through handle_incoming_pdu.
|
||||||
) {
|
///
|
||||||
debug_warn!(
|
/// ## Parameters
|
||||||
tried = ?*tries,
|
///
|
||||||
elapsed = ?time.elapsed(),
|
/// - `room_id`: The room's ID.
|
||||||
"Backing off from {next_id}",
|
/// - `head`: The event we are potentially missing prev_events for.
|
||||||
);
|
/// - `tail`: The most recently known events in the graph (typically forward
|
||||||
continue;
|
/// extremities).
|
||||||
}
|
/// - `via`: The server to ask for missing events.
|
||||||
}
|
/// - `min_depth`: Don't process events with a `depth` lower than this
|
||||||
|
/// value. Not massively useful, but can help short-circuit infinite loops
|
||||||
|
/// and weird edge paths.
|
||||||
|
pub async fn get_missing_events(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
head: &PduEvent,
|
||||||
|
tail: Vec<OwnedEventId>,
|
||||||
|
via: &ServerName,
|
||||||
|
min_depth: UInt,
|
||||||
|
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
{
|
||||||
|
let missing_count = head
|
||||||
|
.prev_events()
|
||||||
|
.stream()
|
||||||
|
.broad_filter_map(|event_id| async move {
|
||||||
|
match self
|
||||||
|
.services
|
||||||
|
.timeline
|
||||||
|
.get_non_outlier_pdu_json(event_id)
|
||||||
|
.await
|
||||||
|
.inspect(|_| debug!("Found prev_event {event_id} locally."))
|
||||||
|
.inspect_err(
|
||||||
|
|e| debug!(%e, "Could not find prev_event {event_id} locally."),
|
||||||
|
) {
|
||||||
|
| Ok(_) => None,
|
||||||
|
| Err(_) => Some(event_id),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.count()
|
||||||
|
.await;
|
||||||
|
debug_assert_ne!(
|
||||||
|
missing_count, 0,
|
||||||
|
"event passed to get_missing_events is not missing any events (wasteful call)"
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
if events_all.contains(&next_id) {
|
let mut discovered = HashMap::with_capacity(20);
|
||||||
continue;
|
let mut latest_events = vec![head.event_id().to_owned()];
|
||||||
}
|
let mut iterations = 0_u8;
|
||||||
|
loop {
|
||||||
if self.services.timeline.pdu_exists(&next_id).await {
|
iterations = iterations.saturating_add(1);
|
||||||
trace!("Found {next_id} in db");
|
let limit = iterations.saturating_mul(10).min(100);
|
||||||
continue;
|
debug_info!(%limit, %via, %iterations, discovered=discovered.len(), %min_depth, "Attempting to gap fill missing events");
|
||||||
}
|
let response: get_missing_events::v1::Response = self
|
||||||
|
|
||||||
debug!("Fetching {next_id} over federation from {origin}.");
|
|
||||||
match self
|
|
||||||
.services
|
.services
|
||||||
.sending
|
.sending
|
||||||
.send_federation_request(
|
.send_federation_request(
|
||||||
origin,
|
via,
|
||||||
get_event::v1::Request::new((*next_id).to_owned()),
|
assign!(
|
||||||
|
get_missing_events::v1::Request::new(
|
||||||
|
room_id.to_owned(),
|
||||||
|
tail.clone(),
|
||||||
|
latest_events.clone()
|
||||||
|
),
|
||||||
|
{limit: limit.into(), min_depth}
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
{
|
|
||||||
| Ok(res) => {
|
|
||||||
debug!("Got {next_id} over federation from {origin}");
|
|
||||||
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
|
|
||||||
back_off((*next_id).to_owned());
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
let Ok((calculated_event_id, value)) =
|
if response.events.is_empty() {
|
||||||
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
|
debug_info!(%via, "Finished gap filling missing events (remote returned no more events).");
|
||||||
else {
|
break;
|
||||||
back_off((*next_id).to_owned());
|
}
|
||||||
continue;
|
debug_info!("Got {} events back from remote", response.events.len());
|
||||||
};
|
|
||||||
|
|
||||||
if calculated_event_id != *next_id {
|
latest_events.clear();
|
||||||
warn!(
|
for raw_event in response.events {
|
||||||
"Server didn't return event id we requested: requested: {next_id}, \
|
let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?;
|
||||||
we got {calculated_event_id}. Event: {:?}",
|
let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| {
|
||||||
&res.pdu
|
err!(Request(BadJson("Failed to parse backfilled event {event_id}: {e}")))
|
||||||
);
|
})?;
|
||||||
|
|
||||||
|
if pdu.depth < min_depth {
|
||||||
|
debug_warn!(
|
||||||
|
"Received PDU with depth {} below min_depth {}, ignoring",
|
||||||
|
pdu.depth,
|
||||||
|
min_depth
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for prev_event_id in pdu.prev_events() {
|
||||||
|
if discovered.contains_key(prev_event_id) {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
if self
|
||||||
if let Some(auth_events) = value
|
.services
|
||||||
.get("auth_events")
|
.timeline
|
||||||
.and_then(CanonicalJsonValue::as_array)
|
.non_outlier_pdu_exists(prev_event_id)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
for auth_event in auth_events {
|
continue;
|
||||||
match serde_json::from_value::<OwnedEventId>(
|
|
||||||
auth_event.clone().into(),
|
|
||||||
) {
|
|
||||||
| Ok(auth_event) => {
|
|
||||||
trace!(
|
|
||||||
"Found auth event id {auth_event} for event {next_id}"
|
|
||||||
);
|
|
||||||
todo_auth_events.push_back(auth_event);
|
|
||||||
},
|
|
||||||
| _ => {
|
|
||||||
warn!("Auth event id is not valid");
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
warn!("Auth event list invalid");
|
|
||||||
}
|
}
|
||||||
|
latest_events.push(prev_event_id.to_owned());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
events_in_reverse_order.push((next_id.clone(), value));
|
discovered.insert(event_id.clone(), pdu);
|
||||||
events_all.insert(next_id);
|
}
|
||||||
},
|
|
||||||
| Err(e) => {
|
if latest_events.is_empty() {
|
||||||
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
|
break;
|
||||||
back_off((*next_id).to_owned());
|
} else if discovered.len() > self.services.server.config.max_fetch_prev_events.into()
|
||||||
},
|
|| iterations >= 20
|
||||||
|
{
|
||||||
|
error!(
|
||||||
|
filled=discovered.len(),
|
||||||
|
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
|
||||||
|
%iterations,
|
||||||
|
"Gap too large, giving up"
|
||||||
|
);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
|
Ok(discovered)
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
|
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||||
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
|
/// it is appended to the outliers Tree.
|
||||||
// a. Look in the main timeline (pduid_pdu tree)
|
///
|
||||||
// b. Look at outlier pdu tree
|
/// Returns pdu and if we fetched it over federation the raw json.
|
||||||
// (get_pdu_json checks both)
|
///
|
||||||
if let Some(local_pdu) = local_pdu {
|
/// a. Look in the main timeline (pduid_pdu tree)
|
||||||
trace!("Found {id} in main timeline or outlier tree");
|
/// b. Look at outlier pdu tree
|
||||||
pdus.push((local_pdu.clone(), None));
|
/// c. Ask origin server over federation
|
||||||
}
|
/// d. TODO: Ask other servers over federation?
|
||||||
|
#[deprecated]
|
||||||
|
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||||
|
&self,
|
||||||
|
origin: &'a ServerName,
|
||||||
|
events: Events,
|
||||||
|
create_event: &'a Pdu,
|
||||||
|
room_id: &'a RoomId,
|
||||||
|
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
||||||
|
where
|
||||||
|
Pdu: Event + Send + Sync,
|
||||||
|
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||||
|
{
|
||||||
|
let back_off = |id| match self
|
||||||
|
.services
|
||||||
|
.globals
|
||||||
|
.bad_event_ratelimiter
|
||||||
|
.write()
|
||||||
|
.entry(id)
|
||||||
|
{
|
||||||
|
| hash_map::Entry::Vacant(e) => {
|
||||||
|
e.insert((Instant::now(), 1));
|
||||||
|
},
|
||||||
|
| hash_map::Entry::Occupied(mut e) => {
|
||||||
|
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
|
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
||||||
if let Some((time, tries)) = self
|
trace!("Fetching {} outlier pdus", events.clone().count());
|
||||||
.services
|
|
||||||
.globals
|
for id in events {
|
||||||
.bad_event_ratelimiter
|
// a. Look in the main timeline (pduid_pdu tree)
|
||||||
.read()
|
// b. Look at outlier pdu tree
|
||||||
.get(&*next_id)
|
// (get_pdu_json checks both)
|
||||||
{
|
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
||||||
// Exponential backoff
|
trace!("Found {id} in main timeline or outlier tree");
|
||||||
const MIN_DURATION: u64 = 5 * 60;
|
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
||||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
continue;
|
||||||
if continue_exponential_backoff_secs(
|
}
|
||||||
MIN_DURATION,
|
|
||||||
MAX_DURATION,
|
// c. Ask origin server over federation
|
||||||
time.elapsed(),
|
// We also handle its auth chain here so we don't get a stack overflow in
|
||||||
*tries,
|
// handle_outlier_pdu.
|
||||||
) {
|
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
|
||||||
debug!("Backing off from {next_id}");
|
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
||||||
|
|
||||||
|
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
||||||
|
while let Some(next_id) = todo_auth_events.pop_front() {
|
||||||
|
if let Some((time, tries)) = self
|
||||||
|
.services
|
||||||
|
.globals
|
||||||
|
.bad_event_ratelimiter
|
||||||
|
.read()
|
||||||
|
.get(&*next_id)
|
||||||
|
{
|
||||||
|
// Exponential backoff
|
||||||
|
const MIN_DURATION: u64 = 60 * 2;
|
||||||
|
const MAX_DURATION: u64 = 60 * 60 * 8;
|
||||||
|
if continue_exponential_backoff_secs(
|
||||||
|
MIN_DURATION,
|
||||||
|
MAX_DURATION,
|
||||||
|
time.elapsed(),
|
||||||
|
*tries,
|
||||||
|
) {
|
||||||
|
debug_warn!(
|
||||||
|
tried = ?*tries,
|
||||||
|
elapsed = ?time.elapsed(),
|
||||||
|
"Backing off from {next_id}",
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if events_all.contains(&next_id) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if self.services.timeline.pdu_exists(&next_id).await {
|
||||||
|
trace!("Found {next_id} in db");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Fetching {next_id} over federation from {origin}.");
|
||||||
|
match self
|
||||||
|
.services
|
||||||
|
.sending
|
||||||
|
.send_federation_request(
|
||||||
|
origin,
|
||||||
|
get_event::v1::Request::new((*next_id).to_owned()),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
| Ok(res) => {
|
||||||
|
debug!("Got {next_id} over federation from {origin}");
|
||||||
|
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
|
||||||
|
back_off((*next_id).to_owned());
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let Ok((calculated_event_id, value)) =
|
||||||
|
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
|
||||||
|
else {
|
||||||
|
back_off((*next_id).to_owned());
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if calculated_event_id != *next_id {
|
||||||
|
warn!(
|
||||||
|
"Server didn't return event id we requested: requested: \
|
||||||
|
{next_id}, we got {calculated_event_id}. Event: {:?}",
|
||||||
|
&res.pdu
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(auth_events) = value
|
||||||
|
.get("auth_events")
|
||||||
|
.and_then(CanonicalJsonValue::as_array)
|
||||||
|
{
|
||||||
|
for auth_event in auth_events {
|
||||||
|
match serde_json::from_value::<OwnedEventId>(
|
||||||
|
auth_event.clone().into(),
|
||||||
|
) {
|
||||||
|
| Ok(auth_event) => {
|
||||||
|
trace!(
|
||||||
|
"Found auth event id {auth_event} for event \
|
||||||
|
{next_id}"
|
||||||
|
);
|
||||||
|
todo_auth_events.push_back(auth_event);
|
||||||
|
},
|
||||||
|
| _ => {
|
||||||
|
warn!("Auth event id is not valid");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Auth event list invalid");
|
||||||
|
}
|
||||||
|
|
||||||
|
events_in_reverse_order.push((next_id.clone(), value));
|
||||||
|
events_all.insert(next_id);
|
||||||
|
},
|
||||||
|
| Err(e) => {
|
||||||
|
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
|
||||||
|
back_off((*next_id).to_owned());
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Handling outlier {next_id}");
|
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
|
||||||
match Box::pin(self.handle_outlier_pdu(
|
}
|
||||||
origin,
|
|
||||||
create_event,
|
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
|
||||||
&next_id,
|
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
|
||||||
room_id,
|
// a. Look in the main timeline (pduid_pdu tree)
|
||||||
value.clone(),
|
// b. Look at outlier pdu tree
|
||||||
true,
|
// (get_pdu_json checks both)
|
||||||
))
|
if let Some(local_pdu) = local_pdu {
|
||||||
.await
|
trace!("Found {id} in main timeline or outlier tree");
|
||||||
{
|
pdus.push((local_pdu.clone(), None));
|
||||||
| Ok((pdu, json)) =>
|
}
|
||||||
if next_id == *id {
|
|
||||||
trace!("Handled outlier {next_id} (original request)");
|
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
|
||||||
pdus.push((pdu, Some(json)));
|
if let Some((time, tries)) = self
|
||||||
|
.services
|
||||||
|
.globals
|
||||||
|
.bad_event_ratelimiter
|
||||||
|
.read()
|
||||||
|
.get(&*next_id)
|
||||||
|
{
|
||||||
|
// Exponential backoff
|
||||||
|
const MIN_DURATION: u64 = 5 * 60;
|
||||||
|
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||||
|
if continue_exponential_backoff_secs(
|
||||||
|
MIN_DURATION,
|
||||||
|
MAX_DURATION,
|
||||||
|
time.elapsed(),
|
||||||
|
*tries,
|
||||||
|
) {
|
||||||
|
debug!("Backing off from {next_id}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!("Handling outlier {next_id}");
|
||||||
|
match Box::pin(self.handle_outlier_pdu(
|
||||||
|
origin,
|
||||||
|
create_event,
|
||||||
|
&next_id,
|
||||||
|
room_id,
|
||||||
|
value.clone(),
|
||||||
|
true,
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
| Ok((pdu, json)) =>
|
||||||
|
if next_id == *id {
|
||||||
|
trace!("Handled outlier {next_id} (original request)");
|
||||||
|
pdus.push((pdu, Some(json)));
|
||||||
|
},
|
||||||
|
| Err(e) => {
|
||||||
|
warn!("Authentication of event {next_id} failed: {e:?}");
|
||||||
|
back_off(next_id);
|
||||||
},
|
},
|
||||||
| Err(e) => {
|
}
|
||||||
warn!("Authentication of event {next_id} failed: {e:?}");
|
|
||||||
back_off(next_id);
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
||||||
|
pdus
|
||||||
}
|
}
|
||||||
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
|
||||||
pdus
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,128 +1,96 @@
|
|||||||
use std::{
|
use std::collections::HashMap;
|
||||||
collections::{BTreeMap, HashMap, HashSet, VecDeque},
|
|
||||||
iter::once,
|
|
||||||
};
|
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Event, PduEvent, Result, debug_warn, err, implement,
|
Event, PduEvent, debug, debug_info,
|
||||||
state_res::{self},
|
utils::{BoolExt, IterStream, stream::BroadbandExt},
|
||||||
};
|
warn,
|
||||||
use futures::{FutureExt, future};
|
|
||||||
use ruma::{
|
|
||||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
|
|
||||||
int, uint,
|
|
||||||
};
|
};
|
||||||
|
use futures::StreamExt;
|
||||||
|
use ruma::{RoomId, ServerName};
|
||||||
|
|
||||||
use super::check_room_id;
|
use crate::rooms::event_handler::build_local_dag;
|
||||||
|
|
||||||
#[implement(super::Service)]
|
impl super::Service {
|
||||||
#[tracing::instrument(
|
/// Fetches any missing prev_events for this event and persists them before
|
||||||
level = "debug",
|
/// returning.
|
||||||
skip_all,
|
pub(super) async fn fetch_prevs(
|
||||||
fields(%origin),
|
&self,
|
||||||
)]
|
room_id: &RoomId,
|
||||||
#[allow(clippy::type_complexity)]
|
create_event: &PduEvent,
|
||||||
pub(super) async fn fetch_prev<'a, Pdu, Events>(
|
incoming_pdu: &PduEvent,
|
||||||
&self,
|
origin: &ServerName,
|
||||||
origin: &ServerName,
|
) -> conduwuit::Result<()> {
|
||||||
create_event: &Pdu,
|
let missing = incoming_pdu
|
||||||
room_id: &RoomId,
|
.prev_events()
|
||||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
.stream()
|
||||||
initial_set: Events,
|
.broad_filter_map(|event_id| async move {
|
||||||
) -> Result<(
|
self.services
|
||||||
Vec<OwnedEventId>,
|
.timeline
|
||||||
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
.get_non_outlier_pdu_json(event_id)
|
||||||
)>
|
.await
|
||||||
where
|
.is_ok()
|
||||||
Pdu: Event + Send + Sync,
|
.or(|| event_id.to_owned())
|
||||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
})
|
||||||
{
|
.collect::<Vec<_>>()
|
||||||
let num_ids = initial_set.clone().count();
|
.await;
|
||||||
let mut eventid_info = HashMap::new();
|
if missing.is_empty() {
|
||||||
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
|
debug!(event_id=%incoming_pdu.event_id(), "No missing prev events.");
|
||||||
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
|
return Ok(());
|
||||||
initial_set.map(ToOwned::to_owned).collect();
|
|
||||||
|
|
||||||
let mut amount = 0;
|
|
||||||
|
|
||||||
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
|
|
||||||
self.services.server.check_running()?;
|
|
||||||
|
|
||||||
match self
|
|
||||||
.fetch_and_handle_outliers(
|
|
||||||
origin,
|
|
||||||
once(prev_event_id.as_ref()),
|
|
||||||
create_event,
|
|
||||||
room_id,
|
|
||||||
)
|
|
||||||
.boxed()
|
|
||||||
.await
|
|
||||||
.pop()
|
|
||||||
{
|
|
||||||
| Some((pdu, mut json_opt)) => {
|
|
||||||
check_room_id(room_id, &pdu)?;
|
|
||||||
|
|
||||||
let limit = self.services.server.config.max_fetch_prev_events;
|
|
||||||
if amount > limit {
|
|
||||||
debug_warn!("Max prev event limit reached! Limit: {limit}");
|
|
||||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if json_opt.is_none() {
|
|
||||||
json_opt = self
|
|
||||||
.services
|
|
||||||
.outlier
|
|
||||||
.get_outlier_pdu_json(&prev_event_id)
|
|
||||||
.await
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(json) = json_opt {
|
|
||||||
if pdu.origin_server_ts() > first_ts_in_room {
|
|
||||||
amount = amount.saturating_add(1);
|
|
||||||
for prev_prev in pdu.prev_events() {
|
|
||||||
if !graph.contains_key(prev_prev) {
|
|
||||||
todo_outlier_stack.push_back(prev_prev.to_owned());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
graph.insert(
|
|
||||||
prev_event_id.clone(),
|
|
||||||
pdu.prev_events().map(ToOwned::to_owned).collect(),
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// Time based check failed
|
|
||||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
|
||||||
}
|
|
||||||
|
|
||||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
|
||||||
} else {
|
|
||||||
// Get json failed, so this was not fetched over federation
|
|
||||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
|
||||||
}
|
|
||||||
},
|
|
||||||
| _ => {
|
|
||||||
// Fetch and handle failed
|
|
||||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
debug!(%room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events");
|
||||||
|
let tail = self
|
||||||
|
.services
|
||||||
|
.state
|
||||||
|
.get_forward_extremities(room_id)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let backfilled = self
|
||||||
|
.get_missing_events(
|
||||||
|
room_id,
|
||||||
|
incoming_pdu,
|
||||||
|
tail,
|
||||||
|
origin,
|
||||||
|
self.services.metadata.get_mindepth(room_id).await,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
debug_info!("Fetched {} missing events", backfilled.len());
|
||||||
|
|
||||||
|
// Persist all fetched events
|
||||||
|
let mapped = backfilled
|
||||||
|
.iter()
|
||||||
|
.map(|(eid, evt)| {
|
||||||
|
let mut obj = evt.to_canonical_object();
|
||||||
|
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||||
|
(eid.clone(), obj)
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let to_persist = if mapped.len() <= 1 {
|
||||||
|
mapped.keys().map(ToOwned::to_owned).collect()
|
||||||
|
} else {
|
||||||
|
build_local_dag(&mapped).await?
|
||||||
|
};
|
||||||
|
|
||||||
|
for event_id in to_persist {
|
||||||
|
debug_info!("Persisting fetched prev event {event_id}");
|
||||||
|
let obj = mapped.get(&event_id).cloned().unwrap();
|
||||||
|
match self
|
||||||
|
.handle_outlier_pdu(origin, create_event, &event_id, room_id, obj, false)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
| Ok((pdu, val)) =>
|
||||||
|
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
|
||||||
|
.await,
|
||||||
|
| Err(e) => {
|
||||||
|
warn!("Failed to persist prev_event {event_id}: {e}");
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
}?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE because i keep forgetting: the caller persists incoming_pdu.
|
||||||
|
// we only care about its prev events
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
let event_fetch = |event_id| {
|
|
||||||
let origin_server_ts = eventid_info
|
|
||||||
.get(&event_id)
|
|
||||||
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
|
|
||||||
|
|
||||||
// This return value is the key used for sorting events,
|
|
||||||
// events are then sorted by power level, time,
|
|
||||||
// and lexically by event_id.
|
|
||||||
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
|
|
||||||
};
|
|
||||||
|
|
||||||
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
|
|
||||||
.await
|
|
||||||
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
|
|
||||||
|
|
||||||
Ok((sorted, eventid_info))
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use std::collections::{HashMap, hash_map};
|
use std::collections::{HashMap, hash_map};
|
||||||
|
|
||||||
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
|
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
|
||||||
use futures::FutureExt;
|
|
||||||
use ruma::{
|
use ruma::{
|
||||||
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
|
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
|
||||||
events::StateEventType,
|
events::StateEventType,
|
||||||
@@ -42,7 +41,6 @@ where
|
|||||||
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
|
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
|
||||||
let state_vec = self
|
let state_vec = self
|
||||||
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
|
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
|
||||||
.boxed()
|
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
|
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
|
||||||
|
|||||||
@@ -1,14 +1,11 @@
|
|||||||
use std::{
|
use std::{collections::BTreeMap, time::Instant};
|
||||||
collections::{BTreeMap, hash_map},
|
|
||||||
time::Instant,
|
|
||||||
};
|
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
|
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
|
||||||
implement, info, trace, utils::stream::IterStream, warn,
|
implement, info, trace, warn,
|
||||||
};
|
};
|
||||||
use futures::{
|
use futures::{
|
||||||
FutureExt, TryFutureExt, TryStreamExt,
|
FutureExt,
|
||||||
future::{OptionFuture, try_join4},
|
future::{OptionFuture, try_join4},
|
||||||
};
|
};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
@@ -236,63 +233,21 @@ pub async fn handle_incoming_pdu<'a>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Skip old events
|
// Skip old events
|
||||||
let first_ts_in_room = self
|
// let first_ts_in_room = self
|
||||||
.services
|
// .services
|
||||||
.timeline
|
// .timeline
|
||||||
.first_pdu_in_room(room_id)
|
// .first_pdu_in_room(room_id)
|
||||||
.await?
|
// .await?
|
||||||
.origin_server_ts();
|
// .origin_server_ts();
|
||||||
|
|
||||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
||||||
// These are timeline events
|
// These are timeline events
|
||||||
let (sorted_prev_events, mut eventid_info) = self
|
debug!("Handling previous events");
|
||||||
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
debug!(
|
self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
|
||||||
events = ?sorted_prev_events,
|
|
||||||
"Handling previous events"
|
|
||||||
);
|
|
||||||
|
|
||||||
sorted_prev_events
|
|
||||||
.iter()
|
|
||||||
.try_stream()
|
|
||||||
.map_ok(AsRef::as_ref)
|
|
||||||
.try_for_each(|prev_id| {
|
|
||||||
self.handle_prev_pdu(
|
|
||||||
origin,
|
|
||||||
event_id,
|
|
||||||
room_id,
|
|
||||||
eventid_info.remove(prev_id),
|
|
||||||
create_event,
|
|
||||||
first_ts_in_room,
|
|
||||||
prev_id,
|
|
||||||
)
|
|
||||||
.inspect_err(move |e| {
|
|
||||||
warn!("Prev {prev_id} failed: {e}");
|
|
||||||
match self
|
|
||||||
.services
|
|
||||||
.globals
|
|
||||||
.bad_event_ratelimiter
|
|
||||||
.write()
|
|
||||||
.entry(prev_id.into())
|
|
||||||
{
|
|
||||||
| hash_map::Entry::Vacant(e) => {
|
|
||||||
e.insert((Instant::now(), 1));
|
|
||||||
},
|
|
||||||
| hash_map::Entry::Occupied(mut e) => {
|
|
||||||
let tries = e.get().1.saturating_add(1);
|
|
||||||
*e.get_mut() = (Instant::now(), tries);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.map(|_| self.services.server.check_running())
|
|
||||||
})
|
|
||||||
.boxed()
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Done with prev events, now handling the incoming event
|
// Done with prev events, now handling the incoming event
|
||||||
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
||||||
.boxed()
|
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,13 +1,13 @@
|
|||||||
use std::collections::{BTreeMap, HashMap, hash_map};
|
use std::collections::{BTreeMap, HashMap, hash_map};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
|
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, info, state_res,
|
||||||
trace, warn,
|
trace, warn,
|
||||||
};
|
};
|
||||||
use futures::future::ready;
|
use futures::future::ready;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||||
events::StateEventType,
|
api::federation::authorization::get_event_authorization, events::StateEventType,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{check_room_id, get_room_version_rules};
|
use super::{check_room_id, get_room_version_rules};
|
||||||
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
|||||||
event_id: &'a EventId,
|
event_id: &'a EventId,
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
mut value: CanonicalJsonObject,
|
mut value: CanonicalJsonObject,
|
||||||
auth_events_known: bool,
|
_auth_events_known: bool,
|
||||||
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
|
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
|
||||||
where
|
where
|
||||||
Pdu: Event + Send + Sync,
|
Pdu: Event + Send + Sync,
|
||||||
@@ -107,45 +107,52 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Fetch any missing ones & reject invalid ones
|
// Fetch any missing ones & reject invalid ones
|
||||||
let missing_auth_events = if auth_events_known {
|
if auth_events.len() != pdu_event.auth_events().count() {
|
||||||
pdu_event
|
info!("Missing some auth events, asking remote for auth chain");
|
||||||
.auth_events()
|
let response: get_event_authorization::v1::Response = self
|
||||||
.filter(|id| !auth_events.contains_key(*id))
|
.services
|
||||||
.collect::<Vec<_>>()
|
.sending
|
||||||
} else {
|
.send_federation_request(
|
||||||
pdu_event.auth_events().collect::<Vec<_>>()
|
|
||||||
};
|
|
||||||
if !missing_auth_events.is_empty() || !auth_events_known {
|
|
||||||
debug_info!(
|
|
||||||
"Fetching {} missing auth events for outlier event {event_id}",
|
|
||||||
missing_auth_events.len()
|
|
||||||
);
|
|
||||||
for (pdu, _) in self
|
|
||||||
.fetch_and_handle_outliers(
|
|
||||||
origin,
|
origin,
|
||||||
missing_auth_events.iter().copied(),
|
get_event_authorization::v1::Request::new(
|
||||||
create_event,
|
room_id.to_owned(),
|
||||||
room_id,
|
event_id.to_owned(),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
.map_err(|e| {
|
||||||
auth_events.insert(pdu.event_id().to_owned(), pdu);
|
err!(Request(Forbidden(
|
||||||
|
"Remote server is not divulging incoming event's auth chain: {e}"
|
||||||
|
)))
|
||||||
|
})?;
|
||||||
|
let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len());
|
||||||
|
for auth_pdu_json in response.auth_chain {
|
||||||
|
let (auth_event_room_id, auth_event_id, auth_pdu_json) =
|
||||||
|
self.parse_incoming_pdu(&auth_pdu_json).await?;
|
||||||
|
if auth_event_room_id != room_id {
|
||||||
|
return Err!(Request(BadJson(
|
||||||
|
"Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}."
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json)
|
||||||
|
.map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?;
|
||||||
|
auth_chain_map.insert(auth_event_id, auth_pdu);
|
||||||
}
|
}
|
||||||
} else {
|
for aid in pdu_event.auth_events() {
|
||||||
debug!("No missing auth events for outlier event {event_id}");
|
if auth_events.contains_key(aid) {
|
||||||
}
|
continue;
|
||||||
// reject if we are still missing some
|
}
|
||||||
let still_missing = pdu_event
|
if let Some(auth_event) = auth_chain_map.get(aid) {
|
||||||
.auth_events()
|
auth_events.insert(aid.to_owned(), auth_event.clone());
|
||||||
.filter(|id| !auth_events.contains_key(*id))
|
} else {
|
||||||
.collect::<Vec<_>>();
|
return Err!(Request(Forbidden(
|
||||||
if !still_missing.is_empty() {
|
"Remote server is not divulging incoming event's auth events (missing: \
|
||||||
// Don't reject: this could be a temporary condition
|
{aid})"
|
||||||
// TODO: use get_missing_events?
|
)));
|
||||||
return Err!(Request(InvalidParam(
|
}
|
||||||
"Could not fetch all auth events for outlier event {event_id}, still missing: \
|
}
|
||||||
{still_missing:?}"
|
// TODO: do events received from auth chain need persisting? that sounds
|
||||||
)));
|
// awfully slow
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
|
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
|
||||||
|
|||||||
@@ -1,89 +0,0 @@
|
|||||||
use std::{collections::BTreeMap, time::Instant};
|
|
||||||
|
|
||||||
use conduwuit::{
|
|
||||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
|
|
||||||
utils::continue_exponential_backoff_secs,
|
|
||||||
};
|
|
||||||
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
|
|
||||||
use tracing::debug;
|
|
||||||
|
|
||||||
#[implement(super::Service)]
|
|
||||||
#[allow(clippy::type_complexity)]
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
#[tracing::instrument(
|
|
||||||
name = "prev",
|
|
||||||
level = INFO_SPAN_LEVEL,
|
|
||||||
skip_all,
|
|
||||||
fields(%prev_id),
|
|
||||||
)]
|
|
||||||
pub(super) async fn handle_prev_pdu<'a, Pdu>(
|
|
||||||
&self,
|
|
||||||
origin: &'a ServerName,
|
|
||||||
event_id: &'a EventId,
|
|
||||||
room_id: &'a RoomId,
|
|
||||||
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
|
||||||
create_event: &'a Pdu,
|
|
||||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
|
||||||
prev_id: &'a EventId,
|
|
||||||
) -> Result
|
|
||||||
where
|
|
||||||
Pdu: Event + Send + Sync,
|
|
||||||
{
|
|
||||||
// Check for disabled again because it might have changed
|
|
||||||
if self.services.metadata.is_disabled(room_id).await {
|
|
||||||
return Err!(Request(Forbidden(debug_warn!(
|
|
||||||
"Federaton of room {room_id} is currently disabled on this server. Request by \
|
|
||||||
origin {origin} and event ID {event_id}"
|
|
||||||
))));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some((time, tries)) = self
|
|
||||||
.services
|
|
||||||
.globals
|
|
||||||
.bad_event_ratelimiter
|
|
||||||
.read()
|
|
||||||
.get(prev_id)
|
|
||||||
{
|
|
||||||
// Exponential backoff
|
|
||||||
const MIN_DURATION: u64 = 5 * 60;
|
|
||||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
|
||||||
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
|
|
||||||
debug!(
|
|
||||||
?tries,
|
|
||||||
duration = ?time.elapsed(),
|
|
||||||
"Backing off from prev_event"
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let Some((pdu, json)) = eventid_info else {
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
// Skip old events
|
|
||||||
if pdu.origin_server_ts() < first_ts_in_room {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let start_time = Instant::now();
|
|
||||||
self.federation_handletime
|
|
||||||
.write()
|
|
||||||
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
|
|
||||||
|
|
||||||
defer! {{
|
|
||||||
self.federation_handletime
|
|
||||||
.write()
|
|
||||||
.remove(room_id);
|
|
||||||
}};
|
|
||||||
|
|
||||||
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
elapsed = ?start_time.elapsed(),
|
|
||||||
"Handled prev_event",
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@@ -4,7 +4,6 @@ mod fetch_prev;
|
|||||||
mod fetch_state;
|
mod fetch_state;
|
||||||
mod handle_incoming_pdu;
|
mod handle_incoming_pdu;
|
||||||
mod handle_outlier_pdu;
|
mod handle_outlier_pdu;
|
||||||
mod handle_prev_pdu;
|
|
||||||
mod parse_incoming_pdu;
|
mod parse_incoming_pdu;
|
||||||
mod policy_server;
|
mod policy_server;
|
||||||
mod resolve_state;
|
mod resolve_state;
|
||||||
@@ -15,6 +14,7 @@ use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant};
|
|||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
|
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
|
||||||
|
pub use fetch_and_handle_outliers::build_local_dag;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
|
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
|
||||||
room_version_rules::RoomVersionRules,
|
room_version_rules::RoomVersionRules,
|
||||||
@@ -22,7 +22,6 @@ use ruma::{
|
|||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
use crate::{Dep, globals, rooms, sending, server_keys};
|
use crate::{Dep, globals, rooms, sending, server_keys};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
pub mutex_federation: RoomMutexMap,
|
pub mutex_federation: RoomMutexMap,
|
||||||
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
||||||
|
|||||||
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
|
|||||||
|
|
||||||
/// Parses every entry in an array as an event ID, returning an error if any
|
/// Parses every entry in an array as an event ID, returning an error if any
|
||||||
/// step fails.
|
/// step fails.
|
||||||
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> {
|
pub(super) fn expect_event_id_array(
|
||||||
|
value: &CanonicalJsonObject,
|
||||||
|
field: &str,
|
||||||
|
) -> Result<Vec<OwnedEventId>> {
|
||||||
value
|
value
|
||||||
.get(field)
|
.get(field)
|
||||||
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
|
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Result, debug, err, error, implement,
|
Result, debug, debug_error, err, error, implement,
|
||||||
matrix::{Event, StateMap},
|
matrix::{Event, StateMap},
|
||||||
trace,
|
trace,
|
||||||
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
|
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
|
||||||
@@ -37,6 +37,7 @@ where
|
|||||||
.pdu_shortstatehash(prev_event)
|
.pdu_shortstatehash(prev_event)
|
||||||
.await
|
.await
|
||||||
else {
|
else {
|
||||||
|
trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state.");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -99,6 +100,7 @@ where
|
|||||||
.map_ok(move |sstatehash| (sstatehash, prev_event))
|
.map_ok(move |sstatehash| (sstatehash, prev_event))
|
||||||
})
|
})
|
||||||
.try_collect::<HashMap<_, _>>()
|
.try_collect::<HashMap<_, _>>()
|
||||||
|
.inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}"))
|
||||||
.await
|
.await
|
||||||
else {
|
else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ where
|
|||||||
.get_pdu_id(incoming_pdu.event_id())
|
.get_pdu_id(incoming_pdu.event_id())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
|
||||||
return Ok(Some(pduid));
|
return Ok(Some(pduid));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,6 +64,7 @@ where
|
|||||||
"Upgrading PDU from outlier to timeline"
|
"Upgrading PDU from outlier to timeline"
|
||||||
);
|
);
|
||||||
let timer = Instant::now();
|
let timer = Instant::now();
|
||||||
|
let min_depth = self.services.metadata.get_mindepth(room_id).await;
|
||||||
let room_version_rules = get_room_version_rules(create_event)?;
|
let room_version_rules = get_room_version_rules(create_event)?;
|
||||||
|
|
||||||
// 10. Fetch missing state and auth chain events by calling /state_ids at
|
// 10. Fetch missing state and auth chain events by calling /state_ids at
|
||||||
@@ -81,6 +83,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
if state_at_incoming_event.is_none() {
|
if state_at_incoming_event.is_none() {
|
||||||
|
trace!("Could not calculate incoming state, asking remote {origin} for it");
|
||||||
state_at_incoming_event = self
|
state_at_incoming_event = self
|
||||||
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
||||||
.await?;
|
.await?;
|
||||||
@@ -382,6 +385,11 @@ where
|
|||||||
|
|
||||||
// Event has passed all auth/stateres checks
|
// Event has passed all auth/stateres checks
|
||||||
drop(state_lock);
|
drop(state_lock);
|
||||||
|
if incoming_pdu.depth > min_depth {
|
||||||
|
self.services
|
||||||
|
.metadata
|
||||||
|
.set_mindepth(room_id, incoming_pdu.depth.into());
|
||||||
|
}
|
||||||
|
|
||||||
Ok(pdu_id)
|
Ok(pdu_id)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -626,6 +626,10 @@ impl Service {
|
|||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
self.services
|
||||||
|
.metadata
|
||||||
|
.maybe_set_mindepth(room_id, parsed_join_pdu.depth.into())
|
||||||
|
.await;
|
||||||
|
|
||||||
info!("Setting final room state for new room");
|
info!("Setting final room state for new room");
|
||||||
// We set the room state after inserting the pdu, so that we never have a moment
|
// We set the room state after inserting the pdu, so that we never have a moment
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use conduwuit::{Result, implement, utils::stream::TryIgnore};
|
use conduwuit::{Result, implement, utils::stream::TryIgnore};
|
||||||
use database::Map;
|
use database::{Deserialized, Map};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use ruma::{OwnedRoomId, RoomId};
|
use ruma::{OwnedRoomId, RoomId, UInt, uint};
|
||||||
|
|
||||||
use crate::{Dep, rooms};
|
use crate::{Dep, rooms};
|
||||||
|
|
||||||
@@ -17,6 +17,7 @@ struct Data {
|
|||||||
bannedroomids: Arc<Map>,
|
bannedroomids: Arc<Map>,
|
||||||
roomid_shortroomid: Arc<Map>,
|
roomid_shortroomid: Arc<Map>,
|
||||||
pduid_pdu: Arc<Map>,
|
pduid_pdu: Arc<Map>,
|
||||||
|
roomid_mindepth: Arc<Map>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Services {
|
struct Services {
|
||||||
@@ -31,6 +32,7 @@ impl crate::Service for Service {
|
|||||||
bannedroomids: args.db["bannedroomids"].clone(),
|
bannedroomids: args.db["bannedroomids"].clone(),
|
||||||
roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
|
roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
|
||||||
pduid_pdu: args.db["pduid_pdu"].clone(),
|
pduid_pdu: args.db["pduid_pdu"].clone(),
|
||||||
|
roomid_mindepth: args.db["roomid_mindepth"].clone(),
|
||||||
},
|
},
|
||||||
services: Services {
|
services: Services {
|
||||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||||
@@ -98,3 +100,27 @@ pub async fn is_disabled(&self, room_id: &RoomId) -> bool {
|
|||||||
pub async fn is_banned(&self, room_id: &RoomId) -> bool {
|
pub async fn is_banned(&self, room_id: &RoomId) -> bool {
|
||||||
self.db.bannedroomids.get(room_id).await.is_ok()
|
self.db.bannedroomids.get(room_id).await.is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[implement(Service)]
|
||||||
|
pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt {
|
||||||
|
self.db
|
||||||
|
.roomid_mindepth
|
||||||
|
.get(room_id)
|
||||||
|
.await
|
||||||
|
.deserialized::<UInt>()
|
||||||
|
.unwrap_or_else(|_| uint!(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(Service)]
|
||||||
|
pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
|
||||||
|
self.db
|
||||||
|
.roomid_mindepth
|
||||||
|
.put_raw(room_id.as_bytes(), min_depth.to_be_bytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(Service)]
|
||||||
|
pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
|
||||||
|
if min_depth > self.get_mindepth(room_id).await.into() {
|
||||||
|
self.set_mindepth(room_id, min_depth);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -173,6 +173,10 @@ impl Service {
|
|||||||
self.db.get_non_outlier_pdu_json(event_id).await
|
self.db.get_non_outlier_pdu_json(event_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool {
|
||||||
|
self.db.non_outlier_pdu_exists(event_id).await.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the pdu's id.
|
/// Returns the pdu's id.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
|
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
|
||||||
|
|||||||
@@ -34,8 +34,9 @@ where
|
|||||||
|
|
||||||
batch
|
batch
|
||||||
});
|
});
|
||||||
|
if server_keys.is_empty() {
|
||||||
debug_assert!(!server_keys.is_empty(), "empty batch request to notary");
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
let mut results = Vec::new();
|
let mut results = Vec::new();
|
||||||
while let Some(batch) = server_keys
|
while let Some(batch) = server_keys
|
||||||
|
|||||||
Reference in New Issue
Block a user