Files
continuwuity/src/service/federation/execute.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

297 lines
7.6 KiB
Rust
Raw Normal View History

use std::{fmt::Debug, mem};
use bytes::Bytes;
2024-12-14 21:58:01 -05:00
use conduwuit::{
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
error::inspect_debug_log, implement, trace, utils::string::EMPTY,
2024-07-18 06:37:47 +00:00
};
use http::{HeaderValue, header::AUTHORIZATION};
use ipaddress::IPAddress;
2024-04-22 10:35:12 -07:00
use reqwest::{Client, Method, Request, Response, Url};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, ServerName, ServerSigningKeyId,
api::{
EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken,
2025-03-06 00:14:24 -05:00
client::error::Error as RumaError, federation::authentication::XMatrix,
},
serde::Base64,
};
use crate::resolver::actual::ActualDest;
2024-04-16 20:54:16 -07:00
/// Sends a request to a federation server
#[implement(super::Service)]
#[tracing::instrument(skip_all, name = "request", level = "debug")]
pub async fn execute<T>(&self, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.federation;
self.execute_on(client, dest, request).await
}
/// Like execute() but with a very large timeout
#[implement(super::Service)]
#[tracing::instrument(skip_all, name = "synapse", level = "debug")]
pub async fn execute_synapse<T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.synapse;
self.execute_on(client, dest, request).await
}
#[implement(super::Service)]
#[tracing::instrument(
2025-01-26 04:46:10 +00:00
name = "fed",
level = INFO_SPAN_LEVEL,
skip(self, client, request),
)]
pub async fn execute_on<T>(
&self,
client: &Client,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
if !self.services.server.config.allow_federation {
return Err!(Config("allow_federation", "Federation is disabled."));
2024-07-18 06:37:47 +00:00
}
if self.services.moderation.is_remote_server_forbidden(dest) {
return Err!(Request(Forbidden(debug_warn!("Federation with {dest} is not allowed."))));
2024-04-16 20:54:16 -07:00
}
let actual = self.services.resolver.get_actual_dest(dest).await?;
let request = into_http_request::<T>(&actual, request)?;
let request = self.prepare(dest, request)?;
self.perform::<T>(dest, &actual, request, client).await
}
2024-04-23 15:31:40 -07:00
#[implement(super::Service)]
async fn perform<T>(
&self,
dest: &ServerName,
actual: &ActualDest,
request: Request,
client: &Client,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
let url = request.url().clone();
let method = request.method().clone();
debug!(?method, ?url, "Sending request");
match client.execute(request).await {
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
| Err(error) =>
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
2024-07-18 06:37:47 +00:00
}
}
2024-04-23 15:31:40 -07:00
#[implement(super::Service)]
fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Result<Request> {
self.sign_request(&mut request, dest);
let request = Request::try_from(request)?;
self.validate_url(request.url())?;
self.services.server.check_running()?;
2024-07-18 06:37:47 +00:00
Ok(request)
}
#[implement(super::Service)]
fn validate_url(&self, url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {
if let Ok(ip) = IPAddress::parse(url_host) {
trace!("Checking request URL IP {ip:?}");
self.services.resolver.validate_ip(&ip)?;
}
2024-07-18 06:37:47 +00:00
}
Ok(())
2024-04-23 15:31:40 -07:00
}
2024-04-16 20:54:16 -07:00
async fn handle_response<T>(
dest: &ServerName,
actual: &ActualDest,
method: &Method,
url: &Url,
response: Response,
2024-04-16 20:54:16 -07:00
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
2024-04-16 20:54:16 -07:00
{
let response = into_http_response(dest, actual, method, url, response).await?;
T::IncomingResponse::try_from_http_response(response)
.map_err(|e| err!(BadServerResponse("Server returned bad 200 response: {e:?}")))
}
async fn into_http_response(
dest: &ServerName,
actual: &ActualDest,
method: &Method,
url: &Url,
mut response: Response,
) -> Result<http::Response<Bytes>> {
2024-04-16 20:54:16 -07:00
let status = response.status();
2024-07-18 06:37:47 +00:00
trace!(
?status, ?method,
request_url = ?url,
response_url = ?response.url(),
"Received response from {}",
2024-10-28 06:49:25 +00:00
actual.string(),
2024-07-18 06:37:47 +00:00
);
2024-04-16 20:54:16 -07:00
let mut http_response_builder = http::Response::builder()
.status(status)
.version(response.version());
2024-04-16 20:54:16 -07:00
mem::swap(
response.headers_mut(),
http_response_builder
.headers_mut()
.expect("http::response::Builder is usable"),
);
2024-07-18 06:37:47 +00:00
// TODO: handle timeout
trace!("Waiting for response body...");
let body = response
.bytes()
.await
.inspect_err(inspect_debug_log)
.unwrap_or_else(|_| Vec::new().into());
2024-04-16 20:54:16 -07:00
let http_response = http_response_builder
.body(body)
.expect("reqwest body is valid http body");
debug!("Got {status:?} for {method} {url}");
if !status.is_success() {
return Err(Error::Federation(
dest.to_owned(),
RumaError::from_http_response(http_response),
));
2024-04-16 20:54:16 -07:00
}
Ok(http_response)
2024-04-16 20:54:16 -07:00
}
fn handle_error(
actual: &ActualDest,
method: &Method,
url: &Url,
mut e: reqwest::Error,
) -> Result {
2024-04-22 10:35:12 -07:00
if e.is_timeout() || e.is_connect() {
e = e.without_url();
debug_warn!("{e:?}");
2024-04-16 20:54:16 -07:00
} else if e.is_redirect() {
debug_error!(
2024-04-16 20:54:16 -07:00
method = ?method,
url = ?url,
final_url = ?e.url(),
"Redirect loop {}: {}",
actual.host,
2024-04-16 20:54:16 -07:00
e,
);
} else {
2024-04-22 10:35:12 -07:00
debug_error!("{e:?}");
2024-04-16 20:54:16 -07:00
}
Err(e.into())
}
#[implement(super::Service)]
2024-10-29 23:31:53 +00:00
fn sign_request(&self, http_request: &mut http::Request<Vec<u8>>, dest: &ServerName) {
type Member = (String, Value);
type Value = CanonicalJsonValue;
type Object = CanonicalJsonObject;
let origin = &self.services.server.name;
2024-10-29 23:31:53 +00:00
let body = http_request.body();
let uri = http_request
.uri()
.path_and_query()
.expect("http::Request missing path_and_query");
let mut req: Object = if !body.is_empty() {
let content: CanonicalJsonValue =
serde_json::from_slice(body).expect("failed to serialize body");
2024-10-29 23:31:53 +00:00
let authorization: [Member; 5] = [
("content".into(), content),
("destination".into(), dest.as_str().into()),
("method".into(), http_request.method().as_str().into()),
("origin".into(), origin.as_str().into()),
("uri".into(), uri.to_string().into()),
];
authorization.into()
} else {
let authorization: [Member; 4] = [
("destination".into(), dest.as_str().into()),
("method".into(), http_request.method().as_str().into()),
("origin".into(), origin.as_str().into()),
("uri".into(), uri.to_string().into()),
];
authorization.into()
2024-04-16 20:54:16 -07:00
};
self.services
.server_keys
2024-10-29 23:31:53 +00:00
.sign_json(&mut req)
.expect("request signing failed");
2024-04-16 20:54:16 -07:00
2024-10-29 23:31:53 +00:00
let signatures = req["signatures"]
2024-04-16 20:54:16 -07:00
.as_object()
2024-10-29 23:31:53 +00:00
.and_then(|object| object[origin.as_str()].as_object())
.expect("origin signatures object");
let key: &ServerSigningKeyId = signatures
.keys()
.next()
.map(|k| k.as_str().try_into())
.expect("at least one signature from this origin")
.expect("keyid is json string");
let sig: Base64 = signatures
2024-04-16 20:54:16 -07:00
.values()
2024-10-29 23:31:53 +00:00
.next()
.map(|s| s.as_str().map(Base64::parse))
.expect("at least one signature from this origin")
.expect("signature is json string")
.expect("signature is valid base64");
let x_matrix = XMatrix::new(origin.into(), dest.into(), key.into(), sig);
let authorization = HeaderValue::from(&x_matrix);
let authorization = http_request
.headers_mut()
.insert(AUTHORIZATION, authorization);
debug_assert!(authorization.is_none(), "Authorization header already present");
2024-04-16 20:54:16 -07:00
}
fn into_http_request<T>(actual: &ActualDest, request: T) -> Result<http::Request<Vec<u8>>>
where
T: OutgoingRequest + Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
const SATIR: SendAccessToken<'_> = SendAccessToken::IfRequired(EMPTY);
let http_request = request
.try_into_http_request::<Vec<u8>>(actual.string().as_str(), SATIR, &VERSIONS)
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
Ok(http_request)
}