feat: Assert that no events were dropped during sorting

This commit is contained in:
timedout
2026-04-27 22:39:41 +01:00
committed by Ellis Git
parent 8b0e86a05d
commit fe7cfd96e7
+43 -30
View File
@@ -1,12 +1,15 @@
use crate::Ruma; use std::{
collections::{BTreeMap, HashMap, HashSet},
net::IpAddr,
time::{Duration, Instant},
};
use axum::extract::State; use axum::extract::State;
use axum_client_ip::ClientIp; use axum_client_ip::ClientIp;
use conduwuit::utils::TryFutureExtExt;
use conduwuit::{ use conduwuit::{
debug, debug_warn, err, error, result::LogErr, state_res::lexicographical_topological_sort, trace, debug, debug_warn, err, error, result::LogErr, state_res::lexicographical_topological_sort, trace,
utils::{ utils::{
millis_since_unix_epoch, stream::{automatic_width, BroadbandExt, TryBroadbandExt}, IterStream, millis_since_unix_epoch, stream::{automatic_width, BroadbandExt, TryBroadbandExt}, IterStream, ReadyExt,
ReadyExt,
}, },
warn, warn,
Err, Err,
@@ -20,30 +23,33 @@ use conduwuit_service::{
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use http::StatusCode; use http::StatusCode;
use itertools::Itertools; use itertools::Itertools;
use ruma::{api::{ use ruma::{
client::error::{ErrorKind, ErrorKind::LimitExceeded}, api::{
federation::transactions::{ client::error::{ErrorKind, ErrorKind::LimitExceeded},
edu::{ federation::transactions::{
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, edu::{
PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent, DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
TypingContent, PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent,
TypingContent,
},
send_transaction_message,
}, },
send_transaction_message, }, events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, int, serde::Raw, to_device::DeviceIdOrAllDevices,
}, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
}, events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, int, serde::Raw, to_device::DeviceIdOrAllDevices, uint, CanonicalJsonObject, Int, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UInt, UserId}; OwnedUserId,
RoomId,
ServerName,
UInt,
UserId,
};
use service::transactions::{ use service::transactions::{
FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse, FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse,
}; };
use std::cmp::Reverse;
use std::hash::Hasher;
use std::{
collections::{BTreeMap, HashMap, HashSet},
net::IpAddr,
time::{Duration, Instant},
};
use tokio::sync::watch::{Receiver, Sender}; use tokio::sync::watch::{Receiver, Sender};
use tracing::instrument; use tracing::instrument;
use crate::Ruma;
type ResolvedMap = BTreeMap<OwnedEventId, Result>; type ResolvedMap = BTreeMap<OwnedEventId, Result>;
type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject); type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
@@ -273,16 +279,18 @@ async fn build_local_dag(
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject>, pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject>,
) -> Result<Vec<OwnedEventId>> { ) -> Result<Vec<OwnedEventId>> {
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs"); debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> = HashMap::with_capacity(pdu_map.len()); let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
HashMap::with_capacity(pdu_map.len());
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len()); let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
for (event_id, value) in pdu_map { for (event_id, value) in pdu_map {
// We already checked that these properties are correct in parse_incoming_pdu, // We already checked that these properties are correct in parse_incoming_pdu,
// so it's safe to unwrap here. // so it's safe to unwrap here.
// We also filter to remove any prev_events that are not in this pdu_map, as we need to have // We also filter to remove any prev_events that are not in this pdu_map, as we
// at least one event with zero out degrees for the lexico-topo sort below. If there are // need to have at least one event with zero out degrees for the lexico-topo
// multiple events with omitted prevs, they will be ordered by timestamp, then event ID. At // sort below. If there are multiple events with omitted prevs, they will be
// that point though, it's unlikely to matter. // ordered by timestamp, then event ID. At that point though, it's unlikely to
// matter.
let prev_events = value let prev_events = value
.get("prev_events") .get("prev_events")
.unwrap() .unwrap()
@@ -294,13 +302,14 @@ async fn build_local_dag(
.collect(); .collect();
dag.insert(event_id.clone(), prev_events); dag.insert(event_id.clone(), prev_events);
let origin_server_ts = value.get("origin_server_ts") let origin_server_ts = value
.get("origin_server_ts")
.and_then(ruma::CanonicalJsonValue::as_integer) .and_then(ruma::CanonicalJsonValue::as_integer)
.unwrap_or_default(); .unwrap_or_default();
id_origin_ts.insert(event_id.clone(), origin_server_ts); id_origin_ts.insert(event_id.clone(), origin_server_ts);
} }
lexicographical_topological_sort(&dag, &async |node_id| { let sorted = lexicographical_topological_sort(&dag, &async |node_id| {
// Note: we don't bother fetching power levels because that would massively slow // Note: we don't bother fetching power levels because that would massively slow
// this function down. This is a best-effort attempt to order events correctly // this function down. This is a best-effort attempt to order events correctly
// for processing, however ultimately that should be the sender's job. // for processing, however ultimately that should be the sender's job.
@@ -315,8 +324,12 @@ async fn build_local_dag(
.unwrap_or_default(); .unwrap_or_default();
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts))) Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
}) })
.await .await
.map_err(|e| err!("failed to resolve local graph: {e}")) .map_err(|e| err!("failed to resolve local graph: {e}"));
if let Ok(ref s) = sorted {
assert_eq!(s.len(), pdu_map.len(), "Sorted graph was not the same size as the input graph");
};
sorted
} }
async fn handle_room( async fn handle_room(