Compare commits

..

8 Commits

Author SHA1 Message Date
timedout c7f8eec282 chore: Add newsfrag 2026-05-26 21:26:57 +01:00
timedout 0d4bbe612d feat: Keep track of a min_depth value
Should prevent weird situations where we accidentally gapfill into backfill territory
2026-05-26 21:22:32 +01:00
timedout c4d297ae3b perf: Increase default max_fetch_prev_events to 256 2026-05-26 20:25:37 +01:00
timedout d15064871e perf: Make max gap depth fetch configurable 2026-05-26 20:22:45 +01:00
timedout b925936195 perf: Improve gap filling, handle missing auth events better 2026-05-26 20:22:45 +01:00
timedout 56feba0ea0 fix: This is some bullshit I tell you 2026-05-26 20:22:40 +01:00
timedout 8d89ba94d5 feat: Better prev event fetching
fix: Don't panic in debug mode when making an empty notary query
2026-05-26 20:22:10 +01:00
timedout 0b135c7717 feat: Add backfill_missing_events helper 2026-05-26 20:22:10 +01:00
29 changed files with 903 additions and 700 deletions
-1
View File
@@ -1 +0,0 @@
Added support for Matrix 1.16's `state_after` feature, allowing clients which understand it to sync room state changes more reliably. Contributed by @ginger.
+2
View File
@@ -0,0 +1,2 @@
Improved the performance and reliability of fetching missing events, improving network partition recovery. Contributed
by @nex.
-1
View File
@@ -1 +0,0 @@
Adjusted legacy sync logic to no longer use the `roomsynctoken_shortstatehash` database column. Once this change has been confirmed to be stable and reliable, a future update will remove it entirely, significantly decreasing database sizes. Contributed by @ginger.
+1 -1
View File
@@ -297,7 +297,7 @@
# This item is undocumented. Please contribute documentation for it. # This item is undocumented. Please contribute documentation for it.
# #
#max_fetch_prev_events = 192 #max_fetch_prev_events = 256
# How many incoming federation transactions the server is willing to be # How many incoming federation transactions the server is willing to be
# processing at any given time before it becomes overloaded and starts # processing at any given time before it becomes overloaded and starts
-7
View File
@@ -48,13 +48,6 @@ async fn load_timeline(
ending_count: Option<PduCount>, ending_count: Option<PduCount>,
limit: usize, limit: usize,
) -> Result<TimelinePdus> { ) -> Result<TimelinePdus> {
if let (Some(starting_count), Some(ending_count)) = (starting_count, ending_count) {
debug_assert!(
starting_count <= ending_count,
"starting count {starting_count} > ending count {ending_count}"
);
}
let mut pdu_stream = match starting_count { let mut pdu_stream = match starting_count {
| Some(starting_count) => { | Some(starting_count) => {
let last_timeline_count = services let last_timeline_count = services
+73 -76
View File
@@ -38,7 +38,6 @@ use ruma::{
uint, uint,
}; };
use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash}; use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash};
use tokio::pin;
use super::{load_timeline, share_encrypted_room}; use super::{load_timeline, share_encrypted_room};
use crate::client::{ use crate::client::{
@@ -97,19 +96,12 @@ pub(super) async fn load_joined_room(
); );
} }
let state_events =
StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect());
let joined_room = assign!(JoinedRoom::new(), { let joined_room = assign!(JoinedRoom::new(), {
account_data, account_data,
summary: summary.unwrap_or_default(), summary: summary.unwrap_or_default(),
unread_notifications: notification_counts.unwrap_or_default(), unread_notifications: notification_counts.unwrap_or_default(),
timeline, timeline,
state: if sync_context.use_state_after { state: RoomState::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())),
RoomState::After(state_events)
} else {
RoomState::Before(state_events)
},
ephemeral, ephemeral,
unread_thread_notifications: BTreeMap::new(), unread_thread_notifications: BTreeMap::new(),
}); });
@@ -352,7 +344,7 @@ struct ShortStateHashes {
#[tracing::instrument(level = "debug", skip_all)] #[tracing::instrument(level = "debug", skip_all)]
async fn fetch_shortstatehashes( async fn fetch_shortstatehashes(
services: &Services, services: &Services,
SyncContext { last_sync_end_count, .. }: SyncContext<'_>, SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>,
room_id: &RoomId, room_id: &RoomId,
) -> Result<ShortStateHashes> { ) -> Result<ShortStateHashes> {
// the room state currently. // the room state currently.
@@ -362,41 +354,46 @@ async fn fetch_shortstatehashes(
.rooms .rooms
.state .state
.get_room_shortstatehash(room_id) .get_room_shortstatehash(room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state")))) .map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
.await?;
// The room state as of the end of the last sync. // the room state as of the end of the last sync.
// This will be None if we are doing an initial sync. // this will be None if we are doing an initial sync or if we just joined this
// room.
let last_sync_end_shortstatehash = let last_sync_end_shortstatehash =
OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| { OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| {
pin! { // look up the shortstatehash saved by the last sync's call to
let pdus = services // `associate_token_shortstatehash`
.rooms services
.timeline .rooms
.pdus(room_id, Some(PduCount::Normal(last_sync_end_count))) .user
.ignore_err(); .get_token_shortstatehash(room_id, last_sync_end_count)
} .inspect_err(move |_| {
debug_warn!(
match pdus.next().await { token = last_sync_end_count,
| Some((_, pdu_after_last_sync_end)) => { "Room has no shortstatehash for this token"
trace!(?pdu_after_last_sync_end.event_id, "pdu at last sync end"); );
})
services .ok()
.rooms
.state_accessor
.pdu_shortstatehash(&pdu_after_last_sync_end.event_id)
.await
.map_err(|err| err!("Last sync end PDU has no shortstatehash: {err}"))
},
| None => {
// No events have been sent since the last sync, or we just joined this room,
// so the state then is the same as the state now
Ok(current_shortstatehash)
},
}
})) }))
.await .map(Option::flatten)
.transpose()?; .map(Ok);
let (current_shortstatehash, last_sync_end_shortstatehash) =
try_join(current_shortstatehash, last_sync_end_shortstatehash).await?;
/*
associate the `current_count` with the `current_shortstatehash`, so we can
use it on the next sync as the `last_sync_end_shortstatehash`.
TODO: the table written to by this call grows extremely fast, gaining one new entry for each
joined room on _every single sync request_. we need to find a better way to remember the shortstatehash
between syncs.
*/
services
.rooms
.user
.associate_token_shortstatehash(room_id, current_count, current_shortstatehash)
.await;
Ok(ShortStateHashes { Ok(ShortStateHashes {
current_shortstatehash, current_shortstatehash,
@@ -455,7 +452,6 @@ async fn build_state_events(
syncing_user, syncing_user,
last_sync_end_count, last_sync_end_count,
full_state, full_state,
use_state_after,
.. ..
} = sync_context; } = sync_context;
@@ -464,28 +460,32 @@ async fn build_state_events(
last_sync_end_shortstatehash, last_sync_end_shortstatehash,
} = shortstatehashes; } = shortstatehashes;
let timeline_start_shortstatehash = if let Some((count, pdu)) = timeline.pdus.front() { // the spec states that the `state` property only includes state events up to
if matches!(count, PduCount::Backfilled(_)) { // the beginning of the timeline, so we determine the state of the syncing room
// We don't have shortstatehashes for backfilled PDUs, the best we can // as of the first timeline event. NOTE: this explanation is not entirely
// do is to use the current state // accurate; see the implementation of `build_state_incremental`.
current_shortstatehash let timeline_start_shortstatehash = async {
} else { if let Some((_, pdu)) = timeline.pdus.front() {
services if let Ok(shortstatehash) = services
.rooms .rooms
.state_accessor .state_accessor
.pdu_shortstatehash(&pdu.event_id) .pdu_shortstatehash(&pdu.event_id)
.await .await
.map_err(|err| err!("Timeline start has no shortstatehash: {err}"))? {
return shortstatehash;
}
} }
} else {
// if the timeline is empty there can't possibly be any changes to the state current_shortstatehash
return Ok(vec![]);
}; };
// the user IDs of members whose membership needs to be sent to the client, if // the user IDs of members whose membership needs to be sent to the client, if
// lazy-loading is enabled. // lazy-loading is enabled.
let lazily_loaded_members = let lazily_loaded_members =
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await; prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
let (timeline_start_shortstatehash, lazily_loaded_members) =
join(timeline_start_shortstatehash, lazily_loaded_members).await;
// compute the state delta between the previous sync and this sync. // compute the state delta between the previous sync and this sync.
match (last_sync_end_count, last_sync_end_shortstatehash) { match (last_sync_end_count, last_sync_end_shortstatehash) {
@@ -494,15 +494,16 @@ async fn build_state_events(
is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false, is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false,
then use `build_state_incremental`. then use `build_state_incremental`.
*/ */
| (Some(_), Some(last_sync_end_shortstatehash)) if !full_state => | (Some(last_sync_end_count), Some(last_sync_end_shortstatehash)) if !full_state =>
build_state_incremental( build_state_incremental(
services, services,
syncing_user, syncing_user,
room_id,
PduCount::Normal(last_sync_end_count),
last_sync_end_shortstatehash, last_sync_end_shortstatehash,
timeline_start_shortstatehash, timeline_start_shortstatehash,
current_shortstatehash, current_shortstatehash,
timeline, timeline,
use_state_after,
lazily_loaded_members.as_ref(), lazily_loaded_members.as_ref(),
) )
.boxed() .boxed()
@@ -517,8 +518,6 @@ async fn build_state_events(
services, services,
syncing_user, syncing_user,
timeline_start_shortstatehash, timeline_start_shortstatehash,
current_shortstatehash,
use_state_after,
lazily_loaded_members.as_ref(), lazily_loaded_members.as_ref(),
) )
.boxed() .boxed()
@@ -599,25 +598,23 @@ async fn check_joined_since_last_sync(
ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes, ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes,
SyncContext { syncing_user, .. }: SyncContext<'_>, SyncContext { syncing_user, .. }: SyncContext<'_>,
) -> Result<bool> { ) -> Result<bool> {
let Some(last_sync_end_shortstatehash) = last_sync_end_shortstatehash else { // fetch the syncing user's membership event during the last sync.
// For initial syncs always return false, since there's no "last sync" for the // this will be None if `previous_sync_end_shortstatehash` is None.
// user to have joined since. let membership_during_previous_sync = match last_sync_end_shortstatehash {
return Ok(false); | Some(last_sync_end_shortstatehash) => services
.rooms
.state_accessor
.state_get_content(
last_sync_end_shortstatehash,
&StateEventType::RoomMember,
syncing_user.as_str(),
)
.await
.inspect_err(|_| debug_warn!("User has no previous membership"))
.ok(),
| None => None,
}; };
// Fetch the syncing user's membership event during the last sync.
let membership_during_previous_sync = services
.rooms
.state_accessor
.state_get_content(
last_sync_end_shortstatehash,
&StateEventType::RoomMember,
syncing_user.as_str(),
)
.await
.inspect_err(|_| debug_warn!("User has no previous membership"))
.ok();
// TODO: If the requesting user got state-reset out of the room, this // TODO: If the requesting user got state-reset out of the room, this
// will be `true` when it shouldn't be. this function should never be called // will be `true` when it shouldn't be. this function should never be called
// in that situation, but it may be if the membership cache didn't get updated. // in that situation, but it may be if the membership cache didn't get updated.
+1 -10
View File
@@ -181,9 +181,6 @@ pub(super) async fn load_left_room(
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await; .await;
let state_events =
StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect());
Ok(Some(assign!(LeftRoom::new(), { Ok(Some(assign!(LeftRoom::new(), {
account_data: RoomAccountData::new(), account_data: RoomAccountData::new(),
timeline: assign!(Timeline::new(), { timeline: assign!(Timeline::new(), {
@@ -191,11 +188,7 @@ pub(super) async fn load_left_room(
prev_batch: Some(current_count.to_string()), prev_batch: Some(current_count.to_string()),
events: raw_timeline_pdus, events: raw_timeline_pdus,
}), }),
state: if sync_context.use_state_after { state: State::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())),
State::After(state_events)
} else {
State::Before(state_events)
},
}))) })))
} }
@@ -271,8 +264,6 @@ async fn build_left_state_and_timeline(
services, services,
syncing_user, syncing_user,
timeline_start_shortstatehash, timeline_start_shortstatehash,
leave_shortstatehash,
sync_context.use_state_after,
lazily_loaded_members.as_ref(), lazily_loaded_members.as_ref(),
) )
.await?; .await?;
+4 -7
View File
@@ -11,11 +11,12 @@ use std::{
use axum::extract::State; use axum::extract::State;
use axum_client_ip::ClientIp; use axum_client_ip::ClientIp;
use conduwuit::{ use conduwuit::{
Err, Result, at, error, extract_variant, Err, Result, at, extract_variant,
utils::{ utils::{
ReadyExt, TryFutureExtExt, ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, Tools, WidebandExt}, stream::{BroadbandExt, Tools, WidebandExt},
}, },
warn,
}; };
use conduwuit_service::Services; use conduwuit_service::Services;
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture}; use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture};
@@ -109,9 +110,6 @@ struct SyncContext<'a> {
/// The sync filter, which the client uses to specify what data should be /// The sync filter, which the client uses to specify what data should be
/// included in the sync response. /// included in the sync response.
filter: &'a FilterDefinition, filter: &'a FilterDefinition,
/// Whether the state at the end of the timeline should be used when
/// calculating state diffs for sync.
use_state_after: bool,
} }
impl<'a> SyncContext<'a> { impl<'a> SyncContext<'a> {
@@ -265,7 +263,6 @@ pub(crate) async fn build_sync_events(
current_count, current_count,
full_state, full_state,
filter: &filter, filter: &filter,
use_state_after: body.use_state_after,
}; };
let joined_rooms = services let joined_rooms = services
@@ -278,7 +275,7 @@ pub(crate) async fn build_sync_events(
match joined_room { match joined_room {
| Ok((room, updates)) => Some((room_id, room, updates)), | Ok((room, updates)) => Some((room_id, room, updates)),
| Err(err) => { | Err(err) => {
error!(?err, %room_id, "error loading joined room"); warn!(?err, %room_id, "error loading joined room");
None None
}, },
} }
@@ -307,7 +304,7 @@ pub(crate) async fn build_sync_events(
| Ok(Some(left_room)) => Some((room_id, left_room)), | Ok(Some(left_room)) => Some((room_id, left_room)),
| Ok(None) => None, | Ok(None) => None,
| Err(err) => { | Err(err) => {
error!(?err, %room_id, "error loading joined room"); warn!(?err, %room_id, "error loading joined room");
None None
}, },
} }
+143 -60
View File
@@ -1,8 +1,11 @@
use std::collections::HashSet; use std::{collections::BTreeSet, ops::ControlFlow};
use conduwuit::{ use conduwuit::{
Result, at, Result, at, is_equal_to,
matrix::{Event, pdu::PduEvent}, matrix::{
Event,
pdu::{PduCount, PduEvent},
},
utils::{ utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt, BoolExt, IterStream, ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, TryIgnore}, stream::{BroadbandExt, TryIgnore},
@@ -13,7 +16,9 @@ use conduwuit_service::{
rooms::{lazy_loading::MemberSet, short::ShortStateHash}, rooms::{lazy_loading::MemberSet, short::ShortStateHash},
}; };
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use ruma::{OwnedEventId, UserId, events::StateEventType}; use itertools::Itertools;
use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType};
use service::rooms::short::ShortEventId;
use tracing::trace; use tracing::trace;
use crate::client::TimelinePdus; use crate::client::TimelinePdus;
@@ -34,19 +39,13 @@ pub(super) async fn build_state_initial(
services: &Services, services: &Services,
sender_user: &UserId, sender_user: &UserId,
timeline_start_shortstatehash: ShortStateHash, timeline_start_shortstatehash: ShortStateHash,
timeline_end_shortstatehash: ShortStateHash,
use_state_after: bool,
lazily_loaded_members: Option<&MemberSet>, lazily_loaded_members: Option<&MemberSet>,
) -> Result<Vec<PduEvent>> { ) -> Result<Vec<PduEvent>> {
// load the keys and event IDs of the state events at the start of the timeline // load the keys and event IDs of the state events at the start of the timeline
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
.rooms .rooms
.state_accessor .state_accessor
.state_full_ids(if use_state_after { .state_full_ids(timeline_start_shortstatehash)
timeline_end_shortstatehash
} else {
timeline_start_shortstatehash
})
.unzip() .unzip()
.await; .await;
@@ -93,34 +92,82 @@ pub(super) async fn build_state_initial(
pub(super) async fn build_state_incremental<'a>( pub(super) async fn build_state_incremental<'a>(
services: &Services, services: &Services,
sender_user: &'a UserId, sender_user: &'a UserId,
room_id: &RoomId,
last_sync_end_count: PduCount,
last_sync_end_shortstatehash: ShortStateHash, last_sync_end_shortstatehash: ShortStateHash,
timeline_start_shortstatehash: ShortStateHash, timeline_start_shortstatehash: ShortStateHash,
timeline_end_shortstatehash: ShortStateHash, timeline_end_shortstatehash: ShortStateHash,
timeline: &TimelinePdus, timeline: &TimelinePdus,
use_state_after: bool,
lazily_loaded_members: Option<&'a MemberSet>, lazily_loaded_members: Option<&'a MemberSet>,
) -> Result<Vec<PduEvent>> { ) -> Result<Vec<PduEvent>> {
let mut state_event_ids: HashSet<OwnedEventId> = HashSet::new(); /*
NB: a limited sync is one where `timeline.limited == true`. Synapse calls this a "gappy" sync internally.
trace!( The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described
%use_state_after, by the Matrix specification. This is because the specification's description of the `state` property does not accurately
%last_sync_end_shortstatehash, reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include:
%timeline_start_shortstatehash, 1. We do not compute the delta using the naive approach of "every state event from the end of the last sync
%timeline_end_shortstatehash, up to the start of this sync's timeline". see below for details.
"computing state for incremental sync" 2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined
); elsewhere and supplied to this function in the `lazily_loaded_members` parameter.
*/
// Fetch lazy-loaded membership events if lazy-loading is enabled /*
if let Some(lazily_loaded_members) = lazily_loaded_members the `state` property of an incremental sync which isn't limited are _usually_ empty.
&& !lazily_loaded_members.is_empty() (note: the specification says that the `state` property is _always_ empty for limited syncs, which is incorrect.)
{ however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`),
trace!("including lazy membership events for members: {:?}", lazily_loaded_members); the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state
at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline
merged a split in the DAG.
services see: https://github.com/element-hq/synapse/issues/16941
*/
let timeline_is_linear = timeline.pdus.is_empty() || {
let last_pdu_of_last_sync = services
.rooms .rooms
.short .timeline
.multi_get_eventid_from_short::<'_, OwnedEventId, _>( .pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1)))
lazily_loaded_members .boxed()
.next()
.await
.transpose()
.expect("last sync should have had some PDUs")
.map(at!(1));
// make sure the prev_events of each pdu in the timeline refer only to the
// previous pdu
timeline
.pdus
.iter()
.try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| {
if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() {
if prev_event_id
.as_ref()
.is_none_or(is_equal_to!(pdu_prev_event_id))
{
return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned()));
}
}
trace!(
"pdu {:?} has split prev_events (expected {:?}): {:?}",
pdu.event_id, prev_event_id, pdu.prev_events
);
ControlFlow::Break(())
})
.is_continue()
};
if timeline_is_linear && !timeline.limited {
// if there are no splits in the DAG and the timeline isn't limited, then
// `state` will always be empty unless lazy loading is enabled.
if let Some(lazily_loaded_members) = lazily_loaded_members {
if !timeline.pdus.is_empty() {
// lazy loading is enabled, so we return the membership events which were
// requested by the caller.
let lazy_membership_events: Vec<_> = lazily_loaded_members
.iter() .iter()
.stream() .stream()
.broad_filter_map(|user_id| async move { .broad_filter_map(|user_id| async move {
@@ -131,24 +178,71 @@ pub(super) async fn build_state_incremental<'a>(
services services
.rooms .rooms
.state_accessor .state_accessor
.state_get_shortid( .state_get(
timeline_start_shortstatehash, timeline_start_shortstatehash,
&StateEventType::RoomMember, &StateEventType::RoomMember,
user_id.as_str(), user_id.as_str(),
) )
.ok() .ok()
.await .await
}), })
) .collect()
.ignore_err() .await;
.ready_for_each(|event_id| {
state_event_ids.insert(event_id); if !lazy_membership_events.is_empty() {
}) trace!(
.await; "syncing lazy membership events for members: {:?}",
lazy_membership_events
.iter()
.map(|pdu| pdu.state_key().unwrap())
.collect::<Vec<_>>()
);
}
return Ok(lazy_membership_events);
}
}
// lazy loading is disabled, `state` is empty.
return Ok(vec![]);
} }
// Fetch the state events added since the last sync. /*
services at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates
computing the incremental state (which may be empty).
NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included
in the incremental state. therefore, the incremental state may include "redundant" membership events,
which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`,
and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that
the performance penalty is acceptable.
*/
trace!(%timeline_is_linear, %timeline.limited, "computing state for incremental sync");
// fetch the shorteventids of state events in the timeline
let state_events_in_timeline: BTreeSet<ShortEventId> = services
.rooms
.short
.multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| {
if pdu.state_key().is_some() {
Some(pdu.event_id.as_ref())
} else {
None
}
}))
.collect()
.await;
trace!("{} state events in timeline", state_events_in_timeline.len());
/*
fetch the state events which were added since the last sync.
specifically we fetch the difference between the state at the last sync and the state at the _end_
of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched.
this is necessary to account for splits in the DAG, as explained above.
*/
let state_diff = services
.rooms .rooms
.short .short
.multi_get_eventid_from_short::<'_, OwnedEventId, _>( .multi_get_eventid_from_short::<'_, OwnedEventId, _>(
@@ -158,29 +252,18 @@ pub(super) async fn build_state_incremental<'a>(
.state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash)) .state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash))
.await? .await?
.stream() .stream()
.map(at!(1)), .ready_filter_map(|(_, shorteventid)| {
if state_events_in_timeline.contains(&shorteventid) {
None
} else {
Some(shorteventid)
}
}),
) )
.ignore_err() .ignore_err();
.ready_for_each(|event_id| {
state_event_ids.insert(event_id);
})
.await;
if !use_state_after { // finally, fetch the PDU contents and collect them into a vec
// If state_after isn't enabled, filter out state events which also exist let state_diff_pdus = state_diff
// in the timeline. If splits exist in the DAG, this may not be exactly the same
// thing as the state diff ending at the start of the timeline, but Synapse
// also does this and it's technically more useful behavior anyway.
// See: https://github.com/element-hq/synapse/issues/16941
for (_, pdu) in &timeline.pdus {
state_event_ids.remove(pdu.event_id());
}
}
// Finally, fetch the PDU contents and collect them into a vec
let state_diff_pdus = state_event_ids
.stream()
.broad_filter_map(|event_id| async move { .broad_filter_map(|event_id| async move {
services services
.rooms .rooms
+7 -27
View File
@@ -15,7 +15,7 @@ use conduwuit::{
BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
future::ReadyEqExt, future::ReadyEqExt,
math::{ruma_from_usize, usize_from_ruma}, math::{ruma_from_usize, usize_from_ruma},
stream::{TryIgnore, WidebandExt}, stream::WidebandExt,
}, },
warn, warn,
}; };
@@ -41,7 +41,6 @@ use ruma::{
uint, uint,
}; };
use service::account_data::AnyRawAccountDataEvent; use service::account_data::AnyRawAccountDataEvent;
use tokio::pin;
use super::share_encrypted_room; use super::share_encrypted_room;
use crate::{ use crate::{
@@ -859,31 +858,12 @@ where
continue; continue;
}; };
let since_shortstatehash = async { let since_shortstatehash = services
pin! { .rooms
let pdus_rev = services .user
.rooms .get_token_shortstatehash(room_id, globalsince)
.timeline .await
.pdus_rev(room_id, Some(PduCount::Normal(globalsince.saturating_sub(1)))) .ok();
.ignore_err();
}
let (count, pdu_at_last_sync_end) = pdus_rev.next().await?;
if matches!(count, PduCount::Backfilled(_)) {
None
} else {
Some(
services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu_at_last_sync_end.event_id)
.await
.expect("pdu should have a shortstatehash"),
)
}
}
.await;
let encrypted_room = services let encrypted_room = services
.rooms .rooms
+1
View File
@@ -381,6 +381,7 @@ async fn handle_room(
.rooms .rooms
.event_handler .event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true) .handle_incoming_pdu(origin, room_id, &event_id, value, true)
.boxed()
.await .await
.map(|_| ()); .map(|_| ());
results.push((event_id, result)); results.push((event_id, result));
+2 -2
View File
@@ -375,7 +375,7 @@ pub struct Config {
#[serde(default = "default_max_request_size")] #[serde(default = "default_max_request_size")]
pub max_request_size: usize, pub max_request_size: usize,
/// default: 192 /// default: 256
#[serde(default = "default_max_fetch_prev_events")] #[serde(default = "default_max_fetch_prev_events")]
pub max_fetch_prev_events: u16, pub max_fetch_prev_events: u16,
@@ -2549,7 +2549,7 @@ fn default_pusher_timeout() -> u64 { 60 }
fn default_pusher_idle_timeout() -> u64 { 15 } fn default_pusher_idle_timeout() -> u64 { 15 }
fn default_max_fetch_prev_events() -> u16 { 192_u16 } fn default_max_fetch_prev_events() -> u16 { 256_u16 }
fn default_max_concurrent_inbound_transactions() -> usize { 150 } fn default_max_concurrent_inbound_transactions() -> usize { 150 }
-1
View File
@@ -21,7 +21,6 @@ pub fn versions() -> Vec<String> {
"v1.12".to_owned(), "v1.12".to_owned(),
"v1.13".to_owned(), "v1.13".to_owned(),
"v1.14".to_owned(), "v1.14".to_owned(),
"v1.16".to_owned(),
] ]
} }
+10 -1
View File
@@ -187,13 +187,22 @@ pub(super) static MAPS: &[Descriptor] = &[
val_size_hint: Some(8), val_size_hint: Some(8),
..descriptor::RANDOM_SMALL ..descriptor::RANDOM_SMALL
}, },
Descriptor {
name: "roomid_mindepth",
..descriptor::RANDOM_SMALL
},
Descriptor { Descriptor {
name: "roomserverids", name: "roomserverids",
..descriptor::RANDOM_SMALL ..descriptor::RANDOM_SMALL
}, },
Descriptor { Descriptor {
name: "roomsynctoken_shortstatehash", name: "roomsynctoken_shortstatehash",
..descriptor::DROPPED file_shape: 3,
val_size_hint: Some(8),
block_size: 512,
compression_level: 3,
bottommost_level: Some(6),
..descriptor::SEQUENTIAL
}, },
Descriptor { Descriptor {
name: "roomuserdataid_accountdata", name: "roomuserdataid_accountdata",
@@ -1,233 +1,456 @@
use std::{ use std::{
collections::{BTreeMap, HashSet, VecDeque, hash_map}, collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
time::Instant, time::Instant,
}; };
use assign::assign;
use conduwuit::{ use conduwuit::{
Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json, Event, PduEvent, debug, debug_info, debug_warn, err, error,
trace, utils::continue_exponential_backoff_secs, warn, matrix::event::gen_event_id_canonical_json,
state_res::lexicographical_topological_sort,
trace,
utils::{IterStream, continue_exponential_backoff_secs, stream::BroadbandExt},
warn,
}; };
use futures::StreamExt;
use ruma::{ use ruma::{
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
api::federation::event::get_event, RoomId, ServerName, UInt,
api::federation::event::{get_event, get_missing_events},
int,
}; };
use super::get_room_version_rules; use super::get_room_version_rules;
/// Find the event and auth it. Once the event is validated (steps 1 - 8) /// Attempts to build a localised directed acyclic graph out of the given PDUs,
/// it is appended to the outliers Tree. /// returning them in a topologically sorted order.
/// ///
/// Returns pdu and if we fetched it over federation the raw json. /// This is used to attempt to process PDUs in an order that respects their
/// /// dependencies, however it is ultimately the sender's responsibility to send
/// a. Look in the main timeline (pduid_pdu tree) /// them in a processable order, so this is just a best effort attempt. It does
/// b. Look at outlier pdu tree /// not account for power levels or other tie breaks.
/// c. Ask origin server over federation pub async fn build_local_dag<S: std::hash::BuildHasher>(
/// d. TODO: Ask other servers over federation? pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject, S>,
#[implement(super::Service)] ) -> conduwuit::Result<Vec<OwnedEventId>> {
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>( debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
&self, let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
origin: &'a ServerName, HashMap::with_capacity(pdu_map.len());
events: Events, let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
create_event: &'a Pdu,
room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let back_off = |id| match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(id)
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
},
};
let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); for (event_id, value) in pdu_map {
trace!("Fetching {} outlier pdus", events.clone().count()); // We already checked that these properties are correct in parse_incoming_pdu,
// so it's safe to unwrap here.
// We also filter to remove any prev_events that are not in this pdu_map, as we
// need to have at least one event with zero out degrees for the lexico-topo
// sort below. If there are multiple events with omitted prevs, they will be
// ordered by timestamp, then event ID. At that point though, it's unlikely to
// matter.
let prev_events = value
.get("prev_events")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
.filter(|id| pdu_map.contains_key(id))
.collect();
for id in events { dag.insert(event_id.clone(), prev_events);
// a. Look in the main timeline (pduid_pdu tree) let origin_server_ts = value
// b. Look at outlier pdu tree .get("origin_server_ts")
// (get_pdu_json checks both) .and_then(CanonicalJsonValue::as_integer)
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await { .unwrap_or_default();
trace!("Found {id} in main timeline or outlier tree"); id_origin_ts.insert(event_id.clone(), origin_server_ts);
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![])); }
continue;
}
// c. Ask origin server over federation debug!(count = dag.len(), "Sorting incoming events with partial graph");
// We also handle its auth chain here so we don't get a stack overflow in lexicographical_topological_sort(&dag, &async |node_id| {
// handle_outlier_pdu. // Note: we don't bother fetching power levels because that would massively slow
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into(); // this function down. This is a best-effort attempt to order events correctly
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len()); // for processing, however ultimately that should be the sender's job.
let ts = id_origin_ts
.get(&node_id)
.copied()
.unwrap_or_else(|| int!(0))
.to_string()
.parse::<u64>()
.ok()
.and_then(UInt::new)
.unwrap_or_default();
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
})
.await
.inspect(|sorted| {
debug_assert_eq!(
sorted.len(),
pdu_map.len(),
"Sorted graph was not the same size as the input graph"
);
})
.map_err(|e| err!("failed to resolve local graph: {e}"))
}
let mut events_all = HashSet::with_capacity(todo_auth_events.len()); impl super::Service {
while let Some(next_id) = todo_auth_events.pop_front() { /// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the
if let Some((time, tries)) = self /// DAG.
.services ///
.globals /// When this function is called, the "earliest events" (current forward
.bad_event_ratelimiter /// extremities) will be collected, and the function will loop with an
.read() /// exponentially incrementing limit (up to 100 per request) until it has
.get(&*next_id) /// filled the gap, i.e. when the remote says there's no more events.
{ ///
// Exponential backoff /// This function will iterate until the remote returns no more events,
const MIN_DURATION: u64 = 60 * 2; /// increasing the limit by a factor of 10. If 100 iterations are reached or
const MAX_DURATION: u64 = 60 * 60 * 8; /// max_fetch_prev_events events are backfilled, the function will give up
if continue_exponential_backoff_secs( /// and return what it has, to avoid pulling in too much data (for example,
MIN_DURATION, /// absurdly large gaps).
MAX_DURATION, ///
time.elapsed(), /// This function does not persist the events. The caller is responsible for
*tries, /// passing them through handle_incoming_pdu.
) { ///
debug_warn!( /// ## Parameters
tried = ?*tries, ///
elapsed = ?time.elapsed(), /// - `room_id`: The room's ID.
"Backing off from {next_id}", /// - `head`: The event we are potentially missing prev_events for.
); /// - `tail`: The most recently known events in the graph (typically forward
continue; /// extremities).
} /// - `via`: The server to ask for missing events.
} /// - `min_depth`: Don't process events with a `depth` lower than this
/// value. Not massively useful, but can help short-circuit infinite loops
/// and weird edge paths.
pub async fn get_missing_events(
&self,
room_id: &RoomId,
head: &PduEvent,
tail: Vec<OwnedEventId>,
via: &ServerName,
min_depth: UInt,
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
#[cfg(debug_assertions)]
{
let missing_count = head
.prev_events()
.stream()
.broad_filter_map(|event_id| async move {
match self
.services
.timeline
.get_non_outlier_pdu_json(event_id)
.await
.inspect(|_| debug!("Found prev_event {event_id} locally."))
.inspect_err(
|e| debug!(%e, "Could not find prev_event {event_id} locally."),
) {
| Ok(_) => None,
| Err(_) => Some(event_id),
}
})
.count()
.await;
debug_assert_ne!(
missing_count, 0,
"event passed to get_missing_events is not missing any events (wasteful call)"
);
};
if events_all.contains(&next_id) { let mut discovered = HashMap::with_capacity(20);
continue; let mut latest_events = vec![head.event_id().to_owned()];
} let mut iterations = 0_u8;
loop {
if self.services.timeline.pdu_exists(&next_id).await { iterations = iterations.saturating_add(1);
trace!("Found {next_id} in db"); let limit = iterations.saturating_mul(10).min(100);
continue; debug_info!(%limit, %via, %iterations, discovered=discovered.len(), %min_depth, "Attempting to gap fill missing events");
} let response: get_missing_events::v1::Response = self
debug!("Fetching {next_id} over federation from {origin}.");
match self
.services .services
.sending .sending
.send_federation_request( .send_federation_request(
origin, via,
get_event::v1::Request::new((*next_id).to_owned()), assign!(
get_missing_events::v1::Request::new(
room_id.to_owned(),
tail.clone(),
latest_events.clone()
),
{limit: limit.into(), min_depth}
),
) )
.await .await?;
{
| Ok(res) => {
debug!("Got {next_id} over federation from {origin}");
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
back_off((*next_id).to_owned());
continue;
};
let Ok((calculated_event_id, value)) = if response.events.is_empty() {
gen_event_id_canonical_json(&res.pdu, &room_version_rules) debug_info!(%via, "Finished gap filling missing events (remote returned no more events).");
else { break;
back_off((*next_id).to_owned()); }
continue; debug_info!("Got {} events back from remote", response.events.len());
};
if calculated_event_id != *next_id { latest_events.clear();
warn!( for raw_event in response.events {
"Server didn't return event id we requested: requested: {next_id}, \ let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?;
we got {calculated_event_id}. Event: {:?}", let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| {
&res.pdu err!(Request(BadJson("Failed to parse backfilled event {event_id}: {e}")))
); })?;
if pdu.depth < min_depth {
debug_warn!(
"Received PDU with depth {} below min_depth {}, ignoring",
pdu.depth,
min_depth
);
continue;
}
for prev_event_id in pdu.prev_events() {
if discovered.contains_key(prev_event_id) {
continue;
} }
if self
if let Some(auth_events) = value .services
.get("auth_events") .timeline
.and_then(CanonicalJsonValue::as_array) .non_outlier_pdu_exists(prev_event_id)
.await
{ {
for auth_event in auth_events { continue;
match serde_json::from_value::<OwnedEventId>(
auth_event.clone().into(),
) {
| Ok(auth_event) => {
trace!(
"Found auth event id {auth_event} for event {next_id}"
);
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
} }
latest_events.push(prev_event_id.to_owned());
break;
}
events_in_reverse_order.push((next_id.clone(), value)); discovered.insert(event_id.clone(), pdu);
events_all.insert(next_id); }
},
| Err(e) => { if latest_events.is_empty() {
warn!("Failed to fetch auth event {next_id} from {origin}: {e}"); break;
back_off((*next_id).to_owned()); } else if discovered.len() > self.services.server.config.max_fetch_prev_events.into()
}, || iterations >= 20
{
error!(
filled=discovered.len(),
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
%iterations,
"Gap too large, giving up"
);
break;
} }
} }
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order)); Ok(discovered)
} }
let mut pdus = Vec::with_capacity(events_with_auth_events.len()); /// Find the event and auth it. Once the event is validated (steps 1 - 8)
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events { /// it is appended to the outliers Tree.
// a. Look in the main timeline (pduid_pdu tree) ///
// b. Look at outlier pdu tree /// Returns pdu and if we fetched it over federation the raw json.
// (get_pdu_json checks both) ///
if let Some(local_pdu) = local_pdu { /// a. Look in the main timeline (pduid_pdu tree)
trace!("Found {id} in main timeline or outlier tree"); /// b. Look at outlier pdu tree
pdus.push((local_pdu.clone(), None)); /// c. Ask origin server over federation
} /// d. TODO: Ask other servers over federation?
#[deprecated]
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
&self,
origin: &'a ServerName,
events: Events,
create_event: &'a Pdu,
room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let back_off = |id| match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(id)
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
},
};
for (next_id, value) in events_in_reverse_order.into_iter().rev() { let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
if let Some((time, tries)) = self trace!("Fetching {} outlier pdus", events.clone().count());
.services
.globals for id in events {
.bad_event_ratelimiter // a. Look in the main timeline (pduid_pdu tree)
.read() // b. Look at outlier pdu tree
.get(&*next_id) // (get_pdu_json checks both)
{ if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
// Exponential backoff trace!("Found {id} in main timeline or outlier tree");
const MIN_DURATION: u64 = 5 * 60; events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
const MAX_DURATION: u64 = 60 * 60 * 24; continue;
if continue_exponential_backoff_secs( }
MIN_DURATION,
MAX_DURATION, // c. Ask origin server over federation
time.elapsed(), // We also handle its auth chain here so we don't get a stack overflow in
*tries, // handle_outlier_pdu.
) { let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
debug!("Backing off from {next_id}"); let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug_warn!(
tried = ?*tries,
elapsed = ?time.elapsed(),
"Backing off from {next_id}",
);
continue;
}
}
if events_all.contains(&next_id) {
continue; continue;
} }
if self.services.timeline.pdu_exists(&next_id).await {
trace!("Found {next_id} in db");
continue;
}
debug!("Fetching {next_id} over federation from {origin}.");
match self
.services
.sending
.send_federation_request(
origin,
get_event::v1::Request::new((*next_id).to_owned()),
)
.await
{
| Ok(res) => {
debug!("Got {next_id} over federation from {origin}");
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
back_off((*next_id).to_owned());
continue;
};
let Ok((calculated_event_id, value)) =
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
else {
back_off((*next_id).to_owned());
continue;
};
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: requested: \
{next_id}, we got {calculated_event_id}. Event: {:?}",
&res.pdu
);
}
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(
auth_event.clone().into(),
) {
| Ok(auth_event) => {
trace!(
"Found auth event id {auth_event} for event \
{next_id}"
);
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
| Err(e) => {
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
back_off((*next_id).to_owned());
},
}
} }
trace!("Handling outlier {next_id}"); events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
match Box::pin(self.handle_outlier_pdu( }
origin,
create_event, let mut pdus = Vec::with_capacity(events_with_auth_events.len());
&next_id, for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
room_id, // a. Look in the main timeline (pduid_pdu tree)
value.clone(), // b. Look at outlier pdu tree
true, // (get_pdu_json checks both)
)) if let Some(local_pdu) = local_pdu {
.await trace!("Found {id} in main timeline or outlier tree");
{ pdus.push((local_pdu.clone(), None));
| Ok((pdu, json)) => }
if next_id == *id {
trace!("Handled outlier {next_id} (original request)"); for (next_id, value) in events_in_reverse_order.into_iter().rev() {
pdus.push((pdu, Some(json))); if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug!("Backing off from {next_id}");
continue;
}
}
trace!("Handling outlier {next_id}");
match Box::pin(self.handle_outlier_pdu(
origin,
create_event,
&next_id,
room_id,
value.clone(),
true,
))
.await
{
| Ok((pdu, json)) =>
if next_id == *id {
trace!("Handled outlier {next_id} (original request)");
pdus.push((pdu, Some(json)));
},
| Err(e) => {
warn!("Authentication of event {next_id} failed: {e:?}");
back_off(next_id);
}, },
| Err(e) => { }
warn!("Authentication of event {next_id} failed: {e:?}");
back_off(next_id);
},
} }
} }
trace!("Fetched and handled {} outlier pdus", pdus.len());
pdus
} }
trace!("Fetched and handled {} outlier pdus", pdus.len());
pdus
} }
+88 -120
View File
@@ -1,128 +1,96 @@
use std::{ use std::collections::HashMap;
collections::{BTreeMap, HashMap, HashSet, VecDeque},
iter::once,
};
use conduwuit::{ use conduwuit::{
Event, PduEvent, Result, debug_warn, err, implement, Event, PduEvent, debug, debug_info,
state_res::{self}, utils::{BoolExt, IterStream, stream::BroadbandExt},
}; warn,
use futures::{FutureExt, future};
use ruma::{
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
int, uint,
}; };
use futures::StreamExt;
use ruma::{RoomId, ServerName};
use super::check_room_id; use crate::rooms::event_handler::build_local_dag;
#[implement(super::Service)] impl super::Service {
#[tracing::instrument( /// Fetches any missing prev_events for this event and persists them before
level = "debug", /// returning.
skip_all, pub(super) async fn fetch_prevs(
fields(%origin), &self,
)] room_id: &RoomId,
#[allow(clippy::type_complexity)] create_event: &PduEvent,
pub(super) async fn fetch_prev<'a, Pdu, Events>( incoming_pdu: &PduEvent,
&self, origin: &ServerName,
origin: &ServerName, ) -> conduwuit::Result<()> {
create_event: &Pdu, let missing = incoming_pdu
room_id: &RoomId, .prev_events()
first_ts_in_room: MilliSecondsSinceUnixEpoch, .stream()
initial_set: Events, .broad_filter_map(|event_id| async move {
) -> Result<( self.services
Vec<OwnedEventId>, .timeline
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>, .get_non_outlier_pdu_json(event_id)
)> .await
where .is_ok()
Pdu: Event + Send + Sync, .or(|| event_id.to_owned())
Events: Iterator<Item = &'a EventId> + Clone + Send, })
{ .collect::<Vec<_>>()
let num_ids = initial_set.clone().count(); .await;
let mut eventid_info = HashMap::new(); if missing.is_empty() {
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids); debug!(event_id=%incoming_pdu.event_id(), "No missing prev events.");
let mut todo_outlier_stack: VecDeque<OwnedEventId> = return Ok(());
initial_set.map(ToOwned::to_owned).collect();
let mut amount = 0;
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
self.services.server.check_running()?;
match self
.fetch_and_handle_outliers(
origin,
once(prev_event_id.as_ref()),
create_event,
room_id,
)
.boxed()
.await
.pop()
{
| Some((pdu, mut json_opt)) => {
check_room_id(room_id, &pdu)?;
let limit = self.services.server.config.max_fetch_prev_events;
if amount > limit {
debug_warn!("Max prev event limit reached! Limit: {limit}");
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
}
if json_opt.is_none() {
json_opt = self
.services
.outlier
.get_outlier_pdu_json(&prev_event_id)
.await
.ok();
}
if let Some(json) = json_opt {
if pdu.origin_server_ts() > first_ts_in_room {
amount = amount.saturating_add(1);
for prev_prev in pdu.prev_events() {
if !graph.contains_key(prev_prev) {
todo_outlier_stack.push_back(prev_prev.to_owned());
}
}
graph.insert(
prev_event_id.clone(),
pdu.prev_events().map(ToOwned::to_owned).collect(),
);
} else {
// Time based check failed
graph.insert(prev_event_id.clone(), HashSet::new());
}
eventid_info.insert(prev_event_id.clone(), (pdu, json));
} else {
// Get json failed, so this was not fetched over federation
graph.insert(prev_event_id.clone(), HashSet::new());
}
},
| _ => {
// Fetch and handle failed
graph.insert(prev_event_id.clone(), HashSet::new());
},
} }
debug!(%room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events");
let tail = self
.services
.state
.get_forward_extremities(room_id)
.collect::<Vec<_>>()
.await;
let backfilled = self
.get_missing_events(
room_id,
incoming_pdu,
tail,
origin,
self.services.metadata.get_mindepth(room_id).await,
)
.await?;
debug_info!("Fetched {} missing events", backfilled.len());
// Persist all fetched events
let mapped = backfilled
.iter()
.map(|(eid, evt)| {
let mut obj = evt.to_canonical_object();
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
(eid.clone(), obj)
})
.collect::<HashMap<_, _>>();
let to_persist = if mapped.len() <= 1 {
mapped.keys().map(ToOwned::to_owned).collect()
} else {
build_local_dag(&mapped).await?
};
for event_id in to_persist {
debug_info!("Persisting fetched prev event {event_id}");
let obj = mapped.get(&event_id).cloned().unwrap();
match self
.handle_outlier_pdu(origin, create_event, &event_id, room_id, obj, false)
.await
{
| Ok((pdu, val)) =>
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
.await,
| Err(e) => {
warn!("Failed to persist prev_event {event_id}: {e}");
continue;
},
}?;
}
// NOTE because i keep forgetting: the caller persists incoming_pdu.
// we only care about its prev events
Ok(())
} }
let event_fetch = |event_id| {
let origin_server_ts = eventid_info
.get(&event_id)
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
// This return value is the key used for sorting events,
// events are then sorted by power level, time,
// and lexically by event_id.
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
};
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
.await
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
Ok((sorted, eventid_info))
} }
@@ -1,7 +1,6 @@
use std::collections::{HashMap, hash_map}; use std::collections::{HashMap, hash_map};
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement}; use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
use futures::FutureExt;
use ruma::{ use ruma::{
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids, EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
events::StateEventType, events::StateEventType,
@@ -42,7 +41,6 @@ where
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
let state_vec = self let state_vec = self
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id) .fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
.boxed()
.await; .await;
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len()); let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
@@ -1,14 +1,11 @@
use std::{ use std::{collections::BTreeMap, time::Instant};
collections::{BTreeMap, hash_map},
time::Instant,
};
use conduwuit::{ use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err, Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
implement, info, trace, utils::stream::IterStream, warn, implement, info, trace, warn,
}; };
use futures::{ use futures::{
FutureExt, TryFutureExt, TryStreamExt, FutureExt,
future::{OptionFuture, try_join4}, future::{OptionFuture, try_join4},
}; };
use ruma::{ use ruma::{
@@ -236,63 +233,21 @@ pub async fn handle_incoming_pdu<'a>(
} }
// Skip old events // Skip old events
let first_ts_in_room = self // let first_ts_in_room = self
.services // .services
.timeline // .timeline
.first_pdu_in_room(room_id) // .first_pdu_in_room(room_id)
.await? // .await?
.origin_server_ts(); // .origin_server_ts();
// 9. Fetch any missing prev events doing all checks listed here starting at 1. // 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events // These are timeline events
let (sorted_prev_events, mut eventid_info) = self debug!("Handling previous events");
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
.await?;
debug!( self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
events = ?sorted_prev_events,
"Handling previous events"
);
sorted_prev_events
.iter()
.try_stream()
.map_ok(AsRef::as_ref)
.try_for_each(|prev_id| {
self.handle_prev_pdu(
origin,
event_id,
room_id,
eventid_info.remove(prev_id),
create_event,
first_ts_in_room,
prev_id,
)
.inspect_err(move |e| {
warn!("Prev {prev_id} failed: {e}");
match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(prev_id.into())
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
let tries = e.get().1.saturating_add(1);
*e.get_mut() = (Instant::now(), tries);
},
}
})
.map(|_| self.services.server.check_running())
})
.boxed()
.await?; .await?;
// Done with prev events, now handling the incoming event // Done with prev events, now handling the incoming event
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id) self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
.boxed()
.await .await
} }
@@ -1,13 +1,13 @@
use std::collections::{BTreeMap, HashMap, hash_map}; use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{ use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, info, state_res,
trace, warn, trace, warn,
}; };
use futures::future::ready; use futures::future::ready;
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
events::StateEventType, api::federation::authorization::get_event_authorization, events::StateEventType,
}; };
use super::{check_room_id, get_room_version_rules}; use super::{check_room_id, get_room_version_rules};
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
event_id: &'a EventId, event_id: &'a EventId,
room_id: &'a RoomId, room_id: &'a RoomId,
mut value: CanonicalJsonObject, mut value: CanonicalJsonObject,
auth_events_known: bool, _auth_events_known: bool,
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)> ) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
where where
Pdu: Event + Send + Sync, Pdu: Event + Send + Sync,
@@ -107,45 +107,52 @@ where
} }
// Fetch any missing ones & reject invalid ones // Fetch any missing ones & reject invalid ones
let missing_auth_events = if auth_events_known { if auth_events.len() != pdu_event.auth_events().count() {
pdu_event info!("Missing some auth events, asking remote for auth chain");
.auth_events() let response: get_event_authorization::v1::Response = self
.filter(|id| !auth_events.contains_key(*id)) .services
.collect::<Vec<_>>() .sending
} else { .send_federation_request(
pdu_event.auth_events().collect::<Vec<_>>()
};
if !missing_auth_events.is_empty() || !auth_events_known {
debug_info!(
"Fetching {} missing auth events for outlier event {event_id}",
missing_auth_events.len()
);
for (pdu, _) in self
.fetch_and_handle_outliers(
origin, origin,
missing_auth_events.iter().copied(), get_event_authorization::v1::Request::new(
create_event, room_id.to_owned(),
room_id, event_id.to_owned(),
),
) )
.await .await
{ .map_err(|e| {
auth_events.insert(pdu.event_id().to_owned(), pdu); err!(Request(Forbidden(
"Remote server is not divulging incoming event's auth chain: {e}"
)))
})?;
let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len());
for auth_pdu_json in response.auth_chain {
let (auth_event_room_id, auth_event_id, auth_pdu_json) =
self.parse_incoming_pdu(&auth_pdu_json).await?;
if auth_event_room_id != room_id {
return Err!(Request(BadJson(
"Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}."
)));
}
let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json)
.map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?;
auth_chain_map.insert(auth_event_id, auth_pdu);
} }
} else { for aid in pdu_event.auth_events() {
debug!("No missing auth events for outlier event {event_id}"); if auth_events.contains_key(aid) {
} continue;
// reject if we are still missing some }
let still_missing = pdu_event if let Some(auth_event) = auth_chain_map.get(aid) {
.auth_events() auth_events.insert(aid.to_owned(), auth_event.clone());
.filter(|id| !auth_events.contains_key(*id)) } else {
.collect::<Vec<_>>(); return Err!(Request(Forbidden(
if !still_missing.is_empty() { "Remote server is not divulging incoming event's auth events (missing: \
// Don't reject: this could be a temporary condition {aid})"
// TODO: use get_missing_events? )));
return Err!(Request(InvalidParam( }
"Could not fetch all auth events for outlier event {event_id}, still missing: \ }
{still_missing:?}" // TODO: do events received from auth chain need persisting? that sounds
))); // awfully slow
} }
// 6. Reject "due to auth events" if the event doesn't pass auth based on the // 6. Reject "due to auth events" if the event doesn't pass auth based on the
@@ -1,89 +0,0 @@
use std::{collections::BTreeMap, time::Instant};
use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
utils::continue_exponential_backoff_secs,
};
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use tracing::debug;
#[implement(super::Service)]
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "prev",
level = INFO_SPAN_LEVEL,
skip_all,
fields(%prev_id),
)]
pub(super) async fn handle_prev_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
create_event: &'a Pdu,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &'a EventId,
) -> Result
where
Pdu: Event + Send + Sync,
{
// Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(debug_warn!(
"Federaton of room {room_id} is currently disabled on this server. Request by \
origin {origin} and event ID {event_id}"
))));
}
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(prev_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
debug!(
?tries,
duration = ?time.elapsed(),
"Backing off from prev_event"
);
return Ok(());
}
}
let Some((pdu, json)) = eventid_info else {
return Ok(());
};
// Skip old events
if pdu.origin_server_ts() < first_ts_in_room {
return Ok(());
}
let start_time = Instant::now();
self.federation_handletime
.write()
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
defer! {{
self.federation_handletime
.write()
.remove(room_id);
}};
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
.await?;
debug!(
elapsed = ?start_time.elapsed(),
"Handled prev_event",
);
Ok(())
}
+1 -2
View File
@@ -4,7 +4,6 @@ mod fetch_prev;
mod fetch_state; mod fetch_state;
mod handle_incoming_pdu; mod handle_incoming_pdu;
mod handle_outlier_pdu; mod handle_outlier_pdu;
mod handle_prev_pdu;
mod parse_incoming_pdu; mod parse_incoming_pdu;
mod policy_server; mod policy_server;
mod resolve_state; mod resolve_state;
@@ -15,6 +14,7 @@ use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant};
use async_trait::async_trait; use async_trait::async_trait;
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap}; use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
pub use fetch_and_handle_outliers::build_local_dag;
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent, OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
room_version_rules::RoomVersionRules, room_version_rules::RoomVersionRules,
@@ -22,7 +22,6 @@ use ruma::{
use tokio::sync::Notify; use tokio::sync::Notify;
use crate::{Dep, globals, rooms, sending, server_keys}; use crate::{Dep, globals, rooms, sending, server_keys};
pub struct Service { pub struct Service {
pub mutex_federation: RoomMutexMap, pub mutex_federation: RoomMutexMap,
pub federation_handletime: SyncRwLock<HandleTimeMap>, pub federation_handletime: SyncRwLock<HandleTimeMap>,
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
/// Parses every entry in an array as an event ID, returning an error if any /// Parses every entry in an array as an event ID, returning an error if any
/// step fails. /// step fails.
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> { pub(super) fn expect_event_id_array(
value: &CanonicalJsonObject,
field: &str,
) -> Result<Vec<OwnedEventId>> {
value value
.get(field) .get(field)
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))? .ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
@@ -5,7 +5,7 @@ use std::{
}; };
use conduwuit::{ use conduwuit::{
Result, debug, err, error, implement, Result, debug, debug_error, err, error, implement,
matrix::{Event, StateMap}, matrix::{Event, StateMap},
trace, trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
@@ -37,6 +37,7 @@ where
.pdu_shortstatehash(prev_event) .pdu_shortstatehash(prev_event)
.await .await
else { else {
trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state.");
return Ok(None); return Ok(None);
}; };
@@ -99,6 +100,7 @@ where
.map_ok(move |sstatehash| (sstatehash, prev_event)) .map_ok(move |sstatehash| (sstatehash, prev_event))
}) })
.try_collect::<HashMap<_, _>>() .try_collect::<HashMap<_, _>>()
.inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}"))
.await .await
else { else {
return Ok(None); return Ok(None);
@@ -41,6 +41,7 @@ where
.get_pdu_id(incoming_pdu.event_id()) .get_pdu_id(incoming_pdu.event_id())
.await .await
{ {
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
return Ok(Some(pduid)); return Ok(Some(pduid));
} }
@@ -63,6 +64,7 @@ where
"Upgrading PDU from outlier to timeline" "Upgrading PDU from outlier to timeline"
); );
let timer = Instant::now(); let timer = Instant::now();
let min_depth = self.services.metadata.get_mindepth(room_id).await;
let room_version_rules = get_room_version_rules(create_event)?; let room_version_rules = get_room_version_rules(create_event)?;
// 10. Fetch missing state and auth chain events by calling /state_ids at // 10. Fetch missing state and auth chain events by calling /state_ids at
@@ -81,6 +83,7 @@ where
}; };
if state_at_incoming_event.is_none() { if state_at_incoming_event.is_none() {
trace!("Could not calculate incoming state, asking remote {origin} for it");
state_at_incoming_event = self state_at_incoming_event = self
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id()) .fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.await?; .await?;
@@ -382,6 +385,11 @@ where
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
drop(state_lock); drop(state_lock);
if incoming_pdu.depth > min_depth {
self.services
.metadata
.set_mindepth(room_id, incoming_pdu.depth.into());
}
Ok(pdu_id) Ok(pdu_id)
} }
+4
View File
@@ -626,6 +626,10 @@ impl Service {
room_id, room_id,
) )
.await?; .await?;
self.services
.metadata
.maybe_set_mindepth(room_id, parsed_join_pdu.depth.into())
.await;
info!("Setting final room state for new room"); info!("Setting final room state for new room");
// We set the room state after inserting the pdu, so that we never have a moment // We set the room state after inserting the pdu, so that we never have a moment
+28 -2
View File
@@ -1,9 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use conduwuit::{Result, implement, utils::stream::TryIgnore}; use conduwuit::{Result, implement, utils::stream::TryIgnore};
use database::Map; use database::{Deserialized, Map};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use ruma::{OwnedRoomId, RoomId}; use ruma::{OwnedRoomId, RoomId, UInt, uint};
use crate::{Dep, rooms}; use crate::{Dep, rooms};
@@ -17,6 +17,7 @@ struct Data {
bannedroomids: Arc<Map>, bannedroomids: Arc<Map>,
roomid_shortroomid: Arc<Map>, roomid_shortroomid: Arc<Map>,
pduid_pdu: Arc<Map>, pduid_pdu: Arc<Map>,
roomid_mindepth: Arc<Map>,
} }
struct Services { struct Services {
@@ -31,6 +32,7 @@ impl crate::Service for Service {
bannedroomids: args.db["bannedroomids"].clone(), bannedroomids: args.db["bannedroomids"].clone(),
roomid_shortroomid: args.db["roomid_shortroomid"].clone(), roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
pduid_pdu: args.db["pduid_pdu"].clone(), pduid_pdu: args.db["pduid_pdu"].clone(),
roomid_mindepth: args.db["roomid_mindepth"].clone(),
}, },
services: Services { services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"), short: args.depend::<rooms::short::Service>("rooms::short"),
@@ -98,3 +100,27 @@ pub async fn is_disabled(&self, room_id: &RoomId) -> bool {
pub async fn is_banned(&self, room_id: &RoomId) -> bool { pub async fn is_banned(&self, room_id: &RoomId) -> bool {
self.db.bannedroomids.get(room_id).await.is_ok() self.db.bannedroomids.get(room_id).await.is_ok()
} }
#[implement(Service)]
pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt {
self.db
.roomid_mindepth
.get(room_id)
.await
.deserialized::<UInt>()
.unwrap_or_else(|_| uint!(0))
}
#[implement(Service)]
pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
self.db
.roomid_mindepth
.put_raw(room_id.as_bytes(), min_depth.to_be_bytes());
}
#[implement(Service)]
pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
if min_depth > self.get_mindepth(room_id).await.into() {
self.set_mindepth(room_id, min_depth);
}
}
+4
View File
@@ -173,6 +173,10 @@ impl Service {
self.db.get_non_outlier_pdu_json(event_id).await self.db.get_non_outlier_pdu_json(event_id).await
} }
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool {
self.db.non_outlier_pdu_exists(event_id).await.is_ok()
}
/// Returns the pdu's id. /// Returns the pdu's id.
#[inline] #[inline]
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> { pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
+46 -2
View File
@@ -1,10 +1,10 @@
use std::sync::Arc; use std::sync::Arc;
use conduwuit::{Result, implement}; use conduwuit::{Result, implement};
use database::{Deserialized, Map}; use database::{Database, Deserialized, Map};
use ruma::{RoomId, UserId}; use ruma::{RoomId, UserId};
use crate::{Dep, globals}; use crate::{Dep, globals, rooms, rooms::short::ShortStateHash};
pub struct Service { pub struct Service {
db: Data, db: Data,
@@ -12,25 +12,32 @@ pub struct Service {
} }
struct Data { struct Data {
db: Arc<Database>,
userroomid_notificationcount: Arc<Map>, userroomid_notificationcount: Arc<Map>,
userroomid_highlightcount: Arc<Map>, userroomid_highlightcount: Arc<Map>,
roomuserid_lastnotificationread: Arc<Map>, roomuserid_lastnotificationread: Arc<Map>,
roomsynctoken_shortstatehash: Arc<Map>,
} }
struct Services { struct Services {
globals: Dep<globals::Service>, globals: Dep<globals::Service>,
short: Dep<rooms::short::Service>,
} }
impl crate::Service for Service { impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> { fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self { Ok(Arc::new(Self {
db: Data { db: Data {
db: args.db.clone(),
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(), roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(),
roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(),
}, },
services: Services { services: Services {
globals: args.depend::<globals::Service>("globals"), globals: args.depend::<globals::Service>("globals"),
short: args.depend::<rooms::short::Service>("rooms::short"),
}, },
})) }))
} }
@@ -83,3 +90,40 @@ pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -
.deserialized() .deserialized()
.unwrap_or(0) .unwrap_or(0)
} }
#[implement(Service)]
pub async fn associate_token_shortstatehash(
&self,
room_id: &RoomId,
token: u64,
shortstatehash: ShortStateHash,
) {
let shortroomid = self
.services
.short
.get_shortroomid(room_id)
.await
.expect("room exists");
let _cork = self.db.db.cork();
let key: &[u64] = &[shortroomid, token];
self.db
.roomsynctoken_shortstatehash
.put(key, shortstatehash);
}
#[implement(Service)]
pub async fn get_token_shortstatehash(
&self,
room_id: &RoomId,
token: u64,
) -> Result<ShortStateHash> {
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
let key: &[u64] = &[shortroomid, token];
self.db
.roomsynctoken_shortstatehash
.qry(key)
.await
.deserialized()
}
+3 -2
View File
@@ -34,8 +34,9 @@ where
batch batch
}); });
if server_keys.is_empty() {
debug_assert!(!server_keys.is_empty(), "empty batch request to notary"); return Ok(vec![]);
}
let mut results = Vec::new(); let mut results = Vec::new();
while let Some(batch) = server_keys while let Some(batch) = server_keys