Files
continuwuity/src/service/rooms/threads/mod.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

196 lines
4.8 KiB
Rust
Raw Normal View History

2024-05-09 15:59:08 -07:00
use std::{collections::BTreeMap, sync::Arc};
2024-03-02 20:55:02 -05:00
2025-04-26 08:24:47 +00:00
use conduwuit_core::{
Event, Result, err,
matrix::pdu::{PduCount, PduEvent, PduId, RawPduId},
2024-12-04 00:00:40 +00:00
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
2024-12-04 00:00:40 +00:00
},
2024-11-06 21:02:23 +00:00
};
use conduwuit_database::{Deserialized, Map};
2024-11-06 21:02:23 +00:00
use futures::{Stream, StreamExt};
2023-06-25 19:31:40 +02:00
use ruma::{
CanonicalJsonValue, EventId, OwnedUserId, RoomId, UserId,
api::client::threads::get_threads::v1::IncludeThreads, events::relation::BundledThread, uint,
2023-06-25 19:31:40 +02:00
};
use serde_json::json;
2023-06-25 19:31:40 +02:00
use crate::{Dep, rooms, rooms::short::ShortRoomId};
2023-06-25 19:31:40 +02:00
2024-05-09 15:59:08 -07:00
pub struct Service {
2024-06-28 22:51:39 +00:00
db: Data,
2024-11-06 21:02:23 +00:00
services: Services,
2023-06-25 19:31:40 +02:00
}
2024-07-18 06:37:47 +00:00
struct Services {
2024-11-06 21:02:23 +00:00
short: Dep<rooms::short::Service>,
2024-07-18 06:37:47 +00:00
timeline: Dep<rooms::timeline::Service>,
}
2024-11-06 21:02:23 +00:00
pub(super) struct Data {
threadid_userids: Arc<Map>,
}
2024-07-04 03:26:19 +00:00
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
2024-11-06 21:02:23 +00:00
db: Data {
threadid_userids: args.db["threadid_userids"].clone(),
},
2024-07-18 06:37:47 +00:00
services: Services {
2024-11-06 21:02:23 +00:00
short: args.depend::<rooms::short::Service>("rooms::short"),
2024-07-18 06:37:47 +00:00
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
},
2024-07-04 03:26:19 +00:00
}))
2024-05-27 03:17:20 +00:00
}
2024-07-04 03:26:19 +00:00
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub async fn add_to_thread<E>(&self, root_event_id: &EventId, event: &E) -> Result
2025-04-26 08:24:47 +00:00
where
E: Event + Send + Sync,
{
2024-07-18 06:37:47 +00:00
let root_id = self
.services
2023-06-25 19:31:40 +02:00
.timeline
2024-08-08 17:18:30 +00:00
.get_pdu_id(root_event_id)
.await
.map_err(|e| {
err!(Request(InvalidParam("Invalid event_id in thread message: {e:?}")))
})?;
2024-03-05 19:48:54 -05:00
2024-07-18 06:37:47 +00:00
let root_pdu = self
.services
2023-06-25 19:31:40 +02:00
.timeline
2024-08-08 17:18:30 +00:00
.get_pdu_from_id(&root_id)
.await
.map_err(|e| err!(Request(InvalidParam("Thread root not found: {e:?}"))))?;
2024-03-05 19:48:54 -05:00
2024-07-18 06:37:47 +00:00
let mut root_pdu_json = self
.services
2023-06-25 19:31:40 +02:00
.timeline
2024-08-08 17:18:30 +00:00
.get_pdu_json_from_id(&root_id)
.await
.map_err(|e| err!(Request(InvalidParam("Thread root pdu not found: {e:?}"))))?;
2024-03-05 19:48:54 -05:00
2023-06-25 19:31:40 +02:00
if let CanonicalJsonValue::Object(unsigned) = root_pdu_json
.entry("unsigned".to_owned())
2024-03-02 20:55:02 -05:00
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
2023-06-25 19:31:40 +02:00
{
if let Some(mut relations) = unsigned
.get("m.relations")
.and_then(|r| r.as_object())
.and_then(|r| r.get("m.thread"))
.and_then(|relations| {
serde_json::from_value::<BundledThread>(relations.clone().into()).ok()
}) {
2023-06-25 19:31:40 +02:00
// Thread already existed
2024-07-07 04:46:16 +00:00
relations.count = relations.count.saturating_add(uint!(1));
2025-04-26 08:24:47 +00:00
relations.latest_event = event.to_format();
2024-03-05 19:48:54 -05:00
2023-06-25 19:31:40 +02:00
let content = serde_json::to_value(relations).expect("to_value always works");
2024-03-05 19:48:54 -05:00
2023-06-25 19:31:40 +02:00
unsigned.insert(
"m.relations".to_owned(),
2024-03-25 17:05:11 -04:00
json!({ "m.thread": content })
.try_into()
.expect("thread is valid json"),
2023-06-25 19:31:40 +02:00
);
} else {
// New thread
let relations = BundledThread {
2025-04-26 08:24:47 +00:00
latest_event: event.to_format(),
2023-06-25 19:31:40 +02:00
count: uint!(1),
current_user_participated: true,
};
2024-03-05 19:48:54 -05:00
2023-06-25 19:31:40 +02:00
let content = serde_json::to_value(relations).expect("to_value always works");
2024-03-05 19:48:54 -05:00
2023-06-25 19:31:40 +02:00
unsigned.insert(
"m.relations".to_owned(),
2024-03-25 17:05:11 -04:00
json!({ "m.thread": content })
.try_into()
.expect("thread is valid json"),
2023-06-25 19:31:40 +02:00
);
}
2024-03-05 19:48:54 -05:00
2024-07-18 06:37:47 +00:00
self.services
2024-03-25 17:05:11 -04:00
.timeline
.replace_pdu(&root_id, &root_pdu_json)
2024-08-08 17:18:30 +00:00
.await?;
2023-06-25 19:31:40 +02:00
}
2024-03-05 19:48:54 -05:00
2023-06-25 19:31:40 +02:00
let mut users = Vec::new();
match self.get_participants(&root_id).await {
| Ok(userids) => {
users.extend_from_slice(&userids);
},
| _ => {
users.push(root_pdu.sender().to_owned());
},
2023-06-25 19:31:40 +02:00
}
2025-04-26 08:24:47 +00:00
users.push(event.sender().to_owned());
2024-03-05 19:48:54 -05:00
2024-11-06 21:02:23 +00:00
self.update_participants(&root_id, &users)
}
pub async fn threads_until<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
shorteventid: PduCount,
_inc: &'a IncludeThreads,
2024-11-06 21:02:23 +00:00
) -> Result<impl Stream<Item = (PduCount, PduEvent)> + Send + 'a> {
let shortroomid: ShortRoomId = self.services.short.get_shortroomid(room_id).await?;
let current: RawPduId = PduId {
shortroomid,
2024-11-11 05:00:29 +00:00
shorteventid: shorteventid.saturating_sub(1),
2024-11-06 21:02:23 +00:00
}
.into();
let stream = self
.db
.threadid_userids
.rev_raw_keys_from(&current)
.ignore_err()
.map(RawPduId::from)
.ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes())
2024-12-04 00:00:40 +00:00
.wide_filter_map(move |pdu_id| async move {
2024-11-06 21:02:23 +00:00
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender() != user_id {
pdu.as_mut_pdu().remove_transaction_id().ok();
2024-11-06 21:02:23 +00:00
}
Some((pdu_id.shorteventid, pdu))
});
Ok(stream)
}
pub(super) fn update_participants(
&self,
root_id: &RawPduId,
participants: &[OwnedUserId],
) -> Result {
2024-11-06 21:02:23 +00:00
let users = participants
.iter()
.map(|user| user.as_bytes())
.collect::<Vec<_>>()
.join(&[0xFF][..]);
self.db.threadid_userids.insert(root_id, &users);
Ok(())
}
pub(super) async fn get_participants(&self, root_id: &RawPduId) -> Result<Vec<OwnedUserId>> {
self.db.threadid_userids.get(root_id).await.deserialized()
2023-06-25 19:31:40 +02:00
}
}