Compare commits

..

1 Commits

Author SHA1 Message Date
Jade Ellis 280b0f7400 WIP 2025-06-14 19:44:53 +01:00
6 changed files with 43 additions and 128 deletions
+2 -49
View File
@@ -1,7 +1,7 @@
use axum::extract::State;
use conduwuit::{Err, Event, PduEvent, Result, err};
use conduwuit::{Err, Event, Result, err};
use futures::{FutureExt, TryFutureExt, future::try_join};
use ruma::api::client::{error::ErrorKind, room::get_room_event};
use ruma::api::client::room::get_room_event;
use crate::{Ruma, client::is_ignored_pdu};
@@ -14,7 +14,6 @@ pub(crate) async fn get_room_event_route(
) -> Result<get_room_event::v3::Response> {
let event_id = &body.event_id;
let room_id = &body.room_id;
let sender_user = body.sender_user();
let event = services
.rooms
@@ -34,52 +33,6 @@ pub(crate) async fn get_room_event_route(
return Err!(Request(Forbidden("You don't have permission to view this event.")));
}
let include_unredacted_content = body
.include_unredacted_content // User's file has this field name
.unwrap_or(false);
if include_unredacted_content && event.is_redacted() {
let is_server_admin = services
.users
.is_admin(sender_user)
.map(|is_admin| Ok(is_admin));
let can_redact_privilege = services
.rooms
.state_accessor
.user_can_redact(event_id, sender_user, room_id, false) // federation=false for local check
;
let (is_server_admin, can_redact_privilege) =
try_join(is_server_admin, can_redact_privilege).await?;
if !is_server_admin && !can_redact_privilege {
return Err!(Request(Forbidden(
"You don't have permission to view redacted content.",
)));
}
let pdu_id = match services.rooms.timeline.get_pdu_id(event_id).await {
| Ok(id) => id,
| Err(e) => {
return Err(e);
},
};
let original_content = services
.rooms
.timeline
.get_original_pdu_content(&pdu_id)
.await?;
if let Some(original_content) = original_content {
// If the original content is available, we can return it.
// event.content = to_raw_value(&original_content)?;
event = PduEvent::from_id_val(event_id, original_content)?;
} else {
return Err(conduwuit::Error::BadRequest(
ErrorKind::UnredactedContentDeleted { content_keep_ms: None },
"The original unredacted content is not in the database.",
));
}
}
debug_assert!(
event.event_id() == event_id && event.room_id() == room_id,
"Fetched PDU must match requested"
-1
View File
@@ -40,7 +40,6 @@ pub(crate) async fn get_supported_versions_route(
"v1.11".to_owned(),
],
unstable_features: BTreeMap::from_iter([
("fi.mau.msc2815".to_owned(), true),
("org.matrix.e2e_cross_signing".to_owned(), true),
("org.matrix.msc2285.stable".to_owned(), true), /* private read receipts (https://github.com/matrix-org/matrix-spec-proposals/pull/2285) */
("uk.half-shot.msc2666.query_mutual_rooms".to_owned(), true), /* query mutual rooms (https://github.com/matrix-org/matrix-spec-proposals/pull/2666) */
-9
View File
@@ -121,15 +121,6 @@ pub(super) static MAPS: &[Descriptor] = &[
index_size: 512,
..descriptor::SEQUENTIAL
},
Descriptor {
name: "pduid_originalcontent",
cache_disp: CacheDisp::SharedWith("pduid_pdu"),
key_size_hint: Some(16),
val_size_hint: Some(1520),
block_size: 2048,
index_size: 512,
..descriptor::RANDOM
},
Descriptor {
name: "publicroomids",
..descriptor::RANDOM_SMALL
+39 -13
View File
@@ -1,6 +1,6 @@
mod data;
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::{BTreeMap, HashMap}, sync::Arc};
use conduwuit::{
Result, debug, err,
@@ -9,12 +9,9 @@ use conduwuit::{
};
use futures::{Stream, TryFutureExt, try_join};
use ruma::{
OwnedEventId, OwnedUserId, RoomId, UserId,
events::{
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
receipt::{ReceiptEvent, ReceiptEventContent, Receipts},
},
serde::Raw,
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType, Receipts}, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent
}, serde::Raw, OwnedEventId, OwnedUserId, RoomId, UserId
};
use self::data::{Data, ReceiptItem};
@@ -47,19 +44,48 @@ impl crate::Service for Service {
}
impl Service {
/// Replaces the previous read receipt.
/// Updates the public read receipt (`m.read`) based on the incoming event.
/// If the event referenced by the new public receipt is newer than the current
/// private read marker (`m.read.private`), the private marker is also updated
/// to match the public receipt's position.
pub async fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: &ReceiptEvent,
) {
self.db.readreceipt_update(user_id, room_id, event).await;
self.services
.sending
.flush_room(room_id)
.await
.expect("room flush failed");
debug!(target: "readreceipt", %room_id, %user_id, "Updating read receipt in database.");
// 2. Find the maximum PDU count for the m.read event(s) referenced in the new receipt
let mut max_new_public_pdu_count: Option<PduCount> = None;
for (event_id, receipts) in event.content.0.iter() {
// Check if this event_id has an m.read receipt for the target user
if let Some(user_receipts) = receipts.get(&ReceiptType::Read) {
if user_receipts.contains_key(user_id) {
// Try to get the PDU count (timeline position) for this event_id
match self.services.timeline.get_pdu_count(event_id).await {
Ok(count) => {
// Update the maximum count found so far
let current_max = max_new_public_pdu_count.unwrap_or(PduCount::Normal(0));
max_new_public_pdu_count = Some(current_max.max(count));
debug!(target: "readreceipt", %room_id, %user_id, %event_id, count, "Found PDU count for new public receipt event.");
}
Err(e) => {
warn!(
target: "readreceipt", %room_id, %user_id, %event_id,
"Failed to get PDU count for event ID from new public read receipt: {}",
e
);
}
}
}
}
}
// Flush the sending queue for the room to notify clients
if let Err(e) = self.services.sending.flush_room(room_id).await {
warn!(target: "readreceipt", %room_id, %user_id, "Failed to flush room after read receipt update: {}", e);
}
}
/// Gets the latest private read receipt from the user in the room
-21
View File
@@ -19,8 +19,6 @@ pub(super) struct Data {
pduid_pdu: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
/// Stores the original content of redacted PDUs.
pduid_originalcontent: Arc<Map>,
pub(super) db: Arc<Database>,
services: Services,
}
@@ -40,7 +38,6 @@ impl Data {
pduid_pdu: db["pduid_pdu"].clone(),
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
pduid_originalcontent: db["pduid_originalcontent"].clone(), // Initialize new table
db: args.db.clone(),
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
@@ -180,24 +177,6 @@ impl Data {
self.pduid_pdu.get(pdu_id).await.deserialized()
}
/// Stores the original content of a PDU that is about to be redacted.
pub(super) async fn store_redacted_pdu_content(
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
) -> Result<()> {
self.pduid_originalcontent.raw_put(pdu_id, Json(pdu_json));
Ok(())
}
/// Returns the original content of a redacted PDU.
pub(super) async fn get_original_pdu_content(
&self,
pdu_id: &RawPduId,
) -> Result<Option<CanonicalJsonObject>> {
self.pduid_originalcontent.get(pdu_id).await.deserialized()
}
pub(super) async fn append_pdu(
&self,
pdu_id: &RawPduId,
+2 -35
View File
@@ -260,25 +260,6 @@ impl Service {
self.db.replace_pdu(pdu_id, pdu_json, pdu).await
}
/// Stores the content of a to-be redacted pdu.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn store_redacted_pdu_content(
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
) -> Result<()> {
self.db.store_redacted_pdu_content(pdu_id, pdu_json).await
}
/// Returns the original content of a redacted PDU.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_original_pdu_content(
&self,
pdu_id: &RawPduId,
) -> Result<Option<CanonicalJsonObject>> {
self.db.get_original_pdu_content(pdu_id).await
}
/// Creates a new persisted data unit and adds it to a room.
///
/// By this point the incoming event should be fully authenticated, no auth
@@ -491,7 +472,7 @@ impl Service {
.user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid, true).await?;
self.redact_pdu(redact_id, pdu, shortroomid).await?;
}
}
},
@@ -504,7 +485,7 @@ impl Service {
.user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid, true).await?;
self.redact_pdu(redact_id, pdu, shortroomid).await?;
}
}
},
@@ -1052,7 +1033,6 @@ impl Service {
event_id: &EventId,
reason: &PduEvent,
shortroomid: ShortRoomId,
keep_original_content: bool,
) -> Result {
// TODO: Don't reserialize, keep original json
let Ok(pdu_id) = self.get_pdu_id(event_id).await else {
@@ -1074,19 +1054,6 @@ impl Service {
let room_version_id = self.services.state.get_room_version(&pdu.room_id).await?;
if keep_original_content && !pdu.is_redacted() {
let original_pdu_json = utils::to_canonical_object(&pdu).map_err(|e| {
err!(Database(error!(
?event_id,
?e,
"Failed to convert PDU to canonical JSON for original content storage"
)))
})?;
self.db
.store_redacted_pdu_content(&pdu_id, &original_pdu_json)
.await?;
}
pdu.redact(&room_version_id, reason)?;
let obj = utils::to_canonical_object(&pdu).map_err(|e| {