diff --git a/changelog.d/+85568e80.feature.md b/changelog.d/+85568e80.feature.md new file mode 100644 index 000000000..f9e5486eb --- /dev/null +++ b/changelog.d/+85568e80.feature.md @@ -0,0 +1 @@ +Added support for Matrix 1.16's `state_after` feature, allowing clients which understand it to sync room state changes more reliably. Contributed by @ginger. diff --git a/changelog.d/917.bugfix.md b/changelog.d/917.bugfix.md new file mode 100644 index 000000000..82dbaec18 --- /dev/null +++ b/changelog.d/917.bugfix.md @@ -0,0 +1 @@ +Adjusted legacy sync logic to no longer use the `roomsynctoken_shortstatehash` database column. Once this change has been confirmed to be stable and reliable, a future update will remove it entirely, significantly decreasing database sizes. Contributed by @ginger. diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 2b6d3d291..ebd74e8b3 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -48,6 +48,13 @@ async fn load_timeline( ending_count: Option, limit: usize, ) -> Result { + if let (Some(starting_count), Some(ending_count)) = (starting_count, ending_count) { + debug_assert!( + starting_count <= ending_count, + "starting count {starting_count} > ending count {ending_count}" + ); + } + let mut pdu_stream = match starting_count { | Some(starting_count) => { let last_timeline_count = services diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 6b4af1dcc..5392776af 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -38,6 +38,7 @@ use ruma::{ uint, }; use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash}; +use tokio::pin; use super::{load_timeline, share_encrypted_room}; use crate::client::{ @@ -96,12 +97,19 @@ pub(super) async fn load_joined_room( ); } + let state_events = + StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect()); + let joined_room = assign!(JoinedRoom::new(), { account_data, summary: summary.unwrap_or_default(), unread_notifications: notification_counts.unwrap_or_default(), timeline, - state: RoomState::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())), + state: if sync_context.use_state_after { + RoomState::After(state_events) + } else { + RoomState::Before(state_events) + }, ephemeral, unread_thread_notifications: BTreeMap::new(), }); @@ -344,7 +352,7 @@ struct ShortStateHashes { #[tracing::instrument(level = "debug", skip_all)] async fn fetch_shortstatehashes( services: &Services, - SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>, + SyncContext { last_sync_end_count, .. }: SyncContext<'_>, room_id: &RoomId, ) -> Result { // the room state currently. @@ -354,46 +362,41 @@ async fn fetch_shortstatehashes( .rooms .state .get_room_shortstatehash(room_id) - .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); + .map_err(|_| err!(Database(error!("Room {room_id} has no state")))) + .await?; - // the room state as of the end of the last sync. - // this will be None if we are doing an initial sync or if we just joined this - // room. + // The room state as of the end of the last sync. + // This will be None if we are doing an initial sync. let last_sync_end_shortstatehash = - OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| { - // look up the shortstatehash saved by the last sync's call to - // `associate_token_shortstatehash` - services - .rooms - .user - .get_token_shortstatehash(room_id, last_sync_end_count) - .inspect_err(move |_| { - debug_warn!( - token = last_sync_end_count, - "Room has no shortstatehash for this token" - ); - }) - .ok() + OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| { + pin! { + let pdus = services + .rooms + .timeline + .pdus(room_id, Some(PduCount::Normal(last_sync_end_count))) + .ignore_err(); + } + + match pdus.next().await { + | Some((_, pdu_after_last_sync_end)) => { + trace!(?pdu_after_last_sync_end.event_id, "pdu at last sync end"); + + services + .rooms + .state_accessor + .pdu_shortstatehash(&pdu_after_last_sync_end.event_id) + .await + .map_err(|err| err!("Last sync end PDU has no shortstatehash: {err}")) + }, + | None => { + // No events have been sent since the last sync, or we just joined this room, + // so the state then is the same as the state now + Ok(current_shortstatehash) + }, + } })) - .map(Option::flatten) - .map(Ok); - - let (current_shortstatehash, last_sync_end_shortstatehash) = - try_join(current_shortstatehash, last_sync_end_shortstatehash).await?; - - /* - associate the `current_count` with the `current_shortstatehash`, so we can - use it on the next sync as the `last_sync_end_shortstatehash`. - - TODO: the table written to by this call grows extremely fast, gaining one new entry for each - joined room on _every single sync request_. we need to find a better way to remember the shortstatehash - between syncs. - */ - services - .rooms - .user - .associate_token_shortstatehash(room_id, current_count, current_shortstatehash) - .await; + .await + .transpose()?; Ok(ShortStateHashes { current_shortstatehash, @@ -452,6 +455,7 @@ async fn build_state_events( syncing_user, last_sync_end_count, full_state, + use_state_after, .. } = sync_context; @@ -460,32 +464,28 @@ async fn build_state_events( last_sync_end_shortstatehash, } = shortstatehashes; - // the spec states that the `state` property only includes state events up to - // the beginning of the timeline, so we determine the state of the syncing room - // as of the first timeline event. NOTE: this explanation is not entirely - // accurate; see the implementation of `build_state_incremental`. - let timeline_start_shortstatehash = async { - if let Some((_, pdu)) = timeline.pdus.front() { - if let Ok(shortstatehash) = services + let timeline_start_shortstatehash = if let Some((count, pdu)) = timeline.pdus.front() { + if matches!(count, PduCount::Backfilled(_)) { + // We don't have shortstatehashes for backfilled PDUs, the best we can + // do is to use the current state + current_shortstatehash + } else { + services .rooms .state_accessor .pdu_shortstatehash(&pdu.event_id) .await - { - return shortstatehash; - } + .map_err(|err| err!("Timeline start has no shortstatehash: {err}"))? } - - current_shortstatehash + } else { + // if the timeline is empty there can't possibly be any changes to the state + return Ok(vec![]); }; // the user IDs of members whose membership needs to be sent to the client, if // lazy-loading is enabled. let lazily_loaded_members = - prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()); - - let (timeline_start_shortstatehash, lazily_loaded_members) = - join(timeline_start_shortstatehash, lazily_loaded_members).await; + prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await; // compute the state delta between the previous sync and this sync. match (last_sync_end_count, last_sync_end_shortstatehash) { @@ -494,16 +494,15 @@ async fn build_state_events( is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false, then use `build_state_incremental`. */ - | (Some(last_sync_end_count), Some(last_sync_end_shortstatehash)) if !full_state => + | (Some(_), Some(last_sync_end_shortstatehash)) if !full_state => build_state_incremental( services, syncing_user, - room_id, - PduCount::Normal(last_sync_end_count), last_sync_end_shortstatehash, timeline_start_shortstatehash, current_shortstatehash, timeline, + use_state_after, lazily_loaded_members.as_ref(), ) .boxed() @@ -518,6 +517,8 @@ async fn build_state_events( services, syncing_user, timeline_start_shortstatehash, + current_shortstatehash, + use_state_after, lazily_loaded_members.as_ref(), ) .boxed() @@ -598,23 +599,25 @@ async fn check_joined_since_last_sync( ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes, SyncContext { syncing_user, .. }: SyncContext<'_>, ) -> Result { - // fetch the syncing user's membership event during the last sync. - // this will be None if `previous_sync_end_shortstatehash` is None. - let membership_during_previous_sync = match last_sync_end_shortstatehash { - | Some(last_sync_end_shortstatehash) => services - .rooms - .state_accessor - .state_get_content( - last_sync_end_shortstatehash, - &StateEventType::RoomMember, - syncing_user.as_str(), - ) - .await - .inspect_err(|_| debug_warn!("User has no previous membership")) - .ok(), - | None => None, + let Some(last_sync_end_shortstatehash) = last_sync_end_shortstatehash else { + // For initial syncs always return false, since there's no "last sync" for the + // user to have joined since. + return Ok(false); }; + // Fetch the syncing user's membership event during the last sync. + let membership_during_previous_sync = services + .rooms + .state_accessor + .state_get_content( + last_sync_end_shortstatehash, + &StateEventType::RoomMember, + syncing_user.as_str(), + ) + .await + .inspect_err(|_| debug_warn!("User has no previous membership")) + .ok(); + // TODO: If the requesting user got state-reset out of the room, this // will be `true` when it shouldn't be. this function should never be called // in that situation, but it may be if the membership cache didn't get updated. diff --git a/src/api/client/sync/v3/left.rs b/src/api/client/sync/v3/left.rs index d833bfdf9..63d14b9ce 100644 --- a/src/api/client/sync/v3/left.rs +++ b/src/api/client/sync/v3/left.rs @@ -181,6 +181,9 @@ pub(super) async fn load_left_room( .collect::>() .await; + let state_events = + StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect()); + Ok(Some(assign!(LeftRoom::new(), { account_data: RoomAccountData::new(), timeline: assign!(Timeline::new(), { @@ -188,7 +191,11 @@ pub(super) async fn load_left_room( prev_batch: Some(current_count.to_string()), events: raw_timeline_pdus, }), - state: State::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())), + state: if sync_context.use_state_after { + State::After(state_events) + } else { + State::Before(state_events) + }, }))) } @@ -264,6 +271,8 @@ async fn build_left_state_and_timeline( services, syncing_user, timeline_start_shortstatehash, + leave_shortstatehash, + sync_context.use_state_after, lazily_loaded_members.as_ref(), ) .await?; diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs index 1c3612b97..6b0d81f85 100644 --- a/src/api/client/sync/v3/mod.rs +++ b/src/api/client/sync/v3/mod.rs @@ -11,12 +11,11 @@ use std::{ use axum::extract::State; use axum_client_ip::ClientIp; use conduwuit::{ - Err, Result, at, extract_variant, + Err, Result, at, error, extract_variant, utils::{ ReadyExt, TryFutureExtExt, stream::{BroadbandExt, Tools, WidebandExt}, }, - warn, }; use conduwuit_service::Services; use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture}; @@ -110,6 +109,9 @@ struct SyncContext<'a> { /// The sync filter, which the client uses to specify what data should be /// included in the sync response. filter: &'a FilterDefinition, + /// Whether the state at the end of the timeline should be used when + /// calculating state diffs for sync. + use_state_after: bool, } impl<'a> SyncContext<'a> { @@ -261,6 +263,7 @@ pub(crate) async fn build_sync_events( current_count, full_state, filter: &filter, + use_state_after: body.use_state_after, }; let joined_rooms = services @@ -273,7 +276,7 @@ pub(crate) async fn build_sync_events( match joined_room { | Ok((room, updates)) => Some((room_id, room, updates)), | Err(err) => { - warn!(?err, %room_id, "error loading joined room"); + error!(?err, %room_id, "error loading joined room"); None }, } @@ -302,7 +305,7 @@ pub(crate) async fn build_sync_events( | Ok(Some(left_room)) => Some((room_id, left_room)), | Ok(None) => None, | Err(err) => { - warn!(?err, %room_id, "error loading joined room"); + error!(?err, %room_id, "error loading joined room"); None }, } diff --git a/src/api/client/sync/v3/state.rs b/src/api/client/sync/v3/state.rs index b22074d89..0da053d65 100644 --- a/src/api/client/sync/v3/state.rs +++ b/src/api/client/sync/v3/state.rs @@ -1,11 +1,8 @@ -use std::{collections::BTreeSet, ops::ControlFlow}; +use std::collections::HashSet; use conduwuit::{ - Result, at, is_equal_to, - matrix::{ - Event, - pdu::{PduCount, PduEvent}, - }, + Result, at, + matrix::{Event, pdu::PduEvent}, utils::{ BoolExt, IterStream, ReadyExt, TryFutureExtExt, stream::{BroadbandExt, TryIgnore}, @@ -16,9 +13,7 @@ use conduwuit_service::{ rooms::{lazy_loading::MemberSet, short::ShortStateHash}, }; use futures::{FutureExt, StreamExt}; -use itertools::Itertools; -use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType}; -use service::rooms::short::ShortEventId; +use ruma::{OwnedEventId, UserId, events::StateEventType}; use tracing::trace; use crate::client::TimelinePdus; @@ -39,13 +34,19 @@ pub(super) async fn build_state_initial( services: &Services, sender_user: &UserId, timeline_start_shortstatehash: ShortStateHash, + timeline_end_shortstatehash: ShortStateHash, + use_state_after: bool, lazily_loaded_members: Option<&MemberSet>, ) -> Result> { // load the keys and event IDs of the state events at the start of the timeline let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services .rooms .state_accessor - .state_full_ids(timeline_start_shortstatehash) + .state_full_ids(if use_state_after { + timeline_end_shortstatehash + } else { + timeline_start_shortstatehash + }) .unzip() .await; @@ -92,82 +93,34 @@ pub(super) async fn build_state_initial( pub(super) async fn build_state_incremental<'a>( services: &Services, sender_user: &'a UserId, - room_id: &RoomId, - last_sync_end_count: PduCount, last_sync_end_shortstatehash: ShortStateHash, timeline_start_shortstatehash: ShortStateHash, timeline_end_shortstatehash: ShortStateHash, timeline: &TimelinePdus, + use_state_after: bool, lazily_loaded_members: Option<&'a MemberSet>, ) -> Result> { - /* - NB: a limited sync is one where `timeline.limited == true`. Synapse calls this a "gappy" sync internally. + let mut state_event_ids: HashSet = HashSet::new(); - The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described - by the Matrix specification. This is because the specification's description of the `state` property does not accurately - reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include: - 1. We do not compute the delta using the naive approach of "every state event from the end of the last sync - up to the start of this sync's timeline". see below for details. - 2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined - elsewhere and supplied to this function in the `lazily_loaded_members` parameter. - */ + trace!( + %use_state_after, + %last_sync_end_shortstatehash, + %timeline_start_shortstatehash, + %timeline_end_shortstatehash, + "computing state for incremental sync" + ); - /* - the `state` property of an incremental sync which isn't limited are _usually_ empty. - (note: the specification says that the `state` property is _always_ empty for limited syncs, which is incorrect.) - however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`), - the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state - at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline - merged a split in the DAG. + // Fetch lazy-loaded membership events if lazy-loading is enabled + if let Some(lazily_loaded_members) = lazily_loaded_members + && !lazily_loaded_members.is_empty() + { + trace!("including lazy membership events for members: {:?}", lazily_loaded_members); - see: https://github.com/element-hq/synapse/issues/16941 - */ - - let timeline_is_linear = timeline.pdus.is_empty() || { - let last_pdu_of_last_sync = services + services .rooms - .timeline - .pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1))) - .boxed() - .next() - .await - .transpose() - .expect("last sync should have had some PDUs") - .map(at!(1)); - - // make sure the prev_events of each pdu in the timeline refer only to the - // previous pdu - timeline - .pdus - .iter() - .try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| { - if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() { - if prev_event_id - .as_ref() - .is_none_or(is_equal_to!(pdu_prev_event_id)) - { - return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned())); - } - } - - trace!( - "pdu {:?} has split prev_events (expected {:?}): {:?}", - pdu.event_id, prev_event_id, pdu.prev_events - ); - ControlFlow::Break(()) - }) - .is_continue() - }; - - if timeline_is_linear && !timeline.limited { - // if there are no splits in the DAG and the timeline isn't limited, then - // `state` will always be empty unless lazy loading is enabled. - - if let Some(lazily_loaded_members) = lazily_loaded_members { - if !timeline.pdus.is_empty() { - // lazy loading is enabled, so we return the membership events which were - // requested by the caller. - let lazy_membership_events: Vec<_> = lazily_loaded_members + .short + .multi_get_eventid_from_short::<'_, OwnedEventId, _>( + lazily_loaded_members .iter() .stream() .broad_filter_map(|user_id| async move { @@ -178,71 +131,24 @@ pub(super) async fn build_state_incremental<'a>( services .rooms .state_accessor - .state_get( + .state_get_shortid( timeline_start_shortstatehash, &StateEventType::RoomMember, user_id.as_str(), ) .ok() .await - }) - .collect() - .await; - - if !lazy_membership_events.is_empty() { - trace!( - "syncing lazy membership events for members: {:?}", - lazy_membership_events - .iter() - .map(|pdu| pdu.state_key().unwrap()) - .collect::>() - ); - } - return Ok(lazy_membership_events); - } - } - - // lazy loading is disabled, `state` is empty. - return Ok(vec![]); + }), + ) + .ignore_err() + .ready_for_each(|event_id| { + state_event_ids.insert(event_id); + }) + .await; } - /* - at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates - computing the incremental state (which may be empty). - - NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included - in the incremental state. therefore, the incremental state may include "redundant" membership events, - which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`, - and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that - the performance penalty is acceptable. - */ - - trace!(%timeline_is_linear, %timeline.limited, "computing state for incremental sync"); - - // fetch the shorteventids of state events in the timeline - let state_events_in_timeline: BTreeSet = services - .rooms - .short - .multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| { - if pdu.state_key().is_some() { - Some(pdu.event_id.as_ref()) - } else { - None - } - })) - .collect() - .await; - - trace!("{} state events in timeline", state_events_in_timeline.len()); - - /* - fetch the state events which were added since the last sync. - - specifically we fetch the difference between the state at the last sync and the state at the _end_ - of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched. - this is necessary to account for splits in the DAG, as explained above. - */ - let state_diff = services + // Fetch the state events added since the last sync. + services .rooms .short .multi_get_eventid_from_short::<'_, OwnedEventId, _>( @@ -252,18 +158,29 @@ pub(super) async fn build_state_incremental<'a>( .state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash)) .await? .stream() - .ready_filter_map(|(_, shorteventid)| { - if state_events_in_timeline.contains(&shorteventid) { - None - } else { - Some(shorteventid) - } - }), + .map(at!(1)), ) - .ignore_err(); + .ignore_err() + .ready_for_each(|event_id| { + state_event_ids.insert(event_id); + }) + .await; - // finally, fetch the PDU contents and collect them into a vec - let state_diff_pdus = state_diff + if !use_state_after { + // If state_after isn't enabled, filter out state events which also exist + // in the timeline. If splits exist in the DAG, this may not be exactly the same + // thing as the state diff ending at the start of the timeline, but Synapse + // also does this and it's technically more useful behavior anyway. + // See: https://github.com/element-hq/synapse/issues/16941 + + for (_, pdu) in &timeline.pdus { + state_event_ids.remove(pdu.event_id()); + } + } + + // Finally, fetch the PDU contents and collect them into a vec + let state_diff_pdus = state_event_ids + .stream() .broad_filter_map(|event_id| async move { services .rooms diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index c396cbdb9..59a79e768 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -15,7 +15,7 @@ use conduwuit::{ BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, future::ReadyEqExt, math::{ruma_from_usize, usize_from_ruma}, - stream::WidebandExt, + stream::{TryIgnore, WidebandExt}, }, warn, }; @@ -41,6 +41,7 @@ use ruma::{ uint, }; use service::account_data::AnyRawAccountDataEvent; +use tokio::pin; use super::share_encrypted_room; use crate::{ @@ -858,12 +859,27 @@ where continue; }; - let since_shortstatehash = services - .rooms - .user - .get_token_shortstatehash(room_id, globalsince) - .await - .ok(); + let since_shortstatehash = async { + pin! { + let pdus_rev = services + .rooms + .timeline + .pdus_rev(room_id, Some(PduCount::Normal(globalsince.saturating_sub(1)))) + .ignore_err(); + } + + let (_, pdu_at_last_sync_end) = pdus_rev.next().await?; + + Some( + services + .rooms + .state_accessor + .pdu_shortstatehash(&pdu_at_last_sync_end.event_id) + .await + .expect("pdu should have a shortstatehash"), + ) + } + .await; let encrypted_room = services .rooms diff --git a/src/database/maps.rs b/src/database/maps.rs index e8eb02331..03fbbda72 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -193,12 +193,7 @@ pub(super) static MAPS: &[Descriptor] = &[ }, Descriptor { name: "roomsynctoken_shortstatehash", - file_shape: 3, - val_size_hint: Some(8), - block_size: 512, - compression_level: 3, - bottommost_level: Some(6), - ..descriptor::SEQUENTIAL + ..descriptor::DROPPED }, Descriptor { name: "roomuserdataid_accountdata", diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index bd76f1f4c..c6a9f9cca 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -1,10 +1,10 @@ use std::sync::Arc; use conduwuit::{Result, implement}; -use database::{Database, Deserialized, Map}; +use database::{Deserialized, Map}; use ruma::{RoomId, UserId}; -use crate::{Dep, globals, rooms, rooms::short::ShortStateHash}; +use crate::{Dep, globals}; pub struct Service { db: Data, @@ -12,32 +12,25 @@ pub struct Service { } struct Data { - db: Arc, userroomid_notificationcount: Arc, userroomid_highlightcount: Arc, roomuserid_lastnotificationread: Arc, - roomsynctoken_shortstatehash: Arc, } struct Services { globals: Dep, - short: Dep, } impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { db: Data { - db: args.db.clone(), userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(), - roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(), }, - services: Services { globals: args.depend::("globals"), - short: args.depend::("rooms::short"), }, })) } @@ -90,40 +83,3 @@ pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) - .deserialized() .unwrap_or(0) } - -#[implement(Service)] -pub async fn associate_token_shortstatehash( - &self, - room_id: &RoomId, - token: u64, - shortstatehash: ShortStateHash, -) { - let shortroomid = self - .services - .short - .get_shortroomid(room_id) - .await - .expect("room exists"); - - let _cork = self.db.db.cork(); - let key: &[u64] = &[shortroomid, token]; - self.db - .roomsynctoken_shortstatehash - .put(key, shortstatehash); -} - -#[implement(Service)] -pub async fn get_token_shortstatehash( - &self, - room_id: &RoomId, - token: u64, -) -> Result { - let shortroomid = self.services.short.get_shortroomid(room_id).await?; - - let key: &[u64] = &[shortroomid, token]; - self.db - .roomsynctoken_shortstatehash - .qry(key) - .await - .deserialized() -}