Files
continuwuity/src/service/rooms/timeline/data.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

318 lines
9.3 KiB
Rust
Raw Normal View History

use std::{
collections::{hash_map, HashMap},
mem::size_of,
2024-08-08 17:18:30 +00:00
sync::Arc,
};
2022-09-06 23:15:09 +02:00
2024-08-08 17:18:30 +00:00
use conduit::{
err, expected,
result::{LogErr, NotFound},
utils,
2024-10-16 11:33:24 +00:00
utils::{future::TryExtExt, stream::TryIgnore, u64_from_u8, ReadyExt},
2024-08-08 17:18:30 +00:00
Err, PduCount, PduEvent, Result,
};
2024-10-07 17:54:27 +00:00
use database::{Database, Deserialized, Json, KeyVal, Map};
2024-10-16 11:33:24 +00:00
use futures::{Stream, StreamExt};
2024-08-08 17:18:30 +00:00
use ruma::{CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId};
use tokio::sync::Mutex;
2022-09-06 23:15:09 +02:00
2024-07-18 06:37:47 +00:00
use crate::{rooms, Dep};
2023-02-20 22:59:45 +01:00
2024-06-28 22:51:39 +00:00
pub(super) struct Data {
2024-07-18 06:37:47 +00:00
eventid_outlierpdu: Arc<Map>,
2024-06-28 22:51:39 +00:00
eventid_pduid: Arc<Map>,
pduid_pdu: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
2024-07-18 06:37:47 +00:00
userroomid_notificationcount: Arc<Map>,
pub(super) lasttimelinecount_cache: LastTimelineCountCache,
2024-07-18 06:37:47 +00:00
pub(super) db: Arc<Database>,
services: Services,
}
struct Services {
short: Dep<rooms::short::Service>,
}
2024-05-26 21:29:19 +00:00
2024-08-08 17:18:30 +00:00
pub type PdusIterItem = (PduCount, PduEvent);
type LastTimelineCountCache = Mutex<HashMap<OwnedRoomId, PduCount>>;
2024-06-28 23:05:45 +00:00
2024-05-27 03:17:20 +00:00
impl Data {
2024-07-18 06:37:47 +00:00
pub(super) fn new(args: &crate::Args<'_>) -> Self {
let db = &args.db;
2024-05-27 03:17:20 +00:00
Self {
2024-07-18 06:37:47 +00:00
eventid_outlierpdu: db["eventid_outlierpdu"].clone(),
2024-06-28 22:51:39 +00:00
eventid_pduid: db["eventid_pduid"].clone(),
pduid_pdu: db["pduid_pdu"].clone(),
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
2024-07-18 06:37:47 +00:00
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
lasttimelinecount_cache: Mutex::new(HashMap::new()),
2024-07-18 06:37:47 +00:00
db: args.db.clone(),
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
},
2024-05-27 03:17:20 +00:00
}
}
2024-08-08 17:18:30 +00:00
pub(super) async fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount> {
2024-05-26 21:29:19 +00:00
match self
.lasttimelinecount_cache
.lock()
2024-08-08 17:18:30 +00:00
.await
2024-05-26 21:29:19 +00:00
.entry(room_id.to_owned())
{
hash_map::Entry::Vacant(v) => {
if let Some(last_count) = self
2024-08-08 17:18:30 +00:00
.pdus_until(sender_user, room_id, PduCount::max())
.await?
.next()
.await
{
2024-05-26 21:29:19 +00:00
Ok(*v.insert(last_count.0))
} else {
Ok(PduCount::Normal(0))
}
},
hash_map::Entry::Occupied(o) => Ok(*o.get()),
}
}
/// Returns the `count` of this pdu's id.
2024-08-08 17:18:30 +00:00
pub(super) async fn get_pdu_count(&self, event_id: &EventId) -> Result<PduCount> {
2024-05-26 21:29:19 +00:00
self.eventid_pduid
.get(event_id)
2024-08-08 17:18:30 +00:00
.await
2024-05-26 21:29:19 +00:00
.map(|pdu_id| pdu_count(&pdu_id))
}
/// Returns the json of a pdu.
2024-08-08 17:18:30 +00:00
pub(super) async fn get_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
if let Ok(pdu) = self.get_non_outlier_pdu_json(event_id).await {
return Ok(pdu);
}
self.eventid_outlierpdu.get(event_id).await.deserialized()
2024-05-26 21:29:19 +00:00
}
/// Returns the json of a pdu.
2024-08-08 17:18:30 +00:00
pub(super) async fn get_non_outlier_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
let pduid = self.get_pdu_id(event_id).await?;
self.pduid_pdu.get(&pduid).await.deserialized()
2024-05-26 21:29:19 +00:00
}
/// Returns the pdu's id.
#[inline]
2024-08-08 17:18:30 +00:00
pub(super) async fn get_pdu_id(&self, event_id: &EventId) -> Result<database::Handle<'_>> {
self.eventid_pduid.get(event_id).await
2024-05-27 03:17:20 +00:00
}
2024-05-26 21:29:19 +00:00
/// Returns the pdu directly from `eventid_pduid` only.
2024-08-08 17:18:30 +00:00
pub(super) async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
let pduid = self.get_pdu_id(event_id).await?;
self.pduid_pdu.get(&pduid).await.deserialized()
2024-08-08 17:18:30 +00:00
}
/// Like get_non_outlier_pdu(), but without the expense of fetching and
/// parsing the PduEvent
2024-10-16 11:33:24 +00:00
pub(super) async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result {
2024-08-08 17:18:30 +00:00
let pduid = self.get_pdu_id(event_id).await?;
2024-10-16 11:33:24 +00:00
self.pduid_pdu.get(&pduid).await.map(|_| ())
2024-05-26 21:29:19 +00:00
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
2024-08-08 17:18:30 +00:00
pub(super) async fn get_pdu(&self, event_id: &EventId) -> Result<Arc<PduEvent>> {
if let Ok(pdu) = self.get_non_outlier_pdu(event_id).await {
return Ok(Arc::new(pdu));
2024-05-26 21:29:19 +00:00
}
2024-08-08 17:18:30 +00:00
self.eventid_outlierpdu
.get(event_id)
2024-08-08 17:18:30 +00:00
.await
2024-09-28 15:14:48 +00:00
.deserialized()
2024-08-08 17:18:30 +00:00
.map(Arc::new)
}
/// Like get_non_outlier_pdu(), but without the expense of fetching and
/// parsing the PduEvent
2024-10-16 11:33:24 +00:00
pub(super) async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result {
self.eventid_outlierpdu.get(event_id).await.map(|_| ())
2024-08-08 17:18:30 +00:00
}
/// Like get_pdu(), but without the expense of fetching and parsing the data
pub(super) async fn pdu_exists(&self, event_id: &EventId) -> bool {
2024-10-16 11:33:24 +00:00
let non_outlier = self.non_outlier_pdu_exists(event_id).is_ok();
let outlier = self.outlier_pdu_exists(event_id).is_ok();
2024-08-08 17:18:30 +00:00
//TODO: parallelize
non_outlier.await || outlier.await
2024-05-26 21:29:19 +00:00
}
/// Returns the pdu.
///
/// This does __NOT__ check the outliers `Tree`.
2024-08-08 17:18:30 +00:00
pub(super) async fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result<PduEvent> {
self.pduid_pdu.get(pdu_id).await.deserialized()
2024-05-26 21:29:19 +00:00
}
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
2024-08-08 17:18:30 +00:00
pub(super) async fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result<CanonicalJsonObject> {
self.pduid_pdu.get(pdu_id).await.deserialized()
2024-05-26 21:29:19 +00:00
}
2024-08-08 17:18:30 +00:00
pub(super) async fn append_pdu(&self, pdu_id: &[u8], pdu: &PduEvent, json: &CanonicalJsonObject, count: u64) {
2024-10-07 17:54:27 +00:00
self.pduid_pdu.raw_put(pdu_id, Json(json));
self.lasttimelinecount_cache
2024-05-26 21:29:19 +00:00
.lock()
2024-08-08 17:18:30 +00:00
.await
2024-05-26 21:29:19 +00:00
.insert(pdu.room_id.clone(), PduCount::Normal(count));
2024-08-08 17:18:30 +00:00
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id);
self.eventid_outlierpdu.remove(pdu.event_id.as_bytes());
2024-05-26 21:29:19 +00:00
}
2024-08-08 17:18:30 +00:00
pub(super) fn prepend_backfill_pdu(&self, pdu_id: &[u8], event_id: &EventId, json: &CanonicalJsonObject) {
2024-10-07 17:54:27 +00:00
self.pduid_pdu.raw_put(pdu_id, Json(json));
self.eventid_pduid.insert(event_id, pdu_id);
self.eventid_outlierpdu.remove(event_id);
2024-05-26 21:29:19 +00:00
}
/// Removes a pdu and creates a new one with the same id.
2024-10-16 11:33:24 +00:00
pub(super) async fn replace_pdu(&self, pdu_id: &[u8], pdu_json: &CanonicalJsonObject, _pdu: &PduEvent) -> Result {
if self.pduid_pdu.get(pdu_id).await.is_not_found() {
2024-08-08 17:18:30 +00:00
return Err!(Request(NotFound("PDU does not exist.")));
2024-05-26 21:29:19 +00:00
}
2024-10-16 11:33:24 +00:00
self.pduid_pdu.raw_put(pdu_id, Json(pdu_json));
2024-08-08 17:18:30 +00:00
2024-05-26 21:29:19 +00:00
Ok(())
}
/// Returns an iterator over all events and their tokens in a room that
/// happened before the event with id `until` in reverse-chronological
/// order.
2024-08-08 17:18:30 +00:00
pub(super) async fn pdus_until<'a>(
&'a self, user_id: &'a UserId, room_id: &'a RoomId, until: PduCount,
) -> Result<impl Stream<Item = PdusIterItem> + Send + 'a> {
let (prefix, current) = self.count_to_id(room_id, until, 1, true).await?;
let stream = self
.pduid_pdu
.rev_raw_stream_from(&current)
.ignore_err()
.ready_take_while(move |(key, _)| key.starts_with(&prefix))
.map(move |item| Self::each_pdu(item, user_id));
Ok(stream)
2024-05-26 21:29:19 +00:00
}
2024-08-08 17:18:30 +00:00
pub(super) async fn pdus_after<'a>(
&'a self, user_id: &'a UserId, room_id: &'a RoomId, from: PduCount,
) -> Result<impl Stream<Item = PdusIterItem> + Send + 'a> {
let (prefix, current) = self.count_to_id(room_id, from, 1, false).await?;
let stream = self
.pduid_pdu
.raw_stream_from(&current)
.ignore_err()
.ready_take_while(move |(key, _)| key.starts_with(&prefix))
.map(move |item| Self::each_pdu(item, user_id));
Ok(stream)
}
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: &UserId) -> PdusIterItem {
let mut pdu =
serde_json::from_slice::<PduEvent>(pdu).expect("PduEvent in pduid_pdu database column is invalid JSON");
if pdu.sender != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
let count = pdu_count(pdu_id);
(count, pdu)
2024-05-26 21:29:19 +00:00
}
2024-05-27 03:17:20 +00:00
pub(super) fn increment_notification_counts(
2024-05-26 21:29:19 +00:00
&self, room_id: &RoomId, notifies: Vec<OwnedUserId>, highlights: Vec<OwnedUserId>,
2024-08-08 17:18:30 +00:00
) {
let _cork = self.db.cork();
2024-05-26 21:29:19 +00:00
for user in notifies {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
2024-08-08 17:18:30 +00:00
increment(&self.userroomid_notificationcount, &userroom_id);
2024-05-26 21:29:19 +00:00
}
2024-08-08 17:18:30 +00:00
2024-05-26 21:29:19 +00:00
for user in highlights {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
2024-08-08 17:18:30 +00:00
increment(&self.userroomid_highlightcount, &userroom_id);
2024-05-26 21:29:19 +00:00
}
}
2024-07-18 06:37:47 +00:00
2024-08-08 17:18:30 +00:00
pub(super) async fn count_to_id(
2024-07-18 06:37:47 +00:00
&self, room_id: &RoomId, count: PduCount, offset: u64, subtract: bool,
) -> Result<(Vec<u8>, Vec<u8>)> {
let prefix = self
.services
.short
2024-08-08 17:18:30 +00:00
.get_shortroomid(room_id)
.await
.map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?
2024-07-18 06:37:47 +00:00
.to_be_bytes()
.to_vec();
2024-08-08 17:18:30 +00:00
2024-07-18 06:37:47 +00:00
let mut pdu_id = prefix.clone();
// +1 so we don't send the base event
let count_raw = match count {
PduCount::Normal(x) => {
if subtract {
x.saturating_sub(offset)
} else {
x.saturating_add(offset)
}
},
PduCount::Backfilled(x) => {
pdu_id.extend_from_slice(&0_u64.to_be_bytes());
let num = u64::MAX.saturating_sub(x);
if subtract {
num.saturating_sub(offset)
} else {
num.saturating_add(offset)
}
},
};
pdu_id.extend_from_slice(&count_raw.to_be_bytes());
Ok((prefix, pdu_id))
}
2024-05-26 21:29:19 +00:00
}
/// Returns the `count` of this pdu's id.
2024-08-08 17:18:30 +00:00
pub(super) fn pdu_count(pdu_id: &[u8]) -> PduCount {
const STRIDE: usize = size_of::<u64>();
2024-07-07 04:46:16 +00:00
let pdu_id_len = pdu_id.len();
2024-08-08 17:18:30 +00:00
let last_u64 = u64_from_u8(&pdu_id[expected!(pdu_id_len - STRIDE)..]);
let second_last_u64 = u64_from_u8(&pdu_id[expected!(pdu_id_len - 2 * STRIDE)..expected!(pdu_id_len - STRIDE)]);
2024-05-26 21:29:19 +00:00
2024-08-08 17:18:30 +00:00
if second_last_u64 == 0 {
PduCount::Backfilled(u64::MAX.saturating_sub(last_u64))
2024-05-26 21:29:19 +00:00
} else {
2024-08-08 17:18:30 +00:00
PduCount::Normal(last_u64)
2024-05-26 21:29:19 +00:00
}
}
2024-08-08 17:18:30 +00:00
//TODO: this is an ABA
fn increment(db: &Arc<Map>, key: &[u8]) {
2024-09-29 07:37:43 +00:00
let old = db.get_blocking(key);
2024-08-08 17:18:30 +00:00
let new = utils::increment(old.ok().as_deref());
2024-10-07 17:54:27 +00:00
db.insert(key, new);
2024-08-08 17:18:30 +00:00
}