Compare commits

..

3 Commits

Author SHA1 Message Date
Jade Ellis aa29b81ef6 fix: Don't store events that have already been redacted
This prevents clobbering
2025-06-14 19:40:43 +01:00
Jade Ellis 46b1eeb2c8 feat: Allow retrieving redacted message content (msc2815)
Still to do:
- Handling the difference between content that we have deleted and
content we never received
- Deleting the original content on command or expiry

Another question is if we have to store the full original content?
Can we get by with just storing the 'content' field?
2025-06-14 19:40:43 +01:00
Jade Ellis 88ecf61d49 feat: Store the original content of redacted PDUs 2025-06-14 19:40:42 +01:00
6 changed files with 128 additions and 43 deletions
+49 -2
View File
@@ -1,7 +1,7 @@
use axum::extract::State;
use conduwuit::{Err, Event, Result, err};
use conduwuit::{Err, Event, PduEvent, Result, err};
use futures::{FutureExt, TryFutureExt, future::try_join};
use ruma::api::client::room::get_room_event;
use ruma::api::client::{error::ErrorKind, room::get_room_event};
use crate::{Ruma, client::is_ignored_pdu};
@@ -14,6 +14,7 @@ pub(crate) async fn get_room_event_route(
) -> Result<get_room_event::v3::Response> {
let event_id = &body.event_id;
let room_id = &body.room_id;
let sender_user = body.sender_user();
let event = services
.rooms
@@ -33,6 +34,52 @@ pub(crate) async fn get_room_event_route(
return Err!(Request(Forbidden("You don't have permission to view this event.")));
}
let include_unredacted_content = body
.include_unredacted_content // User's file has this field name
.unwrap_or(false);
if include_unredacted_content && event.is_redacted() {
let is_server_admin = services
.users
.is_admin(sender_user)
.map(|is_admin| Ok(is_admin));
let can_redact_privilege = services
.rooms
.state_accessor
.user_can_redact(event_id, sender_user, room_id, false) // federation=false for local check
;
let (is_server_admin, can_redact_privilege) =
try_join(is_server_admin, can_redact_privilege).await?;
if !is_server_admin && !can_redact_privilege {
return Err!(Request(Forbidden(
"You don't have permission to view redacted content.",
)));
}
let pdu_id = match services.rooms.timeline.get_pdu_id(event_id).await {
| Ok(id) => id,
| Err(e) => {
return Err(e);
},
};
let original_content = services
.rooms
.timeline
.get_original_pdu_content(&pdu_id)
.await?;
if let Some(original_content) = original_content {
// If the original content is available, we can return it.
// event.content = to_raw_value(&original_content)?;
event = PduEvent::from_id_val(event_id, original_content)?;
} else {
return Err(conduwuit::Error::BadRequest(
ErrorKind::UnredactedContentDeleted { content_keep_ms: None },
"The original unredacted content is not in the database.",
));
}
}
debug_assert!(
event.event_id() == event_id && event.room_id() == room_id,
"Fetched PDU must match requested"
+1
View File
@@ -40,6 +40,7 @@ pub(crate) async fn get_supported_versions_route(
"v1.11".to_owned(),
],
unstable_features: BTreeMap::from_iter([
("fi.mau.msc2815".to_owned(), true),
("org.matrix.e2e_cross_signing".to_owned(), true),
("org.matrix.msc2285.stable".to_owned(), true), /* private read receipts (https://github.com/matrix-org/matrix-spec-proposals/pull/2285) */
("uk.half-shot.msc2666.query_mutual_rooms".to_owned(), true), /* query mutual rooms (https://github.com/matrix-org/matrix-spec-proposals/pull/2666) */
+9
View File
@@ -121,6 +121,15 @@ pub(super) static MAPS: &[Descriptor] = &[
index_size: 512,
..descriptor::SEQUENTIAL
},
Descriptor {
name: "pduid_originalcontent",
cache_disp: CacheDisp::SharedWith("pduid_pdu"),
key_size_hint: Some(16),
val_size_hint: Some(1520),
block_size: 2048,
index_size: 512,
..descriptor::RANDOM
},
Descriptor {
name: "publicroomids",
..descriptor::RANDOM_SMALL
+13 -39
View File
@@ -1,6 +1,6 @@
mod data;
use std::{collections::{BTreeMap, HashMap}, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};
use conduwuit::{
Result, debug, err,
@@ -9,9 +9,12 @@ use conduwuit::{
};
use futures::{Stream, TryFutureExt, try_join};
use ruma::{
OwnedEventId, OwnedUserId, RoomId, UserId,
events::{
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType, Receipts}, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent
}, serde::Raw, OwnedEventId, OwnedUserId, RoomId, UserId
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
receipt::{ReceiptEvent, ReceiptEventContent, Receipts},
},
serde::Raw,
};
use self::data::{Data, ReceiptItem};
@@ -44,48 +47,19 @@ impl crate::Service for Service {
}
impl Service {
/// Updates the public read receipt (`m.read`) based on the incoming event.
/// If the event referenced by the new public receipt is newer than the current
/// private read marker (`m.read.private`), the private marker is also updated
/// to match the public receipt's position.
/// Replaces the previous read receipt.
pub async fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: &ReceiptEvent,
) {
debug!(target: "readreceipt", %room_id, %user_id, "Updating read receipt in database.");
// 2. Find the maximum PDU count for the m.read event(s) referenced in the new receipt
let mut max_new_public_pdu_count: Option<PduCount> = None;
for (event_id, receipts) in event.content.0.iter() {
// Check if this event_id has an m.read receipt for the target user
if let Some(user_receipts) = receipts.get(&ReceiptType::Read) {
if user_receipts.contains_key(user_id) {
// Try to get the PDU count (timeline position) for this event_id
match self.services.timeline.get_pdu_count(event_id).await {
Ok(count) => {
// Update the maximum count found so far
let current_max = max_new_public_pdu_count.unwrap_or(PduCount::Normal(0));
max_new_public_pdu_count = Some(current_max.max(count));
debug!(target: "readreceipt", %room_id, %user_id, %event_id, count, "Found PDU count for new public receipt event.");
}
Err(e) => {
warn!(
target: "readreceipt", %room_id, %user_id, %event_id,
"Failed to get PDU count for event ID from new public read receipt: {}",
e
);
}
}
}
}
}
// Flush the sending queue for the room to notify clients
if let Err(e) = self.services.sending.flush_room(room_id).await {
warn!(target: "readreceipt", %room_id, %user_id, "Failed to flush room after read receipt update: {}", e);
}
self.db.readreceipt_update(user_id, room_id, event).await;
self.services
.sending
.flush_room(room_id)
.await
.expect("room flush failed");
}
/// Gets the latest private read receipt from the user in the room
+21
View File
@@ -19,6 +19,8 @@ pub(super) struct Data {
pduid_pdu: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
/// Stores the original content of redacted PDUs.
pduid_originalcontent: Arc<Map>,
pub(super) db: Arc<Database>,
services: Services,
}
@@ -38,6 +40,7 @@ impl Data {
pduid_pdu: db["pduid_pdu"].clone(),
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
pduid_originalcontent: db["pduid_originalcontent"].clone(), // Initialize new table
db: args.db.clone(),
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
@@ -177,6 +180,24 @@ impl Data {
self.pduid_pdu.get(pdu_id).await.deserialized()
}
/// Stores the original content of a PDU that is about to be redacted.
pub(super) async fn store_redacted_pdu_content(
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
) -> Result<()> {
self.pduid_originalcontent.raw_put(pdu_id, Json(pdu_json));
Ok(())
}
/// Returns the original content of a redacted PDU.
pub(super) async fn get_original_pdu_content(
&self,
pdu_id: &RawPduId,
) -> Result<Option<CanonicalJsonObject>> {
self.pduid_originalcontent.get(pdu_id).await.deserialized()
}
pub(super) async fn append_pdu(
&self,
pdu_id: &RawPduId,
+35 -2
View File
@@ -260,6 +260,25 @@ impl Service {
self.db.replace_pdu(pdu_id, pdu_json, pdu).await
}
/// Stores the content of a to-be redacted pdu.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn store_redacted_pdu_content(
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
) -> Result<()> {
self.db.store_redacted_pdu_content(pdu_id, pdu_json).await
}
/// Returns the original content of a redacted PDU.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_original_pdu_content(
&self,
pdu_id: &RawPduId,
) -> Result<Option<CanonicalJsonObject>> {
self.db.get_original_pdu_content(pdu_id).await
}
/// Creates a new persisted data unit and adds it to a room.
///
/// By this point the incoming event should be fully authenticated, no auth
@@ -472,7 +491,7 @@ impl Service {
.user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid).await?;
self.redact_pdu(redact_id, pdu, shortroomid, true).await?;
}
}
},
@@ -485,7 +504,7 @@ impl Service {
.user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid).await?;
self.redact_pdu(redact_id, pdu, shortroomid, true).await?;
}
}
},
@@ -1033,6 +1052,7 @@ impl Service {
event_id: &EventId,
reason: &PduEvent,
shortroomid: ShortRoomId,
keep_original_content: bool,
) -> Result {
// TODO: Don't reserialize, keep original json
let Ok(pdu_id) = self.get_pdu_id(event_id).await else {
@@ -1054,6 +1074,19 @@ impl Service {
let room_version_id = self.services.state.get_room_version(&pdu.room_id).await?;
if keep_original_content && !pdu.is_redacted() {
let original_pdu_json = utils::to_canonical_object(&pdu).map_err(|e| {
err!(Database(error!(
?event_id,
?e,
"Failed to convert PDU to canonical JSON for original content storage"
)))
})?;
self.db
.store_redacted_pdu_content(&pdu_id, &original_pdu_json)
.await?;
}
pdu.redact(&room_version_id, reason)?;
let obj = utils::to_canonical_object(&pdu).map_err(|e| {