mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
feat: Update policy server implementation to be closer to latest spec
Untested chore: Add news fragment feat: Support stable policy servers feat: Don't attempt erroneous loopback federation for policy server checks refactor: Update PS upgrade to use new ruma fix: Only check loopback via after attempting incoming verification
This commit is contained in:
@@ -0,0 +1 @@
|
||||
Updated [MSC4284: Policy Servers](https://github.com/matrix-org/matrix-spec-proposals/pull/4284) implementation to support the newly stabilised proposal. Contributed by @nex.
|
||||
+5
-21
@@ -372,21 +372,18 @@
|
||||
#
|
||||
#federation_timeout = 60
|
||||
|
||||
# MSC4284 Policy server request timeout (seconds). Generally policy
|
||||
# Policy server request timeout (seconds). Generally policy
|
||||
# servers should respond near instantly, however may slow down under
|
||||
# load. If a policy server doesn't respond in a short amount of time, the
|
||||
# room it is configured in may become unusable if this limit is set too
|
||||
# high. 10 seconds is a good default, however dropping this to 3-5 seconds
|
||||
# can be acceptable.
|
||||
# high. 30 seconds is a good default, however lower values may be
|
||||
# acceptable if temporary send failures are an okay trade-off.
|
||||
#
|
||||
# Please be aware that policy requests are *NOT* currently re-tried, so if
|
||||
# a spam check request fails, the event will be assumed to be not spam,
|
||||
# which in some cases may result in spam being sent to or received from
|
||||
# the room that would typically be prevented.
|
||||
#
|
||||
# About policy servers: https://matrix.org/blog/2025/04/introducing-policy-servers/
|
||||
# (Stabilized in Matrix v1.18)
|
||||
#
|
||||
#policy_server_request_timeout = 10
|
||||
#policy_server_request_timeout = 30
|
||||
|
||||
# Federation client idle connection pool timeout (seconds).
|
||||
#
|
||||
@@ -1594,19 +1591,6 @@
|
||||
#
|
||||
#block_non_admin_invites = false
|
||||
|
||||
# Enable or disable making requests to MSC4284 Policy Servers.
|
||||
# It is recommended you keep this enabled unless you experience frequent
|
||||
# connectivity issues, such as in a restricted networking environment.
|
||||
#
|
||||
#enable_msc4284_policy_servers = true
|
||||
|
||||
# Enable running locally generated events through configured MSC4284
|
||||
# policy servers. You may wish to disable this if your server is
|
||||
# single-user for a slight speed benefit in some rooms, but otherwise
|
||||
# should leave it enabled.
|
||||
#
|
||||
#policy_server_check_own_events = true
|
||||
|
||||
# Allow admins to enter commands in rooms other than "#admins" (admin
|
||||
# room) by prefixing your message with "\!admin" or "\\!admin" followed up
|
||||
# a normal continuwuity admin command. The reply will be publicly visible
|
||||
|
||||
+7
-22
@@ -475,20 +475,18 @@ pub struct Config {
|
||||
#[serde(default = "default_federation_timeout")]
|
||||
pub federation_timeout: u64,
|
||||
|
||||
/// MSC4284 Policy server request timeout (seconds). Generally policy
|
||||
/// Policy server request timeout (seconds). Generally policy
|
||||
/// servers should respond near instantly, however may slow down under
|
||||
/// load. If a policy server doesn't respond in a short amount of time, the
|
||||
/// room it is configured in may become unusable if this limit is set too
|
||||
/// high. 10 seconds is a good default, however dropping this to 3-5 seconds
|
||||
/// can be acceptable.
|
||||
/// high. 30 seconds is a good default, however lower values may be
|
||||
/// acceptable if temporary send failures are an okay trade-off.
|
||||
///
|
||||
/// Please be aware that policy requests are *NOT* currently re-tried, so if
|
||||
/// a spam check request fails, the event will be assumed to be not spam,
|
||||
/// which in some cases may result in spam being sent to or received from
|
||||
/// the room that would typically be prevented.
|
||||
///
|
||||
/// About policy servers: https://matrix.org/blog/2025/04/introducing-policy-servers/
|
||||
/// default: 10
|
||||
/// (Stabilized in Matrix v1.18)
|
||||
///
|
||||
/// default: 30
|
||||
#[serde(default = "default_policy_server_request_timeout")]
|
||||
pub policy_server_request_timeout: u64,
|
||||
|
||||
@@ -1836,19 +1834,6 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub block_non_admin_invites: bool,
|
||||
|
||||
/// Enable or disable making requests to MSC4284 Policy Servers.
|
||||
/// It is recommended you keep this enabled unless you experience frequent
|
||||
/// connectivity issues, such as in a restricted networking environment.
|
||||
#[serde(default = "true_fn")]
|
||||
pub enable_msc4284_policy_servers: bool,
|
||||
|
||||
/// Enable running locally generated events through configured MSC4284
|
||||
/// policy servers. You may wish to disable this if your server is
|
||||
/// single-user for a slight speed benefit in some rooms, but otherwise
|
||||
/// should leave it enabled.
|
||||
#[serde(default = "true_fn")]
|
||||
pub policy_server_check_own_events: bool,
|
||||
|
||||
/// Allow admins to enter commands in rooms other than "#admins" (admin
|
||||
/// room) by prefixing your message with "\!admin" or "\\!admin" followed up
|
||||
/// a normal continuwuity admin command. The reply will be publicly visible
|
||||
@@ -2537,7 +2522,7 @@ fn default_federation_conn_timeout() -> u64 { 10 }
|
||||
|
||||
fn default_federation_timeout() -> u64 { 60 }
|
||||
|
||||
fn default_policy_server_request_timeout() -> u64 { 10 }
|
||||
fn default_policy_server_request_timeout() -> u64 { 30 }
|
||||
|
||||
fn default_federation_idle_timeout() -> u64 { 25 }
|
||||
|
||||
|
||||
+16
-4
@@ -6,7 +6,10 @@ mod serde;
|
||||
|
||||
use std::{any::Any, borrow::Cow, convert::Infallible, error::Error as _, sync::PoisonError};
|
||||
|
||||
use ruma::api::error::{ErrorKind, RetryAfter::Delay};
|
||||
|
||||
pub use self::{err::visit, log::*};
|
||||
use crate::Error::BadRequest;
|
||||
|
||||
#[derive(thiserror::Error)]
|
||||
pub enum Error {
|
||||
@@ -89,7 +92,7 @@ pub enum Error {
|
||||
#[error("Arithmetic operation failed: {0}")]
|
||||
Arithmetic(Cow<'static, str>),
|
||||
#[error("{0:?}: {1}")]
|
||||
BadRequest(ruma::api::error::ErrorKind, &'static str), //TODO: remove
|
||||
BadRequest(ErrorKind, &'static str), //TODO: remove
|
||||
#[error("{0}")]
|
||||
BadServerResponse(Cow<'static, str>),
|
||||
#[error(transparent)]
|
||||
@@ -117,7 +120,7 @@ pub enum Error {
|
||||
#[error("from {0}: {1}")]
|
||||
Redaction(ruma::OwnedServerName, ruma::canonical_json::RedactionError),
|
||||
#[error("{0:?}: {1}")]
|
||||
Request(ruma::api::error::ErrorKind, Cow<'static, str>, http::StatusCode),
|
||||
Request(ErrorKind, Cow<'static, str>, http::StatusCode),
|
||||
#[error(transparent)]
|
||||
Ruma(#[from] ruma::api::error::Error),
|
||||
#[error(transparent)]
|
||||
@@ -164,13 +167,13 @@ impl Error {
|
||||
|
||||
/// Returns the Matrix error code / error kind
|
||||
#[inline]
|
||||
pub fn kind(&self) -> ruma::api::error::ErrorKind {
|
||||
pub fn kind(&self) -> ErrorKind {
|
||||
use ruma::api::error::ErrorKind::{Unknown, Unrecognized};
|
||||
|
||||
match self {
|
||||
| Self::Federation(_, error) | Self::Ruma(error) =>
|
||||
response::ruma_error_kind(error).clone(),
|
||||
| Self::BadRequest(kind, ..) | Self::Request(kind, ..) => kind.clone(),
|
||||
| BadRequest(kind, ..) | Self::Request(kind, ..) => kind.clone(),
|
||||
| Self::FeatureDisabled(..) => Unrecognized,
|
||||
| _ => Unknown,
|
||||
}
|
||||
@@ -200,6 +203,15 @@ impl Error {
|
||||
/// Result where Ok(None) is instead Err(e) if e.is_not_found().
|
||||
#[inline]
|
||||
pub fn is_not_found(&self) -> bool { self.status_code() == http::StatusCode::NOT_FOUND }
|
||||
|
||||
pub fn retry_after(&self) -> Option<std::time::Duration> {
|
||||
if let BadRequest(ErrorKind::LimitExceeded(limit_data), ..) = self {
|
||||
if let Some(Delay(after)) = limit_data.retry_after {
|
||||
return Some(after);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Error {
|
||||
|
||||
@@ -3,24 +3,72 @@
|
||||
//! This module implements a check against a room-specific policy server, as
|
||||
//! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284).
|
||||
|
||||
use std::{collections::BTreeMap, sync::LazyLock, time::Duration};
|
||||
use std::{collections::BTreeMap, time::Duration};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, implement, trace,
|
||||
warn,
|
||||
Err, Error, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, error,
|
||||
implement, info, state_res::EventTypeExt, trace, utils::to_canonical_object, warn,
|
||||
};
|
||||
use http::StatusCode;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, KeyId, OwnedKeyId, RoomId, ServerName, SigningKeyId,
|
||||
events::StateEventType,
|
||||
};
|
||||
use ruminuwuity::policy::{
|
||||
event::RoomPolicyEventContent, policy_check::unstable::Request as PolicyCheckRequest,
|
||||
policy_sign::unstable::Request as PolicySignRequest,
|
||||
CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, SigningKeyAlgorithm,
|
||||
api::error::ErrorKind,
|
||||
canonical_json::redact,
|
||||
events::{StateEventType, room::policy::RoomPolicyEventContent},
|
||||
room_version_rules::{RedactionRules, RoomVersionRules},
|
||||
serde::{Base64, base64::Standard},
|
||||
signatures::{to_canonical_json_string_for_signing, verify_canonical_json_bytes},
|
||||
};
|
||||
use ruminuwuity::policy::policy_sign::unstable::Request as PolicySignRequest;
|
||||
use serde_json::value::RawValue;
|
||||
use tokio::time::sleep;
|
||||
|
||||
static POLICY_EVENT_TYPE_UNSTABLE: LazyLock<StateEventType> =
|
||||
LazyLock::new(|| StateEventType::from("org.matrix.msc4284.policy"));
|
||||
pub(super) fn verify_policy_signature(
|
||||
via: &ServerName,
|
||||
ps_key: &Base64<Standard, Vec<u8>>,
|
||||
pdu_json: &CanonicalJsonObject,
|
||||
redaction_rules: &RedactionRules,
|
||||
) -> bool {
|
||||
trace!(data=?pdu_json, "Preparing to check policy server signature");
|
||||
let Some(canonical_json) = redact(pdu_json.clone(), redaction_rules, None)
|
||||
.ok()
|
||||
.and_then(|r| to_canonical_object(r).ok())
|
||||
else {
|
||||
debug_warn!("Failed to redact event");
|
||||
return false;
|
||||
};
|
||||
let Some(CanonicalJsonValue::Object(signature_map)) = pdu_json.get("signatures") else {
|
||||
debug_warn!("Signatures map is not present?");
|
||||
return false;
|
||||
};
|
||||
let Some(CanonicalJsonValue::Object(signature_set)) = signature_map.get(via.as_str()) else {
|
||||
debug!("Signature map does not contain via {}", via.as_str());
|
||||
return false;
|
||||
};
|
||||
let Some(signature) = signature_set
|
||||
.get("ed25519:policy_server")
|
||||
.and_then(|s| s.as_str())
|
||||
.and_then(|s| Base64::<Standard>::parse(s).ok())
|
||||
else {
|
||||
debug!("No (valid) policy server signature present on event");
|
||||
return false;
|
||||
};
|
||||
|
||||
trace!(%signature, "Verifying policy server signature");
|
||||
let Ok(canonical_str) = to_canonical_json_string_for_signing(&canonical_json) else {
|
||||
debug_warn!("Could not convert canonical json object into string");
|
||||
return false;
|
||||
};
|
||||
|
||||
verify_canonical_json_bytes(
|
||||
&SigningKeyAlgorithm::Ed25519,
|
||||
ps_key.as_bytes(),
|
||||
signature.as_bytes(),
|
||||
canonical_str.as_bytes(),
|
||||
)
|
||||
.inspect_err(|e| debug_error!("Policy server verification failed: {e}"))
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
/// Asks a remote policy server if the event is allowed.
|
||||
///
|
||||
@@ -34,162 +82,216 @@ static POLICY_EVENT_TYPE_UNSTABLE: LazyLock<StateEventType> =
|
||||
/// contacted for whatever reason, Err(e) is returned, which generally is a
|
||||
/// fail-open operation.
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip(self, pdu, pdu_json, room_id), level = "info")]
|
||||
pub async fn ask_policy_server(
|
||||
#[tracing::instrument(skip(self, pdu, pdu_json, room_version_rules), level = "info")]
|
||||
pub async fn policy_server_allows_event(
|
||||
&self,
|
||||
pdu: &PduEvent,
|
||||
pdu_json: &mut CanonicalJsonObject,
|
||||
room_id: &RoomId,
|
||||
room_version_rules: &RoomVersionRules,
|
||||
incoming: bool,
|
||||
) -> Result<bool> {
|
||||
if !self.services.server.config.enable_msc4284_policy_servers {
|
||||
trace!("policy server checking is disabled");
|
||||
return Ok(true); // don't ever contact policy servers
|
||||
}
|
||||
|
||||
if *pdu.event_type() == POLICY_EVENT_TYPE_UNSTABLE.clone().into() {
|
||||
debug!(
|
||||
room_id = %room_id,
|
||||
event_type = ?pdu.event_type(),
|
||||
"Skipping spam check for policy server meta-event"
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let Ok(policyserver) = self
|
||||
.services
|
||||
.state_accessor
|
||||
.room_state_get_content(room_id, &POLICY_EVENT_TYPE_UNSTABLE, "")
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
if !e.is_not_found() {
|
||||
debug_error!("failed to load room policy server state event: {e}");
|
||||
}
|
||||
})
|
||||
.map(|c: RoomPolicyEventContent| c)
|
||||
else {
|
||||
debug!("room has no policy server configured");
|
||||
return Ok(true);
|
||||
) -> Result<()> {
|
||||
let ps = match pdu.event_type().with_state_key("").0 {
|
||||
| StateEventType::RoomPolicy => return Ok(()),
|
||||
| _ =>
|
||||
self.services
|
||||
.state_accessor
|
||||
.room_state_get_content::<RoomPolicyEventContent>(
|
||||
room_id,
|
||||
&StateEventType::RoomPolicy,
|
||||
"",
|
||||
)
|
||||
.await,
|
||||
};
|
||||
|
||||
if self.services.server.config.policy_server_check_own_events
|
||||
&& !incoming
|
||||
&& policyserver.public_key.is_none()
|
||||
{
|
||||
// don't contact policy servers for locally generated events, but only when the
|
||||
// policy server does not require signatures
|
||||
trace!("won't contact policy server for locally generated event");
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let via = match policyserver.via {
|
||||
| Some(ref via) => ServerName::parse(via)?,
|
||||
| None => {
|
||||
trace!("No policy server configured for room {room_id}");
|
||||
return Ok(true);
|
||||
let ps: RoomPolicyEventContent = match ps {
|
||||
| Ok(ps) => ps,
|
||||
| Err(e) => {
|
||||
if e.is_not_found() {
|
||||
trace!("no policy server configured");
|
||||
} else {
|
||||
error!("failed to load policy server event: {e}");
|
||||
// TODO: Should this fail closed?
|
||||
}
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
|
||||
let Some(ps_key) = ps.public_keys.get(&SigningKeyAlgorithm::Ed25519) else {
|
||||
debug!(
|
||||
"room has a policy server configured, but no valid public keys; skipping spam check"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if !self
|
||||
.services
|
||||
.state_cache
|
||||
.server_in_room(&via, room_id)
|
||||
.server_in_room(&ps.via, room_id)
|
||||
.await
|
||||
{
|
||||
debug!(
|
||||
via = %via,
|
||||
via = %ps.via,
|
||||
"Policy server is not in the room, skipping spam check"
|
||||
);
|
||||
return Ok(true);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if incoming {
|
||||
// Verify the signature instead of calling a check
|
||||
let event_id = pdu_json.remove("event_id");
|
||||
let ps_allowed =
|
||||
verify_policy_signature(&ps.via, ps_key, pdu_json, &room_version_rules.redaction);
|
||||
if let Some(event_id) = event_id {
|
||||
pdu_json.insert("event_id".into(), event_id);
|
||||
}
|
||||
|
||||
if ps_allowed {
|
||||
debug!(
|
||||
via = %ps.via,
|
||||
"Event is incoming and has a valid policy server signature"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
debug_info!(
|
||||
via = %ps.via,
|
||||
"Event is incoming but does not have a valid policy server signature; asking policy \
|
||||
server to sign it now"
|
||||
);
|
||||
}
|
||||
|
||||
if ps.via == self.services.globals.server_name()
|
||||
&& !self.services.server.config.federation_loopback
|
||||
{
|
||||
warn!(
|
||||
%ps.via,
|
||||
%room_id,
|
||||
"Cannot ask ourselves for a policy signature if `federation_loopback=false`",
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let outgoing = self
|
||||
.services
|
||||
.sending
|
||||
.convert_to_outgoing_federation_event(pdu_json.clone())
|
||||
.await;
|
||||
if policyserver.public_key.is_some() {
|
||||
if !incoming {
|
||||
|
||||
debug_info!(
|
||||
%ps.via,
|
||||
"Asking policy server to sign event"
|
||||
);
|
||||
self.fetch_policy_server_signature(pdu, pdu_json, &ps.via, outgoing, room_id, ps_key, 0)
|
||||
.await
|
||||
}
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[implement(super::Service)]
|
||||
async fn handle_policy_server_error(
|
||||
&self,
|
||||
error: Error,
|
||||
pdu: &PduEvent,
|
||||
pdu_json: &mut CanonicalJsonObject,
|
||||
via: &ServerName,
|
||||
outgoing: Box<RawValue>,
|
||||
room_id: &RoomId,
|
||||
policy_server_key: &Base64<Standard, Vec<u8>>,
|
||||
retries: u8,
|
||||
timeout: Duration,
|
||||
) -> Result<()> {
|
||||
match error.status_code() {
|
||||
| StatusCode::OK => unreachable!("ok response passed to handle_policy_server_error"),
|
||||
| StatusCode::BAD_REQUEST => {
|
||||
if matches!(error.kind(), ErrorKind::Forbidden) {
|
||||
warn!(
|
||||
via = %via,
|
||||
event_id = %pdu.event_id(),
|
||||
%room_id,
|
||||
error = ?error,
|
||||
"Policy server marked the event as spam"
|
||||
);
|
||||
return Err(error);
|
||||
}
|
||||
error!(
|
||||
via = %via,
|
||||
event_id = %pdu.event_id(),
|
||||
%room_id,
|
||||
error = ?error.to_string(),
|
||||
"Policy server could not understand our request",
|
||||
);
|
||||
Err!(BadServerResponse("Error communicating with policy server"))
|
||||
},
|
||||
| StatusCode::FORBIDDEN => {
|
||||
Err!(Request(Forbidden(
|
||||
"Policy server refused to sign the event due to the room ACL"
|
||||
)))
|
||||
},
|
||||
| StatusCode::NOT_FOUND => {
|
||||
debug_info!(
|
||||
via = %via,
|
||||
outgoing = ?pdu_json,
|
||||
"Getting policy server signature on event"
|
||||
event_id = %pdu.event_id(),
|
||||
%room_id,
|
||||
"Policy server is not actually a policy server or is not protecting this room: {}",
|
||||
error.message()
|
||||
);
|
||||
return self
|
||||
.fetch_policy_server_signature(pdu, pdu_json, &via, outgoing, room_id)
|
||||
.await;
|
||||
}
|
||||
// for incoming events, is it signed by <via> with the key
|
||||
// "ed25519:policy_server"?
|
||||
if let Some(CanonicalJsonValue::Object(sigs)) = pdu_json.get("signatures") {
|
||||
if let Some(CanonicalJsonValue::Object(server_sigs)) = sigs.get(via.as_str()) {
|
||||
let wanted_key_id: OwnedKeyId<ruma::SigningKeyAlgorithm, ruma::Base64PublicKey> =
|
||||
SigningKeyId::parse("ed25519:policy_server")?;
|
||||
if let Some(CanonicalJsonValue::String(_sig_value)) =
|
||||
server_sigs.get(wanted_key_id.as_str())
|
||||
{
|
||||
// TODO: verify signature
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!(
|
||||
"Event is not local and has no policy server signature, performing legacy spam check"
|
||||
);
|
||||
}
|
||||
debug_info!(
|
||||
via = %via,
|
||||
"Checking event for spam with policy server via legacy check"
|
||||
);
|
||||
|
||||
let mut request = PolicyCheckRequest::new(pdu.event_id().to_owned());
|
||||
request.pdu = Some(outgoing);
|
||||
|
||||
let response = tokio::time::timeout(
|
||||
Duration::from_secs(self.services.server.config.policy_server_request_timeout),
|
||||
self.services.sending.send_federation_request(&via, request),
|
||||
)
|
||||
.await;
|
||||
let response = match response {
|
||||
| Ok(Ok(response)) => {
|
||||
debug!("Response from policy server: {:?}", response);
|
||||
response
|
||||
Ok(())
|
||||
},
|
||||
| Ok(Err(e)) => {
|
||||
| StatusCode::TOO_MANY_REQUESTS => {
|
||||
if let Some(retry_after) = error.retry_after() {
|
||||
if retries >= 5 {
|
||||
warn!(
|
||||
via = %via,
|
||||
event_id = %pdu.event_id(),
|
||||
room_id = %room_id,
|
||||
retries,
|
||||
"Policy server rate-limited us too many times; giving up"
|
||||
);
|
||||
return Err(error); // Error should be passed to c2s
|
||||
}
|
||||
let saturated = retry_after.min(timeout);
|
||||
// ^ don't wait more than 60 seconds
|
||||
info!(
|
||||
via = %via,
|
||||
event_id = %pdu.event_id(),
|
||||
room_id = %room_id,
|
||||
retry_after = %saturated.as_secs(),
|
||||
retries,
|
||||
"Policy server rate-limited us; retrying after {retry_after:?}"
|
||||
);
|
||||
// TODO: select between this sleep and shutdown signal
|
||||
sleep(saturated).await;
|
||||
if !self.services.server.running() {
|
||||
return Err(error);
|
||||
}
|
||||
return Box::pin(self.fetch_policy_server_signature(
|
||||
pdu,
|
||||
pdu_json,
|
||||
via,
|
||||
outgoing,
|
||||
room_id,
|
||||
policy_server_key,
|
||||
retries.saturating_add(1),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
warn!(
|
||||
via = %via,
|
||||
event_id = %pdu.event_id(),
|
||||
room_id = %room_id,
|
||||
"Failed to contact policy server: {e}"
|
||||
retries,
|
||||
"Policy server rate-limited us without giving a retry window; giving up"
|
||||
);
|
||||
// Network or policy server errors are treated as non-fatal: event is allowed by
|
||||
// default.
|
||||
return Err(e);
|
||||
Err(error)
|
||||
},
|
||||
| Err(elapsed) => {
|
||||
warn!(
|
||||
%via,
|
||||
event_id = %pdu.event_id(),
|
||||
%room_id,
|
||||
%elapsed,
|
||||
"Policy server request timed out after 10 seconds"
|
||||
);
|
||||
return Err!("Request to policy server timed out");
|
||||
},
|
||||
};
|
||||
trace!("Recommendation from policy server was {}", response.recommendation);
|
||||
if response.recommendation == "spam" {
|
||||
warn!(
|
||||
via = %via,
|
||||
event_id = %pdu.event_id(),
|
||||
room_id = %room_id,
|
||||
"Event was marked as spam by policy server",
|
||||
);
|
||||
return Ok(false);
|
||||
| _ => Err!(BadServerResponse(
|
||||
"Unexpected response from policy server: {}/{:?}",
|
||||
error.status_code(),
|
||||
error.kind()
|
||||
)),
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Asks a remote policy server for a signature on this event.
|
||||
/// If the policy server signs this event, the original data is mutated.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via), level = "info")]
|
||||
pub async fn fetch_policy_server_signature(
|
||||
@@ -199,13 +301,16 @@ pub async fn fetch_policy_server_signature(
|
||||
via: &ServerName,
|
||||
outgoing: Box<RawValue>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<bool> {
|
||||
policy_server_key: &Base64<Standard, Vec<u8>>,
|
||||
retries: u8,
|
||||
) -> Result<()> {
|
||||
let timeout = Duration::from_secs(self.services.server.config.policy_server_request_timeout);
|
||||
debug!("Requesting policy server signature");
|
||||
let response = tokio::time::timeout(
|
||||
Duration::from_secs(self.services.server.config.policy_server_request_timeout),
|
||||
timeout,
|
||||
self.services
|
||||
.sending
|
||||
.send_federation_request(via, PolicySignRequest::new(outgoing)),
|
||||
.send_federation_request(via, PolicySignRequest::new(outgoing.clone())),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -215,15 +320,20 @@ pub async fn fetch_policy_server_signature(
|
||||
response
|
||||
},
|
||||
| Ok(Err(e)) => {
|
||||
warn!(
|
||||
via = %via,
|
||||
event_id = %pdu.event_id(),
|
||||
room_id = %room_id,
|
||||
"Failed to contact policy server: {e}"
|
||||
);
|
||||
// Network or policy server errors are treated as non-fatal: event is allowed by
|
||||
// default.
|
||||
return Err(e);
|
||||
debug_error!("Error from policy server: {:?}", e);
|
||||
return self
|
||||
.handle_policy_server_error(
|
||||
e,
|
||||
pdu,
|
||||
pdu_json,
|
||||
via,
|
||||
outgoing,
|
||||
room_id,
|
||||
policy_server_key,
|
||||
retries,
|
||||
timeout,
|
||||
)
|
||||
.await;
|
||||
},
|
||||
| Err(elapsed) => {
|
||||
warn!(
|
||||
@@ -231,34 +341,41 @@ pub async fn fetch_policy_server_signature(
|
||||
event_id = %pdu.event_id(),
|
||||
%room_id,
|
||||
%elapsed,
|
||||
"Policy server request timed out after 10 seconds"
|
||||
"Policy server signature request timed out"
|
||||
);
|
||||
return Err!("Request to policy server timed out");
|
||||
return Err!(Request(Forbidden("Policy server did not respond in time")));
|
||||
},
|
||||
};
|
||||
if response.signatures.is_none() {
|
||||
debug!("Policy server refused to sign event");
|
||||
return Ok(false);
|
||||
}
|
||||
let sigs: ruma::Signatures<ruma::OwnedServerName, ruma::ServerSigningKeyVersion> =
|
||||
response.signatures.unwrap();
|
||||
if !sigs.contains_key(via) {
|
||||
debug_warn!(
|
||||
"Policy server returned signatures, but did not include the expected server name \
|
||||
'{}': {:?}",
|
||||
via,
|
||||
sigs
|
||||
|
||||
if response
|
||||
.signatures
|
||||
.as_ref()
|
||||
.is_none_or(|sigs| !sigs.contains_key(via))
|
||||
{
|
||||
error!(
|
||||
%via,
|
||||
"Policy server did not sign event: {:?}",
|
||||
response.signatures
|
||||
);
|
||||
return Ok(false);
|
||||
return Err!(BadServerResponse(
|
||||
"Policy server did not include expected server name in signatures"
|
||||
));
|
||||
}
|
||||
let keypairs = sigs.get(via).unwrap();
|
||||
// Unwraps are safe here because we checked both in the above if statement
|
||||
let signatures = response.signatures.unwrap();
|
||||
let keypairs = signatures.get(via).unwrap();
|
||||
|
||||
// TODO: need to be able to verify other algorithms
|
||||
let wanted_key_id = KeyId::parse("ed25519:policy_server")?;
|
||||
if !keypairs.contains_key(&wanted_key_id) {
|
||||
debug_warn!(
|
||||
"Policy server returned signature, but did not use the key ID \
|
||||
error!(
|
||||
signatures = ?signatures,
|
||||
"Policy server returned signatures, but did not use the key ID \
|
||||
'ed25519:policy_server'."
|
||||
);
|
||||
return Ok(false);
|
||||
return Err!(BadServerResponse(
|
||||
"Policy server signed the event, but did not use the expected key ID"
|
||||
));
|
||||
}
|
||||
let signatures_entry = pdu_json
|
||||
.entry("signatures".to_owned())
|
||||
@@ -276,12 +393,12 @@ pub async fn fetch_policy_server_signature(
|
||||
);
|
||||
},
|
||||
| Some(_) => {
|
||||
debug_warn!(
|
||||
// This should never happen
|
||||
unreachable!(
|
||||
"Existing `signatures[{}]` field is not an object; cannot insert policy \
|
||||
signature",
|
||||
via
|
||||
);
|
||||
return Ok(false);
|
||||
},
|
||||
| None => {
|
||||
let mut inner = BTreeMap::new();
|
||||
@@ -296,11 +413,13 @@ pub async fn fetch_policy_server_signature(
|
||||
signatures_map.insert(via.as_str().to_owned(), CanonicalJsonValue::Object(inner));
|
||||
},
|
||||
}
|
||||
// TODO: verify signature value was made with the policy_server_key
|
||||
// rather than the expected key.
|
||||
} else {
|
||||
debug_warn!(
|
||||
unreachable!(
|
||||
"Existing `signatures` field is not an object; cannot insert policy signature"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
Ok(true)
|
||||
debug_info!("Policy server allowed event");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -11,7 +11,9 @@ use conduwuit::{
|
||||
warn,
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, future::ready};
|
||||
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
|
||||
use ruma::{
|
||||
CanonicalJsonValue, RoomId, ServerName, api::error::ErrorKind, events::StateEventType,
|
||||
};
|
||||
use tokio::join;
|
||||
|
||||
use super::get_room_version_rules;
|
||||
@@ -250,28 +252,32 @@ where
|
||||
// 14-pre. If the event is not a state event, ask the policy server about it
|
||||
if incoming_pdu.state_key.is_none() {
|
||||
debug!(event_id = %incoming_pdu.event_id, "Checking policy server for event");
|
||||
match self
|
||||
.ask_policy_server(
|
||||
if let Err(e) = self
|
||||
.policy_server_allows_event(
|
||||
&incoming_pdu,
|
||||
&mut incoming_pdu.to_canonical_object(),
|
||||
room_id,
|
||||
&room_version_rules,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(false) => {
|
||||
if matches!(e.kind(), ErrorKind::Forbidden) {
|
||||
warn!(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Event has been marked as spam by policy server"
|
||||
error = %e,
|
||||
"Event has been marked as spam by policy server: {}",
|
||||
e.message(),
|
||||
);
|
||||
soft_fail = true;
|
||||
},
|
||||
| _ => {
|
||||
debug!(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Event has passed policy server check or the policy server was unavailable."
|
||||
);
|
||||
},
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Event has passed policy server check."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ use conduwuit_core::{
|
||||
state_res,
|
||||
},
|
||||
utils::{self, IterStream, ReadyExt, stream::TryIgnore},
|
||||
warn,
|
||||
};
|
||||
use futures::{StreamExt, TryStreamExt, future, future::ready};
|
||||
use ruma::{
|
||||
@@ -316,23 +315,16 @@ pub async fn create_hash_and_sign_event(
|
||||
"Checking event in room {} with policy server",
|
||||
pdu.room_id.as_ref().map_or("None", |id| id.as_str())
|
||||
);
|
||||
match self
|
||||
.services
|
||||
self.services
|
||||
.event_handler
|
||||
.ask_policy_server(&pdu, &mut pdu_json, pdu.room_id().expect("has room ID"), false)
|
||||
.await
|
||||
{
|
||||
| Ok(true) => {},
|
||||
| Ok(false) => {
|
||||
return Err!(Request(Forbidden(debug_warn!(
|
||||
"Policy server marked this event as spam"
|
||||
))));
|
||||
},
|
||||
| Err(e) => {
|
||||
// fail open
|
||||
warn!("Failed to check event with policy server: {e}");
|
||||
},
|
||||
}
|
||||
.policy_server_allows_event(
|
||||
&pdu,
|
||||
&mut pdu_json,
|
||||
pdu.room_id().expect("has room ID"),
|
||||
&room_version_rules,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Generate short event id
|
||||
|
||||
Reference in New Issue
Block a user