Files
continuwuity/src/service/rooms/auth_chain/mod.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

231 lines
6.4 KiB
Rust
Raw Normal View History

mod data;
2024-06-28 22:51:39 +00:00
2022-10-05 20:34:31 +02:00
use std::{
collections::{BTreeSet, HashSet},
fmt::Debug,
2022-10-05 20:34:31 +02:00
sync::Arc,
};
2022-09-06 23:15:09 +02:00
2024-08-08 17:18:30 +00:00
use conduit::{debug, debug_error, trace, utils::IterStream, validated, warn, Err, Result};
2024-11-15 03:44:04 +00:00
use futures::{Stream, StreamExt};
use ruma::{EventId, RoomId};
2021-08-12 23:04:00 +02:00
2024-07-18 06:37:47 +00:00
use self::data::Data;
2024-11-02 06:12:54 +00:00
use crate::{rooms, rooms::short::ShortEventId, Dep};
2021-08-14 19:07:50 +02:00
2024-05-09 15:59:08 -07:00
pub struct Service {
2024-07-18 06:37:47 +00:00
services: Services,
2024-05-27 03:17:20 +00:00
db: Data,
}
2022-06-20 12:08:58 +02:00
2024-07-18 06:37:47 +00:00
struct Services {
short: Dep<rooms::short::Service>,
timeline: Dep<rooms::timeline::Service>,
}
2024-07-04 03:26:19 +00:00
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
2024-07-18 06:37:47 +00:00
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
},
db: Data::new(&args),
2024-07-04 03:26:19 +00:00
}))
2024-05-27 03:17:20 +00:00
}
2024-07-04 03:26:19 +00:00
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub async fn event_ids_iter<'a, I>(
&'a self, room_id: &RoomId, starting_events: I,
) -> Result<impl Stream<Item = Arc<EventId>> + Send + '_>
where
I: Iterator<Item = &'a EventId> + Clone + Debug + ExactSizeIterator + Send + 'a,
{
let stream = self
.get_event_ids(room_id, starting_events)
.await?
.into_iter()
.stream();
Ok(stream)
}
pub async fn get_event_ids<'a, I>(&'a self, room_id: &RoomId, starting_events: I) -> Result<Vec<Arc<EventId>>>
where
I: Iterator<Item = &'a EventId> + Clone + Debug + ExactSizeIterator + Send + 'a,
{
2024-09-25 03:52:28 +00:00
let chain = self.get_auth_chain(room_id, starting_events).await?;
let event_ids = self
.services
.short
.multi_get_eventid_from_short(chain.into_iter())
.await
.into_iter()
.filter_map(Result::ok)
.collect();
2024-09-25 03:52:28 +00:00
Ok(event_ids)
2024-04-10 13:55:09 -07:00
}
2024-06-17 06:32:53 +00:00
#[tracing::instrument(skip_all, name = "auth_chain")]
pub async fn get_auth_chain<'a, I>(&'a self, room_id: &RoomId, starting_events: I) -> Result<Vec<ShortEventId>>
where
I: Iterator<Item = &'a EventId> + Clone + Debug + ExactSizeIterator + Send + 'a,
{
2024-07-07 06:17:58 +00:00
const NUM_BUCKETS: usize = 50; //TODO: change possible w/o disrupting db?
2024-04-10 13:55:09 -07:00
const BUCKET: BTreeSet<(u64, &EventId)> = BTreeSet::new();
let started = std::time::Instant::now();
2024-11-15 03:44:04 +00:00
let mut starting_ids = self
2024-07-18 06:37:47 +00:00
.services
2024-04-10 13:55:09 -07:00
.short
.multi_get_or_create_shorteventid(starting_events.clone())
.zip(starting_events.clone().stream())
2024-11-15 03:44:04 +00:00
.boxed();
let mut buckets = [BUCKET; NUM_BUCKETS];
while let Some((short, starting_event)) = starting_ids.next().await {
2024-07-07 06:17:58 +00:00
let bucket: usize = short.try_into()?;
let bucket: usize = validated!(bucket % NUM_BUCKETS);
buckets[bucket].insert((short, starting_event));
2024-03-05 19:48:54 -05:00
}
2024-04-10 13:55:09 -07:00
debug!(
starting_events = ?starting_events.count(),
2024-04-10 13:55:09 -07:00
elapsed = ?started.elapsed(),
"start",
);
2024-03-05 19:48:54 -05:00
let mut hits: usize = 0;
let mut misses: usize = 0;
let mut full_auth_chain = Vec::with_capacity(buckets.len());
2022-10-05 18:36:12 +02:00
for chunk in buckets {
if chunk.is_empty() {
continue;
}
2024-03-05 19:48:54 -05:00
2024-11-02 06:12:54 +00:00
let chunk_key: Vec<ShortEventId> = chunk.iter().map(|(short, _)| short).copied().collect();
2024-09-25 03:52:28 +00:00
if let Ok(cached) = self.get_cached_eventid_authchain(&chunk_key).await {
2024-05-04 19:24:48 +00:00
trace!("Found cache entry for whole chunk");
2022-10-05 18:36:12 +02:00
full_auth_chain.extend(cached.iter().copied());
hits = hits.saturating_add(1);
2022-10-05 18:36:12 +02:00
continue;
}
2024-03-05 19:48:54 -05:00
let mut hits2: usize = 0;
let mut misses2: usize = 0;
let mut chunk_cache = Vec::with_capacity(chunk.len());
2022-10-05 18:36:12 +02:00
for (sevent_id, event_id) in chunk {
2024-09-25 03:52:28 +00:00
if let Ok(cached) = self.get_cached_eventid_authchain(&[sevent_id]).await {
2024-05-04 19:24:48 +00:00
trace!(?event_id, "Found cache entry for event");
2022-10-05 18:36:12 +02:00
chunk_cache.extend(cached.iter().copied());
hits2 = hits2.saturating_add(1);
2022-10-05 18:36:12 +02:00
} else {
2024-08-08 17:18:30 +00:00
let auth_chain = self.get_auth_chain_inner(room_id, event_id).await?;
2024-09-25 03:52:28 +00:00
self.cache_auth_chain(vec![sevent_id], &auth_chain);
2024-04-10 13:55:09 -07:00
chunk_cache.extend(auth_chain.iter());
misses2 = misses2.saturating_add(1);
debug!(
event_id = ?event_id,
chain_length = ?auth_chain.len(),
2024-04-10 13:55:09 -07:00
chunk_cache_length = ?chunk_cache.len(),
elapsed = ?started.elapsed(),
"Cache missed event"
2022-10-05 18:36:12 +02:00
);
};
}
2024-04-10 13:55:09 -07:00
chunk_cache.sort_unstable();
chunk_cache.dedup();
2024-09-25 03:52:28 +00:00
self.cache_auth_chain_vec(chunk_key, &chunk_cache);
2024-04-10 13:55:09 -07:00
full_auth_chain.extend(chunk_cache.iter());
misses = misses.saturating_add(1);
debug!(
chunk_cache_length = ?chunk_cache.len(),
hits = ?hits2,
misses = ?misses2,
2024-04-10 13:55:09 -07:00
elapsed = ?started.elapsed(),
"Chunk missed",
2022-10-05 18:36:12 +02:00
);
}
2024-03-05 19:48:54 -05:00
2024-06-02 00:27:03 +00:00
full_auth_chain.sort_unstable();
2024-04-10 13:55:09 -07:00
full_auth_chain.dedup();
debug!(
chain_length = ?full_auth_chain.len(),
hits = ?hits,
misses = ?misses,
2024-04-10 13:55:09 -07:00
elapsed = ?started.elapsed(),
"done",
2022-10-05 18:36:12 +02:00
);
2024-03-05 19:48:54 -05:00
2024-04-10 13:55:09 -07:00
Ok(full_auth_chain)
2022-10-05 18:36:12 +02:00
}
2024-03-05 19:48:54 -05:00
2024-05-04 19:24:48 +00:00
#[tracing::instrument(skip(self, room_id))]
2024-11-02 06:12:54 +00:00
async fn get_auth_chain_inner(&self, room_id: &RoomId, event_id: &EventId) -> Result<HashSet<ShortEventId>> {
2022-10-05 18:36:12 +02:00
let mut todo = vec![Arc::from(event_id)];
let mut found = HashSet::new();
2024-03-05 19:48:54 -05:00
2022-10-05 18:36:12 +02:00
while let Some(event_id) = todo.pop() {
2024-05-04 19:24:48 +00:00
trace!(?event_id, "processing auth event");
2024-08-08 17:18:30 +00:00
match self.services.timeline.get_pdu(&event_id).await {
Err(e) => debug_error!(?event_id, ?e, "Could not find pdu mentioned in auth events"),
Ok(pdu) => {
2022-10-05 18:36:12 +02:00
if pdu.room_id != room_id {
return Err!(Request(Forbidden(error!(
?event_id,
?room_id,
wrong_room_id = ?pdu.room_id,
"auth event for incorrect room"
))));
2022-10-05 18:36:12 +02:00
}
2024-09-25 03:52:28 +00:00
2022-10-05 18:36:12 +02:00
for auth_event in &pdu.auth_events {
2024-08-08 17:18:30 +00:00
let sauthevent = self
.services
.short
.get_or_create_shorteventid(auth_event)
.await;
2024-03-05 19:48:54 -05:00
2024-04-08 07:58:49 -07:00
if found.insert(sauthevent) {
2024-05-04 19:24:48 +00:00
trace!(?event_id, ?auth_event, "adding auth event to processing queue");
2022-10-05 18:36:12 +02:00
todo.push(auth_event.clone());
2024-03-05 19:48:54 -05:00
}
2022-10-05 18:36:12 +02:00
}
},
}
}
2024-03-05 19:48:54 -05:00
2022-10-05 18:36:12 +02:00
Ok(found)
}
2024-09-25 03:52:28 +00:00
#[inline]
2024-11-02 06:12:54 +00:00
pub async fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result<Arc<[ShortEventId]>> {
2024-08-08 17:18:30 +00:00
self.db.get_cached_eventid_authchain(key).await
}
2024-07-07 19:03:15 +00:00
#[tracing::instrument(skip(self), level = "debug")]
2024-11-02 06:12:54 +00:00
pub fn cache_auth_chain(&self, key: Vec<u64>, auth_chain: &HashSet<ShortEventId>) {
let val = auth_chain.iter().copied().collect::<Arc<[ShortEventId]>>();
2024-09-25 03:52:28 +00:00
self.db.cache_auth_chain(key, val);
}
2024-04-10 13:55:09 -07:00
2024-07-07 19:03:15 +00:00
#[tracing::instrument(skip(self), level = "debug")]
2024-11-02 06:12:54 +00:00
pub fn cache_auth_chain_vec(&self, key: Vec<u64>, auth_chain: &Vec<ShortEventId>) {
let val = auth_chain.iter().copied().collect::<Arc<[ShortEventId]>>();
2024-09-25 03:52:28 +00:00
self.db.cache_auth_chain(key, val);
2024-04-10 13:55:09 -07:00
}
pub fn get_cache_usage(&self) -> (usize, usize) {
let cache = self.db.auth_chain_cache.lock().expect("locked");
(cache.len(), cache.capacity())
}
pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().expect("locked").clear(); }
}