mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 27d6604d14 | |||
| 1c7bd2f6fa | |||
| 56d7099011 | |||
| bc426e1bfc | |||
| 6c61b3ec5b | |||
| 9d9d1170b6 |
@@ -0,0 +1,2 @@
|
|||||||
|
Client requested timeout parameter is now applied to e2ee key lookups and claims. Related federation requests are now
|
||||||
|
also concurrent. Contributed by @nex.
|
||||||
@@ -465,7 +465,7 @@ pub(super) async fn force_join_list_of_local_users(
|
|||||||
|
|
||||||
if server_admins.is_empty() {
|
if server_admins.is_empty() {
|
||||||
return Err!("There are no admins set for this server.");
|
return Err!("There are no admins set for this server.");
|
||||||
};
|
}
|
||||||
|
|
||||||
let (room_id, servers) = self
|
let (room_id, servers) = self
|
||||||
.services
|
.services
|
||||||
@@ -580,7 +580,7 @@ pub(super) async fn force_join_all_local_users(
|
|||||||
|
|
||||||
if server_admins.is_empty() {
|
if server_admins.is_empty() {
|
||||||
return Err!("There are no admins set for this server.");
|
return Err!("There are no admins set for this server.");
|
||||||
};
|
}
|
||||||
|
|
||||||
let (room_id, servers) = self
|
let (room_id, servers) = self
|
||||||
.services
|
.services
|
||||||
|
|||||||
+57
-28
@@ -1,7 +1,15 @@
|
|||||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
use std::{
|
||||||
|
collections::{BTreeMap, HashMap, HashSet},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{Err, Error, Result, debug, debug_warn, err, result::NotFound, utils};
|
use conduwuit::{
|
||||||
|
Err, Error, Result, debug, debug_warn, err,
|
||||||
|
result::NotFound,
|
||||||
|
utils,
|
||||||
|
utils::{IterStream, stream::WidebandExt},
|
||||||
|
};
|
||||||
use conduwuit_service::{Services, users::parse_master_key};
|
use conduwuit_service::{Services, users::parse_master_key};
|
||||||
use futures::{StreamExt, stream::FuturesUnordered};
|
use futures::{StreamExt, stream::FuturesUnordered};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
@@ -134,6 +142,7 @@ pub(crate) async fn get_keys_route(
|
|||||||
&body.device_keys,
|
&body.device_keys,
|
||||||
|u| u == sender_user,
|
|u| u == sender_user,
|
||||||
true, // Always allow local users to see device names of other local users
|
true, // Always allow local users to see device names of other local users
|
||||||
|
body.timeout.unwrap_or(Duration::from_secs(10)),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -145,7 +154,12 @@ pub(crate) async fn claim_keys_route(
|
|||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
body: Ruma<claim_keys::v3::Request>,
|
body: Ruma<claim_keys::v3::Request>,
|
||||||
) -> Result<claim_keys::v3::Response> {
|
) -> Result<claim_keys::v3::Response> {
|
||||||
claim_keys_helper(&services, &body.one_time_keys).await
|
claim_keys_helper(
|
||||||
|
&services,
|
||||||
|
&body.one_time_keys,
|
||||||
|
body.timeout.unwrap_or(Duration::from_secs(10)),
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # `POST /_matrix/client/r0/keys/device_signing/upload`
|
/// # `POST /_matrix/client/r0/keys/device_signing/upload`
|
||||||
@@ -421,6 +435,7 @@ pub(crate) async fn get_keys_helper<F>(
|
|||||||
device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
|
device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
|
||||||
allowed_signatures: F,
|
allowed_signatures: F,
|
||||||
include_display_names: bool,
|
include_display_names: bool,
|
||||||
|
timeout: Duration,
|
||||||
) -> Result<get_keys::v3::Response>
|
) -> Result<get_keys::v3::Response>
|
||||||
where
|
where
|
||||||
F: Fn(&UserId) -> bool + Send + Sync,
|
F: Fn(&UserId) -> bool + Send + Sync,
|
||||||
@@ -512,9 +527,10 @@ where
|
|||||||
|
|
||||||
let mut failures = BTreeMap::new();
|
let mut failures = BTreeMap::new();
|
||||||
|
|
||||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
let futures = get_over_federation
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(server, vec)| async move {
|
.stream()
|
||||||
|
.wide_filter_map(|(server, vec)| async move {
|
||||||
let mut device_keys_input_fed = BTreeMap::new();
|
let mut device_keys_input_fed = BTreeMap::new();
|
||||||
for (user_id, keys) in vec {
|
for (user_id, keys) in vec {
|
||||||
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
|
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
|
||||||
@@ -522,17 +538,22 @@ where
|
|||||||
|
|
||||||
let request =
|
let request =
|
||||||
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
|
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
|
||||||
|
let response = tokio::time::timeout(
|
||||||
|
timeout,
|
||||||
|
services.sending.send_federation_request(server, request),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
// Need to flatten the Result<Result<V, E>, E> into Result<V, E>
|
||||||
|
.map_err(|_| err!(Request(Unknown("Timeout when getting keys over federation."))))
|
||||||
|
.and_then(|res| res);
|
||||||
|
|
||||||
let response = services
|
Some((server, response))
|
||||||
.sending
|
|
||||||
.send_federation_request(server, request)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
(server, response)
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect::<FuturesUnordered<_>>()
|
||||||
|
.await
|
||||||
|
.into_iter();
|
||||||
|
|
||||||
while let Some((server, response)) = futures.next().await {
|
for (server, response) in futures {
|
||||||
match response {
|
match response {
|
||||||
| Ok(response) => {
|
| Ok(response) => {
|
||||||
for (user, master_key) in response.master_keys {
|
for (user, master_key) in response.master_keys {
|
||||||
@@ -564,8 +585,8 @@ where
|
|||||||
self_signing_keys.extend(response.self_signing_keys);
|
self_signing_keys.extend(response.self_signing_keys);
|
||||||
device_keys.extend(response.device_keys);
|
device_keys.extend(response.device_keys);
|
||||||
},
|
},
|
||||||
| _ => {
|
| Err(e) => {
|
||||||
failures.insert(server.to_string(), json!({}));
|
failures.insert(server.to_string(), json!({ "error": e.to_string() }));
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -608,6 +629,7 @@ fn add_unsigned_device_display_name(
|
|||||||
pub(crate) async fn claim_keys_helper(
|
pub(crate) async fn claim_keys_helper(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>,
|
one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>,
|
||||||
|
timeout: Duration,
|
||||||
) -> Result<claim_keys::v3::Response> {
|
) -> Result<claim_keys::v3::Response> {
|
||||||
let mut one_time_keys = BTreeMap::new();
|
let mut one_time_keys = BTreeMap::new();
|
||||||
|
|
||||||
@@ -638,32 +660,39 @@ pub(crate) async fn claim_keys_helper(
|
|||||||
|
|
||||||
let mut failures = BTreeMap::new();
|
let mut failures = BTreeMap::new();
|
||||||
|
|
||||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
let futures = get_over_federation
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(server, vec)| async move {
|
.stream()
|
||||||
|
.wide_filter_map(|(server, vec)| async move {
|
||||||
let mut one_time_keys_input_fed = BTreeMap::new();
|
let mut one_time_keys_input_fed = BTreeMap::new();
|
||||||
for (user_id, keys) in vec {
|
for (user_id, keys) in vec {
|
||||||
one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
|
one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
|
||||||
}
|
}
|
||||||
(
|
let response = tokio::time::timeout(
|
||||||
server,
|
timeout,
|
||||||
services
|
services.sending.send_federation_request(
|
||||||
.sending
|
server,
|
||||||
.send_federation_request(server, federation::keys::claim_keys::v1::Request {
|
federation::keys::claim_keys::v1::Request {
|
||||||
one_time_keys: one_time_keys_input_fed,
|
one_time_keys: one_time_keys_input_fed,
|
||||||
})
|
},
|
||||||
.await,
|
),
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| err!(Request(Unknown("Timeout when claiming keys over federation."))))
|
||||||
|
.and_then(|res| res);
|
||||||
|
Some((server, response))
|
||||||
})
|
})
|
||||||
.collect();
|
.collect::<FuturesUnordered<_>>()
|
||||||
|
.await
|
||||||
|
.into_iter();
|
||||||
|
|
||||||
while let Some((server, response)) = futures.next().await {
|
for (server, response) in futures {
|
||||||
match response {
|
match response {
|
||||||
| Ok(keys) => {
|
| Ok(keys) => {
|
||||||
one_time_keys.extend(keys.one_time_keys);
|
one_time_keys.extend(keys.one_time_keys);
|
||||||
},
|
},
|
||||||
| Err(_e) => {
|
| Err(e) => {
|
||||||
failures.insert(server.to_string(), json!({}));
|
failures.insert(server.to_string(), json!({"error": e.to_string()}));
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ pub(crate) async fn well_known_support(
|
|||||||
if contacts.is_empty() {
|
if contacts.is_empty() {
|
||||||
let admin_users = services.admin.get_admins().await;
|
let admin_users = services.admin.get_admins().await;
|
||||||
|
|
||||||
for user_id in admin_users.iter() {
|
for user_id in &admin_users {
|
||||||
if *user_id == services.globals.server_user {
|
if *user_id == services.globals.server_user {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{Error, Result};
|
use conduwuit::{Error, Result};
|
||||||
use futures::{FutureExt, StreamExt, TryFutureExt};
|
use futures::{FutureExt, StreamExt, TryFutureExt};
|
||||||
@@ -96,6 +98,7 @@ pub(crate) async fn get_keys_route(
|
|||||||
&body.device_keys,
|
&body.device_keys,
|
||||||
|u| Some(u.server_name()) == body.origin.as_deref(),
|
|u| Some(u.server_name()) == body.origin.as_deref(),
|
||||||
services.globals.allow_device_name_federation(),
|
services.globals.allow_device_name_federation(),
|
||||||
|
Duration::from_secs(0),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -124,7 +127,8 @@ pub(crate) async fn claim_keys_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = claim_keys_helper(&services, &body.one_time_keys).await?;
|
let result =
|
||||||
|
claim_keys_helper(&services, &body.one_time_keys, Duration::from_secs(0)).await?;
|
||||||
|
|
||||||
Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys })
|
Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys })
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -188,7 +188,7 @@ pub async fn revoke_admin(&self, user_id: &UserId) -> Result {
|
|||||||
warn!(
|
warn!(
|
||||||
"Revoking the admin status of {user_id} will not work correctly as they are within \
|
"Revoking the admin status of {user_id} will not work correctly as they are within \
|
||||||
the admins_list config."
|
the admins_list config."
|
||||||
)
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let Ok(room_id) = self.get_admin_room().await else {
|
let Ok(room_id) = self.get_admin_room().await else {
|
||||||
|
|||||||
@@ -406,16 +406,23 @@ impl Service {
|
|||||||
|
|
||||||
/// Checks whether a given user is an admin of this server
|
/// Checks whether a given user is an admin of this server
|
||||||
pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
|
pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
|
||||||
if self.services.server.config.admins_list.contains(user_id) {
|
if self
|
||||||
|
.services
|
||||||
|
.server
|
||||||
|
.config
|
||||||
|
.admins_list
|
||||||
|
.contains(&user_id.to_owned())
|
||||||
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.services.server.config.admins_from_room {
|
if self.services.server.config.admins_from_room {
|
||||||
if let Ok(admin_room) = self.get_admin_room().await {
|
if let Ok(admin_room) = self.get_admin_room().await {
|
||||||
self.services
|
return self
|
||||||
|
.services
|
||||||
.state_cache
|
.state_cache
|
||||||
.is_joined(user_id, &admin_room)
|
.is_joined(user_id, &admin_room)
|
||||||
.await
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user