mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f2e588b294 | |||
| 7ec81c25f1 | |||
| 61f61826bb | |||
| 27d6604d14 | |||
| 1c7bd2f6fa | |||
| 56d7099011 | |||
| bc426e1bfc | |||
| 6c61b3ec5b | |||
| 9d9d1170b6 | |||
| 7be20abcad | |||
| 078275964c | |||
| bf200ad12d | |||
| 41e628892d | |||
| 44851ee6a2 | |||
| a7e6e6e83f | |||
| 8a561fcd3a | |||
| 25c305f473 | |||
| c900350164 | |||
| c565e6ffbc | |||
| 442f887c98 | |||
| 03220845e5 | |||
| f8c1e9bcde |
@@ -23,7 +23,7 @@ repos:
|
||||
- id: check-added-large-files
|
||||
|
||||
- repo: https://github.com/crate-ci/typos
|
||||
rev: v1.40.0
|
||||
rev: v1.41.0
|
||||
hooks:
|
||||
- id: typos
|
||||
- id: typos
|
||||
@@ -31,7 +31,7 @@ repos:
|
||||
stages: [commit-msg]
|
||||
|
||||
- repo: https://github.com/crate-ci/committed
|
||||
rev: v1.1.8
|
||||
rev: v1.1.9
|
||||
hooks:
|
||||
- id: committed
|
||||
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Certain potentially dangerous admin commands are now restricted to only be usable in the admin room and server console.
|
||||
@@ -0,0 +1 @@
|
||||
Implemented a configuration defined admin list independent of the admin room. (@Terryiscool160).
|
||||
@@ -0,0 +1 @@
|
||||
Fixed unreliable room summary fetching and improved error messages. Contributed by @nex.
|
||||
@@ -0,0 +1,2 @@
|
||||
Client requested timeout parameter is now applied to e2ee key lookups and claims. Related federation requests are now
|
||||
also concurrent. Contributed by @nex.
|
||||
+26
-1
@@ -389,7 +389,15 @@
|
||||
#
|
||||
#appservice_idle_timeout = 300
|
||||
|
||||
# Notification gateway pusher idle connection pool timeout.
|
||||
# Notification gateway pusher request connection timeout (seconds).
|
||||
#
|
||||
#pusher_conn_timeout = 15
|
||||
|
||||
# Notification gateway pusher total request timeout (seconds).
|
||||
#
|
||||
#pusher_timeout = 60
|
||||
|
||||
# Notification gateway pusher idle connection pool timeout (seconds).
|
||||
#
|
||||
#pusher_idle_timeout = 15
|
||||
|
||||
@@ -1455,6 +1463,11 @@
|
||||
#
|
||||
#url_preview_max_spider_size = 256000
|
||||
|
||||
# Total request timeout for URL previews (seconds). This includes
|
||||
# connection, request, and response body reading time.
|
||||
#
|
||||
#url_preview_timeout = 120
|
||||
|
||||
# Option to decide whether you would like to run the domain allowlist
|
||||
# checks (contains and explicit) on the root domain or not. Does not apply
|
||||
# to URL contains allowlist. Defaults to false.
|
||||
@@ -1590,6 +1603,18 @@
|
||||
#
|
||||
#admin_room_tag = "m.server_notice"
|
||||
|
||||
# A list of Matrix IDs that are qualified as server admins.
|
||||
#
|
||||
# Any Matrix IDs within this list are regarded as an admin
|
||||
# regardless of whether they are in the admin room or not
|
||||
#
|
||||
#admins_list = []
|
||||
|
||||
# Defines whether those within the admin room are added to the
|
||||
# admins_list.
|
||||
#
|
||||
#admins_from_room = true
|
||||
|
||||
# Sentry.io crash/panic reporting, performance monitoring/metrics, etc.
|
||||
# This is NOT enabled by default.
|
||||
#
|
||||
|
||||
+15
-3
@@ -53,14 +53,26 @@ pub(super) async fn process(command: AdminCommand, context: &Context<'_>) -> Res
|
||||
use AdminCommand::*;
|
||||
|
||||
match command {
|
||||
| Appservices(command) => appservice::process(command, context).await,
|
||||
| Appservices(command) => {
|
||||
// appservice commands are all restricted
|
||||
context.bail_restricted()?;
|
||||
appservice::process(command, context).await
|
||||
},
|
||||
| Media(command) => media::process(command, context).await,
|
||||
| Users(command) => user::process(command, context).await,
|
||||
| Users(command) => {
|
||||
// user commands are all restricted
|
||||
context.bail_restricted()?;
|
||||
user::process(command, context).await
|
||||
},
|
||||
| Rooms(command) => room::process(command, context).await,
|
||||
| Federation(command) => federation::process(command, context).await,
|
||||
| Server(command) => server::process(command, context).await,
|
||||
| Debug(command) => debug::process(command, context).await,
|
||||
| Query(command) => query::process(command, context).await,
|
||||
| Query(command) => {
|
||||
// query commands are all restricted
|
||||
context.bail_restricted()?;
|
||||
query::process(command, context).await
|
||||
},
|
||||
| Check(command) => check::process(command, context).await,
|
||||
}
|
||||
}
|
||||
|
||||
+21
-1
@@ -1,6 +1,6 @@
|
||||
use std::{fmt, time::SystemTime};
|
||||
|
||||
use conduwuit::Result;
|
||||
use conduwuit::{Err, Result};
|
||||
use conduwuit_service::Services;
|
||||
use futures::{
|
||||
Future, FutureExt, TryFutureExt,
|
||||
@@ -8,6 +8,7 @@ use futures::{
|
||||
lock::Mutex,
|
||||
};
|
||||
use ruma::{EventId, UserId};
|
||||
use service::admin::InvocationSource;
|
||||
|
||||
pub(crate) struct Context<'a> {
|
||||
pub(crate) services: &'a Services,
|
||||
@@ -16,6 +17,7 @@ pub(crate) struct Context<'a> {
|
||||
pub(crate) reply_id: Option<&'a EventId>,
|
||||
pub(crate) sender: Option<&'a UserId>,
|
||||
pub(crate) output: Mutex<BufWriter<Vec<u8>>>,
|
||||
pub(crate) source: InvocationSource,
|
||||
}
|
||||
|
||||
impl Context<'_> {
|
||||
@@ -43,4 +45,22 @@ impl Context<'_> {
|
||||
self.sender
|
||||
.unwrap_or_else(|| self.services.globals.server_user.as_ref())
|
||||
}
|
||||
|
||||
/// Returns an Err if the [`Self::source`] of this context does not allow
|
||||
/// restricted commands to be executed.
|
||||
///
|
||||
/// This is intended to be placed at the start of restricted commands'
|
||||
/// implementations, like so:
|
||||
///
|
||||
/// ```ignore
|
||||
/// self.bail_restricted()?;
|
||||
/// // actual command impl
|
||||
/// ```
|
||||
pub(crate) fn bail_restricted(&self) -> Result {
|
||||
if self.source.allows_restricted() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err!("This command can only be used in the admin room.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -291,6 +291,8 @@ pub(super) async fn get_remote_pdu(
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
let room_id = self.services.rooms.alias.resolve(&room).await?;
|
||||
let room_state: Vec<Raw<AnyStateEvent>> = self
|
||||
.services
|
||||
@@ -417,27 +419,6 @@ pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool)
|
||||
Err!("No log level was specified.")
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn sign_json(&self) -> Result {
|
||||
if self.body.len() < 2
|
||||
|| !self.body[0].trim().starts_with("```")
|
||||
|| self.body.last().unwrap_or(&"").trim() != "```"
|
||||
{
|
||||
return Err!("Expected code block in command body. Add --help for details.");
|
||||
}
|
||||
|
||||
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
|
||||
match serde_json::from_str(&string) {
|
||||
| Err(e) => return Err!("Invalid json: {e}"),
|
||||
| Ok(mut value) => {
|
||||
self.services.server_keys.sign_json(&mut value)?;
|
||||
let json_text = serde_json::to_string_pretty(&value)?;
|
||||
write!(self, "{json_text}")
|
||||
},
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn verify_json(&self) -> Result {
|
||||
if self.body.len() < 2
|
||||
@@ -477,6 +458,8 @@ pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result {
|
||||
#[admin_command]
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
@@ -502,6 +485,8 @@ pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
#[admin_command]
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(super) async fn latest_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
@@ -532,6 +517,8 @@ pub(super) async fn force_set_room_state_from_server(
|
||||
server_name: OwnedServerName,
|
||||
at_event: Option<OwnedEventId>,
|
||||
) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
|
||||
@@ -47,9 +47,9 @@ pub enum DebugCommand {
|
||||
shorteventid: ShortEventId,
|
||||
},
|
||||
|
||||
/// - Attempts to retrieve a PDU from a remote server. Inserts it into our
|
||||
/// database/timeline if found and we do not have this PDU already
|
||||
/// (following normal event auth rules, handles it as an incoming PDU).
|
||||
/// - Attempts to retrieve a PDU from a remote server. **Does not** insert
|
||||
/// it into the database
|
||||
/// or persist it anywhere.
|
||||
GetRemotePdu {
|
||||
/// An event ID (a $ followed by the base64 reference hash)
|
||||
event_id: OwnedEventId,
|
||||
@@ -125,12 +125,6 @@ pub enum DebugCommand {
|
||||
reset: bool,
|
||||
},
|
||||
|
||||
/// - Sign JSON blob
|
||||
///
|
||||
/// This command needs a JSON blob provided in a Markdown code block below
|
||||
/// the command.
|
||||
SignJson,
|
||||
|
||||
/// - Verify JSON signatures
|
||||
///
|
||||
/// This command needs a JSON blob provided in a Markdown code block below
|
||||
|
||||
@@ -8,12 +8,14 @@ use crate::{admin_command, get_room_info};
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn disable_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
self.bail_restricted()?;
|
||||
self.services.rooms.metadata.disable_room(&room_id, true);
|
||||
self.write_str("Room disabled.").await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn enable_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
self.bail_restricted()?;
|
||||
self.services.rooms.metadata.disable_room(&room_id, false);
|
||||
self.write_str("Room enabled.").await
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ pub(super) async fn delete(
|
||||
mxc: Option<OwnedMxcUri>,
|
||||
event_id: Option<OwnedEventId>,
|
||||
) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
if event_id.is_some() && mxc.is_some() {
|
||||
return Err!("Please specify either an MXC or an event ID, not both.",);
|
||||
}
|
||||
@@ -176,6 +178,8 @@ pub(super) async fn delete(
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn delete_list(&self) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
if self.body.len() < 2
|
||||
|| !self.body[0].trim().starts_with("```")
|
||||
|| self.body.last().unwrap_or(&"").trim() != "```"
|
||||
@@ -231,6 +235,8 @@ pub(super) async fn delete_past_remote_media(
|
||||
after: bool,
|
||||
yes_i_want_to_delete_local_media: bool,
|
||||
) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
if before && after {
|
||||
return Err!("Please only pick one argument, --before or --after.",);
|
||||
}
|
||||
@@ -273,6 +279,8 @@ pub(super) async fn delete_all_from_server(
|
||||
server_name: OwnedServerName,
|
||||
yes_i_want_to_delete_local_media: bool,
|
||||
) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
if server_name == self.services.globals.server_name() && !yes_i_want_to_delete_local_media {
|
||||
return Err!("This command only works for remote media by default.",);
|
||||
}
|
||||
|
||||
@@ -59,6 +59,7 @@ async fn process_command(services: Arc<Services>, input: &CommandInput) -> Proce
|
||||
reply_id: input.reply_id.as_deref(),
|
||||
sender: input.sender.as_deref(),
|
||||
output: BufWriter::new(Vec::new()).into(),
|
||||
source: input.source,
|
||||
};
|
||||
|
||||
let (result, mut logs) = process(&context, command, &args).await;
|
||||
|
||||
@@ -24,6 +24,8 @@ pub(super) async fn uptime(&self) -> Result {
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn show_config(&self) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
self.write_str(&format!("{}", *self.services.server.config))
|
||||
.await
|
||||
}
|
||||
@@ -118,6 +120,8 @@ pub(super) async fn list_backups(&self) -> Result {
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn backup_database(&self) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
let db = Arc::clone(&self.services.db);
|
||||
let result = self
|
||||
.services
|
||||
@@ -144,6 +148,8 @@ pub(super) async fn admin_notice(&self, message: Vec<String>) -> Result {
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn reload_mods(&self) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
self.services.server.reload()?;
|
||||
|
||||
self.write_str("Reloading server...").await
|
||||
@@ -168,6 +174,8 @@ pub(super) async fn restart(&self, force: bool) -> Result {
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn shutdown(&self) -> Result {
|
||||
self.bail_restricted()?;
|
||||
|
||||
warn!("shutdown command");
|
||||
self.services.server.shutdown()?;
|
||||
|
||||
|
||||
+10
-24
@@ -461,9 +461,11 @@ pub(super) async fn force_join_list_of_local_users(
|
||||
);
|
||||
}
|
||||
|
||||
let Ok(admin_room) = self.services.admin.get_admin_room().await else {
|
||||
return Err!("There is not an admin room to check for server admins.",);
|
||||
};
|
||||
let server_admins = self.services.admin.get_admins().await;
|
||||
|
||||
if server_admins.is_empty() {
|
||||
return Err!("There are no admins set for this server.");
|
||||
}
|
||||
|
||||
let (room_id, servers) = self
|
||||
.services
|
||||
@@ -482,15 +484,6 @@ pub(super) async fn force_join_list_of_local_users(
|
||||
return Err!("We are not joined in this room.");
|
||||
}
|
||||
|
||||
let server_admins: Vec<_> = self
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.active_local_users_in_room(&admin_room)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
@@ -583,9 +576,11 @@ pub(super) async fn force_join_all_local_users(
|
||||
);
|
||||
}
|
||||
|
||||
let Ok(admin_room) = self.services.admin.get_admin_room().await else {
|
||||
return Err!("There is not an admin room to check for server admins.",);
|
||||
};
|
||||
let server_admins = self.services.admin.get_admins().await;
|
||||
|
||||
if server_admins.is_empty() {
|
||||
return Err!("There are no admins set for this server.");
|
||||
}
|
||||
|
||||
let (room_id, servers) = self
|
||||
.services
|
||||
@@ -604,15 +599,6 @@ pub(super) async fn force_join_all_local_users(
|
||||
return Err!("We are not joined in this room.");
|
||||
}
|
||||
|
||||
let server_admins: Vec<_> = self
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.active_local_users_in_room(&admin_room)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
|
||||
+57
-28
@@ -1,7 +1,15 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{Err, Error, Result, debug, debug_warn, err, result::NotFound, utils};
|
||||
use conduwuit::{
|
||||
Err, Error, Result, debug, debug_warn, err,
|
||||
result::NotFound,
|
||||
utils,
|
||||
utils::{IterStream, stream::WidebandExt},
|
||||
};
|
||||
use conduwuit_service::{Services, users::parse_master_key};
|
||||
use futures::{StreamExt, stream::FuturesUnordered};
|
||||
use ruma::{
|
||||
@@ -134,6 +142,7 @@ pub(crate) async fn get_keys_route(
|
||||
&body.device_keys,
|
||||
|u| u == sender_user,
|
||||
true, // Always allow local users to see device names of other local users
|
||||
body.timeout.unwrap_or(Duration::from_secs(10)),
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -145,7 +154,12 @@ pub(crate) async fn claim_keys_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<claim_keys::v3::Request>,
|
||||
) -> Result<claim_keys::v3::Response> {
|
||||
claim_keys_helper(&services, &body.one_time_keys).await
|
||||
claim_keys_helper(
|
||||
&services,
|
||||
&body.one_time_keys,
|
||||
body.timeout.unwrap_or(Duration::from_secs(10)),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// # `POST /_matrix/client/r0/keys/device_signing/upload`
|
||||
@@ -421,6 +435,7 @@ pub(crate) async fn get_keys_helper<F>(
|
||||
device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
|
||||
allowed_signatures: F,
|
||||
include_display_names: bool,
|
||||
timeout: Duration,
|
||||
) -> Result<get_keys::v3::Response>
|
||||
where
|
||||
F: Fn(&UserId) -> bool + Send + Sync,
|
||||
@@ -512,9 +527,10 @@ where
|
||||
|
||||
let mut failures = BTreeMap::new();
|
||||
|
||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
||||
let futures = get_over_federation
|
||||
.into_iter()
|
||||
.map(|(server, vec)| async move {
|
||||
.stream()
|
||||
.wide_filter_map(|(server, vec)| async move {
|
||||
let mut device_keys_input_fed = BTreeMap::new();
|
||||
for (user_id, keys) in vec {
|
||||
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
|
||||
@@ -522,17 +538,22 @@ where
|
||||
|
||||
let request =
|
||||
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
|
||||
let response = tokio::time::timeout(
|
||||
timeout,
|
||||
services.sending.send_federation_request(server, request),
|
||||
)
|
||||
.await
|
||||
// Need to flatten the Result<Result<V, E>, E> into Result<V, E>
|
||||
.map_err(|_| err!(Request(Unknown("Timeout when getting keys over federation."))))
|
||||
.and_then(|res| res);
|
||||
|
||||
let response = services
|
||||
.sending
|
||||
.send_federation_request(server, request)
|
||||
.await;
|
||||
|
||||
(server, response)
|
||||
Some((server, response))
|
||||
})
|
||||
.collect();
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.await
|
||||
.into_iter();
|
||||
|
||||
while let Some((server, response)) = futures.next().await {
|
||||
for (server, response) in futures {
|
||||
match response {
|
||||
| Ok(response) => {
|
||||
for (user, master_key) in response.master_keys {
|
||||
@@ -564,8 +585,8 @@ where
|
||||
self_signing_keys.extend(response.self_signing_keys);
|
||||
device_keys.extend(response.device_keys);
|
||||
},
|
||||
| _ => {
|
||||
failures.insert(server.to_string(), json!({}));
|
||||
| Err(e) => {
|
||||
failures.insert(server.to_string(), json!({ "error": e.to_string() }));
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -608,6 +629,7 @@ fn add_unsigned_device_display_name(
|
||||
pub(crate) async fn claim_keys_helper(
|
||||
services: &Services,
|
||||
one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>,
|
||||
timeout: Duration,
|
||||
) -> Result<claim_keys::v3::Response> {
|
||||
let mut one_time_keys = BTreeMap::new();
|
||||
|
||||
@@ -638,32 +660,39 @@ pub(crate) async fn claim_keys_helper(
|
||||
|
||||
let mut failures = BTreeMap::new();
|
||||
|
||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
||||
let futures = get_over_federation
|
||||
.into_iter()
|
||||
.map(|(server, vec)| async move {
|
||||
.stream()
|
||||
.wide_filter_map(|(server, vec)| async move {
|
||||
let mut one_time_keys_input_fed = BTreeMap::new();
|
||||
for (user_id, keys) in vec {
|
||||
one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
|
||||
}
|
||||
(
|
||||
server,
|
||||
services
|
||||
.sending
|
||||
.send_federation_request(server, federation::keys::claim_keys::v1::Request {
|
||||
let response = tokio::time::timeout(
|
||||
timeout,
|
||||
services.sending.send_federation_request(
|
||||
server,
|
||||
federation::keys::claim_keys::v1::Request {
|
||||
one_time_keys: one_time_keys_input_fed,
|
||||
})
|
||||
.await,
|
||||
},
|
||||
),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| err!(Request(Unknown("Timeout when claiming keys over federation."))))
|
||||
.and_then(|res| res);
|
||||
Some((server, response))
|
||||
})
|
||||
.collect();
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.await
|
||||
.into_iter();
|
||||
|
||||
while let Some((server, response)) = futures.next().await {
|
||||
for (server, response) in futures {
|
||||
match response {
|
||||
| Ok(keys) => {
|
||||
one_time_keys.extend(keys.one_time_keys);
|
||||
},
|
||||
| Err(_e) => {
|
||||
failures.insert(server.to_string(), json!({}));
|
||||
| Err(e) => {
|
||||
failures.insert(server.to_string(), json!({"error": e.to_string()}));
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{
|
||||
Err, Result, debug_warn, trace,
|
||||
Err, Result, debug, debug_warn, info, trace,
|
||||
utils::{IterStream, future::TryExtExt},
|
||||
};
|
||||
use futures::{
|
||||
FutureExt, StreamExt,
|
||||
FutureExt, StreamExt, TryFutureExt,
|
||||
future::{OptionFuture, join3},
|
||||
stream::FuturesUnordered,
|
||||
};
|
||||
@@ -79,9 +79,15 @@ async fn room_summary_response(
|
||||
.server_in_room(services.globals.server_name(), room_id)
|
||||
.await
|
||||
{
|
||||
return local_room_summary_response(services, room_id, sender_user)
|
||||
match local_room_summary_response(services, room_id, sender_user)
|
||||
.boxed()
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
| Ok(response) => return Ok(response),
|
||||
| Err(e) => {
|
||||
debug_warn!("Failed to get local room summary: {e:?}, falling back to remote");
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
let room =
|
||||
@@ -111,26 +117,27 @@ async fn local_room_summary_response(
|
||||
sender_user: Option<&UserId>,
|
||||
) -> Result<get_summary::msc3266::Response> {
|
||||
trace!(?sender_user, "Sending local room summary response for {room_id:?}");
|
||||
let join_rule = services.rooms.state_accessor.get_join_rules(room_id);
|
||||
|
||||
let world_readable = services.rooms.state_accessor.is_world_readable(room_id);
|
||||
|
||||
let guest_can_join = services.rooms.state_accessor.guest_can_join(room_id);
|
||||
|
||||
let (join_rule, world_readable, guest_can_join) =
|
||||
join3(join_rule, world_readable, guest_can_join).await;
|
||||
|
||||
trace!("{join_rule:?}, {world_readable:?}, {guest_can_join:?}");
|
||||
user_can_see_summary(
|
||||
services,
|
||||
room_id,
|
||||
&join_rule.clone().into(),
|
||||
guest_can_join,
|
||||
world_readable,
|
||||
join_rule.allowed_rooms(),
|
||||
sender_user,
|
||||
let (join_rule, world_readable, guest_can_join) = join3(
|
||||
services.rooms.state_accessor.get_join_rules(room_id),
|
||||
services.rooms.state_accessor.is_world_readable(room_id),
|
||||
services.rooms.state_accessor.guest_can_join(room_id),
|
||||
)
|
||||
.await?;
|
||||
.await;
|
||||
|
||||
// Synapse allows server admins to bypass visibility checks.
|
||||
// That seems neat so we'll copy that behaviour.
|
||||
if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await {
|
||||
user_can_see_summary(
|
||||
services,
|
||||
room_id,
|
||||
&join_rule.clone().into(),
|
||||
guest_can_join,
|
||||
world_readable,
|
||||
join_rule.allowed_rooms(),
|
||||
sender_user,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let canonical_alias = services
|
||||
.rooms
|
||||
@@ -231,15 +238,27 @@ async fn remote_room_summary_hierarchy_response(
|
||||
"Federaton of room {room_id} is currently disabled on this server."
|
||||
)));
|
||||
}
|
||||
if servers.is_empty() {
|
||||
return Err!(Request(MissingParam(
|
||||
"No servers were provided to fetch the room over federation"
|
||||
)));
|
||||
}
|
||||
|
||||
let request = get_hierarchy::v1::Request::new(room_id.to_owned());
|
||||
|
||||
let mut requests: FuturesUnordered<_> = servers
|
||||
.iter()
|
||||
.map(|server| {
|
||||
info!("Fetching room summary for {room_id} from server {server}");
|
||||
services
|
||||
.sending
|
||||
.send_federation_request(server, request.clone())
|
||||
.inspect_ok(move |v| {
|
||||
debug!("Fetched room summary for {room_id} from server {server}: {v:?}");
|
||||
})
|
||||
.inspect_err(move |e| {
|
||||
info!("Failed to fetch room summary for {room_id} from server {server}: {e}");
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -255,23 +274,23 @@ async fn remote_room_summary_hierarchy_response(
|
||||
continue;
|
||||
}
|
||||
|
||||
return user_can_see_summary(
|
||||
services,
|
||||
room_id,
|
||||
&room.join_rule,
|
||||
room.guest_can_join,
|
||||
room.world_readable,
|
||||
room.allowed_room_ids.iter().map(AsRef::as_ref),
|
||||
sender_user,
|
||||
)
|
||||
.await
|
||||
.map(|()| room);
|
||||
if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await {
|
||||
return user_can_see_summary(
|
||||
services,
|
||||
room_id,
|
||||
&room.join_rule,
|
||||
room.guest_can_join,
|
||||
room.world_readable,
|
||||
room.allowed_room_ids.iter().map(AsRef::as_ref),
|
||||
sender_user,
|
||||
)
|
||||
.await
|
||||
.map(|()| room);
|
||||
}
|
||||
return Ok(room);
|
||||
}
|
||||
|
||||
Err!(Request(NotFound(
|
||||
"Room is unknown to this server and was unable to fetch over federation with the \
|
||||
provided servers available"
|
||||
)))
|
||||
Err!(Request(NotFound("Room not found or is not accessible")))
|
||||
}
|
||||
|
||||
async fn user_can_see_summary<'a, I>(
|
||||
@@ -311,21 +330,14 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err!(Request(Forbidden(
|
||||
"Room is not world readable, not publicly accessible/joinable, restricted room \
|
||||
conditions not met, and guest access is forbidden. Not allowed to see details \
|
||||
of this room."
|
||||
)))
|
||||
Err!(Request(Forbidden("Room is not accessible")))
|
||||
},
|
||||
| None => {
|
||||
if is_public_room || world_readable {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err!(Request(Forbidden(
|
||||
"Room is not world readable or publicly accessible/joinable, authentication is \
|
||||
required"
|
||||
)))
|
||||
Err!(Request(Forbidden("Room is not accessible")))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use axum::{Json, extract::State, response::IntoResponse};
|
||||
use conduwuit::{Error, Result};
|
||||
use futures::StreamExt;
|
||||
use ruma::api::client::{
|
||||
discovery::{
|
||||
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
|
||||
@@ -71,21 +70,18 @@ pub(crate) async fn well_known_support(
|
||||
|
||||
// Try to add admin users as contacts if no contacts are configured
|
||||
if contacts.is_empty() {
|
||||
if let Ok(admin_room) = services.admin.get_admin_room().await {
|
||||
let admin_users = services.rooms.state_cache.room_members(&admin_room);
|
||||
let mut stream = admin_users;
|
||||
let admin_users = services.admin.get_admins().await;
|
||||
|
||||
while let Some(user_id) = stream.next().await {
|
||||
// Skip server user
|
||||
if *user_id == services.globals.server_user {
|
||||
continue;
|
||||
}
|
||||
contacts.push(Contact {
|
||||
role: role_value.clone(),
|
||||
email_address: None,
|
||||
matrix_id: Some(user_id.to_owned()),
|
||||
});
|
||||
for user_id in &admin_users {
|
||||
if *user_id == services.globals.server_user {
|
||||
continue;
|
||||
}
|
||||
|
||||
contacts.push(Contact {
|
||||
role: role_value.clone(),
|
||||
email_address: None,
|
||||
matrix_id: Some(user_id.to_owned()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{Error, Result};
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt};
|
||||
@@ -96,6 +98,7 @@ pub(crate) async fn get_keys_route(
|
||||
&body.device_keys,
|
||||
|u| Some(u.server_name()) == body.origin.as_deref(),
|
||||
services.globals.allow_device_name_federation(),
|
||||
Duration::from_secs(0),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -124,7 +127,8 @@ pub(crate) async fn claim_keys_route(
|
||||
));
|
||||
}
|
||||
|
||||
let result = claim_keys_helper(&services, &body.one_time_keys).await?;
|
||||
let result =
|
||||
claim_keys_helper(&services, &body.one_time_keys, Duration::from_secs(0)).await?;
|
||||
|
||||
Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys })
|
||||
}
|
||||
|
||||
+42
-1
@@ -500,7 +500,19 @@ pub struct Config {
|
||||
#[serde(default = "default_appservice_idle_timeout")]
|
||||
pub appservice_idle_timeout: u64,
|
||||
|
||||
/// Notification gateway pusher idle connection pool timeout.
|
||||
/// Notification gateway pusher request connection timeout (seconds).
|
||||
///
|
||||
/// default: 15
|
||||
#[serde(default = "default_pusher_conn_timeout")]
|
||||
pub pusher_conn_timeout: u64,
|
||||
|
||||
/// Notification gateway pusher total request timeout (seconds).
|
||||
///
|
||||
/// default: 60
|
||||
#[serde(default = "default_pusher_timeout")]
|
||||
pub pusher_timeout: u64,
|
||||
|
||||
/// Notification gateway pusher idle connection pool timeout (seconds).
|
||||
///
|
||||
/// default: 15
|
||||
#[serde(default = "default_pusher_idle_timeout")]
|
||||
@@ -1670,6 +1682,13 @@ pub struct Config {
|
||||
#[serde(default = "default_url_preview_max_spider_size")]
|
||||
pub url_preview_max_spider_size: usize,
|
||||
|
||||
/// Total request timeout for URL previews (seconds). This includes
|
||||
/// connection, request, and response body reading time.
|
||||
///
|
||||
/// default: 120
|
||||
#[serde(default = "default_url_preview_timeout")]
|
||||
pub url_preview_timeout: u64,
|
||||
|
||||
/// Option to decide whether you would like to run the domain allowlist
|
||||
/// checks (contains and explicit) on the root domain or not. Does not apply
|
||||
/// to URL contains allowlist. Defaults to false.
|
||||
@@ -1819,6 +1838,22 @@ pub struct Config {
|
||||
#[serde(default = "default_admin_room_tag")]
|
||||
pub admin_room_tag: String,
|
||||
|
||||
/// A list of Matrix IDs that are qualified as server admins.
|
||||
///
|
||||
/// Any Matrix IDs within this list are regarded as an admin
|
||||
/// regardless of whether they are in the admin room or not
|
||||
///
|
||||
/// default: []
|
||||
#[serde(default)]
|
||||
pub admins_list: Vec<OwnedUserId>,
|
||||
|
||||
/// Defines whether those within the admin room are added to the
|
||||
/// admins_list.
|
||||
///
|
||||
/// default: true
|
||||
#[serde(default = "true_fn")]
|
||||
pub admins_from_room: bool,
|
||||
|
||||
/// Sentry.io crash/panic reporting, performance monitoring/metrics, etc.
|
||||
/// This is NOT enabled by default.
|
||||
#[serde(default)]
|
||||
@@ -2408,6 +2443,10 @@ fn default_appservice_timeout() -> u64 { 35 }
|
||||
|
||||
fn default_appservice_idle_timeout() -> u64 { 300 }
|
||||
|
||||
fn default_pusher_conn_timeout() -> u64 { 15 }
|
||||
|
||||
fn default_pusher_timeout() -> u64 { 60 }
|
||||
|
||||
fn default_pusher_idle_timeout() -> u64 { 15 }
|
||||
|
||||
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
||||
@@ -2535,6 +2574,8 @@ fn default_url_preview_max_spider_size() -> usize {
|
||||
256_000 // 256KB
|
||||
}
|
||||
|
||||
fn default_url_preview_timeout() -> u64 { 120 }
|
||||
|
||||
fn default_new_user_displayname_suffix() -> String { "🏳️⚧️".to_owned() }
|
||||
|
||||
fn default_sentry_endpoint() -> Option<Url> { None }
|
||||
|
||||
@@ -66,7 +66,10 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
|
||||
.layer(RequestBodyTimeoutLayer::new(Duration::from_secs(
|
||||
server.config.client_receive_timeout,
|
||||
)))
|
||||
.layer(TimeoutLayer::with_status_code(StatusCode::REQUEST_TIMEOUT, Duration::from_secs(server.config.client_request_timeout)))
|
||||
.layer(TimeoutLayer::with_status_code(
|
||||
StatusCode::REQUEST_TIMEOUT,
|
||||
Duration::from_secs(server.config.client_request_timeout),
|
||||
))
|
||||
.layer(SetResponseHeaderLayer::if_not_present(
|
||||
HeaderName::from_static("origin-agent-cluster"), // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Origin-Agent-Cluster
|
||||
HeaderValue::from_static("?1"),
|
||||
|
||||
@@ -9,7 +9,10 @@ use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
|
||||
use termimad::MadSkin;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::{Dep, admin};
|
||||
use crate::{
|
||||
Dep,
|
||||
admin::{self, InvocationSource},
|
||||
};
|
||||
|
||||
pub struct Console {
|
||||
server: Arc<Server>,
|
||||
@@ -160,7 +163,11 @@ impl Console {
|
||||
}
|
||||
|
||||
async fn process(self: Arc<Self>, line: String) {
|
||||
match self.admin.command_in_place(line, None).await {
|
||||
match self
|
||||
.admin
|
||||
.command_in_place(line, None, InvocationSource::Console)
|
||||
.await
|
||||
{
|
||||
| Ok(Some(ref content)) => self.output(content),
|
||||
| Err(ref content) => self.output_err(content),
|
||||
| _ => unreachable!(),
|
||||
|
||||
@@ -2,6 +2,8 @@ use conduwuit::{Err, Result, debug, debug_info, error, implement, info};
|
||||
use ruma::events::room::message::RoomMessageEventContent;
|
||||
use tokio::time::{Duration, sleep};
|
||||
|
||||
use crate::admin::InvocationSource;
|
||||
|
||||
pub(super) const SIGNAL: &str = "SIGUSR2";
|
||||
|
||||
/// Possibly spawn the terminal console at startup if configured.
|
||||
@@ -88,7 +90,10 @@ pub(super) async fn signal_execute(&self) -> Result {
|
||||
async fn execute_command(&self, i: usize, command: String) -> Result {
|
||||
debug!("Execute command #{i}: executing {command:?}");
|
||||
|
||||
match self.command_in_place(command, None).await {
|
||||
match self
|
||||
.command_in_place(command, None, InvocationSource::Console)
|
||||
.await
|
||||
{
|
||||
| Ok(Some(output)) => Self::execute_command_output(i, &output),
|
||||
| Err(output) => Self::execute_command_error(i, &output),
|
||||
| Ok(None) => {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use conduwuit::{Err, Result, debug_info, debug_warn, error, implement, matrix::pdu::PduBuilder};
|
||||
use conduwuit::{
|
||||
Err, Result, debug_info, debug_warn, error, implement, matrix::pdu::PduBuilder, warn,
|
||||
};
|
||||
use ruma::{
|
||||
RoomId, UserId,
|
||||
events::{
|
||||
@@ -176,6 +178,19 @@ async fn set_room_tag(&self, room_id: &RoomId, user_id: &UserId, tag: &str) -> R
|
||||
pub async fn revoke_admin(&self, user_id: &UserId) -> Result {
|
||||
use MembershipState::{Invite, Join, Knock, Leave};
|
||||
|
||||
if self
|
||||
.services
|
||||
.server
|
||||
.config
|
||||
.admins_list
|
||||
.contains(&user_id.to_owned())
|
||||
{
|
||||
warn!(
|
||||
"Revoking the admin status of {user_id} will not work correctly as they are within \
|
||||
the admins_list config."
|
||||
);
|
||||
}
|
||||
|
||||
let Ok(room_id) = self.get_admin_room().await else {
|
||||
return Err!(error!("No admin room available or created."));
|
||||
};
|
||||
|
||||
+129
-61
@@ -14,10 +14,10 @@ use conduwuit_core::{
|
||||
Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
|
||||
};
|
||||
pub use create::create_admin_room;
|
||||
use futures::{Future, FutureExt, TryFutureExt};
|
||||
use futures::{Future, FutureExt, StreamExt, TryFutureExt};
|
||||
use loole::{Receiver, Sender};
|
||||
use ruma::{
|
||||
Mxc, OwnedEventId, OwnedMxcUri, OwnedRoomId, RoomId, UInt, UserId,
|
||||
Mxc, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
|
||||
events::{
|
||||
Mentions,
|
||||
room::{
|
||||
@@ -54,15 +54,37 @@ struct Services {
|
||||
media: Dep<crate::media::Service>,
|
||||
}
|
||||
|
||||
/// Inputs to a command are a multi-line string, optional reply_id, and optional
|
||||
/// sender.
|
||||
/// Inputs to a command are a multi-line string, invocation source, optional
|
||||
/// reply_id, and optional sender.
|
||||
#[derive(Debug)]
|
||||
pub struct CommandInput {
|
||||
pub command: String,
|
||||
pub reply_id: Option<OwnedEventId>,
|
||||
pub source: InvocationSource,
|
||||
pub sender: Option<Box<UserId>>,
|
||||
}
|
||||
|
||||
/// Where a command is being invoked from.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum InvocationSource {
|
||||
/// The server's private admin room
|
||||
AdminRoom,
|
||||
/// An escaped `\!admin` command in a public room
|
||||
EscapedCommand,
|
||||
/// The server's admin console
|
||||
Console,
|
||||
/// Some other trusted internal source
|
||||
Internal,
|
||||
}
|
||||
|
||||
impl InvocationSource {
|
||||
/// Returns whether this invocation source allows "restricted"
|
||||
/// commands, i.e. ones that could be potentially dangerous if executed by
|
||||
/// an attacker or in a public room.
|
||||
#[must_use]
|
||||
pub fn allows_restricted(&self) -> bool { !matches!(self, Self::EscapedCommand) }
|
||||
}
|
||||
|
||||
/// Prototype of the tab-completer. The input is buffered text when tab
|
||||
/// asserted; the output will fully replace the input buffer.
|
||||
pub type Completer = fn(&str) -> String;
|
||||
@@ -276,10 +298,15 @@ impl Service {
|
||||
/// Posts a command to the command processor queue and returns. Processing
|
||||
/// will take place on the service worker's task asynchronously. Errors if
|
||||
/// the queue is full.
|
||||
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> {
|
||||
pub fn command(
|
||||
&self,
|
||||
command: String,
|
||||
reply_id: Option<OwnedEventId>,
|
||||
source: InvocationSource,
|
||||
) -> Result<()> {
|
||||
self.channel
|
||||
.0
|
||||
.send(CommandInput { command, reply_id, sender: None })
|
||||
.send(CommandInput { command, reply_id, source, sender: None })
|
||||
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
|
||||
}
|
||||
|
||||
@@ -290,11 +317,17 @@ impl Service {
|
||||
&self,
|
||||
command: String,
|
||||
reply_id: Option<OwnedEventId>,
|
||||
source: InvocationSource,
|
||||
sender: Box<UserId>,
|
||||
) -> Result<()> {
|
||||
self.channel
|
||||
.0
|
||||
.send(CommandInput { command, reply_id, sender: Some(sender) })
|
||||
.send(CommandInput {
|
||||
command,
|
||||
reply_id,
|
||||
source,
|
||||
sender: Some(sender),
|
||||
})
|
||||
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
|
||||
}
|
||||
|
||||
@@ -304,8 +337,9 @@ impl Service {
|
||||
&self,
|
||||
command: String,
|
||||
reply_id: Option<OwnedEventId>,
|
||||
source: InvocationSource,
|
||||
) -> ProcessorResult {
|
||||
self.process_command(CommandInput { command, reply_id, sender: None })
|
||||
self.process_command(CommandInput { command, reply_id, source, sender: None })
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -349,16 +383,50 @@ impl Service {
|
||||
handle(services, command).await
|
||||
}
|
||||
|
||||
/// Returns the list of admins for this server. First loads
|
||||
/// the admin_list from the configuration, then adds users from
|
||||
/// the admin room if applicable.
|
||||
pub async fn get_admins(&self) -> Vec<OwnedUserId> {
|
||||
let mut generated_admin_list: Vec<OwnedUserId> =
|
||||
self.services.server.config.admins_list.clone();
|
||||
|
||||
if self.services.server.config.admins_from_room {
|
||||
if let Ok(admin_room) = self.get_admin_room().await {
|
||||
let admin_users = self.services.state_cache.room_members(&admin_room);
|
||||
let mut stream = admin_users;
|
||||
|
||||
while let Some(user_id) = stream.next().await {
|
||||
generated_admin_list.push(user_id.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
generated_admin_list
|
||||
}
|
||||
|
||||
/// Checks whether a given user is an admin of this server
|
||||
pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
|
||||
let Ok(admin_room) = self.get_admin_room().await else {
|
||||
return false;
|
||||
};
|
||||
if self
|
||||
.services
|
||||
.server
|
||||
.config
|
||||
.admins_list
|
||||
.contains(&user_id.to_owned())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
self.services
|
||||
.state_cache
|
||||
.is_joined(user_id, &admin_room)
|
||||
.await
|
||||
if self.services.server.config.admins_from_room {
|
||||
if let Ok(admin_room) = self.get_admin_room().await {
|
||||
return self
|
||||
.services
|
||||
.state_cache
|
||||
.is_joined(user_id, &admin_room)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
/// Gets the room ID of the admin room
|
||||
@@ -459,59 +527,59 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn is_admin_command<E>(&self, event: &E, body: &str) -> bool
|
||||
pub async fn is_admin_command<E>(&self, event: &E, body: &str) -> Option<InvocationSource>
|
||||
where
|
||||
E: Event + Send + Sync,
|
||||
{
|
||||
// Server-side command-escape with public echo
|
||||
let is_escape = body.starts_with('\\');
|
||||
let is_public_escape = is_escape && body.trim_start_matches('\\').starts_with("!admin");
|
||||
|
||||
// Admin command with public echo (in admin room)
|
||||
let server_user = &self.services.globals.server_user;
|
||||
let is_public_prefix =
|
||||
body.starts_with("!admin") || body.starts_with(server_user.as_str());
|
||||
|
||||
// Expected backward branch
|
||||
if !is_public_escape && !is_public_prefix {
|
||||
return false;
|
||||
}
|
||||
|
||||
let user_is_local = self.services.globals.user_is_local(event.sender());
|
||||
|
||||
// only allow public escaped commands by local admins
|
||||
if is_public_escape && !user_is_local {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if server-side command-escape is disabled by configuration
|
||||
if is_public_escape && !self.services.server.config.admin_escape_commands {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Prevent unescaped !admin from being used outside of the admin room
|
||||
if event.room_id().is_some()
|
||||
&& is_public_prefix
|
||||
&& !self.is_admin_room(event.room_id().unwrap()).await
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
// Only senders who are admin can proceed
|
||||
// If the user isn't an admin they definitely can't run admin commands
|
||||
if !self.user_is_admin(event.sender()).await {
|
||||
return false;
|
||||
return None;
|
||||
}
|
||||
|
||||
// This will evaluate to false if the emergency password is set up so that
|
||||
// the administrator can execute commands as the server user
|
||||
let emergency_password_set = self.services.server.config.emergency_password.is_some();
|
||||
let from_server = event.sender() == server_user && !emergency_password_set;
|
||||
if from_server && self.is_admin_room(event.room_id().unwrap()).await {
|
||||
return false;
|
||||
}
|
||||
if let Some(room_id) = event.room_id()
|
||||
&& self.is_admin_room(room_id).await
|
||||
{
|
||||
// This is a message in the admin room
|
||||
|
||||
// Authentic admin command
|
||||
true
|
||||
// Ignore messages which aren't admin commands
|
||||
let server_user = &self.services.globals.server_user;
|
||||
if !(body.starts_with("!admin") || body.starts_with(server_user.as_str())) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Ignore messages from the server user _unless_ the emergency password is set
|
||||
let emergency_password_set = self.services.server.config.emergency_password.is_some();
|
||||
if event.sender() == server_user && !emergency_password_set {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Looks good
|
||||
Some(InvocationSource::AdminRoom)
|
||||
} else {
|
||||
// This is a message outside the admin room
|
||||
|
||||
// Is it an escaped admin command? i.e. `\!admin --help`
|
||||
let is_public_escape =
|
||||
body.starts_with('\\') && body.trim_start_matches('\\').starts_with("!admin");
|
||||
|
||||
// Ignore the message if it's not
|
||||
if !is_public_escape {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Only admin users belonging to this server can use escaped commands
|
||||
if !self.services.globals.user_is_local(event.sender()) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Check if escaped commands are disabled in the config
|
||||
if !self.services.server.config.admin_escape_commands {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Looks good
|
||||
Some(InvocationSource::EscapedCommand)
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
|
||||
@@ -47,6 +47,7 @@ impl crate::Service for Service {
|
||||
})?
|
||||
.local_address(url_preview_bind_addr)
|
||||
.dns_resolver(resolver.resolver.clone())
|
||||
.timeout(Duration::from_secs(config.url_preview_timeout))
|
||||
.redirect(redirect::Policy::limited(3))
|
||||
.build()?,
|
||||
|
||||
@@ -68,6 +69,11 @@ impl crate::Service for Service {
|
||||
.dns_resolver(resolver.resolver.hooked.clone())
|
||||
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
||||
.read_timeout(Duration::from_secs(config.federation_timeout))
|
||||
.timeout(Duration::from_secs(
|
||||
config
|
||||
.federation_timeout
|
||||
.saturating_add(config.federation_conn_timeout),
|
||||
))
|
||||
.pool_max_idle_per_host(config.federation_idle_per_host.into())
|
||||
.pool_idle_timeout(Duration::from_secs(config.federation_idle_timeout))
|
||||
.redirect(redirect::Policy::limited(3))
|
||||
@@ -77,6 +83,7 @@ impl crate::Service for Service {
|
||||
.dns_resolver(resolver.resolver.hooked.clone())
|
||||
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
||||
.read_timeout(Duration::from_secs(305))
|
||||
.timeout(Duration::from_secs(120))
|
||||
.pool_max_idle_per_host(0)
|
||||
.redirect(redirect::Policy::limited(3))
|
||||
.build()?,
|
||||
@@ -103,6 +110,8 @@ impl crate::Service for Service {
|
||||
|
||||
pusher: base(config)?
|
||||
.dns_resolver(resolver.resolver.clone())
|
||||
.connect_timeout(Duration::from_secs(config.pusher_conn_timeout))
|
||||
.timeout(Duration::from_secs(config.pusher_timeout))
|
||||
.pool_max_idle_per_host(1)
|
||||
.pool_idle_timeout(Duration::from_secs(config.pusher_idle_timeout))
|
||||
.redirect(redirect::Policy::limited(2))
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::{fmt::Debug, mem};
|
||||
use bytes::Bytes;
|
||||
use conduwuit::{
|
||||
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
|
||||
error::inspect_debug_log, implement, trace, utils::string::EMPTY,
|
||||
error::inspect_debug_log, implement, trace,
|
||||
};
|
||||
use http::{HeaderValue, header::AUTHORIZATION};
|
||||
use ipaddress::IPAddress;
|
||||
@@ -90,7 +90,9 @@ where
|
||||
|
||||
debug!(?method, ?url, "Sending request");
|
||||
match client.execute(request).await {
|
||||
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
|
||||
| Ok(response) =>
|
||||
self.handle_response::<T>(dest, actual, &method, &url, response)
|
||||
.await,
|
||||
| Err(error) =>
|
||||
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
|
||||
}
|
||||
@@ -119,7 +121,9 @@ fn validate_url(&self, url: &Url) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[implement(super::Service)]
|
||||
async fn handle_response<T>(
|
||||
&self,
|
||||
dest: &ServerName,
|
||||
actual: &ActualDest,
|
||||
method: &Method,
|
||||
@@ -162,7 +166,6 @@ async fn into_http_response(
|
||||
.expect("http::response::Builder is usable"),
|
||||
);
|
||||
|
||||
// TODO: handle timeout
|
||||
trace!("Waiting for response body...");
|
||||
let body = response
|
||||
.bytes()
|
||||
@@ -286,10 +289,13 @@ where
|
||||
T: OutgoingRequest + Send,
|
||||
{
|
||||
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
|
||||
const SATIR: SendAccessToken<'_> = SendAccessToken::IfRequired(EMPTY);
|
||||
|
||||
let http_request = request
|
||||
.try_into_http_request::<Vec<u8>>(actual.string().as_str(), SATIR, &VERSIONS)
|
||||
.try_into_http_request::<Vec<u8>>(
|
||||
actual.string().as_str(),
|
||||
SendAccessToken::None,
|
||||
&VERSIONS,
|
||||
)
|
||||
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
|
||||
|
||||
Ok(http_request)
|
||||
|
||||
+18
-18
@@ -198,7 +198,7 @@ impl Service {
|
||||
trace!("Push gateway destination: {dest}");
|
||||
|
||||
let http_request = request
|
||||
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::IfRequired(""), &VERSIONS)
|
||||
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::None, &VERSIONS)
|
||||
.map_err(|e| {
|
||||
err!(BadServerResponse(warn!(
|
||||
"Failed to find destination {dest} for push gateway: {e}"
|
||||
@@ -245,7 +245,7 @@ impl Service {
|
||||
.expect("http::response::Builder is usable"),
|
||||
);
|
||||
|
||||
let body = response.bytes().await?; // TODO: handle timeout
|
||||
let body = response.bytes().await?;
|
||||
|
||||
if !status.is_success() {
|
||||
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
|
||||
@@ -288,7 +288,7 @@ impl Service {
|
||||
let mut notify = None;
|
||||
let mut tweaks = Vec::new();
|
||||
if event.room_id().is_none() {
|
||||
// TODO(hydra): does this matter?
|
||||
// This only affects v12+ create events
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -427,20 +427,20 @@ impl Service {
|
||||
}
|
||||
|
||||
let d = vec![device];
|
||||
let mut notifi = Notification::new(d);
|
||||
let mut notify = Notification::new(d);
|
||||
|
||||
notifi.event_id = Some(event.event_id().to_owned());
|
||||
notifi.room_id = Some(event.room_id().unwrap().to_owned());
|
||||
notify.event_id = Some(event.event_id().to_owned());
|
||||
notify.room_id = Some(event.room_id().unwrap().to_owned());
|
||||
if http
|
||||
.data
|
||||
.get("org.matrix.msc4076.disable_badge_count")
|
||||
.is_none() && http.data.get("disable_badge_count").is_none()
|
||||
{
|
||||
notifi.counts = NotificationCounts::new(unread, uint!(0));
|
||||
notify.counts = NotificationCounts::new(unread, uint!(0));
|
||||
} else {
|
||||
// counts will not be serialised if it's the default (0, 0)
|
||||
// skip_serializing_if = "NotificationCounts::is_default"
|
||||
notifi.counts = NotificationCounts::default();
|
||||
notify.counts = NotificationCounts::default();
|
||||
}
|
||||
|
||||
if !event_id_only {
|
||||
@@ -449,30 +449,30 @@ impl Service {
|
||||
.iter()
|
||||
.any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
|
||||
{
|
||||
notifi.prio = NotificationPriority::High;
|
||||
notify.prio = NotificationPriority::High;
|
||||
} else {
|
||||
notifi.prio = NotificationPriority::Low;
|
||||
notify.prio = NotificationPriority::Low;
|
||||
}
|
||||
notifi.sender = Some(event.sender().to_owned());
|
||||
notifi.event_type = Some(event.kind().to_owned());
|
||||
notifi.content = serde_json::value::to_raw_value(event.content()).ok();
|
||||
notify.sender = Some(event.sender().to_owned());
|
||||
notify.event_type = Some(event.kind().to_owned());
|
||||
notify.content = serde_json::value::to_raw_value(event.content()).ok();
|
||||
|
||||
if *event.kind() == TimelineEventType::RoomMember {
|
||||
notifi.user_is_target =
|
||||
notify.user_is_target =
|
||||
event.state_key() == Some(event.sender().as_str());
|
||||
}
|
||||
|
||||
notifi.sender_display_name =
|
||||
notify.sender_display_name =
|
||||
self.services.users.displayname(event.sender()).await.ok();
|
||||
|
||||
notifi.room_name = self
|
||||
notify.room_name = self
|
||||
.services
|
||||
.state_accessor
|
||||
.get_name(event.room_id().unwrap())
|
||||
.await
|
||||
.ok();
|
||||
|
||||
notifi.room_alias = self
|
||||
notify.room_alias = self
|
||||
.services
|
||||
.state_accessor
|
||||
.get_canonical_alias(event.room_id().unwrap())
|
||||
@@ -480,7 +480,7 @@ impl Service {
|
||||
.ok();
|
||||
}
|
||||
|
||||
self.send_request(&http.url, send_event_notification::v1::Request::new(notifi))
|
||||
self.send_request(&http.url, send_event_notification::v1::Request::new(notify))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -335,10 +335,11 @@ where
|
||||
if let Some(body) = content.body {
|
||||
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
|
||||
|
||||
if self.services.admin.is_admin_command(pdu, &body).await {
|
||||
if let Some(source) = self.services.admin.is_admin_command(pdu, &body).await {
|
||||
self.services.admin.command_with_sender(
|
||||
body,
|
||||
Some((pdu.event_id()).into()),
|
||||
source,
|
||||
pdu.sender.clone().into(),
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use std::{fmt::Debug, mem};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use conduwuit::{Err, Result, debug_error, err, trace, utils, warn};
|
||||
use reqwest::Client;
|
||||
use conduwuit::{Err, Result, debug_error, err, implement, trace, utils, warn};
|
||||
use ruma::api::{
|
||||
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
|
||||
};
|
||||
@@ -11,8 +10,9 @@ use ruma::api::{
|
||||
///
|
||||
/// Only returns Ok(None) if there is no url specified in the appservice
|
||||
/// registration file
|
||||
pub(crate) async fn send_request<T>(
|
||||
client: &Client,
|
||||
#[implement(super::Service)]
|
||||
pub async fn send_appservice_request<T>(
|
||||
&self,
|
||||
registration: Registration,
|
||||
request: T,
|
||||
) -> Result<Option<T::IncomingResponse>>
|
||||
@@ -32,10 +32,10 @@ where
|
||||
trace!("Appservice URL \"{dest}\", Appservice ID: {}", registration.id);
|
||||
|
||||
let hs_token = registration.hs_token.as_str();
|
||||
let mut http_request = request
|
||||
let http_request = request
|
||||
.try_into_http_request::<BytesMut>(
|
||||
&dest,
|
||||
SendAccessToken::IfRequired(hs_token),
|
||||
SendAccessToken::Appservice(hs_token),
|
||||
&VERSIONS,
|
||||
)
|
||||
.map_err(|e| {
|
||||
@@ -45,19 +45,10 @@ where
|
||||
})?
|
||||
.map(BytesMut::freeze);
|
||||
|
||||
let mut parts = http_request.uri().clone().into_parts();
|
||||
let old_path_and_query = parts.path_and_query.unwrap().as_str().to_owned();
|
||||
let symbol = if old_path_and_query.contains('?') { "&" } else { "?" };
|
||||
|
||||
parts.path_and_query = Some(
|
||||
(old_path_and_query + symbol + "access_token=" + hs_token)
|
||||
.parse()
|
||||
.unwrap(),
|
||||
);
|
||||
*http_request.uri_mut() = parts.try_into().expect("our manipulation is always valid");
|
||||
|
||||
let reqwest_request = reqwest::Request::try_from(http_request)?;
|
||||
|
||||
let client = &self.services.client.appservice;
|
||||
|
||||
let mut response = client.execute(reqwest_request).await.map_err(|e| {
|
||||
warn!("Could not send request to appservice \"{}\" at {dest}: {e:?}", registration.id);
|
||||
e
|
||||
@@ -75,7 +66,7 @@ where
|
||||
.expect("http::response::Builder is usable"),
|
||||
);
|
||||
|
||||
let body = response.bytes().await?; // TODO: handle timeout
|
||||
let body = response.bytes().await?;
|
||||
|
||||
if !status.is_success() {
|
||||
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
|
||||
|
||||
@@ -18,10 +18,7 @@ use conduwuit::{
|
||||
warn,
|
||||
};
|
||||
use futures::{FutureExt, Stream, StreamExt};
|
||||
use ruma::{
|
||||
RoomId, ServerName, UserId,
|
||||
api::{OutgoingRequest, appservice::Registration},
|
||||
};
|
||||
use ruma::{RoomId, ServerName, UserId, api::OutgoingRequest};
|
||||
use tokio::{task, task::JoinSet};
|
||||
|
||||
use self::data::Data;
|
||||
@@ -318,22 +315,6 @@ impl Service {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Sends a request to an appservice
|
||||
///
|
||||
/// Only returns None if there is no url specified in the appservice
|
||||
/// registration file
|
||||
pub async fn send_appservice_request<T>(
|
||||
&self,
|
||||
registration: Registration,
|
||||
request: T,
|
||||
) -> Result<Option<T::IncomingResponse>>
|
||||
where
|
||||
T: OutgoingRequest + Debug + Send,
|
||||
{
|
||||
let client = &self.services.client.appservice;
|
||||
appservice::send_request(client, registration, request).await
|
||||
}
|
||||
|
||||
/// Clean up queued sending event data
|
||||
///
|
||||
/// Used after we remove an appservice registration or a user deletes a push
|
||||
|
||||
@@ -50,9 +50,7 @@ use ruma::{
|
||||
};
|
||||
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
||||
|
||||
use super::{
|
||||
Destination, EduBuf, EduVec, Msg, SendingEvent, Service, appservice, data::QueueItem,
|
||||
};
|
||||
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
|
||||
|
||||
#[derive(Debug)]
|
||||
enum TransactionStatus {
|
||||
@@ -720,18 +718,18 @@ impl Service {
|
||||
|
||||
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
||||
// transaction");
|
||||
let client = &self.services.client.appservice;
|
||||
match appservice::send_request(
|
||||
client,
|
||||
appservice,
|
||||
ruma::api::appservice::event::push_events::v1::Request {
|
||||
events: pdu_jsons,
|
||||
txn_id: txn_id.into(),
|
||||
ephemeral: edu_jsons,
|
||||
to_device: Vec::new(), // TODO
|
||||
},
|
||||
)
|
||||
.await
|
||||
|
||||
match self
|
||||
.send_appservice_request(
|
||||
appservice,
|
||||
ruma::api::appservice::event::push_events::v1::Request {
|
||||
events: pdu_jsons,
|
||||
txn_id: txn_id.into(),
|
||||
ephemeral: edu_jsons,
|
||||
to_device: Vec::new(), // TODO
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(_) => Ok(Destination::Appservice(id)),
|
||||
| Err(e) => Err((Destination::Appservice(id), e)),
|
||||
|
||||
Reference in New Issue
Block a user