Compare commits

...

9 Commits

Author SHA1 Message Date
Jade Ellis f2e588b294 fix: Stop forcing the appservice token into the URL 2026-01-05 18:14:16 +00:00
Jade Ellis 7ec81c25f1 fix: Use correct token handlers for Ruma 2026-01-05 18:12:20 +00:00
Jade Ellis 61f61826bb fix: Apply timeouts in more places 2026-01-05 18:09:18 +00:00
timedout 27d6604d14 fix: Use a timeout instead of deadline 2026-01-03 17:08:47 +00:00
timedout 1c7bd2f6fa style: Remove unnecessary then() calls in chain 2026-01-03 16:22:49 +00:00
timedout 56d7099011 style: Include errors in key claim response too 2026-01-03 16:10:06 +00:00
timedout bc426e1bfc fix: Apply client-requested timeout to federated key queries
Also parallelised federation calls in related functions
2026-01-03 16:05:05 +00:00
timedout 6c61b3ec5b fix: Build error two: electric boogaloo 2025-12-31 21:15:28 +00:00
timedout 9d9d1170b6 fix: Build error 2025-12-31 21:04:06 +00:00
15 changed files with 164 additions and 99 deletions
+2
View File
@@ -0,0 +1,2 @@
Client requested timeout parameter is now applied to e2ee key lookups and claims. Related federation requests are now
also concurrent. Contributed by @nex.
+14 -1
View File
@@ -389,7 +389,15 @@
#
#appservice_idle_timeout = 300
# Notification gateway pusher idle connection pool timeout.
# Notification gateway pusher request connection timeout (seconds).
#
#pusher_conn_timeout = 15
# Notification gateway pusher total request timeout (seconds).
#
#pusher_timeout = 60
# Notification gateway pusher idle connection pool timeout (seconds).
#
#pusher_idle_timeout = 15
@@ -1455,6 +1463,11 @@
#
#url_preview_max_spider_size = 256000
# Total request timeout for URL previews (seconds). This includes
# connection, request, and response body reading time.
#
#url_preview_timeout = 120
# Option to decide whether you would like to run the domain allowlist
# checks (contains and explicit) on the root domain or not. Does not apply
# to URL contains allowlist. Defaults to false.
+2 -2
View File
@@ -465,7 +465,7 @@ pub(super) async fn force_join_list_of_local_users(
if server_admins.is_empty() {
return Err!("There are no admins set for this server.");
};
}
let (room_id, servers) = self
.services
@@ -580,7 +580,7 @@ pub(super) async fn force_join_all_local_users(
if server_admins.is_empty() {
return Err!("There are no admins set for this server.");
};
}
let (room_id, servers) = self
.services
+57 -28
View File
@@ -1,7 +1,15 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::{
collections::{BTreeMap, HashMap, HashSet},
time::Duration,
};
use axum::extract::State;
use conduwuit::{Err, Error, Result, debug, debug_warn, err, result::NotFound, utils};
use conduwuit::{
Err, Error, Result, debug, debug_warn, err,
result::NotFound,
utils,
utils::{IterStream, stream::WidebandExt},
};
use conduwuit_service::{Services, users::parse_master_key};
use futures::{StreamExt, stream::FuturesUnordered};
use ruma::{
@@ -134,6 +142,7 @@ pub(crate) async fn get_keys_route(
&body.device_keys,
|u| u == sender_user,
true, // Always allow local users to see device names of other local users
body.timeout.unwrap_or(Duration::from_secs(10)),
)
.await
}
@@ -145,7 +154,12 @@ pub(crate) async fn claim_keys_route(
State(services): State<crate::State>,
body: Ruma<claim_keys::v3::Request>,
) -> Result<claim_keys::v3::Response> {
claim_keys_helper(&services, &body.one_time_keys).await
claim_keys_helper(
&services,
&body.one_time_keys,
body.timeout.unwrap_or(Duration::from_secs(10)),
)
.await
}
/// # `POST /_matrix/client/r0/keys/device_signing/upload`
@@ -421,6 +435,7 @@ pub(crate) async fn get_keys_helper<F>(
device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
allowed_signatures: F,
include_display_names: bool,
timeout: Duration,
) -> Result<get_keys::v3::Response>
where
F: Fn(&UserId) -> bool + Send + Sync,
@@ -512,9 +527,10 @@ where
let mut failures = BTreeMap::new();
let mut futures: FuturesUnordered<_> = get_over_federation
let futures = get_over_federation
.into_iter()
.map(|(server, vec)| async move {
.stream()
.wide_filter_map(|(server, vec)| async move {
let mut device_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec {
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
@@ -522,17 +538,22 @@ where
let request =
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
let response = tokio::time::timeout(
timeout,
services.sending.send_federation_request(server, request),
)
.await
// Need to flatten the Result<Result<V, E>, E> into Result<V, E>
.map_err(|_| err!(Request(Unknown("Timeout when getting keys over federation."))))
.and_then(|res| res);
let response = services
.sending
.send_federation_request(server, request)
.await;
(server, response)
Some((server, response))
})
.collect();
.collect::<FuturesUnordered<_>>()
.await
.into_iter();
while let Some((server, response)) = futures.next().await {
for (server, response) in futures {
match response {
| Ok(response) => {
for (user, master_key) in response.master_keys {
@@ -564,8 +585,8 @@ where
self_signing_keys.extend(response.self_signing_keys);
device_keys.extend(response.device_keys);
},
| _ => {
failures.insert(server.to_string(), json!({}));
| Err(e) => {
failures.insert(server.to_string(), json!({ "error": e.to_string() }));
},
}
}
@@ -608,6 +629,7 @@ fn add_unsigned_device_display_name(
pub(crate) async fn claim_keys_helper(
services: &Services,
one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>,
timeout: Duration,
) -> Result<claim_keys::v3::Response> {
let mut one_time_keys = BTreeMap::new();
@@ -638,32 +660,39 @@ pub(crate) async fn claim_keys_helper(
let mut failures = BTreeMap::new();
let mut futures: FuturesUnordered<_> = get_over_federation
let futures = get_over_federation
.into_iter()
.map(|(server, vec)| async move {
.stream()
.wide_filter_map(|(server, vec)| async move {
let mut one_time_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec {
one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
}
(
server,
services
.sending
.send_federation_request(server, federation::keys::claim_keys::v1::Request {
let response = tokio::time::timeout(
timeout,
services.sending.send_federation_request(
server,
federation::keys::claim_keys::v1::Request {
one_time_keys: one_time_keys_input_fed,
})
.await,
},
),
)
.await
.map_err(|_| err!(Request(Unknown("Timeout when claiming keys over federation."))))
.and_then(|res| res);
Some((server, response))
})
.collect();
.collect::<FuturesUnordered<_>>()
.await
.into_iter();
while let Some((server, response)) = futures.next().await {
for (server, response) in futures {
match response {
| Ok(keys) => {
one_time_keys.extend(keys.one_time_keys);
},
| Err(_e) => {
failures.insert(server.to_string(), json!({}));
| Err(e) => {
failures.insert(server.to_string(), json!({"error": e.to_string()}));
},
}
}
+1 -1
View File
@@ -72,7 +72,7 @@ pub(crate) async fn well_known_support(
if contacts.is_empty() {
let admin_users = services.admin.get_admins().await;
for user_id in admin_users.iter() {
for user_id in &admin_users {
if *user_id == services.globals.server_user {
continue;
}
+5 -1
View File
@@ -1,3 +1,5 @@
use std::time::Duration;
use axum::extract::State;
use conduwuit::{Error, Result};
use futures::{FutureExt, StreamExt, TryFutureExt};
@@ -96,6 +98,7 @@ pub(crate) async fn get_keys_route(
&body.device_keys,
|u| Some(u.server_name()) == body.origin.as_deref(),
services.globals.allow_device_name_federation(),
Duration::from_secs(0),
)
.await?;
@@ -124,7 +127,8 @@ pub(crate) async fn claim_keys_route(
));
}
let result = claim_keys_helper(&services, &body.one_time_keys).await?;
let result =
claim_keys_helper(&services, &body.one_time_keys, Duration::from_secs(0)).await?;
Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys })
}
+26 -1
View File
@@ -500,7 +500,19 @@ pub struct Config {
#[serde(default = "default_appservice_idle_timeout")]
pub appservice_idle_timeout: u64,
/// Notification gateway pusher idle connection pool timeout.
/// Notification gateway pusher request connection timeout (seconds).
///
/// default: 15
#[serde(default = "default_pusher_conn_timeout")]
pub pusher_conn_timeout: u64,
/// Notification gateway pusher total request timeout (seconds).
///
/// default: 60
#[serde(default = "default_pusher_timeout")]
pub pusher_timeout: u64,
/// Notification gateway pusher idle connection pool timeout (seconds).
///
/// default: 15
#[serde(default = "default_pusher_idle_timeout")]
@@ -1670,6 +1682,13 @@ pub struct Config {
#[serde(default = "default_url_preview_max_spider_size")]
pub url_preview_max_spider_size: usize,
/// Total request timeout for URL previews (seconds). This includes
/// connection, request, and response body reading time.
///
/// default: 120
#[serde(default = "default_url_preview_timeout")]
pub url_preview_timeout: u64,
/// Option to decide whether you would like to run the domain allowlist
/// checks (contains and explicit) on the root domain or not. Does not apply
/// to URL contains allowlist. Defaults to false.
@@ -2424,6 +2443,10 @@ fn default_appservice_timeout() -> u64 { 35 }
fn default_appservice_idle_timeout() -> u64 { 300 }
fn default_pusher_conn_timeout() -> u64 { 15 }
fn default_pusher_timeout() -> u64 { 60 }
fn default_pusher_idle_timeout() -> u64 { 15 }
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
@@ -2551,6 +2574,8 @@ fn default_url_preview_max_spider_size() -> usize {
256_000 // 256KB
}
fn default_url_preview_timeout() -> u64 { 120 }
fn default_new_user_displayname_suffix() -> String { "🏳️‍⚧️".to_owned() }
fn default_sentry_endpoint() -> Option<Url> { None }
+1 -1
View File
@@ -188,7 +188,7 @@ pub async fn revoke_admin(&self, user_id: &UserId) -> Result {
warn!(
"Revoking the admin status of {user_id} will not work correctly as they are within \
the admins_list config."
)
);
}
let Ok(room_id) = self.get_admin_room().await else {
+10 -3
View File
@@ -406,16 +406,23 @@ impl Service {
/// Checks whether a given user is an admin of this server
pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
if self.services.server.config.admins_list.contains(user_id) {
if self
.services
.server
.config
.admins_list
.contains(&user_id.to_owned())
{
return true;
}
if self.services.server.config.admins_from_room {
if let Ok(admin_room) = self.get_admin_room().await {
self.services
return self
.services
.state_cache
.is_joined(user_id, &admin_room)
.await
.await;
}
}
+9
View File
@@ -47,6 +47,7 @@ impl crate::Service for Service {
})?
.local_address(url_preview_bind_addr)
.dns_resolver(resolver.resolver.clone())
.timeout(Duration::from_secs(config.url_preview_timeout))
.redirect(redirect::Policy::limited(3))
.build()?,
@@ -68,6 +69,11 @@ impl crate::Service for Service {
.dns_resolver(resolver.resolver.hooked.clone())
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
.read_timeout(Duration::from_secs(config.federation_timeout))
.timeout(Duration::from_secs(
config
.federation_timeout
.saturating_add(config.federation_conn_timeout),
))
.pool_max_idle_per_host(config.federation_idle_per_host.into())
.pool_idle_timeout(Duration::from_secs(config.federation_idle_timeout))
.redirect(redirect::Policy::limited(3))
@@ -77,6 +83,7 @@ impl crate::Service for Service {
.dns_resolver(resolver.resolver.hooked.clone())
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
.read_timeout(Duration::from_secs(305))
.timeout(Duration::from_secs(120))
.pool_max_idle_per_host(0)
.redirect(redirect::Policy::limited(3))
.build()?,
@@ -103,6 +110,8 @@ impl crate::Service for Service {
pusher: base(config)?
.dns_resolver(resolver.resolver.clone())
.connect_timeout(Duration::from_secs(config.pusher_conn_timeout))
.timeout(Duration::from_secs(config.pusher_timeout))
.pool_max_idle_per_host(1)
.pool_idle_timeout(Duration::from_secs(config.pusher_idle_timeout))
.redirect(redirect::Policy::limited(2))
+11 -5
View File
@@ -3,7 +3,7 @@ use std::{fmt::Debug, mem};
use bytes::Bytes;
use conduwuit::{
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
error::inspect_debug_log, implement, trace, utils::string::EMPTY,
error::inspect_debug_log, implement, trace,
};
use http::{HeaderValue, header::AUTHORIZATION};
use ipaddress::IPAddress;
@@ -90,7 +90,9 @@ where
debug!(?method, ?url, "Sending request");
match client.execute(request).await {
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
| Ok(response) =>
self.handle_response::<T>(dest, actual, &method, &url, response)
.await,
| Err(error) =>
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
}
@@ -119,7 +121,9 @@ fn validate_url(&self, url: &Url) -> Result<()> {
Ok(())
}
#[implement(super::Service)]
async fn handle_response<T>(
&self,
dest: &ServerName,
actual: &ActualDest,
method: &Method,
@@ -162,7 +166,6 @@ async fn into_http_response(
.expect("http::response::Builder is usable"),
);
// TODO: handle timeout
trace!("Waiting for response body...");
let body = response
.bytes()
@@ -286,10 +289,13 @@ where
T: OutgoingRequest + Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
const SATIR: SendAccessToken<'_> = SendAccessToken::IfRequired(EMPTY);
let http_request = request
.try_into_http_request::<Vec<u8>>(actual.string().as_str(), SATIR, &VERSIONS)
.try_into_http_request::<Vec<u8>>(
actual.string().as_str(),
SendAccessToken::None,
&VERSIONS,
)
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
Ok(http_request)
+3 -3
View File
@@ -198,7 +198,7 @@ impl Service {
trace!("Push gateway destination: {dest}");
let http_request = request
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::IfRequired(""), &VERSIONS)
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::None, &VERSIONS)
.map_err(|e| {
err!(BadServerResponse(warn!(
"Failed to find destination {dest} for push gateway: {e}"
@@ -245,7 +245,7 @@ impl Service {
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await?; // TODO: handle timeout
let body = response.bytes().await?;
if !status.is_success() {
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
@@ -288,7 +288,7 @@ impl Service {
let mut notify = None;
let mut tweaks = Vec::new();
if event.room_id().is_none() {
// TODO(hydra): does this matter?
// This only affects v12+ create events
return Ok(());
}
+9 -18
View File
@@ -1,8 +1,7 @@
use std::{fmt::Debug, mem};
use bytes::BytesMut;
use conduwuit::{Err, Result, debug_error, err, trace, utils, warn};
use reqwest::Client;
use conduwuit::{Err, Result, debug_error, err, implement, trace, utils, warn};
use ruma::api::{
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
};
@@ -11,8 +10,9 @@ use ruma::api::{
///
/// Only returns Ok(None) if there is no url specified in the appservice
/// registration file
pub(crate) async fn send_request<T>(
client: &Client,
#[implement(super::Service)]
pub async fn send_appservice_request<T>(
&self,
registration: Registration,
request: T,
) -> Result<Option<T::IncomingResponse>>
@@ -32,10 +32,10 @@ where
trace!("Appservice URL \"{dest}\", Appservice ID: {}", registration.id);
let hs_token = registration.hs_token.as_str();
let mut http_request = request
let http_request = request
.try_into_http_request::<BytesMut>(
&dest,
SendAccessToken::IfRequired(hs_token),
SendAccessToken::Appservice(hs_token),
&VERSIONS,
)
.map_err(|e| {
@@ -45,19 +45,10 @@ where
})?
.map(BytesMut::freeze);
let mut parts = http_request.uri().clone().into_parts();
let old_path_and_query = parts.path_and_query.unwrap().as_str().to_owned();
let symbol = if old_path_and_query.contains('?') { "&" } else { "?" };
parts.path_and_query = Some(
(old_path_and_query + symbol + "access_token=" + hs_token)
.parse()
.unwrap(),
);
*http_request.uri_mut() = parts.try_into().expect("our manipulation is always valid");
let reqwest_request = reqwest::Request::try_from(http_request)?;
let client = &self.services.client.appservice;
let mut response = client.execute(reqwest_request).await.map_err(|e| {
warn!("Could not send request to appservice \"{}\" at {dest}: {e:?}", registration.id);
e
@@ -75,7 +66,7 @@ where
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await?; // TODO: handle timeout
let body = response.bytes().await?;
if !status.is_success() {
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
+1 -20
View File
@@ -18,10 +18,7 @@ use conduwuit::{
warn,
};
use futures::{FutureExt, Stream, StreamExt};
use ruma::{
RoomId, ServerName, UserId,
api::{OutgoingRequest, appservice::Registration},
};
use ruma::{RoomId, ServerName, UserId, api::OutgoingRequest};
use tokio::{task, task::JoinSet};
use self::data::Data;
@@ -318,22 +315,6 @@ impl Service {
.await
}
/// Sends a request to an appservice
///
/// Only returns None if there is no url specified in the appservice
/// registration file
pub async fn send_appservice_request<T>(
&self,
registration: Registration,
request: T,
) -> Result<Option<T::IncomingResponse>>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.appservice;
appservice::send_request(client, registration, request).await
}
/// Clean up queued sending event data
///
/// Used after we remove an appservice registration or a user deletes a push
+13 -15
View File
@@ -50,9 +50,7 @@ use ruma::{
};
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use super::{
Destination, EduBuf, EduVec, Msg, SendingEvent, Service, appservice, data::QueueItem,
};
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
#[derive(Debug)]
enum TransactionStatus {
@@ -720,18 +718,18 @@ impl Service {
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
let client = &self.services.client.appservice;
match appservice::send_request(
client,
appservice,
ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: txn_id.into(),
ephemeral: edu_jsons,
to_device: Vec::new(), // TODO
},
)
.await
match self
.send_appservice_request(
appservice,
ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: txn_id.into(),
ephemeral: edu_jsons,
to_device: Vec::new(), // TODO
},
)
.await
{
| Ok(_) => Ok(Destination::Appservice(id)),
| Err(e) => Err((Destination::Appservice(id), e)),