mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
feat: Remove all uses of roomsynctoken_shortstatehash
This commit is contained in:
@@ -48,6 +48,13 @@ async fn load_timeline(
|
||||
ending_count: Option<PduCount>,
|
||||
limit: usize,
|
||||
) -> Result<TimelinePdus> {
|
||||
if let (Some(starting_count), Some(ending_count)) = (starting_count, ending_count) {
|
||||
debug_assert!(
|
||||
starting_count <= ending_count,
|
||||
"starting count {starting_count} > ending count {ending_count}"
|
||||
);
|
||||
}
|
||||
|
||||
let mut pdu_stream = match starting_count {
|
||||
| Some(starting_count) => {
|
||||
let last_timeline_count = services
|
||||
|
||||
@@ -38,6 +38,7 @@ use ruma::{
|
||||
uint,
|
||||
};
|
||||
use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash};
|
||||
use tokio::pin;
|
||||
|
||||
use super::{load_timeline, share_encrypted_room};
|
||||
use crate::client::{
|
||||
@@ -344,7 +345,7 @@ struct ShortStateHashes {
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn fetch_shortstatehashes(
|
||||
services: &Services,
|
||||
SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>,
|
||||
SyncContext { last_sync_end_count, .. }: SyncContext<'_>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<ShortStateHashes> {
|
||||
// the room state currently.
|
||||
@@ -360,20 +361,25 @@ async fn fetch_shortstatehashes(
|
||||
// this will be None if we are doing an initial sync or if we just joined this
|
||||
// room.
|
||||
let last_sync_end_shortstatehash =
|
||||
OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| {
|
||||
// look up the shortstatehash saved by the last sync's call to
|
||||
// `associate_token_shortstatehash`
|
||||
OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| {
|
||||
pin! {
|
||||
let pdus_rev = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(room_id, Some(PduCount::Normal(last_sync_end_count.saturating_sub(1))))
|
||||
.ignore_err();
|
||||
}
|
||||
|
||||
let (_, pdu_at_last_sync_end) = pdus_rev.next().await?;
|
||||
|
||||
Some(
|
||||
services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, last_sync_end_count)
|
||||
.inspect_err(move |_| {
|
||||
debug_warn!(
|
||||
token = last_sync_end_count,
|
||||
"Room has no shortstatehash for this token"
|
||||
);
|
||||
})
|
||||
.ok()
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&pdu_at_last_sync_end.event_id)
|
||||
.await
|
||||
.expect("pdu should have a shortstatehash"),
|
||||
)
|
||||
}))
|
||||
.map(Option::flatten)
|
||||
.map(Ok);
|
||||
@@ -381,20 +387,6 @@ async fn fetch_shortstatehashes(
|
||||
let (current_shortstatehash, last_sync_end_shortstatehash) =
|
||||
try_join(current_shortstatehash, last_sync_end_shortstatehash).await?;
|
||||
|
||||
/*
|
||||
associate the `current_count` with the `current_shortstatehash`, so we can
|
||||
use it on the next sync as the `last_sync_end_shortstatehash`.
|
||||
|
||||
TODO: the table written to by this call grows extremely fast, gaining one new entry for each
|
||||
joined room on _every single sync request_. we need to find a better way to remember the shortstatehash
|
||||
between syncs.
|
||||
*/
|
||||
services
|
||||
.rooms
|
||||
.user
|
||||
.associate_token_shortstatehash(room_id, current_count, current_shortstatehash)
|
||||
.await;
|
||||
|
||||
Ok(ShortStateHashes {
|
||||
current_shortstatehash,
|
||||
last_sync_end_shortstatehash,
|
||||
|
||||
@@ -15,7 +15,7 @@ use conduwuit::{
|
||||
BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||
future::ReadyEqExt,
|
||||
math::{ruma_from_usize, usize_from_ruma},
|
||||
stream::WidebandExt,
|
||||
stream::{TryIgnore, WidebandExt},
|
||||
},
|
||||
warn,
|
||||
};
|
||||
@@ -41,6 +41,7 @@ use ruma::{
|
||||
uint,
|
||||
};
|
||||
use service::account_data::AnyRawAccountDataEvent;
|
||||
use tokio::pin;
|
||||
|
||||
use super::share_encrypted_room;
|
||||
use crate::{
|
||||
@@ -858,12 +859,27 @@ where
|
||||
continue;
|
||||
};
|
||||
|
||||
let since_shortstatehash = services
|
||||
let since_shortstatehash = async {
|
||||
pin! {
|
||||
let pdus_rev = services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, globalsince)
|
||||
.timeline
|
||||
.pdus_rev(room_id, Some(PduCount::Normal(globalsince.saturating_sub(1))))
|
||||
.ignore_err();
|
||||
}
|
||||
|
||||
let (_, pdu_at_last_sync_end) = pdus_rev.next().await?;
|
||||
|
||||
Some(
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&pdu_at_last_sync_end.event_id)
|
||||
.await
|
||||
.ok();
|
||||
.expect("pdu should have a shortstatehash"),
|
||||
)
|
||||
}
|
||||
.await;
|
||||
|
||||
let encrypted_room = services
|
||||
.rooms
|
||||
|
||||
@@ -193,12 +193,7 @@ pub(super) static MAPS: &[Descriptor] = &[
|
||||
},
|
||||
Descriptor {
|
||||
name: "roomsynctoken_shortstatehash",
|
||||
file_shape: 3,
|
||||
val_size_hint: Some(8),
|
||||
block_size: 512,
|
||||
compression_level: 3,
|
||||
bottommost_level: Some(6),
|
||||
..descriptor::SEQUENTIAL
|
||||
..descriptor::DROPPED
|
||||
},
|
||||
Descriptor {
|
||||
name: "roomuserdataid_accountdata",
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{Result, implement};
|
||||
use database::{Database, Deserialized, Map};
|
||||
use database::{Deserialized, Map};
|
||||
use ruma::{RoomId, UserId};
|
||||
|
||||
use crate::{Dep, globals, rooms, rooms::short::ShortStateHash};
|
||||
use crate::{Dep, globals};
|
||||
|
||||
pub struct Service {
|
||||
db: Data,
|
||||
@@ -12,32 +12,25 @@ pub struct Service {
|
||||
}
|
||||
|
||||
struct Data {
|
||||
db: Arc<Database>,
|
||||
userroomid_notificationcount: Arc<Map>,
|
||||
userroomid_highlightcount: Arc<Map>,
|
||||
roomuserid_lastnotificationread: Arc<Map>,
|
||||
roomsynctoken_shortstatehash: Arc<Map>,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
globals: Dep<globals::Service>,
|
||||
short: Dep<rooms::short::Service>,
|
||||
}
|
||||
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
db: Data {
|
||||
db: args.db.clone(),
|
||||
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
|
||||
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
|
||||
roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(),
|
||||
roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(),
|
||||
},
|
||||
|
||||
services: Services {
|
||||
globals: args.depend::<globals::Service>("globals"),
|
||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||
},
|
||||
}))
|
||||
}
|
||||
@@ -90,40 +83,3 @@ pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -
|
||||
.deserialized()
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn associate_token_shortstatehash(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
token: u64,
|
||||
shortstatehash: ShortStateHash,
|
||||
) {
|
||||
let shortroomid = self
|
||||
.services
|
||||
.short
|
||||
.get_shortroomid(room_id)
|
||||
.await
|
||||
.expect("room exists");
|
||||
|
||||
let _cork = self.db.db.cork();
|
||||
let key: &[u64] = &[shortroomid, token];
|
||||
self.db
|
||||
.roomsynctoken_shortstatehash
|
||||
.put(key, shortstatehash);
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn get_token_shortstatehash(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
token: u64,
|
||||
) -> Result<ShortStateHash> {
|
||||
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
|
||||
|
||||
let key: &[u64] = &[shortroomid, token];
|
||||
self.db
|
||||
.roomsynctoken_shortstatehash
|
||||
.qry(key)
|
||||
.await
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user