//! Policy server integration for event spam checking in Matrix rooms. //! //! This module implements a check against a room-specific policy server, as //! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284). use std::{collections::BTreeMap, time::Duration}; use conduwuit::{ Err, Error, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, error, implement, info, state_res::EventTypeExt, trace, utils::to_canonical_object, warn, }; use http::StatusCode; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, SigningKeyAlgorithm, api::{error::ErrorKind, federation::policy::sign_event}, canonical_json::redact, events::{StateEventType, room::policy::RoomPolicyEventContent}, room_version_rules::{RedactionRules, RoomVersionRules}, serde::{Base64, base64::Standard}, signatures::{to_canonical_json_string_for_signing, verify_canonical_json_bytes}, }; use serde_json::value::RawValue; use tokio::time::sleep; const POLICY_SERVER_KEY_ID_ED25519: &str = "ed25519:policy_server"; /// Checks that the given policy server signed the event with the given public /// key. /// /// Note: The caller MUST ensure event_id (and any other keys) are stripped /// BEFORE passing to this function - no mutation is performed beyond redacting. /// /// Parameters: /// /// - via: The policy server that should've signed the event. /// - ps_key: The public key part of the policy server's signing key. /// - pdu_json: The raw PDU JSON object (will be cloned). /// - redaction_rules: The redaction rules of the room version /// /// Returns: `true` if the signature is present, `false` if it is not (including /// if the signatures object is malformed). pub(super) fn verify_policy_signature( via: &ServerName, ps_key: &Base64>, pdu_json: &CanonicalJsonObject, redaction_rules: &RedactionRules, ) -> bool { #[cfg(debug_assertions)] { assert!( !pdu_json.contains_key("event_id"), "event_id should be removed from the JSON before verifying the policy server \ signature" ); let pretty = serde_json::to_string(pdu_json).unwrap(); trace!(data=%pretty, "Preparing to check policy server signature"); }; let Some(canonical_json) = redact(pdu_json.clone(), redaction_rules, None) .ok() .and_then(|r| to_canonical_object(r).ok()) else { debug_warn!("Failed to redact event"); return false; }; let Some(CanonicalJsonValue::Object(signature_map)) = pdu_json.get("signatures") else { debug_warn!("Signatures map is not present?"); return false; }; let Some(CanonicalJsonValue::Object(signature_set)) = signature_map.get(via.as_str()) else { debug!("Signature map does not contain via {}", via.as_str()); return false; }; let Some(signature) = signature_set .get(POLICY_SERVER_KEY_ID_ED25519) .and_then(|s| s.as_str()) .and_then(|s| Base64::::parse(s).ok()) else { debug!("No (valid) policy server signature present on event"); return false; }; trace!(%signature, "Verifying policy server signature"); let Ok(canonical_str) = to_canonical_json_string_for_signing(&canonical_json) else { debug_warn!("Could not convert canonical json object into string"); return false; }; verify_canonical_json_bytes( &SigningKeyAlgorithm::Ed25519, ps_key.as_bytes(), signature.as_bytes(), canonical_str.as_bytes(), ) .inspect_err(|e| debug_error!("Policy server verification failed: {e}")) .is_ok() } /// Verifies the existing policy server signature, and/or fetches a new one /// immediately. /// /// If `incoming` is `true`, the event is checked for an existing signature. If /// it has a valid one, `Ok` is returned. If it does not have a valid signature, /// the function falls through to fetching a new one (which may be a soft-fail /// in a future version). /// /// If the event is the `m.room.policy` configuration state event, /// this check is skipped. Similarly, if there is no policy server configured in /// the PDU's room, the configuration event is malformed, or the configured /// server is not present in the room, the check is also skipped. /// /// If the policy server marks the event as spam, the relevant error is /// returned. Otherwise, the incoming PDU JSON is mutated to include the new /// policy server signature. Transient errors such as rate-limits are handled, /// so any error returned by this function should be treated as final. #[implement(super::Service)] #[tracing::instrument(skip(self, pdu, pdu_json, room_version_rules), level = "info")] pub async fn policy_server_allows_event( &self, pdu: &PduEvent, pdu_json: &mut CanonicalJsonObject, room_id: &RoomId, room_version_rules: &RoomVersionRules, incoming: bool, ) -> Result<()> { if pdu.event_type().with_state_key("") == (StateEventType::RoomPolicy, "".into()) { return Ok(()); } let ps = match self .services .state_accessor .room_state_get_content::( room_id, &StateEventType::RoomPolicy, "", ) .await { | Ok(s) => s, | Err(e) => return if e.is_not_found() || e.kind() == ErrorKind::BadJson { debug!(%e, "no policy server configured"); Ok(()) } else { Err!("failed to load m.room.policy state event: {e}") }, }; let Some(ps_key) = ps.public_keys.get(&SigningKeyAlgorithm::Ed25519) else { debug!( "room has a policy server configured, but no valid public keys; skipping spam check" ); return Ok(()); }; if !self .services .state_cache .server_in_room(&ps.via, room_id) .await { debug!( via = %ps.via, "Policy server is not in the room, skipping spam check" ); return Ok(()); } if incoming { // Verify the signature instead of calling a check let event_id = pdu_json.remove("event_id"); let ps_allowed = verify_policy_signature(&ps.via, ps_key, pdu_json, &room_version_rules.redaction); if let Some(event_id) = event_id { pdu_json.insert("event_id".into(), event_id); } if ps_allowed { debug!( via = %ps.via, "Event is incoming and has a valid policy server signature" ); return Ok(()); } // N.B. In a future room version, this will be a soft failure specifically. debug_info!( via = %ps.via, "Event is incoming but does not have a valid policy server signature; asking policy \ server to sign it now" ); } if ps.via == self.services.globals.server_name() && !self.services.server.config.federation_loopback { warn!( %ps.via, %room_id, "Cannot ask ourselves for a policy signature if `federation_loopback=false`", ); return Ok(()); } let outgoing = self .services .sending .convert_to_outgoing_federation_event(pdu_json.clone()) .await; debug_info!( %ps.via, "Asking policy server to sign event" ); self.fetch_policy_server_signature(pdu, pdu_json, &ps.via, outgoing, room_id, ps_key, 0) .await?; // Verify that the policy server signature was made with the same public key as // is in the state event, not just that it was signed. if verify_policy_signature(&ps.via, ps_key, pdu_json, &room_version_rules.redaction) { Ok(()) } else { Err(Error::Request( ErrorKind::Unknown, "Policy server signature was made with a different key to the one advertised".into(), StatusCode::BAD_GATEWAY, )) } } /// Handles an error returned by the policy server. If the error is one that /// should be returned to the user, it is propagated, otherwise the request may /// be retried (for example, when rate-limited). #[allow(clippy::too_many_arguments)] #[implement(super::Service)] async fn handle_policy_server_error( &self, error: Error, pdu: &PduEvent, pdu_json: &mut CanonicalJsonObject, via: &ServerName, outgoing: Box, room_id: &RoomId, policy_server_key: &Base64>, retries: u8, timeout: Duration, ) -> Result<()> { match error.status_code() { | StatusCode::OK => unreachable!("ok response passed to handle_policy_server_error"), | StatusCode::BAD_REQUEST => { if matches!(error.kind(), ErrorKind::Forbidden) { warn!( via = %via, event_id = %pdu.event_id(), %room_id, error = ?error, "Policy server marked the event as spam" ); return Err(error); } error!( via = %via, event_id = %pdu.event_id(), %room_id, error = ?error.to_string(), "Policy server could not understand our request", ); Err!(BadServerResponse("Error communicating with policy server")) }, | StatusCode::FORBIDDEN => { Err!(Request(Forbidden( "Policy server refused to sign the event due to the room ACL" ))) }, | StatusCode::NOT_FOUND => { debug_info!( via = %via, event_id = %pdu.event_id(), %room_id, "Policy server is not actually a policy server or is not protecting this room: {}", error.message() ); Ok(()) }, | StatusCode::TOO_MANY_REQUESTS => { if let Some(retry_after) = error.retry_after() { if retries >= 5 { warn!( via = %via, event_id = %pdu.event_id(), room_id = %room_id, retries, "Policy server rate-limited us too many times; giving up" ); return Err(error); // Error should be passed to c2s } let saturated = retry_after.min(timeout); // ^ don't wait more than 60 seconds info!( via = %via, event_id = %pdu.event_id(), room_id = %room_id, retry_after = %saturated.as_secs(), retries, "Policy server rate-limited us; retrying after {retry_after:?}" ); tokio::select! { () = self.server_shutdown.notified() => (), () = sleep(saturated) => (), } if !self.services.server.running() { return Err(error); } return Box::pin(self.fetch_policy_server_signature( pdu, pdu_json, via, outgoing, room_id, policy_server_key, retries.saturating_add(1), )) .await; } warn!( via = %via, event_id = %pdu.event_id(), room_id = %room_id, retries, "Policy server rate-limited us without giving a retry window; giving up" ); Err(error) }, | _ => Err!(BadServerResponse( "Unexpected response from policy server: {}/{:?}", error.status_code(), error.kind() )), } } /// Asks a remote policy server for a signature on this event. /// If the policy server signs this event, the original data is mutated. /// Otherwise, the error is handled and potentially returned. #[allow(clippy::too_many_arguments)] #[implement(super::Service)] #[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via), level = "info")] pub async fn fetch_policy_server_signature( &self, pdu: &PduEvent, pdu_json: &mut CanonicalJsonObject, via: &ServerName, outgoing: Box, room_id: &RoomId, policy_server_key: &Base64>, retries: u8, ) -> Result<()> { let timeout = Duration::from_secs(self.services.server.config.policy_server_request_timeout); debug!("Requesting policy server signature"); let response = tokio::time::timeout( timeout, self.services .sending .send_federation_request(via, sign_event::v1::Request::new(outgoing.clone())), ) .await; let response = match response { | Ok(Ok(response)) => response, | Ok(Err(e)) => { debug_error!("Error from policy server: {:?}", e); return self .handle_policy_server_error( e, pdu, pdu_json, via, outgoing, room_id, policy_server_key, retries, timeout, ) .await; }, | Err(elapsed) => { warn!( %via, event_id = %pdu.event_id(), %room_id, %elapsed, "Policy server signature request timServerSignaturesed out" ); return Err!(Request(Forbidden("Policy server did not respond in time"))); }, }; let Some(signatures) = response.signatures.get(via) else { // NOTE: Legacy policy servers return a `200 {}` to indicate that the event was // flagged as spam. We'll make a distinction in the error message in case // it's unexpected. return Err!(Request(Forbidden("Policy server did not sign the event"))); }; if response.signatures.len() > 1 { warn!(?response.signatures, "Misbehaving policy server: returned signatures for extraneous servers"); return Ok(()); } let Some(signature) = signatures .get(&KeyId::parse(POLICY_SERVER_KEY_ID_ED25519).expect("policy server key ID is valid")) else { return Err!(BadServerResponse( "Policy server did not return a signature with the expected key ID", )); }; if signatures.len() > 1 { info!(?signatures, "Misbehaving policy server: returned extraneous signatures"); } pdu_json .entry("signatures".to_owned()) .or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default())) .as_object_mut() .expect("`signatures` field must be an object") .entry(via.to_string()) .or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default())) .as_object_mut() .expect("`signatures[via]` field must be an object") .insert( POLICY_SERVER_KEY_ID_ED25519.to_owned(), CanonicalJsonValue::String(signature.clone()), ); debug_info!("Policy server allowed event"); Ok(()) }