Files
continuwuity/src/service/rooms/event_handler/policy_server.rs
T
2026-05-25 18:16:55 +01:00

430 lines
12 KiB
Rust

//! Policy server integration for event spam checking in Matrix rooms.
//!
//! This module implements a check against a room-specific policy server, as
//! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284).
use std::{collections::BTreeMap, time::Duration};
use conduwuit::{
debug, debug_error, debug_info, debug_warn, error, implement, info, state_res::EventTypeExt, trace, utils::to_canonical_object,
warn, Err, Error, Event, PduEvent, Result,
};
use http::StatusCode;
use ruma::{
api::error::ErrorKind, canonical_json::redact, events::{room::policy::RoomPolicyEventContent, StateEventType}, room_version_rules::{RedactionRules, RoomVersionRules}, serde::{base64::Standard, Base64}, signatures::{to_canonical_json_string_for_signing, verify_canonical_json_bytes},
CanonicalJsonObject,
CanonicalJsonValue,
KeyId,
RoomId,
ServerName,
SigningKeyAlgorithm,
};
use ruminuwuity::policy::policy_sign::unstable::Request as PolicySignRequest;
use serde_json::value::RawValue;
use tokio::time::sleep;
pub(super) fn verify_policy_signature(
via: &ServerName,
ps_key: &Base64<Standard, Vec<u8>>,
pdu_json: &CanonicalJsonObject,
redaction_rules: &RedactionRules,
) -> bool {
trace!(data=?pdu_json, "Preparing to check policy server signature");
let Some(canonical_json) = redact(pdu_json.clone(), redaction_rules, None)
.ok()
.and_then(|r| to_canonical_object(r).ok())
else {
debug_warn!("Failed to redact event");
return false;
};
let Some(CanonicalJsonValue::Object(signature_map)) = pdu_json.get("signatures") else {
debug_warn!("Signatures map is not present?");
return false;
};
let Some(CanonicalJsonValue::Object(signature_set)) = signature_map.get(via.as_str()) else {
debug!("Signature map does not contain via {}", via.as_str());
return false;
};
let Some(signature) = signature_set
.get("ed25519:policy_server")
.and_then(|s| s.as_str())
.and_then(|s| Base64::<Standard>::parse(s).ok())
else {
debug!("No (valid) policy server signature present on event");
return false;
};
trace!(%signature, "Verifying policy server signature");
let Ok(canonical_str) = to_canonical_json_string_for_signing(&canonical_json) else {
debug_warn!("Could not convert canonical json object into string");
return false;
};
verify_canonical_json_bytes(
&SigningKeyAlgorithm::Ed25519,
ps_key.as_bytes(),
signature.as_bytes(),
canonical_str.as_bytes(),
)
.inspect_err(|e| debug_error!("Policy server verification failed: {e}"))
.is_ok()
}
/// Asks a remote policy server if the event is allowed.
///
/// If the event is the `m.room.policy` configuration state event,
/// this check is skipped. Similarly, if there is no policy server configured in
/// the PDU's room, or the configured server is not present in the room, the
/// check is also skipped.
///
/// If the policy server marks the event as spam, the relevant error is returned. Otherwise,
/// the incoming PDU JSON is mutated to include the new policy server signature.
#[implement(super::Service)]
#[tracing::instrument(skip(self, pdu, pdu_json, room_version_rules), level = "info")]
pub async fn policy_server_allows_event(
&self,
pdu: &PduEvent,
pdu_json: &mut CanonicalJsonObject,
room_id: &RoomId,
room_version_rules: &RoomVersionRules,
incoming: bool,
) -> Result<()> {
let ps = match pdu.event_type().with_state_key("").0 {
| StateEventType::RoomPolicy => return Ok(()),
| _ =>
self.services
.state_accessor
.room_state_get_content::<RoomPolicyEventContent>(
room_id,
&StateEventType::RoomPolicy,
"",
)
.await,
};
let ps: RoomPolicyEventContent = match ps {
| Ok(ps) => ps,
| Err(e) => {
if e.is_not_found() {
trace!("no policy server configured");
} else {
error!("failed to load policy server event: {e}");
// TODO: Should this fail closed?
}
return Ok(());
},
};
let Some(ps_key) = ps.public_keys.get(&SigningKeyAlgorithm::Ed25519) else {
debug!(
"room has a policy server configured, but no valid public keys; skipping spam check"
);
return Ok(());
};
if !self
.services
.state_cache
.server_in_room(&ps.via, room_id)
.await
{
debug!(
via = %ps.via,
"Policy server is not in the room, skipping spam check"
);
return Ok(());
}
if incoming {
// Verify the signature instead of calling a check
let event_id = pdu_json.remove("event_id");
let ps_allowed =
verify_policy_signature(&ps.via, ps_key, pdu_json, &room_version_rules.redaction);
if let Some(event_id) = event_id {
pdu_json.insert("event_id".into(), event_id);
}
if ps_allowed {
debug!(
via = %ps.via,
"Event is incoming and has a valid policy server signature"
);
return Ok(());
}
// N.B. In a future room version, this will be a soft failure specifically.
debug_info!(
via = %ps.via,
"Event is incoming but does not have a valid policy server signature; asking policy \
server to sign it now"
);
}
if ps.via == self.services.globals.server_name()
&& !self.services.server.config.federation_loopback
{
warn!(
%ps.via,
%room_id,
"Cannot ask ourselves for a policy signature if `federation_loopback=false`",
);
return Ok(());
}
let outgoing = self
.services
.sending
.convert_to_outgoing_federation_event(pdu_json.clone())
.await;
debug_info!(
%ps.via,
"Asking policy server to sign event"
);
self.fetch_policy_server_signature(pdu, pdu_json, &ps.via, outgoing, room_id, ps_key, 0)
.await
}
/// Handles an error returned by the policy server. If the error is one that should be returned to
/// the user, it is propagated, otherwise the request may be retried (for example, when
/// rate-limited).
#[allow(clippy::too_many_arguments)]
#[implement(super::Service)]
async fn handle_policy_server_error(
&self,
error: Error,
pdu: &PduEvent,
pdu_json: &mut CanonicalJsonObject,
via: &ServerName,
outgoing: Box<RawValue>,
room_id: &RoomId,
policy_server_key: &Base64<Standard, Vec<u8>>,
retries: u8,
timeout: Duration,
) -> Result<()> {
match error.status_code() {
| StatusCode::OK => unreachable!("ok response passed to handle_policy_server_error"),
| StatusCode::BAD_REQUEST => {
if matches!(error.kind(), ErrorKind::Forbidden) {
warn!(
via = %via,
event_id = %pdu.event_id(),
%room_id,
error = ?error,
"Policy server marked the event as spam"
);
return Err(error);
}
error!(
via = %via,
event_id = %pdu.event_id(),
%room_id,
error = ?error.to_string(),
"Policy server could not understand our request",
);
Err!(BadServerResponse("Error communicating with policy server"))
},
| StatusCode::FORBIDDEN => {
Err!(Request(Forbidden(
"Policy server refused to sign the event due to the room ACL"
)))
},
| StatusCode::NOT_FOUND => {
debug_info!(
via = %via,
event_id = %pdu.event_id(),
%room_id,
"Policy server is not actually a policy server or is not protecting this room: {}",
error.message()
);
Ok(())
},
| StatusCode::TOO_MANY_REQUESTS => {
if let Some(retry_after) = error.retry_after() {
if retries >= 5 {
warn!(
via = %via,
event_id = %pdu.event_id(),
room_id = %room_id,
retries,
"Policy server rate-limited us too many times; giving up"
);
return Err(error); // Error should be passed to c2s
}
let saturated = retry_after.min(timeout);
// ^ don't wait more than 60 seconds
info!(
via = %via,
event_id = %pdu.event_id(),
room_id = %room_id,
retry_after = %saturated.as_secs(),
retries,
"Policy server rate-limited us; retrying after {retry_after:?}"
);
// TODO: select between this sleep and shutdown signal
sleep(saturated).await;
if !self.services.server.running() {
return Err(error);
}
return Box::pin(self.fetch_policy_server_signature(
pdu,
pdu_json,
via,
outgoing,
room_id,
policy_server_key,
retries.saturating_add(1),
))
.await;
}
warn!(
via = %via,
event_id = %pdu.event_id(),
room_id = %room_id,
retries,
"Policy server rate-limited us without giving a retry window; giving up"
);
Err(error)
},
| _ => Err!(BadServerResponse(
"Unexpected response from policy server: {}/{:?}",
error.status_code(),
error.kind()
)),
}
}
/// Asks a remote policy server for a signature on this event.
/// If the policy server signs this event, the original data is mutated. Otherwise, the error is
/// handled and potentially returned.
#[allow(clippy::too_many_arguments)]
#[implement(super::Service)]
#[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via), level = "info")]
pub async fn fetch_policy_server_signature(
&self,
pdu: &PduEvent,
pdu_json: &mut CanonicalJsonObject,
via: &ServerName,
outgoing: Box<RawValue>,
room_id: &RoomId,
policy_server_key: &Base64<Standard, Vec<u8>>,
retries: u8,
) -> Result<()> {
let timeout = Duration::from_secs(self.services.server.config.policy_server_request_timeout);
debug!("Requesting policy server signature");
let response = tokio::time::timeout(
timeout,
self.services
.sending
.send_federation_request(via, PolicySignRequest::new(outgoing.clone())),
)
.await;
let response = match response {
| Ok(Ok(response)) => {
debug!("Response from policy server: {:?}", response);
response
},
| Ok(Err(e)) => {
debug_error!("Error from policy server: {:?}", e);
return self
.handle_policy_server_error(
e,
pdu,
pdu_json,
via,
outgoing,
room_id,
policy_server_key,
retries,
timeout,
)
.await;
},
| Err(elapsed) => {
warn!(
%via,
event_id = %pdu.event_id(),
%room_id,
%elapsed,
"Policy server signature request timed out"
);
return Err!(Request(Forbidden("Policy server did not respond in time")));
},
};
if response
.signatures
.as_ref()
.is_none_or(|sigs| !sigs.contains_key(via))
{
error!(
%via,
"Policy server did not sign event: {:?}",
response.signatures
);
return Err!(BadServerResponse(
"Policy server did not sign the event"
));
}
// Unwraps are safe here because we checked both in the above if statement
let signatures = response.signatures.unwrap();
let keypairs = signatures.get(via).unwrap();
// TODO: need to be able to verify other algorithms
let wanted_key_id = KeyId::parse("ed25519:policy_server")?;
if !keypairs.contains_key(&wanted_key_id) {
error!(
signatures = ?signatures,
"Policy server returned signatures, but did not use the key ID \
'ed25519:policy_server'."
);
return Err!(BadServerResponse(
"Policy server signed the event, but did not use the expected key ID"
));
}
let signatures_entry = pdu_json
.entry("signatures".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()));
if let CanonicalJsonValue::Object(signatures_map) = signatures_entry {
let sig_value = keypairs.get(&wanted_key_id).unwrap().to_owned();
match signatures_map.get_mut(via.as_str()) {
| Some(CanonicalJsonValue::Object(inner_map)) => {
trace!("inserting PS signature: {}", sig_value);
inner_map.insert(
"ed25519:policy_server".to_owned(),
CanonicalJsonValue::String(sig_value),
);
},
| Some(_) => {
// This should never happen
unreachable!(
"Existing `signatures[{}]` field is not an object; cannot insert policy \
signature",
via
);
},
| None => {
let mut inner = BTreeMap::new();
inner.insert(
"ed25519:policy_server".to_owned(),
CanonicalJsonValue::String(sig_value.clone()),
);
trace!(
"created new signatures object for {via} with the signature {}",
sig_value
);
signatures_map.insert(via.as_str().to_owned(), CanonicalJsonValue::Object(inner));
},
}
// TODO: verify signature value was made with the policy_server_key
// rather than the expected key.
} else {
unreachable!(
"Existing `signatures` field is not an object; cannot insert policy signature"
);
}
debug_info!("Policy server allowed event");
Ok(())
}