Files
continuwuity/src/api/client/sync/mod.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

144 lines
3.4 KiB
Rust
Raw Normal View History

2024-10-16 05:32:27 +00:00
mod v3;
mod v5;
2024-10-16 05:32:27 +00:00
2025-10-22 13:06:33 -04:00
use std::collections::VecDeque;
2024-12-14 21:58:01 -05:00
use conduwuit::{
Event, PduCount, Result, err,
matrix::pdu::PduEvent,
2025-10-27 17:24:02 -04:00
ref_at, trace,
2025-04-06 21:59:18 +00:00
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
2024-12-04 00:00:40 +00:00
};
use conduwuit_service::Services;
use futures::StreamExt;
use ruma::{
2025-10-27 17:24:02 -04:00
OwnedUserId, RoomId, UserId,
events::TimelineEventType::{
self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker,
},
};
2024-10-16 05:32:27 +00:00
pub(crate) use self::{v3::sync_events_route, v5::sync_events_v5_route};
2024-10-16 05:32:27 +00:00
pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
&[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker];
#[derive(Default)]
pub(crate) struct TimelinePdus {
2025-10-22 13:06:33 -04:00
pub pdus: VecDeque<(PduCount, PduEvent)>,
pub limited: bool,
}
2025-10-27 17:24:02 -04:00
impl TimelinePdus {
fn senders(&self) -> impl Iterator<Item = OwnedUserId> {
self.pdus
.iter()
.map(ref_at!(1))
.map(Event::sender)
.map(Into::into)
}
}
/// Load up to `limit` PDUs in the range (starting_count, ending_count].
2024-10-16 05:32:27 +00:00
async fn load_timeline(
services: &Services,
sender_user: &UserId,
room_id: &RoomId,
starting_count: Option<PduCount>,
ending_count: Option<PduCount>,
limit: usize,
) -> Result<TimelinePdus> {
2025-10-22 13:06:33 -04:00
let mut pdu_stream = match starting_count {
| Some(starting_count) => {
2025-10-22 13:06:33 -04:00
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await
.map_err(|err| {
err!(Database(warn!("Failed to fetch end of room timeline: {}", err)))
})?;
2025-10-22 13:06:33 -04:00
if last_timeline_count <= starting_count {
2025-10-22 13:06:33 -04:00
// no messages have been sent in this room since `starting_count`
return Ok(TimelinePdus::default());
}
2024-10-16 06:58:37 +00:00
2025-10-22 13:06:33 -04:00
// for incremental sync, stream from the DB all PDUs which were sent after
// `starting_count` but before `ending_count`, including `ending_count` but
// not `starting_count`. this code is pretty similar to the initial sync
// branch, they're separate to allow for future optimization
services
.rooms
.timeline
2025-10-22 13:06:33 -04:00
.pdus_rev(
Some(sender_user),
room_id,
ending_count.map(|count| count.saturating_add(1)),
)
.ignore_err()
.ready_take_while(move |&(pducount, _)| pducount > starting_count)
.boxed()
},
| None => {
// For initial sync, stream from the DB all PDUs before and including
// `ending_count` in reverse order
services
.rooms
.timeline
2025-10-22 13:06:33 -04:00
.pdus_rev(
Some(sender_user),
room_id,
ending_count.map(|count| count.saturating_add(1)),
)
.ignore_err()
.boxed()
},
};
// Return at most `limit` PDUs from the stream
2025-10-22 13:06:33 -04:00
let pdus = pdu_stream
.by_ref()
.take(limit)
.ready_fold(VecDeque::with_capacity(limit), |mut pdus, item| {
pdus.push_front(item);
pdus
})
.await;
// The timeline is limited if there are still more PDUs in the stream
let limited = pdu_stream.next().await.is_some();
2024-10-16 05:32:27 +00:00
trace!(
"syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})",
pdus.len(),
starting_count,
ending_count,
limited,
);
2024-10-16 05:32:27 +00:00
Ok(TimelinePdus { pdus, limited })
2024-10-16 05:32:27 +00:00
}
async fn share_encrypted_room(
services: &Services,
sender_user: &UserId,
user_id: &UserId,
ignore_room: Option<&RoomId>,
2024-10-16 05:32:27 +00:00
) -> bool {
services
.rooms
2024-11-20 20:21:31 +00:00
.state_cache
2024-10-16 05:32:27 +00:00
.get_shared_rooms(sender_user, user_id)
.ready_filter(|&room_id| Some(room_id) != ignore_room)
2025-02-21 17:47:44 +00:00
.map(ToOwned::to_owned)
.broad_any(|other_room_id| async move {
2024-10-16 05:32:27 +00:00
services
.rooms
.state_accessor
2025-02-21 17:47:44 +00:00
.is_encrypted_room(&other_room_id)
.await
2024-10-16 05:32:27 +00:00
})
.await
}