Compare commits

..

6 Commits

Author SHA1 Message Date
timedout 27d6604d14 fix: Use a timeout instead of deadline 2026-01-03 17:08:47 +00:00
timedout 1c7bd2f6fa style: Remove unnecessary then() calls in chain 2026-01-03 16:22:49 +00:00
timedout 56d7099011 style: Include errors in key claim response too 2026-01-03 16:10:06 +00:00
timedout bc426e1bfc fix: Apply client-requested timeout to federated key queries
Also parallelised federation calls in related functions
2026-01-03 16:05:05 +00:00
timedout 6c61b3ec5b fix: Build error two: electric boogaloo 2025-12-31 21:15:28 +00:00
timedout 9d9d1170b6 fix: Build error 2025-12-31 21:04:06 +00:00
7 changed files with 78 additions and 36 deletions
+2
View File
@@ -0,0 +1,2 @@
Client requested timeout parameter is now applied to e2ee key lookups and claims. Related federation requests are now
also concurrent. Contributed by @nex.
+2 -2
View File
@@ -465,7 +465,7 @@ pub(super) async fn force_join_list_of_local_users(
if server_admins.is_empty() { if server_admins.is_empty() {
return Err!("There are no admins set for this server."); return Err!("There are no admins set for this server.");
}; }
let (room_id, servers) = self let (room_id, servers) = self
.services .services
@@ -580,7 +580,7 @@ pub(super) async fn force_join_all_local_users(
if server_admins.is_empty() { if server_admins.is_empty() {
return Err!("There are no admins set for this server."); return Err!("There are no admins set for this server.");
}; }
let (room_id, servers) = self let (room_id, servers) = self
.services .services
+57 -28
View File
@@ -1,7 +1,15 @@
use std::collections::{BTreeMap, HashMap, HashSet}; use std::{
collections::{BTreeMap, HashMap, HashSet},
time::Duration,
};
use axum::extract::State; use axum::extract::State;
use conduwuit::{Err, Error, Result, debug, debug_warn, err, result::NotFound, utils}; use conduwuit::{
Err, Error, Result, debug, debug_warn, err,
result::NotFound,
utils,
utils::{IterStream, stream::WidebandExt},
};
use conduwuit_service::{Services, users::parse_master_key}; use conduwuit_service::{Services, users::parse_master_key};
use futures::{StreamExt, stream::FuturesUnordered}; use futures::{StreamExt, stream::FuturesUnordered};
use ruma::{ use ruma::{
@@ -134,6 +142,7 @@ pub(crate) async fn get_keys_route(
&body.device_keys, &body.device_keys,
|u| u == sender_user, |u| u == sender_user,
true, // Always allow local users to see device names of other local users true, // Always allow local users to see device names of other local users
body.timeout.unwrap_or(Duration::from_secs(10)),
) )
.await .await
} }
@@ -145,7 +154,12 @@ pub(crate) async fn claim_keys_route(
State(services): State<crate::State>, State(services): State<crate::State>,
body: Ruma<claim_keys::v3::Request>, body: Ruma<claim_keys::v3::Request>,
) -> Result<claim_keys::v3::Response> { ) -> Result<claim_keys::v3::Response> {
claim_keys_helper(&services, &body.one_time_keys).await claim_keys_helper(
&services,
&body.one_time_keys,
body.timeout.unwrap_or(Duration::from_secs(10)),
)
.await
} }
/// # `POST /_matrix/client/r0/keys/device_signing/upload` /// # `POST /_matrix/client/r0/keys/device_signing/upload`
@@ -421,6 +435,7 @@ pub(crate) async fn get_keys_helper<F>(
device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>, device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
allowed_signatures: F, allowed_signatures: F,
include_display_names: bool, include_display_names: bool,
timeout: Duration,
) -> Result<get_keys::v3::Response> ) -> Result<get_keys::v3::Response>
where where
F: Fn(&UserId) -> bool + Send + Sync, F: Fn(&UserId) -> bool + Send + Sync,
@@ -512,9 +527,10 @@ where
let mut failures = BTreeMap::new(); let mut failures = BTreeMap::new();
let mut futures: FuturesUnordered<_> = get_over_federation let futures = get_over_federation
.into_iter() .into_iter()
.map(|(server, vec)| async move { .stream()
.wide_filter_map(|(server, vec)| async move {
let mut device_keys_input_fed = BTreeMap::new(); let mut device_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec { for (user_id, keys) in vec {
device_keys_input_fed.insert(user_id.to_owned(), keys.clone()); device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
@@ -522,17 +538,22 @@ where
let request = let request =
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed }; federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
let response = tokio::time::timeout(
timeout,
services.sending.send_federation_request(server, request),
)
.await
// Need to flatten the Result<Result<V, E>, E> into Result<V, E>
.map_err(|_| err!(Request(Unknown("Timeout when getting keys over federation."))))
.and_then(|res| res);
let response = services Some((server, response))
.sending
.send_federation_request(server, request)
.await;
(server, response)
}) })
.collect(); .collect::<FuturesUnordered<_>>()
.await
.into_iter();
while let Some((server, response)) = futures.next().await { for (server, response) in futures {
match response { match response {
| Ok(response) => { | Ok(response) => {
for (user, master_key) in response.master_keys { for (user, master_key) in response.master_keys {
@@ -564,8 +585,8 @@ where
self_signing_keys.extend(response.self_signing_keys); self_signing_keys.extend(response.self_signing_keys);
device_keys.extend(response.device_keys); device_keys.extend(response.device_keys);
}, },
| _ => { | Err(e) => {
failures.insert(server.to_string(), json!({})); failures.insert(server.to_string(), json!({ "error": e.to_string() }));
}, },
} }
} }
@@ -608,6 +629,7 @@ fn add_unsigned_device_display_name(
pub(crate) async fn claim_keys_helper( pub(crate) async fn claim_keys_helper(
services: &Services, services: &Services,
one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>, one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>,
timeout: Duration,
) -> Result<claim_keys::v3::Response> { ) -> Result<claim_keys::v3::Response> {
let mut one_time_keys = BTreeMap::new(); let mut one_time_keys = BTreeMap::new();
@@ -638,32 +660,39 @@ pub(crate) async fn claim_keys_helper(
let mut failures = BTreeMap::new(); let mut failures = BTreeMap::new();
let mut futures: FuturesUnordered<_> = get_over_federation let futures = get_over_federation
.into_iter() .into_iter()
.map(|(server, vec)| async move { .stream()
.wide_filter_map(|(server, vec)| async move {
let mut one_time_keys_input_fed = BTreeMap::new(); let mut one_time_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec { for (user_id, keys) in vec {
one_time_keys_input_fed.insert(user_id.clone(), keys.clone()); one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
} }
( let response = tokio::time::timeout(
server, timeout,
services services.sending.send_federation_request(
.sending server,
.send_federation_request(server, federation::keys::claim_keys::v1::Request { federation::keys::claim_keys::v1::Request {
one_time_keys: one_time_keys_input_fed, one_time_keys: one_time_keys_input_fed,
}) },
.await, ),
) )
.await
.map_err(|_| err!(Request(Unknown("Timeout when claiming keys over federation."))))
.and_then(|res| res);
Some((server, response))
}) })
.collect(); .collect::<FuturesUnordered<_>>()
.await
.into_iter();
while let Some((server, response)) = futures.next().await { for (server, response) in futures {
match response { match response {
| Ok(keys) => { | Ok(keys) => {
one_time_keys.extend(keys.one_time_keys); one_time_keys.extend(keys.one_time_keys);
}, },
| Err(_e) => { | Err(e) => {
failures.insert(server.to_string(), json!({})); failures.insert(server.to_string(), json!({"error": e.to_string()}));
}, },
} }
} }
+1 -1
View File
@@ -72,7 +72,7 @@ pub(crate) async fn well_known_support(
if contacts.is_empty() { if contacts.is_empty() {
let admin_users = services.admin.get_admins().await; let admin_users = services.admin.get_admins().await;
for user_id in admin_users.iter() { for user_id in &admin_users {
if *user_id == services.globals.server_user { if *user_id == services.globals.server_user {
continue; continue;
} }
+5 -1
View File
@@ -1,3 +1,5 @@
use std::time::Duration;
use axum::extract::State; use axum::extract::State;
use conduwuit::{Error, Result}; use conduwuit::{Error, Result};
use futures::{FutureExt, StreamExt, TryFutureExt}; use futures::{FutureExt, StreamExt, TryFutureExt};
@@ -96,6 +98,7 @@ pub(crate) async fn get_keys_route(
&body.device_keys, &body.device_keys,
|u| Some(u.server_name()) == body.origin.as_deref(), |u| Some(u.server_name()) == body.origin.as_deref(),
services.globals.allow_device_name_federation(), services.globals.allow_device_name_federation(),
Duration::from_secs(0),
) )
.await?; .await?;
@@ -124,7 +127,8 @@ pub(crate) async fn claim_keys_route(
)); ));
} }
let result = claim_keys_helper(&services, &body.one_time_keys).await?; let result =
claim_keys_helper(&services, &body.one_time_keys, Duration::from_secs(0)).await?;
Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys }) Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys })
} }
+1 -1
View File
@@ -188,7 +188,7 @@ pub async fn revoke_admin(&self, user_id: &UserId) -> Result {
warn!( warn!(
"Revoking the admin status of {user_id} will not work correctly as they are within \ "Revoking the admin status of {user_id} will not work correctly as they are within \
the admins_list config." the admins_list config."
) );
} }
let Ok(room_id) = self.get_admin_room().await else { let Ok(room_id) = self.get_admin_room().await else {
+10 -3
View File
@@ -406,16 +406,23 @@ impl Service {
/// Checks whether a given user is an admin of this server /// Checks whether a given user is an admin of this server
pub async fn user_is_admin(&self, user_id: &UserId) -> bool { pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
if self.services.server.config.admins_list.contains(user_id) { if self
.services
.server
.config
.admins_list
.contains(&user_id.to_owned())
{
return true; return true;
} }
if self.services.server.config.admins_from_room { if self.services.server.config.admins_from_room {
if let Ok(admin_room) = self.get_admin_room().await { if let Ok(admin_room) = self.get_admin_room().await {
self.services return self
.services
.state_cache .state_cache
.is_joined(user_id, &admin_room) .is_joined(user_id, &admin_room)
.await .await;
} }
} }