diff --git a/changelog.d/1487.feature b/changelog.d/1487.feature new file mode 100644 index 000000000..18231462f --- /dev/null +++ b/changelog.d/1487.feature @@ -0,0 +1 @@ +Updated [MSC4284: Policy Servers](https://github.com/matrix-org/matrix-spec-proposals/pull/4284) implementation to support the newly stabilised proposal. Contributed by @nex. diff --git a/conduwuit-example.toml b/conduwuit-example.toml index bce9e6419..51c0cade7 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -372,21 +372,18 @@ # #federation_timeout = 60 -# MSC4284 Policy server request timeout (seconds). Generally policy +# Policy server request timeout (seconds). Generally policy # servers should respond near instantly, however may slow down under # load. If a policy server doesn't respond in a short amount of time, the # room it is configured in may become unusable if this limit is set too -# high. 10 seconds is a good default, however dropping this to 3-5 seconds -# can be acceptable. +# high. 30 seconds is a good default, however lower values may be +# acceptable if temporary send failures are an okay trade-off. # -# Please be aware that policy requests are *NOT* currently re-tried, so if -# a spam check request fails, the event will be assumed to be not spam, -# which in some cases may result in spam being sent to or received from -# the room that would typically be prevented. # # About policy servers: https://matrix.org/blog/2025/04/introducing-policy-servers/ +# (Stabilized in Matrix v1.18) # -#policy_server_request_timeout = 10 +#policy_server_request_timeout = 30 # Federation client idle connection pool timeout (seconds). # @@ -1594,19 +1591,6 @@ # #block_non_admin_invites = false -# Enable or disable making requests to MSC4284 Policy Servers. -# It is recommended you keep this enabled unless you experience frequent -# connectivity issues, such as in a restricted networking environment. -# -#enable_msc4284_policy_servers = true - -# Enable running locally generated events through configured MSC4284 -# policy servers. You may wish to disable this if your server is -# single-user for a slight speed benefit in some rooms, but otherwise -# should leave it enabled. -# -#policy_server_check_own_events = true - # Allow admins to enter commands in rooms other than "#admins" (admin # room) by prefixing your message with "\!admin" or "\\!admin" followed up # a normal continuwuity admin command. The reply will be publicly visible diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 6083e5056..65a812b08 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -475,20 +475,18 @@ pub struct Config { #[serde(default = "default_federation_timeout")] pub federation_timeout: u64, - /// MSC4284 Policy server request timeout (seconds). Generally policy + /// Policy server request timeout (seconds). Generally policy /// servers should respond near instantly, however may slow down under /// load. If a policy server doesn't respond in a short amount of time, the /// room it is configured in may become unusable if this limit is set too - /// high. 10 seconds is a good default, however dropping this to 3-5 seconds - /// can be acceptable. + /// high. 30 seconds is a good default, however lower values may be + /// acceptable if temporary send failures are an okay trade-off. /// - /// Please be aware that policy requests are *NOT* currently re-tried, so if - /// a spam check request fails, the event will be assumed to be not spam, - /// which in some cases may result in spam being sent to or received from - /// the room that would typically be prevented. /// /// About policy servers: https://matrix.org/blog/2025/04/introducing-policy-servers/ - /// default: 10 + /// (Stabilized in Matrix v1.18) + /// + /// default: 30 #[serde(default = "default_policy_server_request_timeout")] pub policy_server_request_timeout: u64, @@ -1836,19 +1834,6 @@ pub struct Config { #[serde(default)] pub block_non_admin_invites: bool, - /// Enable or disable making requests to MSC4284 Policy Servers. - /// It is recommended you keep this enabled unless you experience frequent - /// connectivity issues, such as in a restricted networking environment. - #[serde(default = "true_fn")] - pub enable_msc4284_policy_servers: bool, - - /// Enable running locally generated events through configured MSC4284 - /// policy servers. You may wish to disable this if your server is - /// single-user for a slight speed benefit in some rooms, but otherwise - /// should leave it enabled. - #[serde(default = "true_fn")] - pub policy_server_check_own_events: bool, - /// Allow admins to enter commands in rooms other than "#admins" (admin /// room) by prefixing your message with "\!admin" or "\\!admin" followed up /// a normal continuwuity admin command. The reply will be publicly visible @@ -2537,7 +2522,7 @@ fn default_federation_conn_timeout() -> u64 { 10 } fn default_federation_timeout() -> u64 { 60 } -fn default_policy_server_request_timeout() -> u64 { 10 } +fn default_policy_server_request_timeout() -> u64 { 30 } fn default_federation_idle_timeout() -> u64 { 25 } diff --git a/src/core/error/mod.rs b/src/core/error/mod.rs index 37a86a193..703726043 100644 --- a/src/core/error/mod.rs +++ b/src/core/error/mod.rs @@ -6,7 +6,10 @@ mod serde; use std::{any::Any, borrow::Cow, convert::Infallible, error::Error as _, sync::PoisonError}; +use ruma::api::error::{ErrorKind, RetryAfter::Delay}; + pub use self::{err::visit, log::*}; +use crate::Error::BadRequest; #[derive(thiserror::Error)] pub enum Error { @@ -89,7 +92,7 @@ pub enum Error { #[error("Arithmetic operation failed: {0}")] Arithmetic(Cow<'static, str>), #[error("{0:?}: {1}")] - BadRequest(ruma::api::error::ErrorKind, &'static str), //TODO: remove + BadRequest(ErrorKind, &'static str), //TODO: remove #[error("{0}")] BadServerResponse(Cow<'static, str>), #[error(transparent)] @@ -117,7 +120,7 @@ pub enum Error { #[error("from {0}: {1}")] Redaction(ruma::OwnedServerName, ruma::canonical_json::RedactionError), #[error("{0:?}: {1}")] - Request(ruma::api::error::ErrorKind, Cow<'static, str>, http::StatusCode), + Request(ErrorKind, Cow<'static, str>, http::StatusCode), #[error(transparent)] Ruma(#[from] ruma::api::error::Error), #[error(transparent)] @@ -164,13 +167,13 @@ impl Error { /// Returns the Matrix error code / error kind #[inline] - pub fn kind(&self) -> ruma::api::error::ErrorKind { + pub fn kind(&self) -> ErrorKind { use ruma::api::error::ErrorKind::{Unknown, Unrecognized}; match self { | Self::Federation(_, error) | Self::Ruma(error) => response::ruma_error_kind(error).clone(), - | Self::BadRequest(kind, ..) | Self::Request(kind, ..) => kind.clone(), + | BadRequest(kind, ..) | Self::Request(kind, ..) => kind.clone(), | Self::FeatureDisabled(..) => Unrecognized, | _ => Unknown, } @@ -200,6 +203,15 @@ impl Error { /// Result where Ok(None) is instead Err(e) if e.is_not_found(). #[inline] pub fn is_not_found(&self) -> bool { self.status_code() == http::StatusCode::NOT_FOUND } + + pub fn retry_after(&self) -> Option { + if let BadRequest(ErrorKind::LimitExceeded(limit_data), ..) = self { + if let Some(Delay(after)) = limit_data.retry_after { + return Some(after); + } + } + None + } } impl std::fmt::Debug for Error { diff --git a/src/service/rooms/event_handler/policy_server.rs b/src/service/rooms/event_handler/policy_server.rs index 5d3a609a1..6641cf324 100644 --- a/src/service/rooms/event_handler/policy_server.rs +++ b/src/service/rooms/event_handler/policy_server.rs @@ -3,24 +3,72 @@ //! This module implements a check against a room-specific policy server, as //! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284). -use std::{collections::BTreeMap, sync::LazyLock, time::Duration}; +use std::{collections::BTreeMap, time::Duration}; use conduwuit::{ - Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, implement, trace, - warn, + Err, Error, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, error, + implement, info, state_res::EventTypeExt, trace, utils::to_canonical_object, warn, }; +use http::StatusCode; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, KeyId, OwnedKeyId, RoomId, ServerName, SigningKeyId, - events::StateEventType, -}; -use ruminuwuity::policy::{ - event::RoomPolicyEventContent, policy_check::unstable::Request as PolicyCheckRequest, - policy_sign::unstable::Request as PolicySignRequest, + CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, SigningKeyAlgorithm, + api::error::ErrorKind, + canonical_json::redact, + events::{StateEventType, room::policy::RoomPolicyEventContent}, + room_version_rules::{RedactionRules, RoomVersionRules}, + serde::{Base64, base64::Standard}, + signatures::{to_canonical_json_string_for_signing, verify_canonical_json_bytes}, }; +use ruminuwuity::policy::policy_sign::unstable::Request as PolicySignRequest; use serde_json::value::RawValue; +use tokio::time::sleep; -static POLICY_EVENT_TYPE_UNSTABLE: LazyLock = - LazyLock::new(|| StateEventType::from("org.matrix.msc4284.policy")); +pub(super) fn verify_policy_signature( + via: &ServerName, + ps_key: &Base64>, + pdu_json: &CanonicalJsonObject, + redaction_rules: &RedactionRules, +) -> bool { + trace!(data=?pdu_json, "Preparing to check policy server signature"); + let Some(canonical_json) = redact(pdu_json.clone(), redaction_rules, None) + .ok() + .and_then(|r| to_canonical_object(r).ok()) + else { + debug_warn!("Failed to redact event"); + return false; + }; + let Some(CanonicalJsonValue::Object(signature_map)) = pdu_json.get("signatures") else { + debug_warn!("Signatures map is not present?"); + return false; + }; + let Some(CanonicalJsonValue::Object(signature_set)) = signature_map.get(via.as_str()) else { + debug!("Signature map does not contain via {}", via.as_str()); + return false; + }; + let Some(signature) = signature_set + .get("ed25519:policy_server") + .and_then(|s| s.as_str()) + .and_then(|s| Base64::::parse(s).ok()) + else { + debug!("No (valid) policy server signature present on event"); + return false; + }; + + trace!(%signature, "Verifying policy server signature"); + let Ok(canonical_str) = to_canonical_json_string_for_signing(&canonical_json) else { + debug_warn!("Could not convert canonical json object into string"); + return false; + }; + + verify_canonical_json_bytes( + &SigningKeyAlgorithm::Ed25519, + ps_key.as_bytes(), + signature.as_bytes(), + canonical_str.as_bytes(), + ) + .inspect_err(|e| debug_error!("Policy server verification failed: {e}")) + .is_ok() +} /// Asks a remote policy server if the event is allowed. /// @@ -34,162 +82,216 @@ static POLICY_EVENT_TYPE_UNSTABLE: LazyLock = /// contacted for whatever reason, Err(e) is returned, which generally is a /// fail-open operation. #[implement(super::Service)] -#[tracing::instrument(skip(self, pdu, pdu_json, room_id), level = "info")] -pub async fn ask_policy_server( +#[tracing::instrument(skip(self, pdu, pdu_json, room_version_rules), level = "info")] +pub async fn policy_server_allows_event( &self, pdu: &PduEvent, pdu_json: &mut CanonicalJsonObject, room_id: &RoomId, + room_version_rules: &RoomVersionRules, incoming: bool, -) -> Result { - if !self.services.server.config.enable_msc4284_policy_servers { - trace!("policy server checking is disabled"); - return Ok(true); // don't ever contact policy servers - } - - if *pdu.event_type() == POLICY_EVENT_TYPE_UNSTABLE.clone().into() { - debug!( - room_id = %room_id, - event_type = ?pdu.event_type(), - "Skipping spam check for policy server meta-event" - ); - return Ok(true); - } - - let Ok(policyserver) = self - .services - .state_accessor - .room_state_get_content(room_id, &POLICY_EVENT_TYPE_UNSTABLE, "") - .await - .inspect_err(|e| { - if !e.is_not_found() { - debug_error!("failed to load room policy server state event: {e}"); - } - }) - .map(|c: RoomPolicyEventContent| c) - else { - debug!("room has no policy server configured"); - return Ok(true); +) -> Result<()> { + let ps = match pdu.event_type().with_state_key("").0 { + | StateEventType::RoomPolicy => return Ok(()), + | _ => + self.services + .state_accessor + .room_state_get_content::( + room_id, + &StateEventType::RoomPolicy, + "", + ) + .await, }; - - if self.services.server.config.policy_server_check_own_events - && !incoming - && policyserver.public_key.is_none() - { - // don't contact policy servers for locally generated events, but only when the - // policy server does not require signatures - trace!("won't contact policy server for locally generated event"); - return Ok(true); - } - - let via = match policyserver.via { - | Some(ref via) => ServerName::parse(via)?, - | None => { - trace!("No policy server configured for room {room_id}"); - return Ok(true); + let ps: RoomPolicyEventContent = match ps { + | Ok(ps) => ps, + | Err(e) => { + if e.is_not_found() { + trace!("no policy server configured"); + } else { + error!("failed to load policy server event: {e}"); + // TODO: Should this fail closed? + } + return Ok(()); }, }; + + let Some(ps_key) = ps.public_keys.get(&SigningKeyAlgorithm::Ed25519) else { + debug!( + "room has a policy server configured, but no valid public keys; skipping spam check" + ); + return Ok(()); + }; + if !self .services .state_cache - .server_in_room(&via, room_id) + .server_in_room(&ps.via, room_id) .await { debug!( - via = %via, + via = %ps.via, "Policy server is not in the room, skipping spam check" ); - return Ok(true); + return Ok(()); } + + if incoming { + // Verify the signature instead of calling a check + let event_id = pdu_json.remove("event_id"); + let ps_allowed = + verify_policy_signature(&ps.via, ps_key, pdu_json, &room_version_rules.redaction); + if let Some(event_id) = event_id { + pdu_json.insert("event_id".into(), event_id); + } + + if ps_allowed { + debug!( + via = %ps.via, + "Event is incoming and has a valid policy server signature" + ); + return Ok(()); + } + debug_info!( + via = %ps.via, + "Event is incoming but does not have a valid policy server signature; asking policy \ + server to sign it now" + ); + } + + if ps.via == self.services.globals.server_name() + && !self.services.server.config.federation_loopback + { + warn!( + %ps.via, + %room_id, + "Cannot ask ourselves for a policy signature if `federation_loopback=false`", + ); + return Ok(()); + } + let outgoing = self .services .sending .convert_to_outgoing_federation_event(pdu_json.clone()) .await; - if policyserver.public_key.is_some() { - if !incoming { + + debug_info!( + %ps.via, + "Asking policy server to sign event" + ); + self.fetch_policy_server_signature(pdu, pdu_json, &ps.via, outgoing, room_id, ps_key, 0) + .await +} +#[allow(clippy::too_many_arguments)] +#[implement(super::Service)] +async fn handle_policy_server_error( + &self, + error: Error, + pdu: &PduEvent, + pdu_json: &mut CanonicalJsonObject, + via: &ServerName, + outgoing: Box, + room_id: &RoomId, + policy_server_key: &Base64>, + retries: u8, + timeout: Duration, +) -> Result<()> { + match error.status_code() { + | StatusCode::OK => unreachable!("ok response passed to handle_policy_server_error"), + | StatusCode::BAD_REQUEST => { + if matches!(error.kind(), ErrorKind::Forbidden) { + warn!( + via = %via, + event_id = %pdu.event_id(), + %room_id, + error = ?error, + "Policy server marked the event as spam" + ); + return Err(error); + } + error!( + via = %via, + event_id = %pdu.event_id(), + %room_id, + error = ?error.to_string(), + "Policy server could not understand our request", + ); + Err!(BadServerResponse("Error communicating with policy server")) + }, + | StatusCode::FORBIDDEN => { + Err!(Request(Forbidden( + "Policy server refused to sign the event due to the room ACL" + ))) + }, + | StatusCode::NOT_FOUND => { debug_info!( via = %via, - outgoing = ?pdu_json, - "Getting policy server signature on event" + event_id = %pdu.event_id(), + %room_id, + "Policy server is not actually a policy server or is not protecting this room: {}", + error.message() ); - return self - .fetch_policy_server_signature(pdu, pdu_json, &via, outgoing, room_id) - .await; - } - // for incoming events, is it signed by with the key - // "ed25519:policy_server"? - if let Some(CanonicalJsonValue::Object(sigs)) = pdu_json.get("signatures") { - if let Some(CanonicalJsonValue::Object(server_sigs)) = sigs.get(via.as_str()) { - let wanted_key_id: OwnedKeyId = - SigningKeyId::parse("ed25519:policy_server")?; - if let Some(CanonicalJsonValue::String(_sig_value)) = - server_sigs.get(wanted_key_id.as_str()) - { - // TODO: verify signature - } - } - } - debug!( - "Event is not local and has no policy server signature, performing legacy spam check" - ); - } - debug_info!( - via = %via, - "Checking event for spam with policy server via legacy check" - ); - - let mut request = PolicyCheckRequest::new(pdu.event_id().to_owned()); - request.pdu = Some(outgoing); - - let response = tokio::time::timeout( - Duration::from_secs(self.services.server.config.policy_server_request_timeout), - self.services.sending.send_federation_request(&via, request), - ) - .await; - let response = match response { - | Ok(Ok(response)) => { - debug!("Response from policy server: {:?}", response); - response + Ok(()) }, - | Ok(Err(e)) => { + | StatusCode::TOO_MANY_REQUESTS => { + if let Some(retry_after) = error.retry_after() { + if retries >= 5 { + warn!( + via = %via, + event_id = %pdu.event_id(), + room_id = %room_id, + retries, + "Policy server rate-limited us too many times; giving up" + ); + return Err(error); // Error should be passed to c2s + } + let saturated = retry_after.min(timeout); + // ^ don't wait more than 60 seconds + info!( + via = %via, + event_id = %pdu.event_id(), + room_id = %room_id, + retry_after = %saturated.as_secs(), + retries, + "Policy server rate-limited us; retrying after {retry_after:?}" + ); + // TODO: select between this sleep and shutdown signal + sleep(saturated).await; + if !self.services.server.running() { + return Err(error); + } + return Box::pin(self.fetch_policy_server_signature( + pdu, + pdu_json, + via, + outgoing, + room_id, + policy_server_key, + retries.saturating_add(1), + )) + .await; + } warn!( via = %via, event_id = %pdu.event_id(), room_id = %room_id, - "Failed to contact policy server: {e}" + retries, + "Policy server rate-limited us without giving a retry window; giving up" ); - // Network or policy server errors are treated as non-fatal: event is allowed by - // default. - return Err(e); + Err(error) }, - | Err(elapsed) => { - warn!( - %via, - event_id = %pdu.event_id(), - %room_id, - %elapsed, - "Policy server request timed out after 10 seconds" - ); - return Err!("Request to policy server timed out"); - }, - }; - trace!("Recommendation from policy server was {}", response.recommendation); - if response.recommendation == "spam" { - warn!( - via = %via, - event_id = %pdu.event_id(), - room_id = %room_id, - "Event was marked as spam by policy server", - ); - return Ok(false); + | _ => Err!(BadServerResponse( + "Unexpected response from policy server: {}/{:?}", + error.status_code(), + error.kind() + )), } - - Ok(true) } /// Asks a remote policy server for a signature on this event. /// If the policy server signs this event, the original data is mutated. +#[allow(clippy::too_many_arguments)] #[implement(super::Service)] #[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via), level = "info")] pub async fn fetch_policy_server_signature( @@ -199,13 +301,16 @@ pub async fn fetch_policy_server_signature( via: &ServerName, outgoing: Box, room_id: &RoomId, -) -> Result { + policy_server_key: &Base64>, + retries: u8, +) -> Result<()> { + let timeout = Duration::from_secs(self.services.server.config.policy_server_request_timeout); debug!("Requesting policy server signature"); let response = tokio::time::timeout( - Duration::from_secs(self.services.server.config.policy_server_request_timeout), + timeout, self.services .sending - .send_federation_request(via, PolicySignRequest::new(outgoing)), + .send_federation_request(via, PolicySignRequest::new(outgoing.clone())), ) .await; @@ -215,15 +320,20 @@ pub async fn fetch_policy_server_signature( response }, | Ok(Err(e)) => { - warn!( - via = %via, - event_id = %pdu.event_id(), - room_id = %room_id, - "Failed to contact policy server: {e}" - ); - // Network or policy server errors are treated as non-fatal: event is allowed by - // default. - return Err(e); + debug_error!("Error from policy server: {:?}", e); + return self + .handle_policy_server_error( + e, + pdu, + pdu_json, + via, + outgoing, + room_id, + policy_server_key, + retries, + timeout, + ) + .await; }, | Err(elapsed) => { warn!( @@ -231,34 +341,41 @@ pub async fn fetch_policy_server_signature( event_id = %pdu.event_id(), %room_id, %elapsed, - "Policy server request timed out after 10 seconds" + "Policy server signature request timed out" ); - return Err!("Request to policy server timed out"); + return Err!(Request(Forbidden("Policy server did not respond in time"))); }, }; - if response.signatures.is_none() { - debug!("Policy server refused to sign event"); - return Ok(false); - } - let sigs: ruma::Signatures = - response.signatures.unwrap(); - if !sigs.contains_key(via) { - debug_warn!( - "Policy server returned signatures, but did not include the expected server name \ - '{}': {:?}", - via, - sigs + + if response + .signatures + .as_ref() + .is_none_or(|sigs| !sigs.contains_key(via)) + { + error!( + %via, + "Policy server did not sign event: {:?}", + response.signatures ); - return Ok(false); + return Err!(BadServerResponse( + "Policy server did not include expected server name in signatures" + )); } - let keypairs = sigs.get(via).unwrap(); + // Unwraps are safe here because we checked both in the above if statement + let signatures = response.signatures.unwrap(); + let keypairs = signatures.get(via).unwrap(); + + // TODO: need to be able to verify other algorithms let wanted_key_id = KeyId::parse("ed25519:policy_server")?; if !keypairs.contains_key(&wanted_key_id) { - debug_warn!( - "Policy server returned signature, but did not use the key ID \ + error!( + signatures = ?signatures, + "Policy server returned signatures, but did not use the key ID \ 'ed25519:policy_server'." ); - return Ok(false); + return Err!(BadServerResponse( + "Policy server signed the event, but did not use the expected key ID" + )); } let signatures_entry = pdu_json .entry("signatures".to_owned()) @@ -276,12 +393,12 @@ pub async fn fetch_policy_server_signature( ); }, | Some(_) => { - debug_warn!( + // This should never happen + unreachable!( "Existing `signatures[{}]` field is not an object; cannot insert policy \ signature", via ); - return Ok(false); }, | None => { let mut inner = BTreeMap::new(); @@ -296,11 +413,13 @@ pub async fn fetch_policy_server_signature( signatures_map.insert(via.as_str().to_owned(), CanonicalJsonValue::Object(inner)); }, } + // TODO: verify signature value was made with the policy_server_key + // rather than the expected key. } else { - debug_warn!( + unreachable!( "Existing `signatures` field is not an object; cannot insert policy signature" ); - return Ok(false); } - Ok(true) + debug_info!("Policy server allowed event"); + Ok(()) } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 7bb343e49..f3874c1ee 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -11,7 +11,9 @@ use conduwuit::{ warn, }; use futures::{FutureExt, StreamExt, future::ready}; -use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; +use ruma::{ + CanonicalJsonValue, RoomId, ServerName, api::error::ErrorKind, events::StateEventType, +}; use tokio::join; use super::get_room_version_rules; @@ -250,28 +252,32 @@ where // 14-pre. If the event is not a state event, ask the policy server about it if incoming_pdu.state_key.is_none() { debug!(event_id = %incoming_pdu.event_id, "Checking policy server for event"); - match self - .ask_policy_server( + if let Err(e) = self + .policy_server_allows_event( &incoming_pdu, &mut incoming_pdu.to_canonical_object(), room_id, + &room_version_rules, true, ) .await { - | Ok(false) => { + if matches!(e.kind(), ErrorKind::Forbidden) { warn!( event_id = %incoming_pdu.event_id, - "Event has been marked as spam by policy server" + error = %e, + "Event has been marked as spam by policy server: {}", + e.message(), ); soft_fail = true; - }, - | _ => { - debug!( - event_id = %incoming_pdu.event_id, - "Event has passed policy server check or the policy server was unavailable." - ); - }, + } else { + return Err(e); + } + } else { + debug!( + event_id = %incoming_pdu.event_id, + "Event has passed policy server check." + ); } } diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index ecb32e1de..e8c555f3b 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -9,7 +9,6 @@ use conduwuit_core::{ state_res, }, utils::{self, IterStream, ReadyExt, stream::TryIgnore}, - warn, }; use futures::{StreamExt, TryStreamExt, future, future::ready}; use ruma::{ @@ -316,23 +315,16 @@ pub async fn create_hash_and_sign_event( "Checking event in room {} with policy server", pdu.room_id.as_ref().map_or("None", |id| id.as_str()) ); - match self - .services + self.services .event_handler - .ask_policy_server(&pdu, &mut pdu_json, pdu.room_id().expect("has room ID"), false) - .await - { - | Ok(true) => {}, - | Ok(false) => { - return Err!(Request(Forbidden(debug_warn!( - "Policy server marked this event as spam" - )))); - }, - | Err(e) => { - // fail open - warn!("Failed to check event with policy server: {e}"); - }, - } + .policy_server_allows_event( + &pdu, + &mut pdu_json, + pdu.room_id().expect("has room ID"), + &room_version_rules, + false, + ) + .await?; } // Generate short event id