mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
fix(sync/v3): Cache shortstatehashes to speed up migration
This commit is contained in:
+25
-15
@@ -1,7 +1,7 @@
|
|||||||
use std::cmp;
|
use std::{cmp, collections::HashMap};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Pdu, Result, debug, debug_info, debug_warn, error, info, pair_of,
|
Err, Pdu, Result, debug, debug_info, debug_warn, error, info,
|
||||||
result::NotFound,
|
result::NotFound,
|
||||||
utils::{
|
utils::{
|
||||||
IterStream, ReadyExt,
|
IterStream, ReadyExt,
|
||||||
@@ -13,7 +13,7 @@ use database::Json;
|
|||||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
OwnedUserId, RoomId, UserId,
|
OwnedRoomId, OwnedUserId, RoomId, UserId,
|
||||||
events::{
|
events::{
|
||||||
GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent,
|
GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent,
|
||||||
room::member::MembershipState,
|
room::member::MembershipState,
|
||||||
@@ -22,7 +22,7 @@ use ruma::{
|
|||||||
serde::Raw,
|
serde::Raw,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{Services, media};
|
use crate::{Services, media, rooms::short::ShortStateHash};
|
||||||
|
|
||||||
/// The current schema version.
|
/// The current schema version.
|
||||||
/// - If database is opened at greater version we reject with error. The
|
/// - If database is opened at greater version we reject with error. The
|
||||||
@@ -649,20 +649,30 @@ async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
|
|||||||
let cork = db.cork_and_sync();
|
let cork = db.cork_and_sync();
|
||||||
let userroomid_leftstate = db["userroomid_leftstate"].clone();
|
let userroomid_leftstate = db["userroomid_leftstate"].clone();
|
||||||
|
|
||||||
let (total, fixed) = userroomid_leftstate
|
let (total, fixed, _) = userroomid_leftstate
|
||||||
.stream()
|
.stream()
|
||||||
.try_fold(
|
.try_fold(
|
||||||
(0_usize, 0_usize),
|
(0_usize, 0_usize, HashMap::<OwnedRoomId, ShortStateHash>::new()),
|
||||||
async |(mut total, mut fixed): pair_of!(usize),
|
async |(mut total, mut fixed, mut shortstatehash_cache): (
|
||||||
|
usize,
|
||||||
|
usize,
|
||||||
|
HashMap<_, _>,
|
||||||
|
),
|
||||||
((user_id, room_id), state): KeyVal<'_>|
|
((user_id, room_id), state): KeyVal<'_>|
|
||||||
-> Result<pair_of!(usize)> {
|
-> Result<(usize, usize, HashMap<_, _>)> {
|
||||||
if matches!(state.deserialize(), Err(_)) {
|
if matches!(state.deserialize(), Err(_)) {
|
||||||
let Ok(latest_shortstatehash) =
|
let latest_shortstatehash =
|
||||||
services.rooms.state.get_room_shortstatehash(room_id).await
|
if let Some(shortstatehash) = shortstatehash_cache.get(room_id) {
|
||||||
else {
|
*shortstatehash
|
||||||
warn!(?room_id, ?user_id, "room has no shortstatehash");
|
} else if let Ok(shortstatehash) =
|
||||||
return Ok((total, fixed));
|
services.rooms.state.get_room_shortstatehash(room_id).await
|
||||||
};
|
{
|
||||||
|
shortstatehash_cache.insert(room_id.to_owned(), shortstatehash);
|
||||||
|
shortstatehash
|
||||||
|
} else {
|
||||||
|
warn!(?room_id, ?user_id, "room has no shortstatehash");
|
||||||
|
return Ok((total, fixed, shortstatehash_cache));
|
||||||
|
};
|
||||||
|
|
||||||
let leave_state_event = services
|
let leave_state_event = services
|
||||||
.rooms
|
.rooms
|
||||||
@@ -692,7 +702,7 @@ async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
|
|||||||
}
|
}
|
||||||
|
|
||||||
total = total.saturating_add(1);
|
total = total.saturating_add(1);
|
||||||
Ok((total, fixed))
|
Ok((total, fixed, shortstatehash_cache))
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|||||||
Reference in New Issue
Block a user