fix: Correctly send limited timelines again

This commit is contained in:
Ginger
2025-10-22 13:06:33 -04:00
parent c85b5bb122
commit 91d07a9bfc
4 changed files with 47 additions and 29 deletions
+40 -22
View File
@@ -1,6 +1,8 @@
mod v3; mod v3;
mod v5; mod v5;
use std::collections::VecDeque;
use conduwuit::{ use conduwuit::{
PduCount, Result, PduCount, Result,
matrix::pdu::PduEvent, matrix::pdu::PduEvent,
@@ -23,7 +25,7 @@ pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
#[derive(Default)] #[derive(Default)]
pub(crate) struct TimelinePdus { pub(crate) struct TimelinePdus {
pub pdus: Vec<(PduCount, PduEvent)>, pub pdus: VecDeque<(PduCount, PduEvent)>,
pub limited: bool, pub limited: bool,
} }
@@ -35,27 +37,36 @@ async fn load_timeline(
ending_count: Option<PduCount>, ending_count: Option<PduCount>,
limit: usize, limit: usize,
) -> Result<TimelinePdus> { ) -> Result<TimelinePdus> {
let last_timeline_count = services let mut pdu_stream = match starting_count {
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
let mut pdus_between_counts = match starting_count {
| Some(starting_count) => { | Some(starting_count) => {
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
if last_timeline_count <= starting_count { if last_timeline_count <= starting_count {
// no messages have been sent in this room since `starting_count`
return Ok(TimelinePdus::default()); return Ok(TimelinePdus::default());
} }
trace!(?last_timeline_count, ?starting_count, ?ending_count);
// Stream from the DB all PDUs which were sent after `starting_count` but before // for incremental sync, stream from the DB all PDUs which were sent after
// `ending_count`, including both endpoints // `starting_count` but before `ending_count`, including `ending_count` but
// not `starting_count`. this code is pretty similar to the initial sync
// branch, they're separate to allow for future optimization
services services
.rooms .rooms
.timeline .timeline
.pdus(Some(sender_user), room_id, Some(starting_count)) .pdus_rev(
Some(sender_user),
room_id,
ending_count.map(|count| count.saturating_add(1)),
)
.ignore_err() .ignore_err()
.ready_take_while(|&(pducount, _)| { .ready_take_while(move |&(pducount, ref pdu)| {
pducount <= ending_count.unwrap_or_else(PduCount::max) trace!(?pducount, ?pdu, "glubbins");
pducount > starting_count
}) })
.boxed() .boxed()
}, },
@@ -65,21 +76,28 @@ async fn load_timeline(
services services
.rooms .rooms
.timeline .timeline
.pdus_rev(Some(sender_user), room_id, ending_count) .pdus_rev(
Some(sender_user),
room_id,
ending_count.map(|count| count.saturating_add(1)),
)
.ignore_err() .ignore_err()
.boxed() .boxed()
}, },
}; };
// Return at most `limit` PDUs from the stream // Return at most `limit` PDUs from the stream
let mut pdus: Vec<_> = pdus_between_counts.by_ref().take(limit).collect().await; let pdus = pdu_stream
if starting_count.is_none() { .by_ref()
// `pdus_rev` returns PDUs in reverse order. fix that here .take(limit)
pdus.reverse(); .ready_fold(VecDeque::with_capacity(limit), |mut pdus, item| {
} pdus.push_front(item);
// The timeline is limited if more than `limit` PDUs exist in the DB after pdus
// `starting_count` })
let limited = pdus_between_counts.next().await.is_some(); .await;
// The timeline is limited if there are still more PDUs in the stream
let limited = pdu_stream.next().await.is_some();
trace!( trace!(
"syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})", "syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})",
+2 -2
View File
@@ -125,7 +125,7 @@ pub(super) async fn load_joined_room(
let is_initial_sync = since_shortstatehash.is_none(); let is_initial_sync = since_shortstatehash.is_none();
let timeline_start_shortstatehash = async { let timeline_start_shortstatehash = async {
if let Some((_, pdu)) = timeline_pdus.first() { if let Some((_, pdu)) = timeline_pdus.front() {
if let Ok(shortstatehash) = services if let Ok(shortstatehash) = services
.rooms .rooms
.state_accessor .state_accessor
@@ -349,7 +349,7 @@ pub(super) async fn load_joined_room(
.flatten(); .flatten();
let prev_batch = timeline_pdus let prev_batch = timeline_pdus
.first() .front()
.map(at!(0)) .map(at!(0))
.or_else(|| joined_sender_member.is_some().and(since).map(Into::into)); .or_else(|| joined_sender_member.is_some().and(since).map(Into::into));
+3 -3
View File
@@ -1,6 +1,6 @@
use std::{ use std::{
cmp::{self, Ordering}, cmp::{self, Ordering},
collections::{BTreeMap, BTreeSet, HashMap, HashSet}, collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
ops::Deref, ops::Deref,
time::Duration, time::Duration,
}; };
@@ -411,7 +411,7 @@ where
.await .await
.ok(); .ok();
(timeline_pdus, limited) = (Vec::new(), true); (timeline_pdus, limited) = (VecDeque::new(), true);
} else { } else {
TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline( TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline(
services, services,
@@ -501,7 +501,7 @@ where
} }
let prev_batch = timeline_pdus let prev_batch = timeline_pdus
.first() .front()
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| { .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
Ok(Some(match pdu_count { Ok(Some(match pdu_count {
| PduCount::Backfilled(_) => { | PduCount::Backfilled(_) => {
+2 -2
View File
@@ -243,7 +243,7 @@ impl Service {
self.pdus(Some(user_id), room_id, None).ignore_err() self.pdus(Some(user_id), room_id, None).ignore_err()
} }
/// Reverse iteration starting at from. /// Reverse iteration starting after `until`.
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>( pub fn pdus_rev<'a>(
&'a self, &'a self,
@@ -255,7 +255,7 @@ impl Service {
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
} }
/// Forward iteration starting at from. /// Forward iteration starting after `from`.
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>( pub fn pdus<'a>(
&'a self, &'a self,