diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 2b6d3d291..ebd74e8b3 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -48,6 +48,13 @@ async fn load_timeline( ending_count: Option, limit: usize, ) -> Result { + if let (Some(starting_count), Some(ending_count)) = (starting_count, ending_count) { + debug_assert!( + starting_count <= ending_count, + "starting count {starting_count} > ending count {ending_count}" + ); + } + let mut pdu_stream = match starting_count { | Some(starting_count) => { let last_timeline_count = services diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 6b4af1dcc..4de0ce3e3 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -38,6 +38,7 @@ use ruma::{ uint, }; use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash}; +use tokio::pin; use super::{load_timeline, share_encrypted_room}; use crate::client::{ @@ -344,7 +345,7 @@ struct ShortStateHashes { #[tracing::instrument(level = "debug", skip_all)] async fn fetch_shortstatehashes( services: &Services, - SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>, + SyncContext { last_sync_end_count, .. }: SyncContext<'_>, room_id: &RoomId, ) -> Result { // the room state currently. @@ -360,20 +361,25 @@ async fn fetch_shortstatehashes( // this will be None if we are doing an initial sync or if we just joined this // room. let last_sync_end_shortstatehash = - OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| { - // look up the shortstatehash saved by the last sync's call to - // `associate_token_shortstatehash` - services - .rooms - .user - .get_token_shortstatehash(room_id, last_sync_end_count) - .inspect_err(move |_| { - debug_warn!( - token = last_sync_end_count, - "Room has no shortstatehash for this token" - ); - }) - .ok() + OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| { + pin! { + let pdus_rev = services + .rooms + .timeline + .pdus_rev(room_id, Some(PduCount::Normal(last_sync_end_count.saturating_sub(1)))) + .ignore_err(); + } + + let (_, pdu_at_last_sync_end) = pdus_rev.next().await?; + + Some( + services + .rooms + .state_accessor + .pdu_shortstatehash(&pdu_at_last_sync_end.event_id) + .await + .expect("pdu should have a shortstatehash"), + ) })) .map(Option::flatten) .map(Ok); @@ -381,20 +387,6 @@ async fn fetch_shortstatehashes( let (current_shortstatehash, last_sync_end_shortstatehash) = try_join(current_shortstatehash, last_sync_end_shortstatehash).await?; - /* - associate the `current_count` with the `current_shortstatehash`, so we can - use it on the next sync as the `last_sync_end_shortstatehash`. - - TODO: the table written to by this call grows extremely fast, gaining one new entry for each - joined room on _every single sync request_. we need to find a better way to remember the shortstatehash - between syncs. - */ - services - .rooms - .user - .associate_token_shortstatehash(room_id, current_count, current_shortstatehash) - .await; - Ok(ShortStateHashes { current_shortstatehash, last_sync_end_shortstatehash, diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 900619c5f..a42722666 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -15,7 +15,7 @@ use conduwuit::{ BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, future::ReadyEqExt, math::{ruma_from_usize, usize_from_ruma}, - stream::WidebandExt, + stream::{TryIgnore, WidebandExt}, }, warn, }; @@ -41,6 +41,7 @@ use ruma::{ uint, }; use service::account_data::AnyRawAccountDataEvent; +use tokio::pin; use super::share_encrypted_room; use crate::{ @@ -858,12 +859,27 @@ where continue; }; - let since_shortstatehash = services - .rooms - .user - .get_token_shortstatehash(room_id, globalsince) - .await - .ok(); + let since_shortstatehash = async { + pin! { + let pdus_rev = services + .rooms + .timeline + .pdus_rev(room_id, Some(PduCount::Normal(globalsince.saturating_sub(1)))) + .ignore_err(); + } + + let (_, pdu_at_last_sync_end) = pdus_rev.next().await?; + + Some( + services + .rooms + .state_accessor + .pdu_shortstatehash(&pdu_at_last_sync_end.event_id) + .await + .expect("pdu should have a shortstatehash"), + ) + } + .await; let encrypted_room = services .rooms diff --git a/src/database/maps.rs b/src/database/maps.rs index e8eb02331..03fbbda72 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -193,12 +193,7 @@ pub(super) static MAPS: &[Descriptor] = &[ }, Descriptor { name: "roomsynctoken_shortstatehash", - file_shape: 3, - val_size_hint: Some(8), - block_size: 512, - compression_level: 3, - bottommost_level: Some(6), - ..descriptor::SEQUENTIAL + ..descriptor::DROPPED }, Descriptor { name: "roomuserdataid_accountdata", diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index bd76f1f4c..c6a9f9cca 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -1,10 +1,10 @@ use std::sync::Arc; use conduwuit::{Result, implement}; -use database::{Database, Deserialized, Map}; +use database::{Deserialized, Map}; use ruma::{RoomId, UserId}; -use crate::{Dep, globals, rooms, rooms::short::ShortStateHash}; +use crate::{Dep, globals}; pub struct Service { db: Data, @@ -12,32 +12,25 @@ pub struct Service { } struct Data { - db: Arc, userroomid_notificationcount: Arc, userroomid_highlightcount: Arc, roomuserid_lastnotificationread: Arc, - roomsynctoken_shortstatehash: Arc, } struct Services { globals: Dep, - short: Dep, } impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { db: Data { - db: args.db.clone(), userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(), - roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(), }, - services: Services { globals: args.depend::("globals"), - short: args.depend::("rooms::short"), }, })) } @@ -90,40 +83,3 @@ pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) - .deserialized() .unwrap_or(0) } - -#[implement(Service)] -pub async fn associate_token_shortstatehash( - &self, - room_id: &RoomId, - token: u64, - shortstatehash: ShortStateHash, -) { - let shortroomid = self - .services - .short - .get_shortroomid(room_id) - .await - .expect("room exists"); - - let _cork = self.db.db.cork(); - let key: &[u64] = &[shortroomid, token]; - self.db - .roomsynctoken_shortstatehash - .put(key, shortstatehash); -} - -#[implement(Service)] -pub async fn get_token_shortstatehash( - &self, - room_id: &RoomId, - token: u64, -) -> Result { - let shortroomid = self.services.short.get_shortroomid(room_id).await?; - - let key: &[u64] = &[shortroomid, token]; - self.db - .roomsynctoken_shortstatehash - .qry(key) - .await - .deserialized() -}