mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 624bd3796a | |||
| 7f21f0d6ab | |||
| b6158c73db | |||
| bf4c716c7f | |||
| 4facaa4440 | |||
| 28aed31874 | |||
| af8e28559e | |||
| 6a2480774d |
@@ -0,0 +1 @@
|
|||||||
|
Added support for Matrix 1.16's `state_after` feature, allowing clients which understand it to sync room state changes more reliably. Contributed by @ginger.
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
Adjusted legacy sync logic to no longer use the `roomsynctoken_shortstatehash` database column. Once this change has been confirmed to be stable and reliable, a future update will remove it entirely, significantly decreasing database sizes. Contributed by @ginger.
|
||||||
@@ -48,6 +48,13 @@ async fn load_timeline(
|
|||||||
ending_count: Option<PduCount>,
|
ending_count: Option<PduCount>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<TimelinePdus> {
|
) -> Result<TimelinePdus> {
|
||||||
|
if let (Some(starting_count), Some(ending_count)) = (starting_count, ending_count) {
|
||||||
|
debug_assert!(
|
||||||
|
starting_count <= ending_count,
|
||||||
|
"starting count {starting_count} > ending count {ending_count}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
let mut pdu_stream = match starting_count {
|
let mut pdu_stream = match starting_count {
|
||||||
| Some(starting_count) => {
|
| Some(starting_count) => {
|
||||||
let last_timeline_count = services
|
let last_timeline_count = services
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ use ruma::{
|
|||||||
uint,
|
uint,
|
||||||
};
|
};
|
||||||
use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash};
|
use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash};
|
||||||
|
use tokio::pin;
|
||||||
|
|
||||||
use super::{load_timeline, share_encrypted_room};
|
use super::{load_timeline, share_encrypted_room};
|
||||||
use crate::client::{
|
use crate::client::{
|
||||||
@@ -96,12 +97,19 @@ pub(super) async fn load_joined_room(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let state_events =
|
||||||
|
StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect());
|
||||||
|
|
||||||
let joined_room = assign!(JoinedRoom::new(), {
|
let joined_room = assign!(JoinedRoom::new(), {
|
||||||
account_data,
|
account_data,
|
||||||
summary: summary.unwrap_or_default(),
|
summary: summary.unwrap_or_default(),
|
||||||
unread_notifications: notification_counts.unwrap_or_default(),
|
unread_notifications: notification_counts.unwrap_or_default(),
|
||||||
timeline,
|
timeline,
|
||||||
state: RoomState::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())),
|
state: if sync_context.use_state_after {
|
||||||
|
RoomState::After(state_events)
|
||||||
|
} else {
|
||||||
|
RoomState::Before(state_events)
|
||||||
|
},
|
||||||
ephemeral,
|
ephemeral,
|
||||||
unread_thread_notifications: BTreeMap::new(),
|
unread_thread_notifications: BTreeMap::new(),
|
||||||
});
|
});
|
||||||
@@ -344,7 +352,7 @@ struct ShortStateHashes {
|
|||||||
#[tracing::instrument(level = "debug", skip_all)]
|
#[tracing::instrument(level = "debug", skip_all)]
|
||||||
async fn fetch_shortstatehashes(
|
async fn fetch_shortstatehashes(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>,
|
SyncContext { last_sync_end_count, .. }: SyncContext<'_>,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
) -> Result<ShortStateHashes> {
|
) -> Result<ShortStateHashes> {
|
||||||
// the room state currently.
|
// the room state currently.
|
||||||
@@ -354,46 +362,41 @@ async fn fetch_shortstatehashes(
|
|||||||
.rooms
|
.rooms
|
||||||
.state
|
.state
|
||||||
.get_room_shortstatehash(room_id)
|
.get_room_shortstatehash(room_id)
|
||||||
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
|
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
|
||||||
|
.await?;
|
||||||
|
|
||||||
// the room state as of the end of the last sync.
|
// The room state as of the end of the last sync.
|
||||||
// this will be None if we are doing an initial sync or if we just joined this
|
// This will be None if we are doing an initial sync.
|
||||||
// room.
|
|
||||||
let last_sync_end_shortstatehash =
|
let last_sync_end_shortstatehash =
|
||||||
OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| {
|
OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| {
|
||||||
// look up the shortstatehash saved by the last sync's call to
|
pin! {
|
||||||
// `associate_token_shortstatehash`
|
let pdus = services
|
||||||
services
|
.rooms
|
||||||
.rooms
|
.timeline
|
||||||
.user
|
.pdus(room_id, Some(PduCount::Normal(last_sync_end_count)))
|
||||||
.get_token_shortstatehash(room_id, last_sync_end_count)
|
.ignore_err();
|
||||||
.inspect_err(move |_| {
|
}
|
||||||
debug_warn!(
|
|
||||||
token = last_sync_end_count,
|
match pdus.next().await {
|
||||||
"Room has no shortstatehash for this token"
|
| Some((_, pdu_after_last_sync_end)) => {
|
||||||
);
|
trace!(?pdu_after_last_sync_end.event_id, "pdu at last sync end");
|
||||||
})
|
|
||||||
.ok()
|
services
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.pdu_shortstatehash(&pdu_after_last_sync_end.event_id)
|
||||||
|
.await
|
||||||
|
.map_err(|err| err!("Last sync end PDU has no shortstatehash: {err}"))
|
||||||
|
},
|
||||||
|
| None => {
|
||||||
|
// No events have been sent since the last sync, or we just joined this room,
|
||||||
|
// so the state then is the same as the state now
|
||||||
|
Ok(current_shortstatehash)
|
||||||
|
},
|
||||||
|
}
|
||||||
}))
|
}))
|
||||||
.map(Option::flatten)
|
.await
|
||||||
.map(Ok);
|
.transpose()?;
|
||||||
|
|
||||||
let (current_shortstatehash, last_sync_end_shortstatehash) =
|
|
||||||
try_join(current_shortstatehash, last_sync_end_shortstatehash).await?;
|
|
||||||
|
|
||||||
/*
|
|
||||||
associate the `current_count` with the `current_shortstatehash`, so we can
|
|
||||||
use it on the next sync as the `last_sync_end_shortstatehash`.
|
|
||||||
|
|
||||||
TODO: the table written to by this call grows extremely fast, gaining one new entry for each
|
|
||||||
joined room on _every single sync request_. we need to find a better way to remember the shortstatehash
|
|
||||||
between syncs.
|
|
||||||
*/
|
|
||||||
services
|
|
||||||
.rooms
|
|
||||||
.user
|
|
||||||
.associate_token_shortstatehash(room_id, current_count, current_shortstatehash)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
Ok(ShortStateHashes {
|
Ok(ShortStateHashes {
|
||||||
current_shortstatehash,
|
current_shortstatehash,
|
||||||
@@ -452,6 +455,7 @@ async fn build_state_events(
|
|||||||
syncing_user,
|
syncing_user,
|
||||||
last_sync_end_count,
|
last_sync_end_count,
|
||||||
full_state,
|
full_state,
|
||||||
|
use_state_after,
|
||||||
..
|
..
|
||||||
} = sync_context;
|
} = sync_context;
|
||||||
|
|
||||||
@@ -460,32 +464,28 @@ async fn build_state_events(
|
|||||||
last_sync_end_shortstatehash,
|
last_sync_end_shortstatehash,
|
||||||
} = shortstatehashes;
|
} = shortstatehashes;
|
||||||
|
|
||||||
// the spec states that the `state` property only includes state events up to
|
let timeline_start_shortstatehash = if let Some((count, pdu)) = timeline.pdus.front() {
|
||||||
// the beginning of the timeline, so we determine the state of the syncing room
|
if matches!(count, PduCount::Backfilled(_)) {
|
||||||
// as of the first timeline event. NOTE: this explanation is not entirely
|
// We don't have shortstatehashes for backfilled PDUs, the best we can
|
||||||
// accurate; see the implementation of `build_state_incremental`.
|
// do is to use the current state
|
||||||
let timeline_start_shortstatehash = async {
|
current_shortstatehash
|
||||||
if let Some((_, pdu)) = timeline.pdus.front() {
|
} else {
|
||||||
if let Ok(shortstatehash) = services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.pdu_shortstatehash(&pdu.event_id)
|
.pdu_shortstatehash(&pdu.event_id)
|
||||||
.await
|
.await
|
||||||
{
|
.map_err(|err| err!("Timeline start has no shortstatehash: {err}"))?
|
||||||
return shortstatehash;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
current_shortstatehash
|
// if the timeline is empty there can't possibly be any changes to the state
|
||||||
|
return Ok(vec![]);
|
||||||
};
|
};
|
||||||
|
|
||||||
// the user IDs of members whose membership needs to be sent to the client, if
|
// the user IDs of members whose membership needs to be sent to the client, if
|
||||||
// lazy-loading is enabled.
|
// lazy-loading is enabled.
|
||||||
let lazily_loaded_members =
|
let lazily_loaded_members =
|
||||||
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
|
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await;
|
||||||
|
|
||||||
let (timeline_start_shortstatehash, lazily_loaded_members) =
|
|
||||||
join(timeline_start_shortstatehash, lazily_loaded_members).await;
|
|
||||||
|
|
||||||
// compute the state delta between the previous sync and this sync.
|
// compute the state delta between the previous sync and this sync.
|
||||||
match (last_sync_end_count, last_sync_end_shortstatehash) {
|
match (last_sync_end_count, last_sync_end_shortstatehash) {
|
||||||
@@ -494,16 +494,15 @@ async fn build_state_events(
|
|||||||
is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false,
|
is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false,
|
||||||
then use `build_state_incremental`.
|
then use `build_state_incremental`.
|
||||||
*/
|
*/
|
||||||
| (Some(last_sync_end_count), Some(last_sync_end_shortstatehash)) if !full_state =>
|
| (Some(_), Some(last_sync_end_shortstatehash)) if !full_state =>
|
||||||
build_state_incremental(
|
build_state_incremental(
|
||||||
services,
|
services,
|
||||||
syncing_user,
|
syncing_user,
|
||||||
room_id,
|
|
||||||
PduCount::Normal(last_sync_end_count),
|
|
||||||
last_sync_end_shortstatehash,
|
last_sync_end_shortstatehash,
|
||||||
timeline_start_shortstatehash,
|
timeline_start_shortstatehash,
|
||||||
current_shortstatehash,
|
current_shortstatehash,
|
||||||
timeline,
|
timeline,
|
||||||
|
use_state_after,
|
||||||
lazily_loaded_members.as_ref(),
|
lazily_loaded_members.as_ref(),
|
||||||
)
|
)
|
||||||
.boxed()
|
.boxed()
|
||||||
@@ -518,6 +517,8 @@ async fn build_state_events(
|
|||||||
services,
|
services,
|
||||||
syncing_user,
|
syncing_user,
|
||||||
timeline_start_shortstatehash,
|
timeline_start_shortstatehash,
|
||||||
|
current_shortstatehash,
|
||||||
|
use_state_after,
|
||||||
lazily_loaded_members.as_ref(),
|
lazily_loaded_members.as_ref(),
|
||||||
)
|
)
|
||||||
.boxed()
|
.boxed()
|
||||||
@@ -598,23 +599,25 @@ async fn check_joined_since_last_sync(
|
|||||||
ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes,
|
ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes,
|
||||||
SyncContext { syncing_user, .. }: SyncContext<'_>,
|
SyncContext { syncing_user, .. }: SyncContext<'_>,
|
||||||
) -> Result<bool> {
|
) -> Result<bool> {
|
||||||
// fetch the syncing user's membership event during the last sync.
|
let Some(last_sync_end_shortstatehash) = last_sync_end_shortstatehash else {
|
||||||
// this will be None if `previous_sync_end_shortstatehash` is None.
|
// For initial syncs always return false, since there's no "last sync" for the
|
||||||
let membership_during_previous_sync = match last_sync_end_shortstatehash {
|
// user to have joined since.
|
||||||
| Some(last_sync_end_shortstatehash) => services
|
return Ok(false);
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.state_get_content(
|
|
||||||
last_sync_end_shortstatehash,
|
|
||||||
&StateEventType::RoomMember,
|
|
||||||
syncing_user.as_str(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.inspect_err(|_| debug_warn!("User has no previous membership"))
|
|
||||||
.ok(),
|
|
||||||
| None => None,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Fetch the syncing user's membership event during the last sync.
|
||||||
|
let membership_during_previous_sync = services
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.state_get_content(
|
||||||
|
last_sync_end_shortstatehash,
|
||||||
|
&StateEventType::RoomMember,
|
||||||
|
syncing_user.as_str(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.inspect_err(|_| debug_warn!("User has no previous membership"))
|
||||||
|
.ok();
|
||||||
|
|
||||||
// TODO: If the requesting user got state-reset out of the room, this
|
// TODO: If the requesting user got state-reset out of the room, this
|
||||||
// will be `true` when it shouldn't be. this function should never be called
|
// will be `true` when it shouldn't be. this function should never be called
|
||||||
// in that situation, but it may be if the membership cache didn't get updated.
|
// in that situation, but it may be if the membership cache didn't get updated.
|
||||||
|
|||||||
@@ -181,6 +181,9 @@ pub(super) async fn load_left_room(
|
|||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
let state_events =
|
||||||
|
StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect());
|
||||||
|
|
||||||
Ok(Some(assign!(LeftRoom::new(), {
|
Ok(Some(assign!(LeftRoom::new(), {
|
||||||
account_data: RoomAccountData::new(),
|
account_data: RoomAccountData::new(),
|
||||||
timeline: assign!(Timeline::new(), {
|
timeline: assign!(Timeline::new(), {
|
||||||
@@ -188,7 +191,11 @@ pub(super) async fn load_left_room(
|
|||||||
prev_batch: Some(current_count.to_string()),
|
prev_batch: Some(current_count.to_string()),
|
||||||
events: raw_timeline_pdus,
|
events: raw_timeline_pdus,
|
||||||
}),
|
}),
|
||||||
state: State::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())),
|
state: if sync_context.use_state_after {
|
||||||
|
State::After(state_events)
|
||||||
|
} else {
|
||||||
|
State::Before(state_events)
|
||||||
|
},
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -264,6 +271,8 @@ async fn build_left_state_and_timeline(
|
|||||||
services,
|
services,
|
||||||
syncing_user,
|
syncing_user,
|
||||||
timeline_start_shortstatehash,
|
timeline_start_shortstatehash,
|
||||||
|
leave_shortstatehash,
|
||||||
|
sync_context.use_state_after,
|
||||||
lazily_loaded_members.as_ref(),
|
lazily_loaded_members.as_ref(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|||||||
@@ -11,12 +11,11 @@ use std::{
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum_client_ip::ClientIp;
|
use axum_client_ip::ClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, at, extract_variant,
|
Err, Result, at, error, extract_variant,
|
||||||
utils::{
|
utils::{
|
||||||
ReadyExt, TryFutureExtExt,
|
ReadyExt, TryFutureExtExt,
|
||||||
stream::{BroadbandExt, Tools, WidebandExt},
|
stream::{BroadbandExt, Tools, WidebandExt},
|
||||||
},
|
},
|
||||||
warn,
|
|
||||||
};
|
};
|
||||||
use conduwuit_service::Services;
|
use conduwuit_service::Services;
|
||||||
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture};
|
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture};
|
||||||
@@ -110,6 +109,9 @@ struct SyncContext<'a> {
|
|||||||
/// The sync filter, which the client uses to specify what data should be
|
/// The sync filter, which the client uses to specify what data should be
|
||||||
/// included in the sync response.
|
/// included in the sync response.
|
||||||
filter: &'a FilterDefinition,
|
filter: &'a FilterDefinition,
|
||||||
|
/// Whether the state at the end of the timeline should be used when
|
||||||
|
/// calculating state diffs for sync.
|
||||||
|
use_state_after: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> SyncContext<'a> {
|
impl<'a> SyncContext<'a> {
|
||||||
@@ -263,6 +265,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
current_count,
|
current_count,
|
||||||
full_state,
|
full_state,
|
||||||
filter: &filter,
|
filter: &filter,
|
||||||
|
use_state_after: body.use_state_after,
|
||||||
};
|
};
|
||||||
|
|
||||||
let joined_rooms = services
|
let joined_rooms = services
|
||||||
@@ -275,7 +278,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
match joined_room {
|
match joined_room {
|
||||||
| Ok((room, updates)) => Some((room_id, room, updates)),
|
| Ok((room, updates)) => Some((room_id, room, updates)),
|
||||||
| Err(err) => {
|
| Err(err) => {
|
||||||
warn!(?err, %room_id, "error loading joined room");
|
error!(?err, %room_id, "error loading joined room");
|
||||||
None
|
None
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -304,7 +307,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
| Ok(Some(left_room)) => Some((room_id, left_room)),
|
| Ok(Some(left_room)) => Some((room_id, left_room)),
|
||||||
| Ok(None) => None,
|
| Ok(None) => None,
|
||||||
| Err(err) => {
|
| Err(err) => {
|
||||||
warn!(?err, %room_id, "error loading joined room");
|
error!(?err, %room_id, "error loading joined room");
|
||||||
None
|
None
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
+60
-143
@@ -1,11 +1,8 @@
|
|||||||
use std::{collections::BTreeSet, ops::ControlFlow};
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Result, at, is_equal_to,
|
Result, at,
|
||||||
matrix::{
|
matrix::{Event, pdu::PduEvent},
|
||||||
Event,
|
|
||||||
pdu::{PduCount, PduEvent},
|
|
||||||
},
|
|
||||||
utils::{
|
utils::{
|
||||||
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||||
stream::{BroadbandExt, TryIgnore},
|
stream::{BroadbandExt, TryIgnore},
|
||||||
@@ -16,9 +13,7 @@ use conduwuit_service::{
|
|||||||
rooms::{lazy_loading::MemberSet, short::ShortStateHash},
|
rooms::{lazy_loading::MemberSet, short::ShortStateHash},
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
use itertools::Itertools;
|
use ruma::{OwnedEventId, UserId, events::StateEventType};
|
||||||
use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType};
|
|
||||||
use service::rooms::short::ShortEventId;
|
|
||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::client::TimelinePdus;
|
use crate::client::TimelinePdus;
|
||||||
@@ -39,13 +34,19 @@ pub(super) async fn build_state_initial(
|
|||||||
services: &Services,
|
services: &Services,
|
||||||
sender_user: &UserId,
|
sender_user: &UserId,
|
||||||
timeline_start_shortstatehash: ShortStateHash,
|
timeline_start_shortstatehash: ShortStateHash,
|
||||||
|
timeline_end_shortstatehash: ShortStateHash,
|
||||||
|
use_state_after: bool,
|
||||||
lazily_loaded_members: Option<&MemberSet>,
|
lazily_loaded_members: Option<&MemberSet>,
|
||||||
) -> Result<Vec<PduEvent>> {
|
) -> Result<Vec<PduEvent>> {
|
||||||
// load the keys and event IDs of the state events at the start of the timeline
|
// load the keys and event IDs of the state events at the start of the timeline
|
||||||
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
|
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.state_full_ids(timeline_start_shortstatehash)
|
.state_full_ids(if use_state_after {
|
||||||
|
timeline_end_shortstatehash
|
||||||
|
} else {
|
||||||
|
timeline_start_shortstatehash
|
||||||
|
})
|
||||||
.unzip()
|
.unzip()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -92,82 +93,34 @@ pub(super) async fn build_state_initial(
|
|||||||
pub(super) async fn build_state_incremental<'a>(
|
pub(super) async fn build_state_incremental<'a>(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
sender_user: &'a UserId,
|
sender_user: &'a UserId,
|
||||||
room_id: &RoomId,
|
|
||||||
last_sync_end_count: PduCount,
|
|
||||||
last_sync_end_shortstatehash: ShortStateHash,
|
last_sync_end_shortstatehash: ShortStateHash,
|
||||||
timeline_start_shortstatehash: ShortStateHash,
|
timeline_start_shortstatehash: ShortStateHash,
|
||||||
timeline_end_shortstatehash: ShortStateHash,
|
timeline_end_shortstatehash: ShortStateHash,
|
||||||
timeline: &TimelinePdus,
|
timeline: &TimelinePdus,
|
||||||
|
use_state_after: bool,
|
||||||
lazily_loaded_members: Option<&'a MemberSet>,
|
lazily_loaded_members: Option<&'a MemberSet>,
|
||||||
) -> Result<Vec<PduEvent>> {
|
) -> Result<Vec<PduEvent>> {
|
||||||
/*
|
let mut state_event_ids: HashSet<OwnedEventId> = HashSet::new();
|
||||||
NB: a limited sync is one where `timeline.limited == true`. Synapse calls this a "gappy" sync internally.
|
|
||||||
|
|
||||||
The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described
|
trace!(
|
||||||
by the Matrix specification. This is because the specification's description of the `state` property does not accurately
|
%use_state_after,
|
||||||
reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include:
|
%last_sync_end_shortstatehash,
|
||||||
1. We do not compute the delta using the naive approach of "every state event from the end of the last sync
|
%timeline_start_shortstatehash,
|
||||||
up to the start of this sync's timeline". see below for details.
|
%timeline_end_shortstatehash,
|
||||||
2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined
|
"computing state for incremental sync"
|
||||||
elsewhere and supplied to this function in the `lazily_loaded_members` parameter.
|
);
|
||||||
*/
|
|
||||||
|
|
||||||
/*
|
// Fetch lazy-loaded membership events if lazy-loading is enabled
|
||||||
the `state` property of an incremental sync which isn't limited are _usually_ empty.
|
if let Some(lazily_loaded_members) = lazily_loaded_members
|
||||||
(note: the specification says that the `state` property is _always_ empty for limited syncs, which is incorrect.)
|
&& !lazily_loaded_members.is_empty()
|
||||||
however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`),
|
{
|
||||||
the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state
|
trace!("including lazy membership events for members: {:?}", lazily_loaded_members);
|
||||||
at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline
|
|
||||||
merged a split in the DAG.
|
|
||||||
|
|
||||||
see: https://github.com/element-hq/synapse/issues/16941
|
services
|
||||||
*/
|
|
||||||
|
|
||||||
let timeline_is_linear = timeline.pdus.is_empty() || {
|
|
||||||
let last_pdu_of_last_sync = services
|
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.short
|
||||||
.pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1)))
|
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
|
||||||
.boxed()
|
lazily_loaded_members
|
||||||
.next()
|
|
||||||
.await
|
|
||||||
.transpose()
|
|
||||||
.expect("last sync should have had some PDUs")
|
|
||||||
.map(at!(1));
|
|
||||||
|
|
||||||
// make sure the prev_events of each pdu in the timeline refer only to the
|
|
||||||
// previous pdu
|
|
||||||
timeline
|
|
||||||
.pdus
|
|
||||||
.iter()
|
|
||||||
.try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| {
|
|
||||||
if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() {
|
|
||||||
if prev_event_id
|
|
||||||
.as_ref()
|
|
||||||
.is_none_or(is_equal_to!(pdu_prev_event_id))
|
|
||||||
{
|
|
||||||
return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(
|
|
||||||
"pdu {:?} has split prev_events (expected {:?}): {:?}",
|
|
||||||
pdu.event_id, prev_event_id, pdu.prev_events
|
|
||||||
);
|
|
||||||
ControlFlow::Break(())
|
|
||||||
})
|
|
||||||
.is_continue()
|
|
||||||
};
|
|
||||||
|
|
||||||
if timeline_is_linear && !timeline.limited {
|
|
||||||
// if there are no splits in the DAG and the timeline isn't limited, then
|
|
||||||
// `state` will always be empty unless lazy loading is enabled.
|
|
||||||
|
|
||||||
if let Some(lazily_loaded_members) = lazily_loaded_members {
|
|
||||||
if !timeline.pdus.is_empty() {
|
|
||||||
// lazy loading is enabled, so we return the membership events which were
|
|
||||||
// requested by the caller.
|
|
||||||
let lazy_membership_events: Vec<_> = lazily_loaded_members
|
|
||||||
.iter()
|
.iter()
|
||||||
.stream()
|
.stream()
|
||||||
.broad_filter_map(|user_id| async move {
|
.broad_filter_map(|user_id| async move {
|
||||||
@@ -178,71 +131,24 @@ pub(super) async fn build_state_incremental<'a>(
|
|||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.state_get(
|
.state_get_shortid(
|
||||||
timeline_start_shortstatehash,
|
timeline_start_shortstatehash,
|
||||||
&StateEventType::RoomMember,
|
&StateEventType::RoomMember,
|
||||||
user_id.as_str(),
|
user_id.as_str(),
|
||||||
)
|
)
|
||||||
.ok()
|
.ok()
|
||||||
.await
|
.await
|
||||||
})
|
}),
|
||||||
.collect()
|
)
|
||||||
.await;
|
.ignore_err()
|
||||||
|
.ready_for_each(|event_id| {
|
||||||
if !lazy_membership_events.is_empty() {
|
state_event_ids.insert(event_id);
|
||||||
trace!(
|
})
|
||||||
"syncing lazy membership events for members: {:?}",
|
.await;
|
||||||
lazy_membership_events
|
|
||||||
.iter()
|
|
||||||
.map(|pdu| pdu.state_key().unwrap())
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return Ok(lazy_membership_events);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// lazy loading is disabled, `state` is empty.
|
|
||||||
return Ok(vec![]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
// Fetch the state events added since the last sync.
|
||||||
at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates
|
services
|
||||||
computing the incremental state (which may be empty).
|
|
||||||
|
|
||||||
NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included
|
|
||||||
in the incremental state. therefore, the incremental state may include "redundant" membership events,
|
|
||||||
which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`,
|
|
||||||
and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that
|
|
||||||
the performance penalty is acceptable.
|
|
||||||
*/
|
|
||||||
|
|
||||||
trace!(%timeline_is_linear, %timeline.limited, "computing state for incremental sync");
|
|
||||||
|
|
||||||
// fetch the shorteventids of state events in the timeline
|
|
||||||
let state_events_in_timeline: BTreeSet<ShortEventId> = services
|
|
||||||
.rooms
|
|
||||||
.short
|
|
||||||
.multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| {
|
|
||||||
if pdu.state_key().is_some() {
|
|
||||||
Some(pdu.event_id.as_ref())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
.collect()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
trace!("{} state events in timeline", state_events_in_timeline.len());
|
|
||||||
|
|
||||||
/*
|
|
||||||
fetch the state events which were added since the last sync.
|
|
||||||
|
|
||||||
specifically we fetch the difference between the state at the last sync and the state at the _end_
|
|
||||||
of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched.
|
|
||||||
this is necessary to account for splits in the DAG, as explained above.
|
|
||||||
*/
|
|
||||||
let state_diff = services
|
|
||||||
.rooms
|
.rooms
|
||||||
.short
|
.short
|
||||||
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
|
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
|
||||||
@@ -252,18 +158,29 @@ pub(super) async fn build_state_incremental<'a>(
|
|||||||
.state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash))
|
.state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash))
|
||||||
.await?
|
.await?
|
||||||
.stream()
|
.stream()
|
||||||
.ready_filter_map(|(_, shorteventid)| {
|
.map(at!(1)),
|
||||||
if state_events_in_timeline.contains(&shorteventid) {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(shorteventid)
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
)
|
)
|
||||||
.ignore_err();
|
.ignore_err()
|
||||||
|
.ready_for_each(|event_id| {
|
||||||
|
state_event_ids.insert(event_id);
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
// finally, fetch the PDU contents and collect them into a vec
|
if !use_state_after {
|
||||||
let state_diff_pdus = state_diff
|
// If state_after isn't enabled, filter out state events which also exist
|
||||||
|
// in the timeline. If splits exist in the DAG, this may not be exactly the same
|
||||||
|
// thing as the state diff ending at the start of the timeline, but Synapse
|
||||||
|
// also does this and it's technically more useful behavior anyway.
|
||||||
|
// See: https://github.com/element-hq/synapse/issues/16941
|
||||||
|
|
||||||
|
for (_, pdu) in &timeline.pdus {
|
||||||
|
state_event_ids.remove(pdu.event_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, fetch the PDU contents and collect them into a vec
|
||||||
|
let state_diff_pdus = state_event_ids
|
||||||
|
.stream()
|
||||||
.broad_filter_map(|event_id| async move {
|
.broad_filter_map(|event_id| async move {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ use conduwuit::{
|
|||||||
BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||||
future::ReadyEqExt,
|
future::ReadyEqExt,
|
||||||
math::{ruma_from_usize, usize_from_ruma},
|
math::{ruma_from_usize, usize_from_ruma},
|
||||||
stream::WidebandExt,
|
stream::{TryIgnore, WidebandExt},
|
||||||
},
|
},
|
||||||
warn,
|
warn,
|
||||||
};
|
};
|
||||||
@@ -41,6 +41,7 @@ use ruma::{
|
|||||||
uint,
|
uint,
|
||||||
};
|
};
|
||||||
use service::account_data::AnyRawAccountDataEvent;
|
use service::account_data::AnyRawAccountDataEvent;
|
||||||
|
use tokio::pin;
|
||||||
|
|
||||||
use super::share_encrypted_room;
|
use super::share_encrypted_room;
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -858,12 +859,31 @@ where
|
|||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
let since_shortstatehash = services
|
let since_shortstatehash = async {
|
||||||
.rooms
|
pin! {
|
||||||
.user
|
let pdus_rev = services
|
||||||
.get_token_shortstatehash(room_id, globalsince)
|
.rooms
|
||||||
.await
|
.timeline
|
||||||
.ok();
|
.pdus_rev(room_id, Some(PduCount::Normal(globalsince.saturating_sub(1))))
|
||||||
|
.ignore_err();
|
||||||
|
}
|
||||||
|
|
||||||
|
let (count, pdu_at_last_sync_end) = pdus_rev.next().await?;
|
||||||
|
|
||||||
|
if matches!(count, PduCount::Backfilled(_)) {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(
|
||||||
|
services
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.pdu_shortstatehash(&pdu_at_last_sync_end.event_id)
|
||||||
|
.await
|
||||||
|
.expect("pdu should have a shortstatehash"),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.await;
|
||||||
|
|
||||||
let encrypted_room = services
|
let encrypted_room = services
|
||||||
.rooms
|
.rooms
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ pub fn versions() -> Vec<String> {
|
|||||||
"v1.12".to_owned(),
|
"v1.12".to_owned(),
|
||||||
"v1.13".to_owned(),
|
"v1.13".to_owned(),
|
||||||
"v1.14".to_owned(),
|
"v1.14".to_owned(),
|
||||||
|
"v1.16".to_owned(),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -193,12 +193,7 @@ pub(super) static MAPS: &[Descriptor] = &[
|
|||||||
},
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "roomsynctoken_shortstatehash",
|
name: "roomsynctoken_shortstatehash",
|
||||||
file_shape: 3,
|
..descriptor::DROPPED
|
||||||
val_size_hint: Some(8),
|
|
||||||
block_size: 512,
|
|
||||||
compression_level: 3,
|
|
||||||
bottommost_level: Some(6),
|
|
||||||
..descriptor::SEQUENTIAL
|
|
||||||
},
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "roomuserdataid_accountdata",
|
name: "roomuserdataid_accountdata",
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use conduwuit::{Result, implement};
|
use conduwuit::{Result, implement};
|
||||||
use database::{Database, Deserialized, Map};
|
use database::{Deserialized, Map};
|
||||||
use ruma::{RoomId, UserId};
|
use ruma::{RoomId, UserId};
|
||||||
|
|
||||||
use crate::{Dep, globals, rooms, rooms::short::ShortStateHash};
|
use crate::{Dep, globals};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
db: Data,
|
db: Data,
|
||||||
@@ -12,32 +12,25 @@ pub struct Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct Data {
|
struct Data {
|
||||||
db: Arc<Database>,
|
|
||||||
userroomid_notificationcount: Arc<Map>,
|
userroomid_notificationcount: Arc<Map>,
|
||||||
userroomid_highlightcount: Arc<Map>,
|
userroomid_highlightcount: Arc<Map>,
|
||||||
roomuserid_lastnotificationread: Arc<Map>,
|
roomuserid_lastnotificationread: Arc<Map>,
|
||||||
roomsynctoken_shortstatehash: Arc<Map>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Services {
|
struct Services {
|
||||||
globals: Dep<globals::Service>,
|
globals: Dep<globals::Service>,
|
||||||
short: Dep<rooms::short::Service>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl crate::Service for Service {
|
impl crate::Service for Service {
|
||||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
db: Data {
|
db: Data {
|
||||||
db: args.db.clone(),
|
|
||||||
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
|
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
|
||||||
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
|
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
|
||||||
roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(),
|
roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(),
|
||||||
roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(),
|
|
||||||
},
|
},
|
||||||
|
|
||||||
services: Services {
|
services: Services {
|
||||||
globals: args.depend::<globals::Service>("globals"),
|
globals: args.depend::<globals::Service>("globals"),
|
||||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
|
||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
@@ -90,40 +83,3 @@ pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -
|
|||||||
.deserialized()
|
.deserialized()
|
||||||
.unwrap_or(0)
|
.unwrap_or(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
|
||||||
pub async fn associate_token_shortstatehash(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
token: u64,
|
|
||||||
shortstatehash: ShortStateHash,
|
|
||||||
) {
|
|
||||||
let shortroomid = self
|
|
||||||
.services
|
|
||||||
.short
|
|
||||||
.get_shortroomid(room_id)
|
|
||||||
.await
|
|
||||||
.expect("room exists");
|
|
||||||
|
|
||||||
let _cork = self.db.db.cork();
|
|
||||||
let key: &[u64] = &[shortroomid, token];
|
|
||||||
self.db
|
|
||||||
.roomsynctoken_shortstatehash
|
|
||||||
.put(key, shortstatehash);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[implement(Service)]
|
|
||||||
pub async fn get_token_shortstatehash(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
token: u64,
|
|
||||||
) -> Result<ShortStateHash> {
|
|
||||||
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
|
|
||||||
|
|
||||||
let key: &[u64] = &[shortroomid, token];
|
|
||||||
self.db
|
|
||||||
.roomsynctoken_shortstatehash
|
|
||||||
.qry(key)
|
|
||||||
.await
|
|
||||||
.deserialized()
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user