diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 80f6bfa43..fc92fc31c 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -1,57 +1,49 @@ +use crate::Ruma; +use axum::extract::State; +use axum_client_ip::ClientIp; +use conduwuit::utils::TryFutureExtExt; +use conduwuit::{ + debug, debug_warn, err, error, result::LogErr, state_res::lexicographical_topological_sort, trace, + utils::{ + millis_since_unix_epoch, stream::{automatic_width, BroadbandExt, TryBroadbandExt}, IterStream, + ReadyExt, + }, + warn, + Err, + Error, + Result, +}; +use conduwuit_service::{ + sending::{EDU_LIMIT, PDU_LIMIT}, + Services, +}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use http::StatusCode; +use itertools::Itertools; +use ruma::{api::{ + client::error::{ErrorKind, ErrorKind::LimitExceeded}, + federation::transactions::{ + edu::{ + DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, + PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent, + TypingContent, + }, + send_transaction_message, + }, +}, events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, int, serde::Raw, to_device::DeviceIdOrAllDevices, uint, CanonicalJsonObject, Int, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UInt, UserId}; +use service::transactions::{ + FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse, +}; +use std::cmp::Reverse; +use std::hash::Hasher; use std::{ collections::{BTreeMap, HashMap, HashSet}, net::IpAddr, time::{Duration, Instant}, }; - -use axum::extract::State; -use axum_client_ip::ClientIp; -use conduwuit::{ - Err, Error, Result, debug, debug_warn, err, error, - result::LogErr, - state_res::lexicographical_topological_sort, - trace, - utils::{ - IterStream, ReadyExt, millis_since_unix_epoch, - stream::{BroadbandExt, TryBroadbandExt, automatic_width}, - }, - warn, -}; -use conduwuit_service::{ - Services, - sending::{EDU_LIMIT, PDU_LIMIT}, -}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; -use http::StatusCode; -use itertools::Itertools; -use ruma::{ - CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, - RoomId, ServerName, UserId, - api::{ - client::error::{ErrorKind, ErrorKind::LimitExceeded}, - federation::transactions::{ - edu::{ - DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, - PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent, - TypingContent, - }, - send_transaction_message, - }, - }, - events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, - int, - serde::Raw, - to_device::DeviceIdOrAllDevices, - uint, -}; -use service::transactions::{ - FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse, -}; use tokio::sync::watch::{Receiver, Sender}; use tracing::instrument; -use crate::Ruma; - type ResolvedMap = BTreeMap; type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject); @@ -281,11 +273,16 @@ async fn build_local_dag( pdu_map: &HashMap, ) -> Result> { debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs"); - let mut dag: HashMap> = HashMap::new(); + let mut dag: HashMap> = HashMap::with_capacity(pdu_map.len()); + let mut id_origin_ts: HashMap = HashMap::with_capacity(pdu_map.len()); for (event_id, value) in pdu_map { // We already checked that these properties are correct in parse_incoming_pdu, // so it's safe to unwrap here. + // We also filter to remove any prev_events that are not in this pdu_map, as we need to have + // at least one event with zero out degrees for the lexico-topo sort below. If there are + // multiple events with omitted prevs, they will be ordered by timestamp, then event ID. At + // that point though, it's unlikely to matter. let prev_events = value .get("prev_events") .unwrap() @@ -293,18 +290,33 @@ async fn build_local_dag( .unwrap() .iter() .map(|v| OwnedEventId::parse(v.as_str().unwrap()).unwrap()) + .filter(|id| pdu_map.contains_key(id)) .collect(); dag.insert(event_id.clone(), prev_events); + let origin_server_ts = value.get("origin_server_ts") + .and_then(ruma::CanonicalJsonValue::as_integer) + .unwrap_or_default(); + id_origin_ts.insert(event_id.clone(), origin_server_ts); } - lexicographical_topological_sort(&dag, &|_| async { + + lexicographical_topological_sort(&dag, &async |node_id| { // Note: we don't bother fetching power levels because that would massively slow // this function down. This is a best-effort attempt to order events correctly // for processing, however ultimately that should be the sender's job. - Ok((int!(0), MilliSecondsSinceUnixEpoch(uint!(0)))) + let ts = id_origin_ts + .get(&node_id) + .copied() + .unwrap_or_else(|| int!(0)) + .to_string() + .parse::() + .ok() + .and_then(UInt::new) + .unwrap_or_default(); + Ok((int!(0), MilliSecondsSinceUnixEpoch(ts))) }) - .await - .map_err(|e| err!("failed to resolve local graph: {e}")) + .await + .map_err(|e| err!("failed to resolve local graph: {e}")) } async fn handle_room(