mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
437 lines
13 KiB
Rust
437 lines
13 KiB
Rust
//! Policy server integration for event spam checking in Matrix rooms.
|
|
//!
|
|
//! This module implements a check against a room-specific policy server, as
|
|
//! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284).
|
|
|
|
use std::{collections::BTreeMap, time::Duration};
|
|
|
|
use conduwuit::{
|
|
Err, Error, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, error,
|
|
implement, info, state_res::EventTypeExt, trace, utils::to_canonical_object, warn,
|
|
};
|
|
use http::StatusCode;
|
|
use ruma::{
|
|
CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, SigningKeyAlgorithm,
|
|
api::{error::ErrorKind, federation::policy::sign_event},
|
|
canonical_json::redact,
|
|
events::{StateEventType, room::policy::RoomPolicyEventContent},
|
|
room_version_rules::{RedactionRules, RoomVersionRules},
|
|
serde::{Base64, base64::Standard},
|
|
signatures::{to_canonical_json_string_for_signing, verify_canonical_json_bytes},
|
|
};
|
|
use serde_json::value::RawValue;
|
|
use tokio::time::sleep;
|
|
|
|
const POLICY_SERVER_KEY_ID_ED25519: &str = "ed25519:policy_server";
|
|
|
|
/// Checks that the given policy server signed the event with the given public
|
|
/// key.
|
|
///
|
|
/// Note: The caller MUST ensure event_id (and any other keys) are stripped
|
|
/// BEFORE passing to this function - no mutation is performed beyond redacting.
|
|
///
|
|
/// Parameters:
|
|
///
|
|
/// - via: The policy server that should've signed the event.
|
|
/// - ps_key: The public key part of the policy server's signing key.
|
|
/// - pdu_json: The raw PDU JSON object (will be cloned).
|
|
/// - redaction_rules: The redaction rules of the room version
|
|
///
|
|
/// Returns: `true` if the signature is present, `false` if it is not (including
|
|
/// if the signatures object is malformed).
|
|
pub(super) fn verify_policy_signature(
|
|
via: &ServerName,
|
|
ps_key: &Base64<Standard, Vec<u8>>,
|
|
pdu_json: &CanonicalJsonObject,
|
|
redaction_rules: &RedactionRules,
|
|
) -> bool {
|
|
#[cfg(debug_assertions)]
|
|
{
|
|
assert!(
|
|
!pdu_json.contains_key("event_id"),
|
|
"event_id should be removed from the JSON before verifying the policy server \
|
|
signature"
|
|
);
|
|
let pretty = serde_json::to_string(pdu_json).unwrap();
|
|
trace!(data=%pretty, "Preparing to check policy server signature");
|
|
};
|
|
let Some(canonical_json) = redact(pdu_json.clone(), redaction_rules, None)
|
|
.ok()
|
|
.and_then(|r| to_canonical_object(r).ok())
|
|
else {
|
|
debug_warn!("Failed to redact event");
|
|
return false;
|
|
};
|
|
let Some(signature) = extract_signature(pdu_json, via, POLICY_SERVER_KEY_ID_ED25519) else {
|
|
debug!("No (valid) policy server signature present on event");
|
|
return false;
|
|
};
|
|
|
|
trace!(%signature, "Verifying policy server signature");
|
|
let Ok(canonical_str) = to_canonical_json_string_for_signing(&canonical_json) else {
|
|
debug_warn!("Could not convert canonical json object into string");
|
|
return false;
|
|
};
|
|
|
|
verify_canonical_json_bytes(
|
|
&SigningKeyAlgorithm::Ed25519,
|
|
ps_key.as_bytes(),
|
|
signature.as_bytes(),
|
|
canonical_str.as_bytes(),
|
|
)
|
|
.inspect_err(|e| debug_error!("Policy server verification failed: {e}"))
|
|
.is_ok()
|
|
}
|
|
|
|
pub(super) fn extract_signature(
|
|
pdu_json: &CanonicalJsonObject,
|
|
server_name: &ServerName,
|
|
key_id: &str,
|
|
) -> Option<Base64<Standard, Vec<u8>>> {
|
|
pdu_json
|
|
.get("signatures")?
|
|
.as_object()?
|
|
.get(server_name.as_str())?
|
|
.as_object()?
|
|
.get(key_id)?
|
|
.as_str()
|
|
.and_then(|signature| Base64::<Standard>::parse(signature).ok())
|
|
}
|
|
|
|
/// Verifies the existing policy server signature, and/or fetches a new one
|
|
/// immediately.
|
|
///
|
|
/// If `incoming` is `true`, the event is checked for an existing signature. If
|
|
/// it has a valid one, `Ok` is returned. If it does not have a valid signature,
|
|
/// the function falls through to fetching a new one (which may be a soft-fail
|
|
/// in a future version).
|
|
///
|
|
/// If the event is the `m.room.policy` configuration state event,
|
|
/// this check is skipped. Similarly, if there is no policy server configured in
|
|
/// the PDU's room, the configuration event is malformed, or the configured
|
|
/// server is not present in the room, the check is also skipped.
|
|
///
|
|
/// If the policy server marks the event as spam, the relevant error is
|
|
/// returned. Otherwise, the incoming PDU JSON is mutated to include the new
|
|
/// policy server signature. Transient errors such as rate-limits are handled,
|
|
/// so any error returned by this function should be treated as final.
|
|
#[implement(super::Service)]
|
|
#[tracing::instrument(skip(self, pdu, pdu_json, room_version_rules), level = "info")]
|
|
pub async fn policy_server_allows_event(
|
|
&self,
|
|
pdu: &PduEvent,
|
|
pdu_json: &mut CanonicalJsonObject,
|
|
room_id: &RoomId,
|
|
room_version_rules: &RoomVersionRules,
|
|
incoming: bool,
|
|
) -> Result<()> {
|
|
if pdu.event_type().with_state_key("") == (StateEventType::RoomPolicy, "".into()) {
|
|
return Ok(());
|
|
}
|
|
let ps = match self
|
|
.services
|
|
.state_accessor
|
|
.room_state_get_content::<RoomPolicyEventContent>(
|
|
room_id,
|
|
&StateEventType::RoomPolicy,
|
|
"",
|
|
)
|
|
.await
|
|
{
|
|
| Ok(s) => s,
|
|
| Err(e) =>
|
|
return if e.is_not_found() || e.kind() == ErrorKind::BadJson {
|
|
debug!(%e, "no policy server configured");
|
|
Ok(())
|
|
} else {
|
|
Err!("failed to load m.room.policy state event: {e}")
|
|
},
|
|
};
|
|
|
|
let Some(ps_key) = ps.public_keys.get(&SigningKeyAlgorithm::Ed25519) else {
|
|
debug!(
|
|
"room has a policy server configured, but no valid public keys; skipping spam check"
|
|
);
|
|
return Ok(());
|
|
};
|
|
|
|
if !self
|
|
.services
|
|
.state_cache
|
|
.server_in_room(&ps.via, room_id)
|
|
.await
|
|
{
|
|
debug!(
|
|
via = %ps.via,
|
|
"Policy server is not in the room, skipping spam check"
|
|
);
|
|
return Ok(());
|
|
}
|
|
|
|
if incoming {
|
|
// Verify the signature instead of calling a check
|
|
let event_id = pdu_json.remove("event_id");
|
|
let ps_allowed =
|
|
verify_policy_signature(&ps.via, ps_key, pdu_json, &room_version_rules.redaction);
|
|
if let Some(event_id) = event_id {
|
|
pdu_json.insert("event_id".into(), event_id);
|
|
}
|
|
|
|
if ps_allowed {
|
|
debug!(
|
|
via = %ps.via,
|
|
"Event is incoming and has a valid policy server signature"
|
|
);
|
|
return Ok(());
|
|
}
|
|
// N.B. In a future room version, this will be a soft failure specifically.
|
|
debug_info!(
|
|
via = %ps.via,
|
|
"Event is incoming but does not have a valid policy server signature; asking policy \
|
|
server to sign it now"
|
|
);
|
|
}
|
|
|
|
if ps.via == self.services.globals.server_name()
|
|
&& !self.services.server.config.federation_loopback
|
|
{
|
|
warn!(
|
|
%ps.via,
|
|
%room_id,
|
|
"Cannot ask ourselves for a policy signature if `federation_loopback=false`",
|
|
);
|
|
return Ok(());
|
|
}
|
|
|
|
let outgoing = self
|
|
.services
|
|
.sending
|
|
.convert_to_outgoing_federation_event(pdu_json.clone())
|
|
.await;
|
|
|
|
debug_info!(
|
|
%ps.via,
|
|
"Asking policy server to sign event"
|
|
);
|
|
self.fetch_policy_server_signature(pdu, pdu_json, &ps.via, outgoing, room_id, ps_key, 0)
|
|
.await?;
|
|
|
|
// Verify that the policy server signature was made with the same public key as
|
|
// is in the state event, not just that it was signed.
|
|
if verify_policy_signature(&ps.via, ps_key, pdu_json, &room_version_rules.redaction) {
|
|
Ok(())
|
|
} else {
|
|
Err(Error::Request(
|
|
ErrorKind::Unknown,
|
|
"Policy server signature was made with a different key to the one advertised".into(),
|
|
StatusCode::BAD_GATEWAY,
|
|
))
|
|
}
|
|
}
|
|
|
|
/// Handles an error returned by the policy server. If the error is one that
|
|
/// should be returned to the user, it is propagated, otherwise the request may
|
|
/// be retried (for example, when rate-limited).
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[implement(super::Service)]
|
|
async fn handle_policy_server_error(
|
|
&self,
|
|
error: Error,
|
|
pdu: &PduEvent,
|
|
pdu_json: &mut CanonicalJsonObject,
|
|
via: &ServerName,
|
|
outgoing: Box<RawValue>,
|
|
room_id: &RoomId,
|
|
policy_server_key: &Base64<Standard, Vec<u8>>,
|
|
retries: u8,
|
|
timeout: Duration,
|
|
) -> Result<()> {
|
|
match error.status_code() {
|
|
| StatusCode::OK => unreachable!("ok response passed to handle_policy_server_error"),
|
|
| StatusCode::BAD_REQUEST => {
|
|
if matches!(error.kind(), ErrorKind::Forbidden) {
|
|
warn!(
|
|
via = %via,
|
|
event_id = %pdu.event_id(),
|
|
%room_id,
|
|
error = ?error,
|
|
"Policy server marked the event as spam"
|
|
);
|
|
return Err(error);
|
|
}
|
|
error!(
|
|
via = %via,
|
|
event_id = %pdu.event_id(),
|
|
%room_id,
|
|
error = ?error.to_string(),
|
|
"Policy server could not understand our request",
|
|
);
|
|
Err!(BadServerResponse("Error communicating with policy server"))
|
|
},
|
|
| StatusCode::FORBIDDEN => {
|
|
Err!(Request(Forbidden(
|
|
"Policy server refused to sign the event due to the room ACL"
|
|
)))
|
|
},
|
|
| StatusCode::NOT_FOUND => {
|
|
debug_info!(
|
|
via = %via,
|
|
event_id = %pdu.event_id(),
|
|
%room_id,
|
|
"Policy server is not actually a policy server or is not protecting this room: {}",
|
|
error.message()
|
|
);
|
|
Ok(())
|
|
},
|
|
| StatusCode::TOO_MANY_REQUESTS => {
|
|
if let Some(retry_after) = error.retry_after() {
|
|
if retries >= 5 {
|
|
warn!(
|
|
via = %via,
|
|
event_id = %pdu.event_id(),
|
|
room_id = %room_id,
|
|
retries,
|
|
"Policy server rate-limited us too many times; giving up"
|
|
);
|
|
return Err(error); // Error should be passed to c2s
|
|
}
|
|
let saturated = retry_after.min(timeout);
|
|
// ^ don't wait more than 60 seconds
|
|
info!(
|
|
via = %via,
|
|
event_id = %pdu.event_id(),
|
|
room_id = %room_id,
|
|
retry_after = %saturated.as_secs(),
|
|
retries,
|
|
"Policy server rate-limited us; retrying after {retry_after:?}"
|
|
);
|
|
tokio::select! {
|
|
() = self.server_shutdown.notified() => (),
|
|
() = sleep(saturated) => (),
|
|
}
|
|
if !self.services.server.running() {
|
|
return Err(error);
|
|
}
|
|
return Box::pin(self.fetch_policy_server_signature(
|
|
pdu,
|
|
pdu_json,
|
|
via,
|
|
outgoing,
|
|
room_id,
|
|
policy_server_key,
|
|
retries.saturating_add(1),
|
|
))
|
|
.await;
|
|
}
|
|
warn!(
|
|
via = %via,
|
|
event_id = %pdu.event_id(),
|
|
room_id = %room_id,
|
|
retries,
|
|
"Policy server rate-limited us without giving a retry window; giving up"
|
|
);
|
|
Err(error)
|
|
},
|
|
| _ => Err!(BadServerResponse(
|
|
"Unexpected response from policy server: {}/{:?}",
|
|
error.status_code(),
|
|
error.kind()
|
|
)),
|
|
}
|
|
}
|
|
|
|
/// Asks a remote policy server for a signature on this event.
|
|
/// If the policy server signs this event, the original data is mutated.
|
|
/// Otherwise, the error is handled and potentially returned.
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[implement(super::Service)]
|
|
#[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via), level = "info")]
|
|
pub async fn fetch_policy_server_signature(
|
|
&self,
|
|
pdu: &PduEvent,
|
|
pdu_json: &mut CanonicalJsonObject,
|
|
via: &ServerName,
|
|
outgoing: Box<RawValue>,
|
|
room_id: &RoomId,
|
|
policy_server_key: &Base64<Standard, Vec<u8>>,
|
|
retries: u8,
|
|
) -> Result<()> {
|
|
let timeout = Duration::from_secs(self.services.server.config.policy_server_request_timeout);
|
|
debug!("Requesting policy server signature");
|
|
let response = tokio::time::timeout(
|
|
timeout,
|
|
self.services
|
|
.sending
|
|
.send_federation_request(via, sign_event::v1::Request::new(outgoing.clone())),
|
|
)
|
|
.await;
|
|
|
|
let response = match response {
|
|
| Ok(Ok(response)) => response,
|
|
| Ok(Err(e)) => {
|
|
debug_error!("Error from policy server: {:?}", e);
|
|
return self
|
|
.handle_policy_server_error(
|
|
e,
|
|
pdu,
|
|
pdu_json,
|
|
via,
|
|
outgoing,
|
|
room_id,
|
|
policy_server_key,
|
|
retries,
|
|
timeout,
|
|
)
|
|
.await;
|
|
},
|
|
| Err(elapsed) => {
|
|
warn!(
|
|
%via,
|
|
event_id = %pdu.event_id(),
|
|
%room_id,
|
|
%elapsed,
|
|
"Policy server signature request timServerSignaturesed out"
|
|
);
|
|
return Err!(Request(Forbidden("Policy server did not respond in time")));
|
|
},
|
|
};
|
|
|
|
let Some(signatures) = response.signatures.get(via) else {
|
|
// NOTE: Legacy policy servers return a `200 {}` to indicate that the event was
|
|
// flagged as spam. We'll make a distinction in the error message in case
|
|
// it's unexpected.
|
|
return Err!(Request(Forbidden("Policy server did not sign the event")));
|
|
};
|
|
if response.signatures.len() > 1 {
|
|
warn!(?response.signatures, "Misbehaving policy server: returned signatures for extraneous servers");
|
|
return Ok(());
|
|
}
|
|
|
|
let Some(signature) = signatures
|
|
.get(&KeyId::parse(POLICY_SERVER_KEY_ID_ED25519).expect("policy server key ID is valid"))
|
|
else {
|
|
return Err!(BadServerResponse(
|
|
"Policy server did not return a signature with the expected key ID",
|
|
));
|
|
};
|
|
if signatures.len() > 1 {
|
|
info!(?signatures, "Misbehaving policy server: returned extraneous signatures");
|
|
}
|
|
|
|
pdu_json
|
|
.entry("signatures".to_owned())
|
|
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
|
|
.as_object_mut()
|
|
.expect("`signatures` field must be an object")
|
|
.entry(via.to_string())
|
|
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
|
|
.as_object_mut()
|
|
.expect("`signatures[via]` field must be an object")
|
|
.insert(
|
|
POLICY_SERVER_KEY_ID_ED25519.to_owned(),
|
|
CanonicalJsonValue::String(signature.clone()),
|
|
);
|
|
debug_info!("Policy server allowed event");
|
|
Ok(())
|
|
}
|