mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
fix: Calculate state at end of last sync correctly
This commit is contained in:
@@ -362,41 +362,41 @@ async fn fetch_shortstatehashes(
|
|||||||
.rooms
|
.rooms
|
||||||
.state
|
.state
|
||||||
.get_room_shortstatehash(room_id)
|
.get_room_shortstatehash(room_id)
|
||||||
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
|
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
|
||||||
|
.await?;
|
||||||
|
|
||||||
// the room state as of the end of the last sync.
|
// The room state as of the end of the last sync.
|
||||||
// this will be None if we are doing an initial sync or if we just joined this
|
// This will be None if we are doing an initial sync or if we just joined this
|
||||||
// room.
|
// room.
|
||||||
let last_sync_end_shortstatehash =
|
let last_sync_end_shortstatehash =
|
||||||
OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| {
|
OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| {
|
||||||
pin! {
|
pin! {
|
||||||
let pdus_rev = services
|
let pdus = services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
// We add 2 to the count here because `pdu_shortstatehash` returns
|
.pdus(room_id, Some(PduCount::Normal(last_sync_end_count)))
|
||||||
// state _before_ the event ID (i.e. not including the event itself if it's a state event)
|
|
||||||
// and `pdus_rev` does exclusive iteration (i.e. not including an event that has _exactly_
|
|
||||||
// the provided count).
|
|
||||||
.pdus_rev(room_id, Some(PduCount::Normal(last_sync_end_count).saturating_add(2)))
|
|
||||||
.ignore_err();
|
.ignore_err();
|
||||||
}
|
}
|
||||||
|
|
||||||
let (_, pdu_at_last_sync_end) = pdus_rev.next().await?;
|
match pdus.next().await {
|
||||||
|
| Some((_, pdu_after_last_sync_end)) => {
|
||||||
|
trace!(?pdu_after_last_sync_end.event_id, "pdu at last sync end");
|
||||||
|
|
||||||
Some(
|
services
|
||||||
services
|
.rooms
|
||||||
.rooms
|
.state_accessor
|
||||||
.state_accessor
|
.pdu_shortstatehash(&pdu_after_last_sync_end.event_id)
|
||||||
.pdu_shortstatehash(&pdu_at_last_sync_end.event_id)
|
.await
|
||||||
.await
|
.expect("pdu should have a shortstatehash")
|
||||||
.expect("pdu should have a shortstatehash"),
|
},
|
||||||
)
|
| None => {
|
||||||
|
// No events have been sent since the last sync,
|
||||||
|
// so the state then is the same as the state now
|
||||||
|
current_shortstatehash
|
||||||
|
},
|
||||||
|
}
|
||||||
}))
|
}))
|
||||||
.map(Option::flatten)
|
.await;
|
||||||
.map(Ok);
|
|
||||||
|
|
||||||
let (current_shortstatehash, last_sync_end_shortstatehash) =
|
|
||||||
try_join(current_shortstatehash, last_sync_end_shortstatehash).await?;
|
|
||||||
|
|
||||||
Ok(ShortStateHashes {
|
Ok(ShortStateHashes {
|
||||||
current_shortstatehash,
|
current_shortstatehash,
|
||||||
@@ -464,28 +464,22 @@ async fn build_state_events(
|
|||||||
last_sync_end_shortstatehash,
|
last_sync_end_shortstatehash,
|
||||||
} = shortstatehashes;
|
} = shortstatehashes;
|
||||||
|
|
||||||
let timeline_start_shortstatehash = async {
|
let timeline_start_shortstatehash = if let Some((_, pdu)) = timeline.pdus.front() {
|
||||||
if let Some((_, pdu)) = timeline.pdus.front() {
|
services
|
||||||
if let Ok(shortstatehash) = services
|
.rooms
|
||||||
.rooms
|
.state_accessor
|
||||||
.state_accessor
|
.pdu_shortstatehash(&pdu.event_id)
|
||||||
.pdu_shortstatehash(&pdu.event_id)
|
.await
|
||||||
.await
|
.expect("event should have shortstatehash")
|
||||||
{
|
} else {
|
||||||
return shortstatehash;
|
// if the timeline is empty there can't possibly be any changes to the state
|
||||||
}
|
return Ok(vec![]);
|
||||||
}
|
|
||||||
|
|
||||||
current_shortstatehash
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// the user IDs of members whose membership needs to be sent to the client, if
|
// the user IDs of members whose membership needs to be sent to the client, if
|
||||||
// lazy-loading is enabled.
|
// lazy-loading is enabled.
|
||||||
let lazily_loaded_members =
|
let lazily_loaded_members =
|
||||||
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
|
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await;
|
||||||
|
|
||||||
let (timeline_start_shortstatehash, lazily_loaded_members) =
|
|
||||||
join(timeline_start_shortstatehash, lazily_loaded_members).await;
|
|
||||||
|
|
||||||
// compute the state delta between the previous sync and this sync.
|
// compute the state delta between the previous sync and this sync.
|
||||||
match (last_sync_end_count, last_sync_end_shortstatehash) {
|
match (last_sync_end_count, last_sync_end_shortstatehash) {
|
||||||
|
|||||||
@@ -100,12 +100,15 @@ pub(super) async fn build_state_incremental<'a>(
|
|||||||
use_state_after: bool,
|
use_state_after: bool,
|
||||||
lazily_loaded_members: Option<&'a MemberSet>,
|
lazily_loaded_members: Option<&'a MemberSet>,
|
||||||
) -> Result<Vec<PduEvent>> {
|
) -> Result<Vec<PduEvent>> {
|
||||||
// NB: a limited sync is one where `timeline.limited == true`. Synapse calls
|
|
||||||
// this a "gappy" sync internally.
|
|
||||||
|
|
||||||
let mut state_event_ids: HashSet<OwnedEventId> = HashSet::new();
|
let mut state_event_ids: HashSet<OwnedEventId> = HashSet::new();
|
||||||
|
|
||||||
trace!(%timeline.limited, "computing state for incremental sync");
|
trace!(
|
||||||
|
%use_state_after,
|
||||||
|
%last_sync_end_shortstatehash,
|
||||||
|
%timeline_start_shortstatehash,
|
||||||
|
%timeline_end_shortstatehash,
|
||||||
|
"computing state for incremental sync"
|
||||||
|
);
|
||||||
|
|
||||||
// Fetch lazy-loaded membership events if lazy-loading is enabled
|
// Fetch lazy-loaded membership events if lazy-loading is enabled
|
||||||
if let Some(lazily_loaded_members) = lazily_loaded_members
|
if let Some(lazily_loaded_members) = lazily_loaded_members
|
||||||
|
|||||||
Reference in New Issue
Block a user