Compare commits

...

20 Commits

Author SHA1 Message Date
Jade Ellis aed15f246a refactor: Clean up logging issues
Primary issues: Double escapes (debug fmt), spans without levels
2026-01-05 18:28:57 +00:00
timedout 27d6604d14 fix: Use a timeout instead of deadline 2026-01-03 17:08:47 +00:00
timedout 1c7bd2f6fa style: Remove unnecessary then() calls in chain 2026-01-03 16:22:49 +00:00
timedout 56d7099011 style: Include errors in key claim response too 2026-01-03 16:10:06 +00:00
timedout bc426e1bfc fix: Apply client-requested timeout to federated key queries
Also parallelised federation calls in related functions
2026-01-03 16:05:05 +00:00
timedout 6c61b3ec5b fix: Build error two: electric boogaloo 2025-12-31 21:15:28 +00:00
timedout 9d9d1170b6 fix: Build error 2025-12-31 21:04:06 +00:00
Jade Ellis 7be20abcad style: Fix typo 2025-12-31 20:08:53 +00:00
Jade Ellis 078275964c chore: Update precommit hooks 2025-12-31 20:08:53 +00:00
timedout bf200ad12d fix: Resolve compile errors
me and cargo check are oops now
2025-12-31 20:01:29 +00:00
timedout 41e628892d chore: Add news fragment 2025-12-31 20:01:29 +00:00
timedout 44851ee6a2 feat: Fall back to remote room summary if local fails 2025-12-31 20:01:29 +00:00
timedout a7e6e6e83f feat: Allow local server admins to bypass summary visibility checks
feat: Allow local server admins to bypass summary visibility checks

Also improve error messages so they aren't so damn long.
2025-12-31 20:01:29 +00:00
Ginger 8a561fcd3a chore: Clippy fixes 2025-12-31 19:56:35 +00:00
Ginger 25c305f473 chore: Fix comment formatting 2025-12-31 19:56:35 +00:00
Ginger c900350164 chore: Add news fragment 2025-12-31 19:56:35 +00:00
Ginger c565e6ffbc feat: Restrict where certain admin commands may be used 2025-12-31 19:56:31 +00:00
Jade Ellis 442f887c98 style: Improve warning regarding admin removal 2025-12-31 19:40:42 +00:00
Terry 03220845e5 docs: Changelog 2025-12-31 19:35:53 +00:00
Terry f8c1e9bcde feat: Config defined admin list
Closes !1246
2025-12-31 19:35:40 +00:00
86 changed files with 620 additions and 425 deletions
+2 -2
View File
@@ -23,7 +23,7 @@ repos:
- id: check-added-large-files - id: check-added-large-files
- repo: https://github.com/crate-ci/typos - repo: https://github.com/crate-ci/typos
rev: v1.40.0 rev: v1.41.0
hooks: hooks:
- id: typos - id: typos
- id: typos - id: typos
@@ -31,7 +31,7 @@ repos:
stages: [commit-msg] stages: [commit-msg]
- repo: https://github.com/crate-ci/committed - repo: https://github.com/crate-ci/committed
rev: v1.1.8 rev: v1.1.9
hooks: hooks:
- id: committed - id: committed
Generated
+7 -7
View File
@@ -1750,7 +1750,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.52.0", "windows-sys 0.61.2",
] ]
[[package]] [[package]]
@@ -2405,7 +2405,7 @@ dependencies = [
"libc", "libc",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"socket2 0.5.10", "socket2 0.6.1",
"tokio", "tokio",
"tower-service", "tower-service",
"tracing", "tracing",
@@ -3121,7 +3121,7 @@ version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [ dependencies = [
"windows-sys 0.60.2", "windows-sys 0.61.2",
] ]
[[package]] [[package]]
@@ -3749,7 +3749,7 @@ dependencies = [
"quinn-udp", "quinn-udp",
"rustc-hash", "rustc-hash",
"rustls", "rustls",
"socket2 0.5.10", "socket2 0.6.1",
"thiserror 2.0.17", "thiserror 2.0.17",
"tokio", "tokio",
"tracing", "tracing",
@@ -3786,9 +3786,9 @@ dependencies = [
"cfg_aliases", "cfg_aliases",
"libc", "libc",
"once_cell", "once_cell",
"socket2 0.5.10", "socket2 0.6.1",
"tracing", "tracing",
"windows-sys 0.52.0", "windows-sys 0.60.2",
] ]
[[package]] [[package]]
@@ -4325,7 +4325,7 @@ dependencies = [
"errno", "errno",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys 0.52.0", "windows-sys 0.61.2",
] ]
[[package]] [[package]]
+1
View File
@@ -0,0 +1 @@
Certain potentially dangerous admin commands are now restricted to only be usable in the admin room and server console.
+1
View File
@@ -0,0 +1 @@
Implemented a configuration defined admin list independent of the admin room. (@Terryiscool160).
+1
View File
@@ -0,0 +1 @@
Fixed unreliable room summary fetching and improved error messages. Contributed by @nex.
+2
View File
@@ -0,0 +1,2 @@
Client requested timeout parameter is now applied to e2ee key lookups and claims. Related federation requests are now
also concurrent. Contributed by @nex.
+12
View File
@@ -1590,6 +1590,18 @@
# #
#admin_room_tag = "m.server_notice" #admin_room_tag = "m.server_notice"
# A list of Matrix IDs that are qualified as server admins.
#
# Any Matrix IDs within this list are regarded as an admin
# regardless of whether they are in the admin room or not
#
#admins_list = []
# Defines whether those within the admin room are added to the
# admins_list.
#
#admins_from_room = true
# Sentry.io crash/panic reporting, performance monitoring/metrics, etc. # Sentry.io crash/panic reporting, performance monitoring/metrics, etc.
# This is NOT enabled by default. # This is NOT enabled by default.
# #
+2 -2
View File
@@ -128,7 +128,7 @@ Keep in mind the frequency that the log will be reached, and the relevancy to a
```rs ```rs
// Good // Good
error!( error!(
error = %err, error = ?err,
room_id = %room_id, room_id = %room_id,
"Failed to send event to room" "Failed to send event to room"
); );
@@ -264,7 +264,7 @@ pub async fn send_federation_request(
warn!( warn!(
destination = %destination, destination = %destination,
attempt = attempt, attempt = attempt,
error = %err, error = ?err,
retry_delay_ms = retry_delay.as_millis(), retry_delay_ms = retry_delay.as_millis(),
"Federation request failed, retrying" "Federation request failed, retrying"
); );
+16 -4
View File
@@ -48,19 +48,31 @@ pub enum AdminCommand {
Query(QueryCommand), Query(QueryCommand),
} }
#[tracing::instrument(skip_all, name = "command")] #[tracing::instrument(skip_all, name = "command", level = "info")]
pub(super) async fn process(command: AdminCommand, context: &Context<'_>) -> Result { pub(super) async fn process(command: AdminCommand, context: &Context<'_>) -> Result {
use AdminCommand::*; use AdminCommand::*;
match command { match command {
| Appservices(command) => appservice::process(command, context).await, | Appservices(command) => {
// appservice commands are all restricted
context.bail_restricted()?;
appservice::process(command, context).await
},
| Media(command) => media::process(command, context).await, | Media(command) => media::process(command, context).await,
| Users(command) => user::process(command, context).await, | Users(command) => {
// user commands are all restricted
context.bail_restricted()?;
user::process(command, context).await
},
| Rooms(command) => room::process(command, context).await, | Rooms(command) => room::process(command, context).await,
| Federation(command) => federation::process(command, context).await, | Federation(command) => federation::process(command, context).await,
| Server(command) => server::process(command, context).await, | Server(command) => server::process(command, context).await,
| Debug(command) => debug::process(command, context).await, | Debug(command) => debug::process(command, context).await,
| Query(command) => query::process(command, context).await, | Query(command) => {
// query commands are all restricted
context.bail_restricted()?;
query::process(command, context).await
},
| Check(command) => check::process(command, context).await, | Check(command) => check::process(command, context).await,
} }
} }
+21 -1
View File
@@ -1,6 +1,6 @@
use std::{fmt, time::SystemTime}; use std::{fmt, time::SystemTime};
use conduwuit::Result; use conduwuit::{Err, Result};
use conduwuit_service::Services; use conduwuit_service::Services;
use futures::{ use futures::{
Future, FutureExt, TryFutureExt, Future, FutureExt, TryFutureExt,
@@ -8,6 +8,7 @@ use futures::{
lock::Mutex, lock::Mutex,
}; };
use ruma::{EventId, UserId}; use ruma::{EventId, UserId};
use service::admin::InvocationSource;
pub(crate) struct Context<'a> { pub(crate) struct Context<'a> {
pub(crate) services: &'a Services, pub(crate) services: &'a Services,
@@ -16,6 +17,7 @@ pub(crate) struct Context<'a> {
pub(crate) reply_id: Option<&'a EventId>, pub(crate) reply_id: Option<&'a EventId>,
pub(crate) sender: Option<&'a UserId>, pub(crate) sender: Option<&'a UserId>,
pub(crate) output: Mutex<BufWriter<Vec<u8>>>, pub(crate) output: Mutex<BufWriter<Vec<u8>>>,
pub(crate) source: InvocationSource,
} }
impl Context<'_> { impl Context<'_> {
@@ -43,4 +45,22 @@ impl Context<'_> {
self.sender self.sender
.unwrap_or_else(|| self.services.globals.server_user.as_ref()) .unwrap_or_else(|| self.services.globals.server_user.as_ref())
} }
/// Returns an Err if the [`Self::source`] of this context does not allow
/// restricted commands to be executed.
///
/// This is intended to be placed at the start of restricted commands'
/// implementations, like so:
///
/// ```ignore
/// self.bail_restricted()?;
/// // actual command impl
/// ```
pub(crate) fn bail_restricted(&self) -> Result {
if self.source.allows_restricted() {
Ok(())
} else {
Err!("This command can only be used in the admin room.")
}
}
} }
+11 -24
View File
@@ -291,6 +291,8 @@ pub(super) async fn get_remote_pdu(
#[admin_command] #[admin_command]
pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result { pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result {
self.bail_restricted()?;
let room_id = self.services.rooms.alias.resolve(&room).await?; let room_id = self.services.rooms.alias.resolve(&room).await?;
let room_state: Vec<Raw<AnyStateEvent>> = self let room_state: Vec<Raw<AnyStateEvent>> = self
.services .services
@@ -417,27 +419,6 @@ pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool)
Err!("No log level was specified.") Err!("No log level was specified.")
} }
#[admin_command]
pub(super) async fn sign_json(&self) -> Result {
if self.body.len() < 2
|| !self.body[0].trim().starts_with("```")
|| self.body.last().unwrap_or(&"").trim() != "```"
{
return Err!("Expected code block in command body. Add --help for details.");
}
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
match serde_json::from_str(&string) {
| Err(e) => return Err!("Invalid json: {e}"),
| Ok(mut value) => {
self.services.server_keys.sign_json(&mut value)?;
let json_text = serde_json::to_string_pretty(&value)?;
write!(self, "{json_text}")
},
}
.await
}
#[admin_command] #[admin_command]
pub(super) async fn verify_json(&self) -> Result { pub(super) async fn verify_json(&self) -> Result {
if self.body.len() < 2 if self.body.len() < 2
@@ -475,8 +456,10 @@ pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result {
} }
#[admin_command] #[admin_command]
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self), level = "info")]
pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result { pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
self.bail_restricted()?;
if !self if !self
.services .services
.rooms .rooms
@@ -500,8 +483,10 @@ pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
} }
#[admin_command] #[admin_command]
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self), level = "info")]
pub(super) async fn latest_pdu_in_room(&self, room_id: OwnedRoomId) -> Result { pub(super) async fn latest_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
self.bail_restricted()?;
if !self if !self
.services .services
.rooms .rooms
@@ -525,13 +510,15 @@ pub(super) async fn latest_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
} }
#[admin_command] #[admin_command]
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self), level = "info")]
pub(super) async fn force_set_room_state_from_server( pub(super) async fn force_set_room_state_from_server(
&self, &self,
room_id: OwnedRoomId, room_id: OwnedRoomId,
server_name: OwnedServerName, server_name: OwnedServerName,
at_event: Option<OwnedEventId>, at_event: Option<OwnedEventId>,
) -> Result { ) -> Result {
self.bail_restricted()?;
if !self if !self
.services .services
.rooms .rooms
+3 -9
View File
@@ -47,9 +47,9 @@ pub enum DebugCommand {
shorteventid: ShortEventId, shorteventid: ShortEventId,
}, },
/// - Attempts to retrieve a PDU from a remote server. Inserts it into our /// - Attempts to retrieve a PDU from a remote server. **Does not** insert
/// database/timeline if found and we do not have this PDU already /// it into the database
/// (following normal event auth rules, handles it as an incoming PDU). /// or persist it anywhere.
GetRemotePdu { GetRemotePdu {
/// An event ID (a $ followed by the base64 reference hash) /// An event ID (a $ followed by the base64 reference hash)
event_id: OwnedEventId, event_id: OwnedEventId,
@@ -125,12 +125,6 @@ pub enum DebugCommand {
reset: bool, reset: bool,
}, },
/// - Sign JSON blob
///
/// This command needs a JSON blob provided in a Markdown code block below
/// the command.
SignJson,
/// - Verify JSON signatures /// - Verify JSON signatures
/// ///
/// This command needs a JSON blob provided in a Markdown code block below /// This command needs a JSON blob provided in a Markdown code block below
+2
View File
@@ -8,12 +8,14 @@ use crate::{admin_command, get_room_info};
#[admin_command] #[admin_command]
pub(super) async fn disable_room(&self, room_id: OwnedRoomId) -> Result { pub(super) async fn disable_room(&self, room_id: OwnedRoomId) -> Result {
self.bail_restricted()?;
self.services.rooms.metadata.disable_room(&room_id, true); self.services.rooms.metadata.disable_room(&room_id, true);
self.write_str("Room disabled.").await self.write_str("Room disabled.").await
} }
#[admin_command] #[admin_command]
pub(super) async fn enable_room(&self, room_id: OwnedRoomId) -> Result { pub(super) async fn enable_room(&self, room_id: OwnedRoomId) -> Result {
self.bail_restricted()?;
self.services.rooms.metadata.disable_room(&room_id, false); self.services.rooms.metadata.disable_room(&room_id, false);
self.write_str("Room enabled.").await self.write_str("Room enabled.").await
} }
+8
View File
@@ -16,6 +16,8 @@ pub(super) async fn delete(
mxc: Option<OwnedMxcUri>, mxc: Option<OwnedMxcUri>,
event_id: Option<OwnedEventId>, event_id: Option<OwnedEventId>,
) -> Result { ) -> Result {
self.bail_restricted()?;
if event_id.is_some() && mxc.is_some() { if event_id.is_some() && mxc.is_some() {
return Err!("Please specify either an MXC or an event ID, not both.",); return Err!("Please specify either an MXC or an event ID, not both.",);
} }
@@ -176,6 +178,8 @@ pub(super) async fn delete(
#[admin_command] #[admin_command]
pub(super) async fn delete_list(&self) -> Result { pub(super) async fn delete_list(&self) -> Result {
self.bail_restricted()?;
if self.body.len() < 2 if self.body.len() < 2
|| !self.body[0].trim().starts_with("```") || !self.body[0].trim().starts_with("```")
|| self.body.last().unwrap_or(&"").trim() != "```" || self.body.last().unwrap_or(&"").trim() != "```"
@@ -231,6 +235,8 @@ pub(super) async fn delete_past_remote_media(
after: bool, after: bool,
yes_i_want_to_delete_local_media: bool, yes_i_want_to_delete_local_media: bool,
) -> Result { ) -> Result {
self.bail_restricted()?;
if before && after { if before && after {
return Err!("Please only pick one argument, --before or --after.",); return Err!("Please only pick one argument, --before or --after.",);
} }
@@ -273,6 +279,8 @@ pub(super) async fn delete_all_from_server(
server_name: OwnedServerName, server_name: OwnedServerName,
yes_i_want_to_delete_local_media: bool, yes_i_want_to_delete_local_media: bool,
) -> Result { ) -> Result {
self.bail_restricted()?;
if server_name == self.services.globals.server_name() && !yes_i_want_to_delete_local_media { if server_name == self.services.globals.server_name() && !yes_i_want_to_delete_local_media {
return Err!("This command only works for remote media by default.",); return Err!("This command only works for remote media by default.",);
} }
+2 -1
View File
@@ -37,7 +37,7 @@ pub(super) fn dispatch(services: Arc<Services>, command: CommandInput) -> Proces
Box::pin(handle_command(services, command)) Box::pin(handle_command(services, command))
} }
#[tracing::instrument(skip_all, name = "admin")] #[tracing::instrument(skip_all, name = "admin", level = "info")]
async fn handle_command(services: Arc<Services>, command: CommandInput) -> ProcessorResult { async fn handle_command(services: Arc<Services>, command: CommandInput) -> ProcessorResult {
AssertUnwindSafe(Box::pin(process_command(services, &command))) AssertUnwindSafe(Box::pin(process_command(services, &command)))
.catch_unwind() .catch_unwind()
@@ -59,6 +59,7 @@ async fn process_command(services: Arc<Services>, input: &CommandInput) -> Proce
reply_id: input.reply_id.as_deref(), reply_id: input.reply_id.as_deref(),
sender: input.sender.as_deref(), sender: input.sender.as_deref(),
output: BufWriter::new(Vec::new()).into(), output: BufWriter::new(Vec::new()).into(),
source: input.source,
}; };
let (result, mut logs) = process(&context, command, &args).await; let (result, mut logs) = process(&context, command, &args).await;
+3 -3
View File
@@ -98,7 +98,7 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
{ {
| Ok((room_id, servers)) => { | Ok((room_id, servers)) => {
debug!( debug!(
?room_id, %room_id,
?servers, ?servers,
"Got federation response fetching room ID for room {room}" "Got federation response fetching room ID for room {room}"
); );
@@ -240,7 +240,7 @@ async fn ban_list_of_rooms(&self) -> Result {
{ {
| Ok((room_id, servers)) => { | Ok((room_id, servers)) => {
debug!( debug!(
?room_id, %room_id,
?servers, ?servers,
"Got federation response fetching room ID for \ "Got federation response fetching room ID for \
{room}", {room}",
@@ -397,7 +397,7 @@ async fn unban_room(&self, room: OwnedRoomOrAliasId) -> Result {
{ {
| Ok((room_id, servers)) => { | Ok((room_id, servers)) => {
debug!( debug!(
?room_id, %room_id,
?servers, ?servers,
"Got federation response fetching room ID for room {room}" "Got federation response fetching room ID for room {room}"
); );
+8
View File
@@ -24,6 +24,8 @@ pub(super) async fn uptime(&self) -> Result {
#[admin_command] #[admin_command]
pub(super) async fn show_config(&self) -> Result { pub(super) async fn show_config(&self) -> Result {
self.bail_restricted()?;
self.write_str(&format!("{}", *self.services.server.config)) self.write_str(&format!("{}", *self.services.server.config))
.await .await
} }
@@ -118,6 +120,8 @@ pub(super) async fn list_backups(&self) -> Result {
#[admin_command] #[admin_command]
pub(super) async fn backup_database(&self) -> Result { pub(super) async fn backup_database(&self) -> Result {
self.bail_restricted()?;
let db = Arc::clone(&self.services.db); let db = Arc::clone(&self.services.db);
let result = self let result = self
.services .services
@@ -144,6 +148,8 @@ pub(super) async fn admin_notice(&self, message: Vec<String>) -> Result {
#[admin_command] #[admin_command]
pub(super) async fn reload_mods(&self) -> Result { pub(super) async fn reload_mods(&self) -> Result {
self.bail_restricted()?;
self.services.server.reload()?; self.services.server.reload()?;
self.write_str("Reloading server...").await self.write_str("Reloading server...").await
@@ -168,6 +174,8 @@ pub(super) async fn restart(&self, force: bool) -> Result {
#[admin_command] #[admin_command]
pub(super) async fn shutdown(&self) -> Result { pub(super) async fn shutdown(&self) -> Result {
self.bail_restricted()?;
warn!("shutdown command"); warn!("shutdown command");
self.services.server.shutdown()?; self.services.server.shutdown()?;
+10 -24
View File
@@ -461,9 +461,11 @@ pub(super) async fn force_join_list_of_local_users(
); );
} }
let Ok(admin_room) = self.services.admin.get_admin_room().await else { let server_admins = self.services.admin.get_admins().await;
return Err!("There is not an admin room to check for server admins.",);
}; if server_admins.is_empty() {
return Err!("There are no admins set for this server.");
}
let (room_id, servers) = self let (room_id, servers) = self
.services .services
@@ -482,15 +484,6 @@ pub(super) async fn force_join_list_of_local_users(
return Err!("We are not joined in this room."); return Err!("We are not joined in this room.");
} }
let server_admins: Vec<_> = self
.services
.rooms
.state_cache
.active_local_users_in_room(&admin_room)
.map(ToOwned::to_owned)
.collect()
.await;
if !self if !self
.services .services
.rooms .rooms
@@ -583,9 +576,11 @@ pub(super) async fn force_join_all_local_users(
); );
} }
let Ok(admin_room) = self.services.admin.get_admin_room().await else { let server_admins = self.services.admin.get_admins().await;
return Err!("There is not an admin room to check for server admins.",);
}; if server_admins.is_empty() {
return Err!("There are no admins set for this server.");
}
let (room_id, servers) = self let (room_id, servers) = self
.services .services
@@ -604,15 +599,6 @@ pub(super) async fn force_join_all_local_users(
return Err!("We are not joined in this room."); return Err!("We are not joined in this room.");
} }
let server_admins: Vec<_> = self
.services
.rooms
.state_cache
.active_local_users_in_room(&admin_room)
.map(ToOwned::to_owned)
.collect()
.await;
if !self if !self
.services .services
.rooms .rooms
+4 -4
View File
@@ -49,7 +49,7 @@ const RANDOM_USER_ID_LENGTH: usize = 10;
/// ///
/// Note: This will not reserve the username, so the username might become /// Note: This will not reserve the username, so the username might become
/// invalid when trying to register /// invalid when trying to register
#[tracing::instrument(skip_all, fields(%client), name = "register_available")] #[tracing::instrument(skip_all, fields(%client), name = "register_available", level = "info")]
pub(crate) async fn get_register_available_route( pub(crate) async fn get_register_available_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -138,7 +138,7 @@ pub(crate) async fn get_register_available_route(
/// - If `inhibit_login` is false: Creates a device and returns device id and /// - If `inhibit_login` is false: Creates a device and returns device id and
/// access_token /// access_token
#[allow(clippy::doc_markdown)] #[allow(clippy::doc_markdown)]
#[tracing::instrument(skip_all, fields(%client), name = "register")] #[tracing::instrument(skip_all, fields(%client), name = "register", level = "info")]
pub(crate) async fn register_route( pub(crate) async fn register_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -603,7 +603,7 @@ pub(crate) async fn register_route(
/// last seen ts) /// last seen ts)
/// - Forgets to-device events /// - Forgets to-device events
/// - Triggers device list updates /// - Triggers device list updates
#[tracing::instrument(skip_all, fields(%client), name = "change_password")] #[tracing::instrument(skip_all, fields(%client), name = "change_password", level = "info")]
pub(crate) async fn change_password_route( pub(crate) async fn change_password_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -727,7 +727,7 @@ pub(crate) async fn whoami_route(
/// - Forgets all to-device events /// - Forgets all to-device events
/// - Triggers device list updates /// - Triggers device list updates
/// - Removes ability to log in again /// - Removes ability to log in again
#[tracing::instrument(skip_all, fields(%client), name = "deactivate")] #[tracing::instrument(skip_all, fields(%client), name = "deactivate", level = "info")]
pub(crate) async fn deactivate_route( pub(crate) async fn deactivate_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+1 -1
View File
@@ -102,7 +102,7 @@ pub(crate) async fn get_alias_route(
}; };
let servers = room_available_servers(&services, &room_id, &room_alias, servers).await; let servers = room_available_servers(&services, &room_id, &room_alias, servers).await;
debug!(?room_alias, ?room_id, "available servers: {servers:?}"); debug!(%room_alias, %room_id, "available servers: {servers:?}");
Ok(get_alias::v3::Response::new(room_id, servers)) Ok(get_alias::v3::Response::new(room_id, servers))
} }
+1 -1
View File
@@ -74,7 +74,7 @@ pub(crate) async fn get_context_route(
} }
if !visible { if !visible {
debug_warn!(req_evt = ?event_id, ?base_id, ?room_id, "Event requested by {sender_user} but is not allowed to see it, returning 404"); debug_warn!(req_evt = %event_id, ?base_id, %room_id, "Event requested by {sender_user} but is not allowed to see it, returning 404");
return Err!(Request(NotFound("Event not found."))); return Err!(Request(NotFound("Event not found.")));
} }
+1 -1
View File
@@ -49,7 +49,7 @@ pub(crate) async fn get_device_route(
/// # `PUT /_matrix/client/r0/devices/{deviceId}` /// # `PUT /_matrix/client/r0/devices/{deviceId}`
/// ///
/// Updates the metadata on a given device of the sender user. /// Updates the metadata on a given device of the sender user.
#[tracing::instrument(skip_all, fields(%client), name = "update_device")] #[tracing::instrument(skip_all, fields(%client), name = "update_device", level = "debug")]
pub(crate) async fn update_device_route( pub(crate) async fn update_device_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+3 -3
View File
@@ -44,7 +44,7 @@ use crate::Ruma;
/// Lists the public rooms on this server. /// Lists the public rooms on this server.
/// ///
/// - Rooms are ordered by the number of joined members /// - Rooms are ordered by the number of joined members
#[tracing::instrument(skip_all, fields(%client), name = "publicrooms")] #[tracing::instrument(skip_all, fields(%client), name = "publicrooms", level = "info")]
pub(crate) async fn get_public_rooms_filtered_route( pub(crate) async fn get_public_rooms_filtered_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -80,7 +80,7 @@ pub(crate) async fn get_public_rooms_filtered_route(
/// Lists the public rooms on this server. /// Lists the public rooms on this server.
/// ///
/// - Rooms are ordered by the number of joined members /// - Rooms are ordered by the number of joined members
#[tracing::instrument(skip_all, fields(%client), name = "publicrooms")] #[tracing::instrument(skip_all, fields(%client), name = "publicrooms", level = "info")]
pub(crate) async fn get_public_rooms_route( pub(crate) async fn get_public_rooms_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -116,7 +116,7 @@ pub(crate) async fn get_public_rooms_route(
/// # `PUT /_matrix/client/r0/directory/list/room/{roomId}` /// # `PUT /_matrix/client/r0/directory/list/room/{roomId}`
/// ///
/// Sets the visibility of a given room in the room directory. /// Sets the visibility of a given room in the room directory.
#[tracing::instrument(skip_all, fields(%client), name = "room_directory")] #[tracing::instrument(skip_all, fields(%client), name = "room_directory", level = "info")]
pub(crate) async fn set_room_visibility_route( pub(crate) async fn set_room_visibility_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+61 -32
View File
@@ -1,7 +1,15 @@
use std::collections::{BTreeMap, HashMap, HashSet}; use std::{
collections::{BTreeMap, HashMap, HashSet},
time::Duration,
};
use axum::extract::State; use axum::extract::State;
use conduwuit::{Err, Error, Result, debug, debug_warn, err, result::NotFound, utils}; use conduwuit::{
Err, Error, Result, debug, debug_warn, err,
result::NotFound,
utils,
utils::{IterStream, stream::WidebandExt},
};
use conduwuit_service::{Services, users::parse_master_key}; use conduwuit_service::{Services, users::parse_master_key};
use futures::{StreamExt, stream::FuturesUnordered}; use futures::{StreamExt, stream::FuturesUnordered};
use ruma::{ use ruma::{
@@ -44,7 +52,7 @@ pub(crate) async fn upload_keys_route(
.deserialize() .deserialize()
.inspect_err(|e| { .inspect_err(|e| {
debug_warn!( debug_warn!(
?key_id, %key_id,
?one_time_key, ?one_time_key,
"Invalid one time key JSON submitted by client, skipping: {e}" "Invalid one time key JSON submitted by client, skipping: {e}"
); );
@@ -86,8 +94,8 @@ pub(crate) async fn upload_keys_route(
{ {
if existing_keys.json().get() == device_keys.json().get() { if existing_keys.json().get() == device_keys.json().get() {
debug!( debug!(
?sender_user, %sender_user,
?sender_device, %sender_device,
?device_keys, ?device_keys,
"Ignoring user uploaded keys as they are an exact copy already in the \ "Ignoring user uploaded keys as they are an exact copy already in the \
database" database"
@@ -134,6 +142,7 @@ pub(crate) async fn get_keys_route(
&body.device_keys, &body.device_keys,
|u| u == sender_user, |u| u == sender_user,
true, // Always allow local users to see device names of other local users true, // Always allow local users to see device names of other local users
body.timeout.unwrap_or(Duration::from_secs(10)),
) )
.await .await
} }
@@ -145,7 +154,12 @@ pub(crate) async fn claim_keys_route(
State(services): State<crate::State>, State(services): State<crate::State>,
body: Ruma<claim_keys::v3::Request>, body: Ruma<claim_keys::v3::Request>,
) -> Result<claim_keys::v3::Response> { ) -> Result<claim_keys::v3::Response> {
claim_keys_helper(&services, &body.one_time_keys).await claim_keys_helper(
&services,
&body.one_time_keys,
body.timeout.unwrap_or(Duration::from_secs(10)),
)
.await
} }
/// # `POST /_matrix/client/r0/keys/device_signing/upload` /// # `POST /_matrix/client/r0/keys/device_signing/upload`
@@ -324,7 +338,7 @@ pub(crate) async fn upload_signatures_route(
for (user_id, keys) in &body.signed_keys { for (user_id, keys) in &body.signed_keys {
for (key_id, key) in keys { for (key_id, key) in keys {
let Ok(key) = serde_json::to_value(key) let Ok(key) = serde_json::to_value(key)
.inspect_err(|e| debug_warn!(?key_id, "Invalid \"key\" JSON: {e}")) .inspect_err(|e| debug_warn!(%key_id, "Invalid \"key\" JSON: {e}"))
else { else {
continue; continue;
}; };
@@ -421,6 +435,7 @@ pub(crate) async fn get_keys_helper<F>(
device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>, device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
allowed_signatures: F, allowed_signatures: F,
include_display_names: bool, include_display_names: bool,
timeout: Duration,
) -> Result<get_keys::v3::Response> ) -> Result<get_keys::v3::Response>
where where
F: Fn(&UserId) -> bool + Send + Sync, F: Fn(&UserId) -> bool + Send + Sync,
@@ -512,9 +527,10 @@ where
let mut failures = BTreeMap::new(); let mut failures = BTreeMap::new();
let mut futures: FuturesUnordered<_> = get_over_federation let futures = get_over_federation
.into_iter() .into_iter()
.map(|(server, vec)| async move { .stream()
.wide_filter_map(|(server, vec)| async move {
let mut device_keys_input_fed = BTreeMap::new(); let mut device_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec { for (user_id, keys) in vec {
device_keys_input_fed.insert(user_id.to_owned(), keys.clone()); device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
@@ -522,17 +538,22 @@ where
let request = let request =
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed }; federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
let response = tokio::time::timeout(
timeout,
services.sending.send_federation_request(server, request),
)
.await
// Need to flatten the Result<Result<V, E>, E> into Result<V, E>
.map_err(|_| err!(Request(Unknown("Timeout when getting keys over federation."))))
.and_then(|res| res);
let response = services Some((server, response))
.sending
.send_federation_request(server, request)
.await;
(server, response)
}) })
.collect(); .collect::<FuturesUnordered<_>>()
.await
.into_iter();
while let Some((server, response)) = futures.next().await { for (server, response) in futures {
match response { match response {
| Ok(response) => { | Ok(response) => {
for (user, master_key) in response.master_keys { for (user, master_key) in response.master_keys {
@@ -564,8 +585,8 @@ where
self_signing_keys.extend(response.self_signing_keys); self_signing_keys.extend(response.self_signing_keys);
device_keys.extend(response.device_keys); device_keys.extend(response.device_keys);
}, },
| _ => { | Err(e) => {
failures.insert(server.to_string(), json!({})); failures.insert(server.to_string(), json!({ "error": e.to_string() }));
}, },
} }
} }
@@ -608,6 +629,7 @@ fn add_unsigned_device_display_name(
pub(crate) async fn claim_keys_helper( pub(crate) async fn claim_keys_helper(
services: &Services, services: &Services,
one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>, one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>,
timeout: Duration,
) -> Result<claim_keys::v3::Response> { ) -> Result<claim_keys::v3::Response> {
let mut one_time_keys = BTreeMap::new(); let mut one_time_keys = BTreeMap::new();
@@ -638,32 +660,39 @@ pub(crate) async fn claim_keys_helper(
let mut failures = BTreeMap::new(); let mut failures = BTreeMap::new();
let mut futures: FuturesUnordered<_> = get_over_federation let futures = get_over_federation
.into_iter() .into_iter()
.map(|(server, vec)| async move { .stream()
.wide_filter_map(|(server, vec)| async move {
let mut one_time_keys_input_fed = BTreeMap::new(); let mut one_time_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec { for (user_id, keys) in vec {
one_time_keys_input_fed.insert(user_id.clone(), keys.clone()); one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
} }
( let response = tokio::time::timeout(
server, timeout,
services services.sending.send_federation_request(
.sending server,
.send_federation_request(server, federation::keys::claim_keys::v1::Request { federation::keys::claim_keys::v1::Request {
one_time_keys: one_time_keys_input_fed, one_time_keys: one_time_keys_input_fed,
}) },
.await, ),
) )
.await
.map_err(|_| err!(Request(Unknown("Timeout when claiming keys over federation."))))
.and_then(|res| res);
Some((server, response))
}) })
.collect(); .collect::<FuturesUnordered<_>>()
.await
.into_iter();
while let Some((server, response)) = futures.next().await { for (server, response) in futures {
match response { match response {
| Ok(keys) => { | Ok(keys) => {
one_time_keys.extend(keys.one_time_keys); one_time_keys.extend(keys.one_time_keys);
}, },
| Err(_e) => { | Err(e) => {
failures.insert(server.to_string(), json!({})); failures.insert(server.to_string(), json!({"error": e.to_string()}));
}, },
} }
} }
+1 -1
View File
@@ -21,7 +21,7 @@ use crate::Ruma;
/// # `POST /_matrix/client/r0/rooms/{roomId}/invite` /// # `POST /_matrix/client/r0/rooms/{roomId}/invite`
/// ///
/// Tries to send an invite event into the room. /// Tries to send an invite event into the room.
#[tracing::instrument(skip_all, fields(%client), name = "invite")] #[tracing::instrument(skip_all, fields(%client), name = "invite", level = "info")]
pub(crate) async fn invite_user_route( pub(crate) async fn invite_user_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+4 -4
View File
@@ -59,7 +59,7 @@ use crate::Ruma;
/// rules locally /// rules locally
/// - If the server does not know about the room: asks other servers over /// - If the server does not know about the room: asks other servers over
/// federation /// federation
#[tracing::instrument(skip_all, fields(%client), name = "join")] #[tracing::instrument(skip_all, fields(%client), name = "join", level = "info")]
pub(crate) async fn join_room_by_id_route( pub(crate) async fn join_room_by_id_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -131,7 +131,7 @@ pub(crate) async fn join_room_by_id_route(
/// - If the server does not know about the room: use the server name query /// - If the server does not know about the room: use the server name query
/// param if specified. if not specified, asks other servers over federation /// param if specified. if not specified, asks other servers over federation
/// via room alias server name and room ID server name /// via room alias server name and room ID server name
#[tracing::instrument(skip_all, fields(%client), name = "join")] #[tracing::instrument(skip_all, fields(%client), name = "join", level = "info")]
pub(crate) async fn join_room_by_id_or_alias_route( pub(crate) async fn join_room_by_id_or_alias_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -351,7 +351,7 @@ pub async fn join_room_by_id_helper(
Ok(join_room_by_id::v3::Response::new(room_id.to_owned())) Ok(join_room_by_id::v3::Response::new(room_id.to_owned()))
} }
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote")] #[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote", level = "info")]
async fn join_room_by_id_helper_remote( async fn join_room_by_id_helper_remote(
services: &Services, services: &Services,
sender_user: &UserId, sender_user: &UserId,
@@ -709,7 +709,7 @@ async fn join_room_by_id_helper_remote(
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local")] #[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local", level = "info")]
async fn join_room_by_id_helper_local( async fn join_room_by_id_helper_local(
services: &Services, services: &Services,
sender_user: &UserId, sender_user: &UserId,
+1 -1
View File
@@ -44,7 +44,7 @@ use crate::Ruma;
/// # `POST /_matrix/client/*/knock/{roomIdOrAlias}` /// # `POST /_matrix/client/*/knock/{roomIdOrAlias}`
/// ///
/// Tries to knock the room to ask permission to join for the sender user. /// Tries to knock the room to ask permission to join for the sender user.
#[tracing::instrument(skip_all, fields(%client), name = "knock")] #[tracing::instrument(skip_all, fields(%client), name = "knock", level = "info")]
pub(crate) async fn knock_room_route( pub(crate) async fn knock_room_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+1 -1
View File
@@ -63,7 +63,7 @@ pub(crate) async fn joined_rooms_route(
/// ///
/// Performs automatic deactivation if `auto_deactivate_banned_room_attempts` is /// Performs automatic deactivation if `auto_deactivate_banned_room_attempts` is
/// enabled /// enabled
#[tracing::instrument(skip(services))] #[tracing::instrument(skip(services), level = "info")]
pub(crate) async fn banned_room_check( pub(crate) async fn banned_room_check(
services: &Services, services: &Services,
user_id: &UserId, user_id: &UserId,
+1 -1
View File
@@ -115,7 +115,7 @@ async fn paginate_relations_with_filter(
.user_can_see_event(sender_user, room_id, target) .user_can_see_event(sender_user, room_id, target)
.await .await
{ {
debug_warn!(req_evt = ?target, ?room_id, "Event relations requested by {sender_user} but is not allowed to see it, returning 404"); debug_warn!(req_evt = %target, %room_id, "Event relations requested by {sender_user} but is not allowed to see it, returning 404");
return Err!(Request(NotFound("Event not found."))); return Err!(Request(NotFound("Event not found.")));
} }
+3 -3
View File
@@ -29,7 +29,7 @@ struct Report {
/// # `POST /_matrix/client/v3/rooms/{roomId}/report` /// # `POST /_matrix/client/v3/rooms/{roomId}/report`
/// ///
/// Reports an abusive room to homeserver admins /// Reports an abusive room to homeserver admins
#[tracing::instrument(skip_all, fields(%client), name = "report_room")] #[tracing::instrument(skip_all, fields(%client), name = "report_room", level = "info")]
pub(crate) async fn report_room_route( pub(crate) async fn report_room_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -85,7 +85,7 @@ pub(crate) async fn report_room_route(
/// # `POST /_matrix/client/v3/rooms/{roomId}/report/{eventId}` /// # `POST /_matrix/client/v3/rooms/{roomId}/report/{eventId}`
/// ///
/// Reports an inappropriate event to homeserver admins /// Reports an inappropriate event to homeserver admins
#[tracing::instrument(skip_all, fields(%client), name = "report_event")] #[tracing::instrument(skip_all, fields(%client), name = "report_event", level = "info")]
pub(crate) async fn report_event_route( pub(crate) async fn report_event_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -133,7 +133,7 @@ pub(crate) async fn report_event_route(
Ok(report_content::v3::Response {}) Ok(report_content::v3::Response {})
} }
#[tracing::instrument(skip_all, fields(%client), name = "report_user")] #[tracing::instrument(skip_all, fields(%client), name = "report_user", level = "info")]
pub(crate) async fn report_user_route( pub(crate) async fn report_user_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+4 -4
View File
@@ -492,7 +492,7 @@ pub(crate) async fn create_room_route(
.boxed() .boxed()
.await .await
{ {
warn!(%e, "Failed to send invite"); warn!(?e, "Failed to send invite");
} }
} }
@@ -627,7 +627,7 @@ async fn room_alias_check(
.map_err(|e| { .map_err(|e| {
err!(Request(InvalidParam(debug_error!( err!(Request(InvalidParam(debug_error!(
?e, ?e,
?room_alias_name, %room_alias_name,
"Failed to parse room alias.", "Failed to parse room alias.",
)))) ))))
})?; })?;
@@ -711,7 +711,7 @@ fn custom_room_id_check(services: &Services, custom_room_id: &str) -> Result<Own
} }
}) })
.inspect(|full_room_id| { .inspect(|full_room_id| {
debug_info!(?full_room_id, "Full custom room ID"); debug_info!(%full_room_id, "Full custom room ID");
}) })
.inspect_err(|e| warn!(?e, ?custom_room_id, "Failed to create room with custom room ID",)) .inspect_err(|e| warn!(?e, %custom_room_id, "Failed to create room with custom room ID",))
} }
+65 -50
View File
@@ -1,11 +1,11 @@
use axum::extract::State; use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduwuit::{ use conduwuit::{
Err, Result, debug_warn, trace, Err, Result, debug, debug_warn, info, trace,
utils::{IterStream, future::TryExtExt}, utils::{IterStream, future::TryExtExt},
}; };
use futures::{ use futures::{
FutureExt, StreamExt, FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join3}, future::{OptionFuture, join3},
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
@@ -46,7 +46,7 @@ pub(crate) async fn get_room_summary_legacy(
/// # `GET /_matrix/client/v1/room_summary/{roomIdOrAlias}` /// # `GET /_matrix/client/v1/room_summary/{roomIdOrAlias}`
/// ///
/// Returns a short description of the state of a room. /// Returns a short description of the state of a room.
#[tracing::instrument(skip_all, fields(%client), name = "room_summary")] #[tracing::instrument(skip_all, fields(%client), name = "room_summary", level = "info")]
pub(crate) async fn get_room_summary( pub(crate) async fn get_room_summary(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -79,9 +79,15 @@ async fn room_summary_response(
.server_in_room(services.globals.server_name(), room_id) .server_in_room(services.globals.server_name(), room_id)
.await .await
{ {
return local_room_summary_response(services, room_id, sender_user) match local_room_summary_response(services, room_id, sender_user)
.boxed() .boxed()
.await; .await
{
| Ok(response) => return Ok(response),
| Err(e) => {
debug_warn!("Failed to get local room summary: {e:?}, falling back to remote");
},
}
} }
let room = let room =
@@ -110,27 +116,31 @@ async fn local_room_summary_response(
room_id: &RoomId, room_id: &RoomId,
sender_user: Option<&UserId>, sender_user: Option<&UserId>,
) -> Result<get_summary::msc3266::Response> { ) -> Result<get_summary::msc3266::Response> {
trace!(?sender_user, "Sending local room summary response for {room_id:?}"); trace!(
let join_rule = services.rooms.state_accessor.get_join_rules(room_id); sender_user = sender_user.map(tracing::field::display),
"Sending local room summary response for {room_id:?}"
let world_readable = services.rooms.state_accessor.is_world_readable(room_id); );
let (join_rule, world_readable, guest_can_join) = join3(
let guest_can_join = services.rooms.state_accessor.guest_can_join(room_id); services.rooms.state_accessor.get_join_rules(room_id),
services.rooms.state_accessor.is_world_readable(room_id),
let (join_rule, world_readable, guest_can_join) = services.rooms.state_accessor.guest_can_join(room_id),
join3(join_rule, world_readable, guest_can_join).await;
trace!("{join_rule:?}, {world_readable:?}, {guest_can_join:?}");
user_can_see_summary(
services,
room_id,
&join_rule.clone().into(),
guest_can_join,
world_readable,
join_rule.allowed_rooms(),
sender_user,
) )
.await?; .await;
// Synapse allows server admins to bypass visibility checks.
// That seems neat so we'll copy that behaviour.
if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await {
user_can_see_summary(
services,
room_id,
&join_rule.clone().into(),
guest_can_join,
world_readable,
join_rule.allowed_rooms(),
sender_user,
)
.await?;
}
let canonical_alias = services let canonical_alias = services
.rooms .rooms
@@ -221,7 +231,7 @@ async fn remote_room_summary_hierarchy_response(
servers: &[OwnedServerName], servers: &[OwnedServerName],
sender_user: Option<&UserId>, sender_user: Option<&UserId>,
) -> Result<SpaceHierarchyParentSummary> { ) -> Result<SpaceHierarchyParentSummary> {
trace!(?sender_user, ?servers, "Sending remote room summary response for {room_id:?}"); trace!(sender_user = ?sender_user.map(tracing::field::display), ?servers, "Sending remote room summary response for {room_id:?}");
if !services.config.allow_federation { if !services.config.allow_federation {
return Err!(Request(Forbidden("Federation is disabled."))); return Err!(Request(Forbidden("Federation is disabled.")));
} }
@@ -231,15 +241,27 @@ async fn remote_room_summary_hierarchy_response(
"Federaton of room {room_id} is currently disabled on this server." "Federaton of room {room_id} is currently disabled on this server."
))); )));
} }
if servers.is_empty() {
return Err!(Request(MissingParam(
"No servers were provided to fetch the room over federation"
)));
}
let request = get_hierarchy::v1::Request::new(room_id.to_owned()); let request = get_hierarchy::v1::Request::new(room_id.to_owned());
let mut requests: FuturesUnordered<_> = servers let mut requests: FuturesUnordered<_> = servers
.iter() .iter()
.map(|server| { .map(|server| {
info!("Fetching room summary for {room_id} from server {server}");
services services
.sending .sending
.send_federation_request(server, request.clone()) .send_federation_request(server, request.clone())
.inspect_ok(move |v| {
debug!("Fetched room summary for {room_id} from server {server}: {v:?}");
})
.inspect_err(move |e| {
info!("Failed to fetch room summary for {room_id} from server {server}: {e}");
})
}) })
.collect(); .collect();
@@ -255,23 +277,23 @@ async fn remote_room_summary_hierarchy_response(
continue; continue;
} }
return user_can_see_summary( if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await {
services, return user_can_see_summary(
room_id, services,
&room.join_rule, room_id,
room.guest_can_join, &room.join_rule,
room.world_readable, room.guest_can_join,
room.allowed_room_ids.iter().map(AsRef::as_ref), room.world_readable,
sender_user, room.allowed_room_ids.iter().map(AsRef::as_ref),
) sender_user,
.await )
.map(|()| room); .await
.map(|()| room);
}
return Ok(room);
} }
Err!(Request(NotFound( Err!(Request(NotFound("Room not found or is not accessible")))
"Room is unknown to this server and was unable to fetch over federation with the \
provided servers available"
)))
} }
async fn user_can_see_summary<'a, I>( async fn user_can_see_summary<'a, I>(
@@ -311,21 +333,14 @@ where
return Ok(()); return Ok(());
} }
Err!(Request(Forbidden( Err!(Request(Forbidden("Room is not accessible")))
"Room is not world readable, not publicly accessible/joinable, restricted room \
conditions not met, and guest access is forbidden. Not allowed to see details \
of this room."
)))
}, },
| None => { | None => {
if is_public_room || world_readable { if is_public_room || world_readable {
return Ok(()); return Ok(());
} }
Err!(Request(Forbidden( Err!(Request(Forbidden("Room is not accessible")))
"Room is not world readable or publicly accessible/joinable, authentication is \
required"
)))
}, },
} }
} }
+7 -7
View File
@@ -35,7 +35,7 @@ use crate::Ruma;
/// ///
/// Get the supported login types of this server. One of these should be used as /// Get the supported login types of this server. One of these should be used as
/// the `type` field when logging in. /// the `type` field when logging in.
#[tracing::instrument(skip_all, fields(%client), name = "login")] #[tracing::instrument(skip_all, fields(%client), name = "login", level = "info")]
pub(crate) async fn get_login_types_route( pub(crate) async fn get_login_types_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -53,7 +53,7 @@ pub(crate) async fn get_login_types_route(
/// Authenticates the given user by its ID and its password. /// Authenticates the given user by its ID and its password.
/// ///
/// Returns the user ID if successful, and an error otherwise. /// Returns the user ID if successful, and an error otherwise.
#[tracing::instrument(skip_all, fields(%user_id), name = "password")] #[tracing::instrument(skip_all, fields(%user_id), name = "password", level = "debug")]
pub(crate) async fn password_login( pub(crate) async fn password_login(
services: &Services, services: &Services,
user_id: &UserId, user_id: &UserId,
@@ -96,7 +96,7 @@ pub(crate) async fn password_login(
/// ///
/// Creates the user if the user is found in the LDAP and do not already have an /// Creates the user if the user is found in the LDAP and do not already have an
/// account. /// account.
#[tracing::instrument(skip_all, fields(%user_id), name = "ldap")] #[tracing::instrument(skip_all, fields(%user_id), name = "ldap", level = "debug")]
pub(super) async fn ldap_login( pub(super) async fn ldap_login(
services: &Services, services: &Services,
user_id: &UserId, user_id: &UserId,
@@ -212,7 +212,7 @@ pub(crate) async fn handle_login(
/// Note: You can use [`GET /// Note: You can use [`GET
/// /_matrix/client/r0/login`](fn.get_supported_versions_route.html) to see /// /_matrix/client/r0/login`](fn.get_supported_versions_route.html) to see
/// supported login types. /// supported login types.
#[tracing::instrument(skip_all, fields(%client), name = "login")] #[tracing::instrument(skip_all, fields(%client), name = "login", level = "info")]
pub(crate) async fn login_route( pub(crate) async fn login_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -345,7 +345,7 @@ pub(crate) async fn login_route(
/// to log in with the m.login.token flow. /// to log in with the m.login.token flow.
/// ///
/// <https://spec.matrix.org/v1.13/client-server-api/#post_matrixclientv1loginget_token> /// <https://spec.matrix.org/v1.13/client-server-api/#post_matrixclientv1loginget_token>
#[tracing::instrument(skip_all, fields(%client), name = "login_token")] #[tracing::instrument(skip_all, fields(%client), name = "login_token", level = "info")]
pub(crate) async fn login_token_route( pub(crate) async fn login_token_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -413,7 +413,7 @@ pub(crate) async fn login_token_route(
/// last seen ts) /// last seen ts)
/// - Forgets to-device events /// - Forgets to-device events
/// - Triggers device list updates /// - Triggers device list updates
#[tracing::instrument(skip_all, fields(%client), name = "logout")] #[tracing::instrument(skip_all, fields(%client), name = "logout", level = "info")]
pub(crate) async fn logout_route( pub(crate) async fn logout_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
@@ -440,7 +440,7 @@ pub(crate) async fn logout_route(
/// Note: This is equivalent to calling [`GET /// Note: This is equivalent to calling [`GET
/// /_matrix/client/r0/logout`](fn.logout_route.html) from each device of this /// /_matrix/client/r0/logout`](fn.logout_route.html) from each device of this
/// user. /// user.
#[tracing::instrument(skip_all, fields(%client), name = "logout")] #[tracing::instrument(skip_all, fields(%client), name = "logout", level = "info")]
pub(crate) async fn logout_all_route( pub(crate) async fn logout_all_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+7 -7
View File
@@ -140,8 +140,8 @@ pub(crate) async fn get_state_events_for_key_route(
.await .await
.map_err(|_| { .map_err(|_| {
err!(Request(NotFound(debug_warn!( err!(Request(NotFound(debug_warn!(
room_id = ?body.room_id, room_id = %body.room_id,
event_type = ?body.event_type, event_type = %body.event_type,
"State event not found in room.", "State event not found in room.",
)))) ))))
})?; })?;
@@ -226,7 +226,7 @@ async fn allowed_to_send_state_event(
match event_type { match event_type {
| StateEventType::RoomCreate => { | StateEventType::RoomCreate => {
return Err!(Request(BadJson(debug_warn!( return Err!(Request(BadJson(debug_warn!(
?room_id, %room_id,
"You cannot update m.room.create after a room has been created." "You cannot update m.room.create after a room has been created."
)))); ))));
}, },
@@ -237,7 +237,7 @@ async fn allowed_to_send_state_event(
| Ok(acl_content) => { | Ok(acl_content) => {
if acl_content.allow_is_empty() { if acl_content.allow_is_empty() {
return Err!(Request(BadJson(debug_warn!( return Err!(Request(BadJson(debug_warn!(
?room_id, %room_id,
"Sending an ACL event with an empty allow key will permanently \ "Sending an ACL event with an empty allow key will permanently \
brick the room for non-conduwuit's as this equates to no servers \ brick the room for non-conduwuit's as this equates to no servers \
being allowed to participate in this room." being allowed to participate in this room."
@@ -246,7 +246,7 @@ async fn allowed_to_send_state_event(
if acl_content.deny_contains("*") && acl_content.allow_contains("*") { if acl_content.deny_contains("*") && acl_content.allow_contains("*") {
return Err!(Request(BadJson(debug_warn!( return Err!(Request(BadJson(debug_warn!(
?room_id, %room_id,
"Sending an ACL event with a deny and allow key value of \"*\" will \ "Sending an ACL event with a deny and allow key value of \"*\" will \
permanently brick the room for non-conduwuit's as this equates to \ permanently brick the room for non-conduwuit's as this equates to \
no servers being allowed to participate in this room." no servers being allowed to participate in this room."
@@ -258,7 +258,7 @@ async fn allowed_to_send_state_event(
&& !acl_content.allow_contains(services.globals.server_name().as_str()) && !acl_content.allow_contains(services.globals.server_name().as_str())
{ {
return Err!(Request(BadJson(debug_warn!( return Err!(Request(BadJson(debug_warn!(
?room_id, %room_id,
"Sending an ACL event with a deny key value of \"*\" and without \ "Sending an ACL event with a deny key value of \"*\" and without \
your own server name in the allow key will result in you being \ your own server name in the allow key will result in you being \
unable to participate in this room." unable to participate in this room."
@@ -270,7 +270,7 @@ async fn allowed_to_send_state_event(
&& !acl_content.allow_contains(services.globals.server_name().as_str()) && !acl_content.allow_contains(services.globals.server_name().as_str())
{ {
return Err!(Request(BadJson(debug_warn!( return Err!(Request(BadJson(debug_warn!(
?room_id, %room_id,
"Sending an ACL event for an allow key without \"*\" and without \ "Sending an ACL event for an allow key without \"*\" and without \
your own server name in the allow key will result in you being \ your own server name in the allow key will result in you being \
unable to participate in this room." unable to participate in this room."
+5 -5
View File
@@ -50,8 +50,8 @@ use crate::client::{
level = "debug", level = "debug",
skip_all, skip_all,
fields( fields(
room_id = ?room_id, room_id = %room_id,
syncing_user = ?sync_context.syncing_user, syncing_user = %sync_context.syncing_user,
), ),
)] )]
pub(super) async fn load_joined_room( pub(super) async fn load_joined_room(
@@ -578,7 +578,7 @@ async fn build_notification_counts(
) )
.await; .await;
trace!(?notification_count, ?highlight_count, "syncing new notification counts"); trace!(%notification_count, %highlight_count, "syncing new notification counts");
Ok(Some(UnreadNotificationsCount { Ok(Some(UnreadNotificationsCount {
notification_count: Some(notification_count), notification_count: Some(notification_count),
@@ -692,8 +692,8 @@ async fn build_room_summary(
}; };
trace!( trace!(
?joined_member_count, %joined_member_count,
?invited_member_count, %invited_member_count,
heroes_length = heroes.as_ref().map(HashSet::len), heroes_length = heroes.as_ref().map(HashSet::len),
"syncing updated summary" "syncing updated summary"
); );
+2 -2
View File
@@ -307,8 +307,8 @@ async fn build_left_state_and_timeline(
} }
trace!( trace!(
?timeline_start_count, %timeline_start_count,
?timeline_end_count, %timeline_end_count,
"syncing {} timeline events (limited = {}) and {} state events", "syncing {} timeline events (limited = {}) and {} state events",
timeline.pdus.len(), timeline.pdus.len(),
timeline.limited, timeline.limited,
+1 -1
View File
@@ -275,7 +275,7 @@ pub(crate) async fn build_sync_events(
match joined_room { match joined_room {
| Ok((room, updates)) => Some((room_id, room, updates)), | Ok((room, updates)) => Some((room_id, room, updates)),
| Err(err) => { | Err(err) => {
warn!(?err, ?room_id, "error loading joined room {}", room_id); warn!(?err, %room_id, "error loading joined room");
None None
}, },
} }
+1 -1
View File
@@ -217,7 +217,7 @@ pub(super) async fn build_state_incremental<'a>(
the performance penalty is acceptable. the performance penalty is acceptable.
*/ */
trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync"); trace!(%timeline_is_linear, %timeline.limited, "computing state for incremental sync");
// fetch the shorteventids of state events in the timeline // fetch the shorteventids of state events in the timeline
let state_events_in_timeline: BTreeSet<ShortEventId> = services let state_events_in_timeline: BTreeSet<ShortEventId> = services
+1 -1
View File
@@ -26,7 +26,7 @@ use crate::Ruma;
/// TODO: Implement pagination, currently this just returns everything /// TODO: Implement pagination, currently this just returns everything
/// ///
/// An implementation of [MSC2666](https://github.com/matrix-org/matrix-spec-proposals/pull/2666) /// An implementation of [MSC2666](https://github.com/matrix-org/matrix-spec-proposals/pull/2666)
#[tracing::instrument(skip_all, fields(%client), name = "mutual_rooms")] #[tracing::instrument(skip_all, fields(%client), name = "mutual_rooms", level = "info")]
pub(crate) async fn get_mutual_rooms_route( pub(crate) async fn get_mutual_rooms_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+10 -14
View File
@@ -1,6 +1,5 @@
use axum::{Json, extract::State, response::IntoResponse}; use axum::{Json, extract::State, response::IntoResponse};
use conduwuit::{Error, Result}; use conduwuit::{Error, Result};
use futures::StreamExt;
use ruma::api::client::{ use ruma::api::client::{
discovery::{ discovery::{
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo}, discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
@@ -71,21 +70,18 @@ pub(crate) async fn well_known_support(
// Try to add admin users as contacts if no contacts are configured // Try to add admin users as contacts if no contacts are configured
if contacts.is_empty() { if contacts.is_empty() {
if let Ok(admin_room) = services.admin.get_admin_room().await { let admin_users = services.admin.get_admins().await;
let admin_users = services.rooms.state_cache.room_members(&admin_room);
let mut stream = admin_users;
while let Some(user_id) = stream.next().await { for user_id in &admin_users {
// Skip server user if *user_id == services.globals.server_user {
if *user_id == services.globals.server_user { continue;
continue;
}
contacts.push(Contact {
role: role_value.clone(),
email_address: None,
matrix_id: Some(user_id.to_owned()),
});
} }
contacts.push(Contact {
role: role_value.clone(),
email_address: None,
matrix_id: Some(user_id.to_owned()),
});
} }
} }
+3 -3
View File
@@ -40,7 +40,7 @@ pub(crate) async fn get_missing_events_route(
while i < queued_events.len() && events.len() < limit { while i < queued_events.len() && events.len() < limit {
let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else { let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else {
debug!( debug!(
?body.origin, body.origin = body.origin.as_ref().map(tracing::field::display),
"Event {} does not exist locally, skipping", &queued_events[i] "Event {} does not exist locally, skipping", &queued_events[i]
); );
i = i.saturating_add(1); i = i.saturating_add(1);
@@ -59,7 +59,7 @@ pub(crate) async fn get_missing_events_route(
.await .await
{ {
debug!( debug!(
?body.origin, body.origin = body.origin.as_ref().map(tracing::field::display),
"Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id "Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id
); );
i = i.saturating_add(1); i = i.saturating_add(1);
@@ -68,7 +68,7 @@ pub(crate) async fn get_missing_events_route(
let Ok(event) = to_canonical_object(&pdu) else { let Ok(event) = to_canonical_object(&pdu) else {
debug_error!( debug_error!(
?body.origin, body.origin = body.origin.as_ref().map(tracing::field::display),
"Failed to convert PDU in database to canonical JSON: {pdu:?}" "Failed to convert PDU in database to canonical JSON: {pdu:?}"
); );
i = i.saturating_add(1); i = i.saturating_add(1);
+1 -1
View File
@@ -19,7 +19,7 @@ use crate::Ruma;
/// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}` /// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}`
/// ///
/// Invites a remote user to a room. /// Invites a remote user to a room.
#[tracing::instrument(skip_all, fields(%client), name = "invite")] #[tracing::instrument(skip_all, fields(%client), name = "invite", level = "info")]
pub(crate) async fn create_invite_route( pub(crate) async fn create_invite_route(
State(services): State<crate::State>, State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
+1 -1
View File
@@ -22,7 +22,7 @@ use crate::Ruma;
/// # `GET /_matrix/federation/v1/make_join/{roomId}/{userId}` /// # `GET /_matrix/federation/v1/make_join/{roomId}/{userId}`
/// ///
/// Creates a join template. /// Creates a join template.
#[tracing::instrument(skip_all, fields(room_id = %body.room_id, user_id = %body.user_id, origin = %body.origin()))] #[tracing::instrument(skip_all, fields(room_id = %body.room_id, user_id = %body.user_id, origin = %body.origin()), level = "info")]
pub(crate) async fn create_join_event_template_route( pub(crate) async fn create_join_event_template_route(
State(services): State<crate::State>, State(services): State<crate::State>,
body: Ruma<prepare_join_event::v1::Request>, body: Ruma<prepare_join_event::v1::Request>,
+6 -8
View File
@@ -3,9 +3,7 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant};
use axum::extract::State; use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduwuit::{ use conduwuit::{
Err, Error, Result, debug, Err, Error, Result, debug, debug_warn, err, error,
debug::INFO_SPAN_LEVEL,
debug_warn, err, error,
result::LogErr, result::LogErr,
trace, trace,
utils::{ utils::{
@@ -48,7 +46,7 @@ type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
/// Push EDUs and PDUs to this server. /// Push EDUs and PDUs to this server.
#[tracing::instrument( #[tracing::instrument(
name = "txn", name = "txn",
level = INFO_SPAN_LEVEL, level = "debug",
skip_all, skip_all,
fields( fields(
%client, %client,
@@ -83,8 +81,8 @@ pub(crate) async fn send_transaction_message_route(
pdus = body.pdus.len(), pdus = body.pdus.len(),
edus = body.edus.len(), edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(), elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id, id = %body.transaction_id,
origin =?body.origin(), origin = %body.origin(),
"Starting txn", "Starting txn",
); );
@@ -110,8 +108,8 @@ pub(crate) async fn send_transaction_message_route(
pdus = body.pdus.len(), pdus = body.pdus.len(),
edus = body.edus.len(), edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(), elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id, id = %body.transaction_id,
origin =?body.origin(), origin = %body.origin(),
"Finished txn", "Finished txn",
); );
for (id, result) in &results { for (id, result) in &results {
+1 -1
View File
@@ -26,7 +26,7 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use crate::Ruma; use crate::Ruma;
/// helper method for /send_join v1 and v2 /// helper method for /send_join v1 and v2
#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))] #[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()), level = "info")]
async fn create_join_event( async fn create_join_event(
services: &Services, services: &Services,
origin: &ServerName, origin: &ServerName,
+5 -1
View File
@@ -1,3 +1,5 @@
use std::time::Duration;
use axum::extract::State; use axum::extract::State;
use conduwuit::{Error, Result}; use conduwuit::{Error, Result};
use futures::{FutureExt, StreamExt, TryFutureExt}; use futures::{FutureExt, StreamExt, TryFutureExt};
@@ -96,6 +98,7 @@ pub(crate) async fn get_keys_route(
&body.device_keys, &body.device_keys,
|u| Some(u.server_name()) == body.origin.as_deref(), |u| Some(u.server_name()) == body.origin.as_deref(),
services.globals.allow_device_name_federation(), services.globals.allow_device_name_federation(),
Duration::from_secs(0),
) )
.await?; .await?;
@@ -124,7 +127,8 @@ pub(crate) async fn claim_keys_route(
)); ));
} }
let result = claim_keys_helper(&services, &body.one_time_keys).await?; let result =
claim_keys_helper(&services, &body.one_time_keys, Duration::from_secs(0)).await?;
Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys }) Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys })
} }
+2 -2
View File
@@ -340,7 +340,7 @@ where
#[tracing::instrument( #[tracing::instrument(
name = "get", name = "get",
level = "trace" level = "trace",
skip_all, skip_all,
fields(?key) fields(?key)
)] )]
@@ -357,7 +357,7 @@ where
#[tracing::instrument( #[tracing::instrument(
name = "xchg", name = "xchg",
level = "trace" level = "trace",
skip_all, skip_all,
fields(?key, ?val) fields(?key, ?val)
)] )]
+1 -1
View File
@@ -59,7 +59,7 @@ impl Deref for Manager {
/// Update the active configuration, returning prior configuration. /// Update the active configuration, returning prior configuration.
#[implement(Manager)] #[implement(Manager)]
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "info")]
pub fn update(&self, config: Config) -> Result<Arc<Config>> { pub fn update(&self, config: Config) -> Result<Arc<Config>> {
let config = Arc::new(config); let config = Arc::new(config);
let new = Arc::into_raw(config); let new = Arc::into_raw(config);
+16
View File
@@ -1819,6 +1819,22 @@ pub struct Config {
#[serde(default = "default_admin_room_tag")] #[serde(default = "default_admin_room_tag")]
pub admin_room_tag: String, pub admin_room_tag: String,
/// A list of Matrix IDs that are qualified as server admins.
///
/// Any Matrix IDs within this list are regarded as an admin
/// regardless of whether they are in the admin room or not
///
/// default: []
#[serde(default)]
pub admins_list: Vec<OwnedUserId>,
/// Defines whether those within the admin room are added to the
/// admins_list.
///
/// default: true
#[serde(default = "true_fn")]
pub admins_from_room: bool,
/// Sentry.io crash/panic reporting, performance monitoring/metrics, etc. /// Sentry.io crash/panic reporting, performance monitoring/metrics, etc.
/// This is NOT enabled by default. /// This is NOT enabled by default.
#[serde(default)] #[serde(default)]
+11 -10
View File
@@ -532,8 +532,8 @@ where
if sender_power_level < invite_level { if sender_power_level < invite_level {
warn!( warn!(
%sender, %sender,
has=?sender_power_level, has=%sender_power_level,
required=?invite_level, required=%invite_level,
"sender cannot send invites in this room" "sender cannot send invites in this room"
); );
return Ok(false); return Ok(false);
@@ -605,8 +605,8 @@ where
if !check_redaction(room_version, incoming_event, sender_power_level, redact_level)? { if !check_redaction(room_version, incoming_event, sender_power_level, redact_level)? {
warn!( warn!(
%sender, %sender,
?sender_power_level, %sender_power_level,
?redact_level, %redact_level,
"redaction event was not allowed" "redaction event was not allowed"
); );
return Ok(false); return Ok(false);
@@ -772,11 +772,12 @@ where
power_levels_event.as_ref().is_some(), power_levels_event.as_ref().is_some(),
) || auth_user_pl >= invite_level; ) || auth_user_pl >= invite_level;
trace!( trace!(
auth_user_pl=?auth_user_pl, %auth_user_pl,
invite_level=?invite_level, %auth_user_pl,
user_joined=?user_joined, %invite_level,
okay_power=?okay_power, %user_joined,
passing=?(user_joined && okay_power), %okay_power,
passing=%(user_joined && okay_power),
"user for join auth is valid check details" "user for join auth is valid check details"
); );
user_joined && okay_power user_joined && okay_power
@@ -1211,7 +1212,7 @@ fn can_send_event(event: &impl Event, ple: Option<&impl Event>, user_level: Int)
{ {
warn!( warn!(
%user_level, %user_level,
required=?event_type_power_level, required=%event_type_power_level,
state_key=?event.state_key(), state_key=?event.state_key(),
sender=%event.sender(), sender=%event.sender(),
"state_key starts with @ but does not match sender", "state_key starts with @ but does not match sender",
+1 -1
View File
@@ -7,7 +7,7 @@ use super::Engine;
use crate::util::map_err; use crate::util::map_err;
#[implement(Engine)] #[implement(Engine)]
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self), level = "info")]
pub fn backup(&self) -> Result { pub fn backup(&self) -> Result {
let mut engine = self.backup_engine()?; let mut engine = self.backup_engine()?;
let config = &self.ctx.server.config; let config = &self.ctx.server.config;
+1 -1
View File
@@ -4,7 +4,7 @@ use rocksdb::LogLevel;
#[tracing::instrument( #[tracing::instrument(
parent = None, parent = None,
name = "rocksdb", name = "rocksdb",
level = "trace" level = "trace",
skip(msg), skip(msg),
)] )]
pub(crate) fn handle(level: LogLevel, msg: &str) { pub(crate) fn handle(level: LogLevel, msg: &str) {
+3 -3
View File
@@ -17,7 +17,7 @@ use super::{
use crate::{Context, or_else}; use crate::{Context, or_else};
#[implement(Engine)] #[implement(Engine)]
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "info")]
pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<Self>> { pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<Self>> {
let server = &ctx.server; let server = &ctx.server;
let config = &server.config; let config = &server.config;
@@ -63,7 +63,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
} }
#[implement(Engine)] #[implement(Engine)]
#[tracing::instrument(name = "configure", skip_all)] #[tracing::instrument(name = "configure", skip_all, level = "debug")]
fn configure_cfds( fn configure_cfds(
ctx: &Arc<Context>, ctx: &Arc<Context>,
db_opts: &Options, db_opts: &Options,
@@ -119,7 +119,7 @@ fn configure_cfds(
} }
#[implement(Engine)] #[implement(Engine)]
#[tracing::instrument(name = "discover", skip_all)] #[tracing::instrument(name = "discover", skip_all, level = "debug")]
fn discover_cfs(path: &Path, opts: &Options) -> BTreeSet<String> { fn discover_cfs(path: &Path, opts: &Options) -> BTreeSet<String> {
Db::list_cf(opts, path) Db::list_cf(opts, path)
.unwrap_or_default() .unwrap_or_default()
+1 -1
View File
@@ -26,7 +26,7 @@ pub struct Options {
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument( #[tracing::instrument(
name = "compact", name = "compact",
level = "info" level = "info",
skip(self), skip(self),
fields(%self), fields(%self),
)] )]
+4 -4
View File
@@ -113,7 +113,7 @@ impl Drop for Pool {
} }
#[implement(Pool)] #[implement(Pool)]
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "debug")]
pub(crate) fn close(&self) { pub(crate) fn close(&self) {
let workers = take(&mut *self.workers.lock()); let workers = take(&mut *self.workers.lock());
@@ -147,8 +147,8 @@ pub(crate) fn close(&self) {
.map(|result| result.map_err(Error::from_panic)) .map(|result| result.map_err(Error::from_panic))
.enumerate() .enumerate()
.for_each(|(id, result)| match result { .for_each(|(id, result)| match result {
| Ok(()) => trace!(?id, "worker joined"), | Ok(()) => trace!(%id, "worker joined"),
| Err(error) => error!(?id, "worker joined with error: {error}"), | Err(error) => error!(%id, "worker joined with error: {error}"),
}); });
} }
@@ -297,7 +297,7 @@ fn worker_init(&self, id: usize) {
} }
debug!( debug!(
?group, %group,
affinity = ?affinity.collect::<Vec<_>>(), affinity = ?affinity.collect::<Vec<_>>(),
"worker ready" "worker ready"
); );
+9 -9
View File
@@ -105,8 +105,8 @@ pub(super) fn configure(server: &Arc<Server>) -> (usize, Vec<usize>, Vec<usize>)
.unwrap_or("None"), .unwrap_or("None"),
?worker_counts, ?worker_counts,
?queue_sizes, ?queue_sizes,
?total_workers, %total_workers,
stream_width = ?stream::automatic_width(), stream_width = %stream::automatic_width(),
"Frontend topology", "Frontend topology",
); );
@@ -139,13 +139,13 @@ fn update_stream_width(server: &Arc<Server>, num_queues: usize, total_workers: u
let (old_width, new_width) = stream::set_width(req_width); let (old_width, new_width) = stream::set_width(req_width);
let (old_amp, new_amp) = stream::set_amplification(req_amp); let (old_amp, new_amp) = stream::set_amplification(req_amp);
debug!( debug!(
scale = ?config.stream_width_scale, scale = %config.stream_width_scale,
?num_queues, %num_queues,
?req_width, %req_width,
?old_width, %old_width,
?new_width, %new_width,
?old_amp, %old_amp,
?new_amp, %new_amp,
"Updated global stream width" "Updated global stream width"
); );
} }
+3 -2
View File
@@ -6,6 +6,7 @@ use conduwuit_core::{
debug_warn, err, debug_warn, err,
log::{ConsoleFormat, ConsoleWriter, LogLevelReloadHandles, capture, fmt_span}, log::{ConsoleFormat, ConsoleWriter, LogLevelReloadHandles, capture, fmt_span},
result::UnwrapOrErr, result::UnwrapOrErr,
warn,
}; };
#[cfg(feature = "otlp_telemetry")] #[cfg(feature = "otlp_telemetry")]
use opentelemetry::trace::TracerProvider; use opentelemetry::trace::TracerProvider;
@@ -85,7 +86,7 @@ pub(crate) fn init(
let exporter = match config.otlp_protocol.as_str() { let exporter = match config.otlp_protocol.as_str() {
| "grpc" => opentelemetry_otlp::SpanExporter::builder() | "grpc" => opentelemetry_otlp::SpanExporter::builder()
.with_tonic() .with_tonic()
.with_protocol(opentelemetry_otlp::Protocol::Grpc) .with_protocol(opentelemetry_otlp::Protocol::Grpc) // TODO: build from env when 0.32 is released
.build() .build()
.expect("Failed to create OTLP gRPC exporter"), .expect("Failed to create OTLP gRPC exporter"),
| "http" => opentelemetry_otlp::SpanExporter::builder() | "http" => opentelemetry_otlp::SpanExporter::builder()
@@ -93,7 +94,7 @@ pub(crate) fn init(
.build() .build()
.expect("Failed to create OTLP HTTP exporter"), .expect("Failed to create OTLP HTTP exporter"),
| protocol => { | protocol => {
debug_warn!( warn!(
"Invalid OTLP protocol '{}', falling back to HTTP. Valid options are \ "Invalid OTLP protocol '{}', falling back to HTTP. Valid options are \
'http' or 'grpc'.", 'http' or 'grpc'.",
protocol protocol
+2 -1
View File
@@ -50,7 +50,8 @@ pub fn run_with_args(args: &Args) -> Result<()> {
#[tracing::instrument( #[tracing::instrument(
name = "main", name = "main",
parent = None, parent = None,
skip_all skip_all,
level = "info"
)] )]
async fn async_main(server: &Arc<Server>) -> Result<(), Error> { async fn async_main(server: &Arc<Server>) -> Result<(), Error> {
extern crate conduwuit_router as router; extern crate conduwuit_router as router;
+3 -3
View File
@@ -6,7 +6,7 @@ use tokio::signal;
use super::server::Server; use super::server::Server;
#[cfg(unix)] #[cfg(unix)]
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "info")]
pub(super) async fn signal(server: Arc<Server>) { pub(super) async fn signal(server: Arc<Server>) {
use signal::unix; use signal::unix;
use unix::SignalKind; use unix::SignalKind;
@@ -39,13 +39,13 @@ pub(super) async fn signal(server: Arc<Server>) {
}; };
if let Err(e) = result { if let Err(e) = result {
debug_error!(?sig, "signal: {e}"); debug_error!(%sig, "signal: {e}");
} }
} }
} }
#[cfg(not(unix))] #[cfg(not(unix))]
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "info")]
pub(super) async fn signal(server: Arc<Server>) { pub(super) async fn signal(server: Arc<Server>) {
loop { loop {
tokio::select! { tokio::select! {
+4 -1
View File
@@ -66,7 +66,10 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
.layer(RequestBodyTimeoutLayer::new(Duration::from_secs( .layer(RequestBodyTimeoutLayer::new(Duration::from_secs(
server.config.client_receive_timeout, server.config.client_receive_timeout,
))) )))
.layer(TimeoutLayer::with_status_code(StatusCode::REQUEST_TIMEOUT, Duration::from_secs(server.config.client_request_timeout))) .layer(TimeoutLayer::with_status_code(
StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(server.config.client_request_timeout),
))
.layer(SetResponseHeaderLayer::if_not_present( .layer(SetResponseHeaderLayer::if_not_present(
HeaderName::from_static("origin-agent-cluster"), // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Origin-Agent-Cluster HeaderName::from_static("origin-agent-cluster"), // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Origin-Agent-Cluster
HeaderValue::from_static("?1"), HeaderValue::from_static("?1"),
+4 -4
View File
@@ -102,13 +102,13 @@ fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Respons
let reason = status.canonical_reason().unwrap_or("Unknown Reason"); let reason = status.canonical_reason().unwrap_or("Unknown Reason");
if status.is_server_error() { if status.is_server_error() {
error!(method = ?method, uri = ?uri, "{code} {reason}"); error!(%method, %uri, "{code} {reason}");
} else if status.is_client_error() { } else if status.is_client_error() {
debug_error!(method = ?method, uri = ?uri, "{code} {reason}"); debug_error!(%method, %uri, "{code} {reason}");
} else if status.is_redirection() { } else if status.is_redirection() {
debug!(method = ?method, uri = ?uri, "{code} {reason}"); debug!(%method, %uri, "{code} {reason}");
} else { } else {
trace!(method = ?method, uri = ?uri, "{code} {reason}"); trace!(%method, %uri, "{code} {reason}");
} }
if status == StatusCode::METHOD_NOT_ALLOWED { if status == StatusCode::METHOD_NOT_ALLOWED {
+5 -5
View File
@@ -19,7 +19,7 @@ use tokio::{
use crate::serve; use crate::serve;
/// Main loop base /// Main loop base
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "info")]
pub(crate) async fn run(services: Arc<Services>) -> Result<()> { pub(crate) async fn run(services: Arc<Services>) -> Result<()> {
let server = &services.server; let server = &services.server;
debug!("Start"); debug!("Start");
@@ -58,7 +58,7 @@ pub(crate) async fn run(services: Arc<Services>) -> Result<()> {
} }
/// Async initializations /// Async initializations
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "info")]
pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> { pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
debug!("Starting..."); debug!("Starting...");
@@ -73,7 +73,7 @@ pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
} }
/// Async destructions /// Async destructions
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "info")]
pub(crate) async fn stop(services: Arc<Services>) -> Result<()> { pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
debug!("Shutting down..."); debug!("Shutting down...");
@@ -108,7 +108,7 @@ pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all, level = "info")]
async fn signal(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) { async fn signal(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
server server
.clone() .clone()
@@ -126,7 +126,7 @@ async fn handle_shutdown(server: Arc<Server>, tx: Sender<()>, handle: axum_serve
let timeout = Duration::from_secs(timeout); let timeout = Duration::from_secs(timeout);
debug!( debug!(
?timeout, ?timeout,
handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed), handle_active = %server.metrics.requests_handle_active.load(Ordering::Relaxed),
"Notifying for graceful shutdown" "Notifying for graceful shutdown"
); );
+9 -2
View File
@@ -9,7 +9,10 @@ use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
use termimad::MadSkin; use termimad::MadSkin;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::{Dep, admin}; use crate::{
Dep,
admin::{self, InvocationSource},
};
pub struct Console { pub struct Console {
server: Arc<Server>, server: Arc<Server>,
@@ -160,7 +163,11 @@ impl Console {
} }
async fn process(self: Arc<Self>, line: String) { async fn process(self: Arc<Self>, line: String) {
match self.admin.command_in_place(line, None).await { match self
.admin
.command_in_place(line, None, InvocationSource::Console)
.await
{
| Ok(Some(ref content)) => self.output(content), | Ok(Some(ref content)) => self.output(content),
| Err(ref content) => self.output_err(content), | Err(ref content) => self.output_err(content),
| _ => unreachable!(), | _ => unreachable!(),
+6 -1
View File
@@ -2,6 +2,8 @@ use conduwuit::{Err, Result, debug, debug_info, error, implement, info};
use ruma::events::room::message::RoomMessageEventContent; use ruma::events::room::message::RoomMessageEventContent;
use tokio::time::{Duration, sleep}; use tokio::time::{Duration, sleep};
use crate::admin::InvocationSource;
pub(super) const SIGNAL: &str = "SIGUSR2"; pub(super) const SIGNAL: &str = "SIGUSR2";
/// Possibly spawn the terminal console at startup if configured. /// Possibly spawn the terminal console at startup if configured.
@@ -88,7 +90,10 @@ pub(super) async fn signal_execute(&self) -> Result {
async fn execute_command(&self, i: usize, command: String) -> Result { async fn execute_command(&self, i: usize, command: String) -> Result {
debug!("Execute command #{i}: executing {command:?}"); debug!("Execute command #{i}: executing {command:?}");
match self.command_in_place(command, None).await { match self
.command_in_place(command, None, InvocationSource::Console)
.await
{
| Ok(Some(output)) => Self::execute_command_output(i, &output), | Ok(Some(output)) => Self::execute_command_output(i, &output),
| Err(output) => Self::execute_command_error(i, &output), | Err(output) => Self::execute_command_error(i, &output),
| Ok(None) => { | Ok(None) => {
+17 -2
View File
@@ -1,6 +1,8 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use conduwuit::{Err, Result, debug_info, debug_warn, error, implement, matrix::pdu::PduBuilder}; use conduwuit::{
Err, Result, debug_info, debug_warn, error, implement, matrix::pdu::PduBuilder, warn,
};
use ruma::{ use ruma::{
RoomId, UserId, RoomId, UserId,
events::{ events::{
@@ -120,7 +122,7 @@ pub async fn make_user_admin(&self, user_id: &UserId) -> Result {
let room_tag = self.services.server.config.admin_room_tag.as_str(); let room_tag = self.services.server.config.admin_room_tag.as_str();
if !room_tag.is_empty() { if !room_tag.is_empty() {
if let Err(e) = self.set_room_tag(&room_id, user_id, room_tag).await { if let Err(e) = self.set_room_tag(&room_id, user_id, room_tag).await {
error!(?room_id, ?user_id, ?room_tag, "Failed to set tag for admin grant: {e}"); error!(%room_id, %user_id, %room_tag, "Failed to set tag for admin grant: {e}");
} }
} }
@@ -176,6 +178,19 @@ async fn set_room_tag(&self, room_id: &RoomId, user_id: &UserId, tag: &str) -> R
pub async fn revoke_admin(&self, user_id: &UserId) -> Result { pub async fn revoke_admin(&self, user_id: &UserId) -> Result {
use MembershipState::{Invite, Join, Knock, Leave}; use MembershipState::{Invite, Join, Knock, Leave};
if self
.services
.server
.config
.admins_list
.contains(&user_id.to_owned())
{
warn!(
"Revoking the admin status of {user_id} will not work correctly as they are within \
the admins_list config."
);
}
let Ok(room_id) = self.get_admin_room().await else { let Ok(room_id) = self.get_admin_room().await else {
return Err!(error!("No admin room available or created.")); return Err!(error!("No admin room available or created."));
}; };
+129 -61
View File
@@ -14,10 +14,10 @@ use conduwuit_core::{
Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
}; };
pub use create::create_admin_room; pub use create::create_admin_room;
use futures::{Future, FutureExt, TryFutureExt}; use futures::{Future, FutureExt, StreamExt, TryFutureExt};
use loole::{Receiver, Sender}; use loole::{Receiver, Sender};
use ruma::{ use ruma::{
Mxc, OwnedEventId, OwnedMxcUri, OwnedRoomId, RoomId, UInt, UserId, Mxc, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
events::{ events::{
Mentions, Mentions,
room::{ room::{
@@ -54,15 +54,37 @@ struct Services {
media: Dep<crate::media::Service>, media: Dep<crate::media::Service>,
} }
/// Inputs to a command are a multi-line string, optional reply_id, and optional /// Inputs to a command are a multi-line string, invocation source, optional
/// sender. /// reply_id, and optional sender.
#[derive(Debug)] #[derive(Debug)]
pub struct CommandInput { pub struct CommandInput {
pub command: String, pub command: String,
pub reply_id: Option<OwnedEventId>, pub reply_id: Option<OwnedEventId>,
pub source: InvocationSource,
pub sender: Option<Box<UserId>>, pub sender: Option<Box<UserId>>,
} }
/// Where a command is being invoked from.
#[derive(Debug, Clone, Copy)]
pub enum InvocationSource {
/// The server's private admin room
AdminRoom,
/// An escaped `\!admin` command in a public room
EscapedCommand,
/// The server's admin console
Console,
/// Some other trusted internal source
Internal,
}
impl InvocationSource {
/// Returns whether this invocation source allows "restricted"
/// commands, i.e. ones that could be potentially dangerous if executed by
/// an attacker or in a public room.
#[must_use]
pub fn allows_restricted(&self) -> bool { !matches!(self, Self::EscapedCommand) }
}
/// Prototype of the tab-completer. The input is buffered text when tab /// Prototype of the tab-completer. The input is buffered text when tab
/// asserted; the output will fully replace the input buffer. /// asserted; the output will fully replace the input buffer.
pub type Completer = fn(&str) -> String; pub type Completer = fn(&str) -> String;
@@ -276,10 +298,15 @@ impl Service {
/// Posts a command to the command processor queue and returns. Processing /// Posts a command to the command processor queue and returns. Processing
/// will take place on the service worker's task asynchronously. Errors if /// will take place on the service worker's task asynchronously. Errors if
/// the queue is full. /// the queue is full.
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> { pub fn command(
&self,
command: String,
reply_id: Option<OwnedEventId>,
source: InvocationSource,
) -> Result<()> {
self.channel self.channel
.0 .0
.send(CommandInput { command, reply_id, sender: None }) .send(CommandInput { command, reply_id, source, sender: None })
.map_err(|e| err!("Failed to enqueue admin command: {e:?}")) .map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
} }
@@ -290,11 +317,17 @@ impl Service {
&self, &self,
command: String, command: String,
reply_id: Option<OwnedEventId>, reply_id: Option<OwnedEventId>,
source: InvocationSource,
sender: Box<UserId>, sender: Box<UserId>,
) -> Result<()> { ) -> Result<()> {
self.channel self.channel
.0 .0
.send(CommandInput { command, reply_id, sender: Some(sender) }) .send(CommandInput {
command,
reply_id,
source,
sender: Some(sender),
})
.map_err(|e| err!("Failed to enqueue admin command: {e:?}")) .map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
} }
@@ -304,8 +337,9 @@ impl Service {
&self, &self,
command: String, command: String,
reply_id: Option<OwnedEventId>, reply_id: Option<OwnedEventId>,
source: InvocationSource,
) -> ProcessorResult { ) -> ProcessorResult {
self.process_command(CommandInput { command, reply_id, sender: None }) self.process_command(CommandInput { command, reply_id, source, sender: None })
.await .await
} }
@@ -349,16 +383,50 @@ impl Service {
handle(services, command).await handle(services, command).await
} }
/// Returns the list of admins for this server. First loads
/// the admin_list from the configuration, then adds users from
/// the admin room if applicable.
pub async fn get_admins(&self) -> Vec<OwnedUserId> {
let mut generated_admin_list: Vec<OwnedUserId> =
self.services.server.config.admins_list.clone();
if self.services.server.config.admins_from_room {
if let Ok(admin_room) = self.get_admin_room().await {
let admin_users = self.services.state_cache.room_members(&admin_room);
let mut stream = admin_users;
while let Some(user_id) = stream.next().await {
generated_admin_list.push(user_id.to_owned());
}
}
}
generated_admin_list
}
/// Checks whether a given user is an admin of this server /// Checks whether a given user is an admin of this server
pub async fn user_is_admin(&self, user_id: &UserId) -> bool { pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
let Ok(admin_room) = self.get_admin_room().await else { if self
return false; .services
}; .server
.config
.admins_list
.contains(&user_id.to_owned())
{
return true;
}
self.services if self.services.server.config.admins_from_room {
.state_cache if let Ok(admin_room) = self.get_admin_room().await {
.is_joined(user_id, &admin_room) return self
.await .services
.state_cache
.is_joined(user_id, &admin_room)
.await;
}
}
false
} }
/// Gets the room ID of the admin room /// Gets the room ID of the admin room
@@ -459,59 +527,59 @@ impl Service {
Ok(()) Ok(())
} }
pub async fn is_admin_command<E>(&self, event: &E, body: &str) -> bool pub async fn is_admin_command<E>(&self, event: &E, body: &str) -> Option<InvocationSource>
where where
E: Event + Send + Sync, E: Event + Send + Sync,
{ {
// Server-side command-escape with public echo // If the user isn't an admin they definitely can't run admin commands
let is_escape = body.starts_with('\\');
let is_public_escape = is_escape && body.trim_start_matches('\\').starts_with("!admin");
// Admin command with public echo (in admin room)
let server_user = &self.services.globals.server_user;
let is_public_prefix =
body.starts_with("!admin") || body.starts_with(server_user.as_str());
// Expected backward branch
if !is_public_escape && !is_public_prefix {
return false;
}
let user_is_local = self.services.globals.user_is_local(event.sender());
// only allow public escaped commands by local admins
if is_public_escape && !user_is_local {
return false;
}
// Check if server-side command-escape is disabled by configuration
if is_public_escape && !self.services.server.config.admin_escape_commands {
return false;
}
// Prevent unescaped !admin from being used outside of the admin room
if event.room_id().is_some()
&& is_public_prefix
&& !self.is_admin_room(event.room_id().unwrap()).await
{
return false;
}
// Only senders who are admin can proceed
if !self.user_is_admin(event.sender()).await { if !self.user_is_admin(event.sender()).await {
return false; return None;
} }
// This will evaluate to false if the emergency password is set up so that if let Some(room_id) = event.room_id()
// the administrator can execute commands as the server user && self.is_admin_room(room_id).await
let emergency_password_set = self.services.server.config.emergency_password.is_some(); {
let from_server = event.sender() == server_user && !emergency_password_set; // This is a message in the admin room
if from_server && self.is_admin_room(event.room_id().unwrap()).await {
return false;
}
// Authentic admin command // Ignore messages which aren't admin commands
true let server_user = &self.services.globals.server_user;
if !(body.starts_with("!admin") || body.starts_with(server_user.as_str())) {
return None;
}
// Ignore messages from the server user _unless_ the emergency password is set
let emergency_password_set = self.services.server.config.emergency_password.is_some();
if event.sender() == server_user && !emergency_password_set {
return None;
}
// Looks good
Some(InvocationSource::AdminRoom)
} else {
// This is a message outside the admin room
// Is it an escaped admin command? i.e. `\!admin --help`
let is_public_escape =
body.starts_with('\\') && body.trim_start_matches('\\').starts_with("!admin");
// Ignore the message if it's not
if !is_public_escape {
return None;
}
// Only admin users belonging to this server can use escaped commands
if !self.services.globals.user_is_local(event.sender()) {
return None;
}
// Check if escaped commands are disabled in the config
if !self.services.server.config.admin_escape_commands {
return None;
}
// Looks good
Some(InvocationSource::EscapedCommand)
}
} }
#[must_use] #[must_use]
+1 -1
View File
@@ -96,7 +96,7 @@ impl crate::Service for Service {
} }
if let Err(e) = self.check().await { if let Err(e) = self.check().await {
warn!(%e, "Failed to check for announcements"); warn!(?e, "Failed to check for announcements");
} }
} }
+7 -7
View File
@@ -88,7 +88,7 @@ where
let url = request.url().clone(); let url = request.url().clone();
let method = request.method().clone(); let method = request.method().clone();
debug!(?method, ?url, "Sending request"); debug!(%method, %url, "Sending request");
match client.execute(request).await { match client.execute(request).await {
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await, | Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
| Err(error) => | Err(error) =>
@@ -144,9 +144,9 @@ async fn into_http_response(
) -> Result<http::Response<Bytes>> { ) -> Result<http::Response<Bytes>> {
let status = response.status(); let status = response.status();
trace!( trace!(
?status, ?method, %status, %method,
request_url = ?url, request_url = %url,
response_url = ?response.url(), response_url = %response.url(),
"Received response from {}", "Received response from {}",
actual.string(), actual.string(),
); );
@@ -196,9 +196,9 @@ fn handle_error(
debug_warn!("{e:?}"); debug_warn!("{e:?}");
} else if e.is_redirect() { } else if e.is_redirect() {
debug_error!( debug_error!(
method = ?method, %method,
url = ?url, %url,
final_url = ?e.url(), final_url = e.url().map(tracing::field::display),
"Redirect loop {}: {}", "Redirect loop {}: {}",
actual.host, actual.host,
e, e,
+3 -3
View File
@@ -124,16 +124,16 @@ impl Data {
.next() .next()
.map(string_from_bytes) .map(string_from_bytes)
.transpose() .transpose()
.map_err(|e| err!(Database(error!(?mxc, "Content-type is invalid: {e}"))))?; .map_err(|e| err!(Database(error!(%mxc, "Content-type is invalid: {e}"))))?;
let content_disposition = parts let content_disposition = parts
.next() .next()
.map(Some) .map(Some)
.ok_or_else(|| err!(Database(error!(?mxc, "Media ID in db is invalid."))))? .ok_or_else(|| err!(Database(error!(%mxc, "Media ID in db is invalid."))))?
.filter(|bytes| !bytes.is_empty()) .filter(|bytes| !bytes.is_empty())
.map(string_from_bytes) .map(string_from_bytes)
.transpose() .transpose()
.map_err(|e| err!(Database(error!(?mxc, "Content-type is invalid: {e}"))))? .map_err(|e| err!(Database(error!(%mxc, "Content-type is invalid: {e}"))))?
.as_deref() .as_deref()
.map(str::parse) .map(str::parse)
.transpose()?; .transpose()?;
+7 -7
View File
@@ -118,14 +118,14 @@ impl Service {
match self.db.search_mxc_metadata_prefix(mxc).await { match self.db.search_mxc_metadata_prefix(mxc).await {
| Ok(keys) => { | Ok(keys) => {
for key in keys { for key in keys {
trace!(?mxc, "MXC Key: {key:?}"); trace!(%mxc, "MXC Key: {key:?}");
debug_info!(?mxc, "Deleting from filesystem"); debug_info!(%mxc, "Deleting from filesystem");
if let Err(e) = self.remove_media_file(&key).await { if let Err(e) = self.remove_media_file(&key).await {
debug_error!(?mxc, "Failed to remove media file: {e}"); debug_error!(%mxc, "Failed to remove media file: {e}");
} }
debug_info!(?mxc, "Deleting from database"); debug_info!(%mxc, "Deleting from database");
self.db.delete_file_mxc(mxc).await; self.db.delete_file_mxc(mxc).await;
} }
@@ -148,7 +148,7 @@ impl Service {
for mxc in mxcs { for mxc in mxcs {
let Ok(mxc) = mxc.as_str().try_into().inspect_err(|e| { let Ok(mxc) = mxc.as_str().try_into().inspect_err(|e| {
debug_error!(?mxc, "Failed to parse MXC URI from database: {e}"); debug_error!(%mxc, "Failed to parse MXC URI from database: {e}");
}) else { }) else {
continue; continue;
}; };
@@ -210,7 +210,7 @@ impl Service {
let Some(mxc_s) = mxc else { let Some(mxc_s) = mxc else {
debug_warn!( debug_warn!(
?mxc, mxc,
"Parsed MXC URL unicode bytes from database but is still invalid" "Parsed MXC URL unicode bytes from database but is still invalid"
); );
continue; continue;
@@ -256,7 +256,7 @@ impl Service {
let Some(mxc_s) = mxc else { let Some(mxc_s) = mxc else {
debug_warn!( debug_warn!(
?mxc, mxc,
"Parsed MXC URL unicode bytes from database but is still invalid" "Parsed MXC URL unicode bytes from database but is still invalid"
); );
continue; continue;
+2 -2
View File
@@ -71,10 +71,10 @@ async fn request_url_preview(&self, url: &Url) -> Result<UrlPreviewData> {
let client = &self.services.client.url_preview; let client = &self.services.client.url_preview;
let response = client.head(url.as_str()).send().await?; let response = client.head(url.as_str()).send().await?;
debug!(?url, "URL preview response headers: {:?}", response.headers()); debug!(%url, "URL preview response headers: {:?}", response.headers());
if let Some(remote_addr) = response.remote_addr() { if let Some(remote_addr) = response.remote_addr() {
debug!(?url, "URL preview response remote address: {:?}", remote_addr); debug!(%url, "URL preview response remote address: {:?}", remote_addr);
if let Ok(ip) = IPAddress::parse(remote_addr.ip().to_string()) { if let Ok(ip) = IPAddress::parse(remote_addr.ip().to_string()) {
if !self.services.client.valid_cidr_range(&ip) { if !self.services.client.valid_cidr_range(&ip) {
+2 -2
View File
@@ -247,7 +247,7 @@ async fn handle_location(
) -> Result<FileMeta> { ) -> Result<FileMeta> {
self.location_request(location).await.map_err(|error| { self.location_request(location).await.map_err(|error| {
err!(Request(NotFound( err!(Request(NotFound(
debug_warn!(%mxc, ?user, ?location, ?error, "Fetching media from location failed") debug_warn!(%mxc, user = user.map(tracing::field::display), ?location, ?error, "Fetching media from location failed")
))) )))
}) })
} }
@@ -320,7 +320,7 @@ fn handle_federation_error(
) -> Error { ) -> Error {
let fallback = || { let fallback = || {
err!(Request(NotFound( err!(Request(NotFound(
debug_error!(%mxc, ?user, ?server, ?error, "Remote media not found") debug_error!(%mxc, user = user.map(tracing::field::display), server = server.map(tracing::field::display), ?error, "Remote media not found")
))) )))
}; };
+1 -1
View File
@@ -120,7 +120,7 @@ async fn get_thumbnail_generate(
let mut cursor = std::io::Cursor::new(&mut thumbnail_bytes); let mut cursor = std::io::Cursor::new(&mut thumbnail_bytes);
thumbnail thumbnail
.write_to(&mut cursor, image::ImageFormat::Png) .write_to(&mut cursor, image::ImageFormat::Png)
.map_err(|error| err!(error!(?error, "Error writing PNG thumbnail.")))?; .map_err(|error| err!(error!(%error, "Error writing PNG thumbnail.")))?;
// Save thumbnail in database so we don't have to generate it again next time // Save thumbnail in database so we don't have to generate it again next time
let thumbnail_key = self.db.create_file_metadata( let thumbnail_key = self.db.create_file_metadata(
+3 -3
View File
@@ -677,7 +677,7 @@ async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
shortstatehash_cache.insert(room_id.to_owned(), shortstatehash); shortstatehash_cache.insert(room_id.to_owned(), shortstatehash);
shortstatehash shortstatehash
} else { } else {
warn!(?room_id, ?user_id, "room has no shortstatehash"); warn!(%room_id, %user_id, "room has no shortstatehash");
return Ok((total, fixed, shortstatehash_cache)); return Ok((total, fixed, shortstatehash_cache));
}; };
@@ -698,8 +698,8 @@ async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
}, },
| Err(_) => { | Err(_) => {
warn!( warn!(
?room_id, %room_id,
?user_id, %user_id,
"room cached as left has no leave event for user, removing \ "room cached as left has no leave event for user, removing \
cache entry" cache entry"
); );
+2 -2
View File
@@ -198,11 +198,11 @@ impl Service {
presence.presence, presence.presence,
PresenceState::Unavailable | PresenceState::Online | PresenceState::Busy PresenceState::Unavailable | PresenceState::Online | PresenceState::Busy
) { ) {
trace!(?user_id, ?presence, "Skipping user"); trace!(%user_id, ?presence, "Skipping user");
continue; continue;
} }
trace!(?user_id, ?presence, "Resetting presence to offline"); trace!(%user_id, ?presence, "Resetting presence to offline");
_ = self _ = self
.set_presence( .set_presence(
+15 -15
View File
@@ -427,20 +427,20 @@ impl Service {
} }
let d = vec![device]; let d = vec![device];
let mut notifi = Notification::new(d); let mut notify = Notification::new(d);
notifi.event_id = Some(event.event_id().to_owned()); notify.event_id = Some(event.event_id().to_owned());
notifi.room_id = Some(event.room_id().unwrap().to_owned()); notify.room_id = Some(event.room_id().unwrap().to_owned());
if http if http
.data .data
.get("org.matrix.msc4076.disable_badge_count") .get("org.matrix.msc4076.disable_badge_count")
.is_none() && http.data.get("disable_badge_count").is_none() .is_none() && http.data.get("disable_badge_count").is_none()
{ {
notifi.counts = NotificationCounts::new(unread, uint!(0)); notify.counts = NotificationCounts::new(unread, uint!(0));
} else { } else {
// counts will not be serialised if it's the default (0, 0) // counts will not be serialised if it's the default (0, 0)
// skip_serializing_if = "NotificationCounts::is_default" // skip_serializing_if = "NotificationCounts::is_default"
notifi.counts = NotificationCounts::default(); notify.counts = NotificationCounts::default();
} }
if !event_id_only { if !event_id_only {
@@ -449,30 +449,30 @@ impl Service {
.iter() .iter()
.any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
{ {
notifi.prio = NotificationPriority::High; notify.prio = NotificationPriority::High;
} else { } else {
notifi.prio = NotificationPriority::Low; notify.prio = NotificationPriority::Low;
} }
notifi.sender = Some(event.sender().to_owned()); notify.sender = Some(event.sender().to_owned());
notifi.event_type = Some(event.kind().to_owned()); notify.event_type = Some(event.kind().to_owned());
notifi.content = serde_json::value::to_raw_value(event.content()).ok(); notify.content = serde_json::value::to_raw_value(event.content()).ok();
if *event.kind() == TimelineEventType::RoomMember { if *event.kind() == TimelineEventType::RoomMember {
notifi.user_is_target = notify.user_is_target =
event.state_key() == Some(event.sender().as_str()); event.state_key() == Some(event.sender().as_str());
} }
notifi.sender_display_name = notify.sender_display_name =
self.services.users.displayname(event.sender()).await.ok(); self.services.users.displayname(event.sender()).await.ok();
notifi.room_name = self notify.room_name = self
.services .services
.state_accessor .state_accessor
.get_name(event.room_id().unwrap()) .get_name(event.room_id().unwrap())
.await .await
.ok(); .ok();
notifi.room_alias = self notify.room_alias = self
.services .services
.state_accessor .state_accessor
.get_canonical_alias(event.room_id().unwrap()) .get_canonical_alias(event.room_id().unwrap())
@@ -480,7 +480,7 @@ impl Service {
.ok(); .ok();
} }
self.send_request(&http.url, send_event_notification::v1::Request::new(notifi)) self.send_request(&http.url, send_event_notification::v1::Request::new(notify))
.await?; .await?;
Ok(()) Ok(())
+6 -6
View File
@@ -151,7 +151,7 @@ async fn get_auth_chain_outer(
let auth_chain = self.get_auth_chain_inner(room_id, event_id).await?; let auth_chain = self.get_auth_chain_inner(room_id, event_id).await?;
self.cache_auth_chain_vec(vec![shortid], auth_chain.as_slice()); self.cache_auth_chain_vec(vec![shortid], auth_chain.as_slice());
debug!( debug!(
?event_id, %event_id,
elapsed = ?started.elapsed(), elapsed = ?started.elapsed(),
"Cache missed event" "Cache missed event"
); );
@@ -188,18 +188,18 @@ async fn get_auth_chain_inner(
let mut found = HashSet::new(); let mut found = HashSet::new();
while let Some(event_id) = todo.pop_front() { while let Some(event_id) = todo.pop_front() {
trace!(?event_id, "processing auth event"); trace!(%event_id, "processing auth event");
match self.services.timeline.get_pdu(&event_id).await { match self.services.timeline.get_pdu(&event_id).await {
| Err(e) => { | Err(e) => {
debug_error!(?event_id, ?e, "Could not find pdu mentioned in auth events"); debug_error!(%event_id, ?e, "Could not find pdu mentioned in auth events");
}, },
| Ok(pdu) => { | Ok(pdu) => {
if let Some(claimed_room_id) = pdu.room_id.clone() { if let Some(claimed_room_id) = pdu.room_id.clone() {
if claimed_room_id != *room_id { if claimed_room_id != *room_id {
return Err!(Request(Forbidden(error!( return Err!(Request(Forbidden(error!(
?event_id, %event_id,
?room_id, %room_id,
wrong_room_id = ?pdu.room_id.unwrap(), wrong_room_id = ?pdu.room_id.unwrap(),
"auth event for incorrect room" "auth event for incorrect room"
)))); ))));
@@ -214,7 +214,7 @@ async fn get_auth_chain_inner(
.await; .await;
if found.insert(sauthevent) { if found.insert(sauthevent) {
trace!(?event_id, ?auth_event, "adding auth event to processing queue"); trace!(%event_id, ?auth_event, "adding auth event to processing queue");
todo.push_back(auth_event.clone()); todo.push_back(auth_event.clone());
} }
+3 -3
View File
@@ -104,9 +104,9 @@ fn check_room_id<Pdu: Event>(room_id: &RoomId, pdu: &Pdu) -> Result {
.is_some_and(|claimed_room_id| claimed_room_id != room_id) .is_some_and(|claimed_room_id| claimed_room_id != room_id)
{ {
return Err!(Request(InvalidParam(error!( return Err!(Request(InvalidParam(error!(
pdu_event_id = ?pdu.event_id(), pdu_event_id = %pdu.event_id(),
pdu_room_id = ?pdu.room_id(), pdu_room_id = pdu.room_id().map(tracing::field::display),
?room_id, %room_id,
"Found event from room in room", "Found event from room in room",
)))); ))));
} }
@@ -31,7 +31,7 @@ use serde_json::value::RawValue;
/// contacted for whatever reason, Err(e) is returned, which generally is a /// contacted for whatever reason, Err(e) is returned, which generally is a
/// fail-open operation. /// fail-open operation.
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(skip(self, pdu, pdu_json, room_id))] #[tracing::instrument(skip(self, pdu, pdu_json, room_id), level = "info")]
pub async fn ask_policy_server( pub async fn ask_policy_server(
&self, &self,
pdu: &PduEvent, pdu: &PduEvent,
@@ -184,7 +184,7 @@ pub async fn ask_policy_server(
/// Asks a remote policy server for a signature on this event. /// Asks a remote policy server for a signature on this event.
/// If the policy server signs this event, the original data is mutated. /// If the policy server signs this event, the original data is mutated.
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via))] #[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via), level = "info")]
pub async fn fetch_policy_server_signature( pub async fn fetch_policy_server_signature(
&self, &self,
pdu: &PduEvent, pdu: &PduEvent,
+2 -1
View File
@@ -335,10 +335,11 @@ where
if let Some(body) = content.body { if let Some(body) = content.body {
self.services.search.index_pdu(shortroomid, &pdu_id, &body); self.services.search.index_pdu(shortroomid, &pdu_id, &body);
if self.services.admin.is_admin_command(pdu, &body).await { if let Some(source) = self.services.admin.is_admin_command(pdu, &body).await {
self.services.admin.command_with_sender( self.services.admin.command_with_sender(
body, body,
Some((pdu.event_id()).into()), Some((pdu.event_id()).into()),
source,
pdu.sender.clone().into(), pdu.sender.clone().into(),
)?; )?;
} }
+2 -2
View File
@@ -28,7 +28,7 @@ pub async fn redact_pdu<Pdu: Event + Send + Sync>(
.await .await
.map(Event::into_pdu) .map(Event::into_pdu)
.map_err(|e| { .map_err(|e| {
err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU."))) err!(Database(error!(?pdu_id, %event_id, ?e, "PDU ID points to invalid PDU.")))
})?; })?;
if let Ok(content) = pdu.get_content::<ExtractBody>() { if let Ok(content) = pdu.get_content::<ExtractBody>() {
@@ -48,7 +48,7 @@ pub async fn redact_pdu<Pdu: Event + Send + Sync>(
pdu.redact(&room_version_id, reason.to_value())?; pdu.redact(&room_version_id, reason.to_value())?;
let obj = utils::to_canonical_object(&pdu).map_err(|e| { let obj = utils::to_canonical_object(&pdu).map_err(|e| {
err!(Database(error!(?event_id, ?e, "Failed to convert PDU to canonical JSON"))) err!(Database(error!(%event_id, ?e, "Failed to convert PDU to canonical JSON")))
})?; })?;
self.replace_pdu(&pdu_id, &obj).await self.replace_pdu(&pdu_id, &obj).await
+3 -3
View File
@@ -535,12 +535,12 @@ impl Service {
} }
let Ok(event) = serde_json::from_str(read_receipt.json().get()) else { let Ok(event) = serde_json::from_str(read_receipt.json().get()) else {
error!(?user_id, ?count, ?read_receipt, "Invalid edu event in read_receipts."); error!(%user_id, %count, ?read_receipt, "Invalid edu event in read_receipts.");
continue; continue;
}; };
let AnySyncEphemeralRoomEvent::Receipt(r) = event else { let AnySyncEphemeralRoomEvent::Receipt(r) = event else {
error!(?user_id, ?count, ?event, "Invalid event type in read_receipts"); error!(%user_id, %count, ?event, "Invalid event type in read_receipts");
continue; continue;
}; };
@@ -755,7 +755,7 @@ impl Service {
let Ok(pusher) = self.services.pusher.get_pusher(&user_id, &pushkey).await else { let Ok(pusher) = self.services.pusher.get_pusher(&user_id, &pushkey).await else {
return Err(( return Err((
Destination::Push(user_id.clone(), pushkey.clone()), Destination::Push(user_id.clone(), pushkey.clone()),
err!(Database(error!(?user_id, ?pushkey, "Missing pusher"))), err!(Database(error!(%user_id, ?pushkey, "Missing pusher"))),
)); ));
}; };
+3 -3
View File
@@ -118,7 +118,7 @@ where
} }
for (server, key_ids) in missing { for (server, key_ids) in missing {
debug_warn!(?server, ?key_ids, "missing"); debug_warn!(%server, ?key_ids, "missing");
} }
} }
@@ -174,8 +174,8 @@ async fn acquire_origin(
timeout: Instant, timeout: Instant,
) -> (OwnedServerName, Vec<OwnedServerSigningKeyId>) { ) -> (OwnedServerName, Vec<OwnedServerSigningKeyId>) {
match timeout_at(timeout, self.server_request(&origin)).await { match timeout_at(timeout, self.server_request(&origin)).await {
| Err(e) => debug_warn!(?origin, "timed out: {e}"), | Err(e) => debug_warn!(%origin, "timed out: {e}"),
| Ok(Err(e)) => debug_error!(?origin, "{e}"), | Ok(Err(e)) => debug_error!(%origin, "{e}"),
| Ok(Ok(server_keys)) => { | Ok(Ok(server_keys)) => {
trace!( trace!(
%origin, %origin,
+3 -3
View File
@@ -63,7 +63,7 @@ where
} }
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self), level = "debug")]
pub async fn get_verify_key( pub async fn get_verify_key(
&self, &self,
origin: &ServerName, origin: &ServerName,
@@ -96,8 +96,8 @@ pub async fn get_verify_key(
} }
Err!(BadServerResponse(debug_error!( Err!(BadServerResponse(debug_error!(
?key_id, %key_id,
?origin, %origin,
"Failed to fetch federation signing-key" "Failed to fetch federation signing-key"
))) )))
} }
+2 -2
View File
@@ -112,7 +112,7 @@ async fn add_signing_keys(&self, new_keys: ServerSigningKeys) {
} }
#[implement(Service)] #[implement(Service)]
#[tracing::instrument(skip(self, object))] #[tracing::instrument(skip(self, object), level = "debug")]
pub async fn required_keys_exist( pub async fn required_keys_exist(
&self, &self,
object: &CanonicalJsonObject, object: &CanonicalJsonObject,
@@ -135,7 +135,7 @@ pub async fn required_keys_exist(
} }
#[implement(Service)] #[implement(Service)]
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self), level = "debug")]
pub async fn verify_key_exists(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> bool { pub async fn verify_key_exists(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> bool {
type KeysMap<'a> = BTreeMap<&'a ServerSigningKeyId, &'a RawJsonValue>; type KeysMap<'a> = BTreeMap<&'a ServerSigningKeyId, &'a RawJsonValue>;
+6 -6
View File
@@ -494,8 +494,8 @@ impl Service {
let key = (user_id, device_id); let key = (user_id, device_id);
if self.db.userdeviceid_metadata.qry(&key).await.is_err() { if self.db.userdeviceid_metadata.qry(&key).await.is_err() {
return Err!(Database(error!( return Err!(Database(error!(
?user_id, %user_id,
?device_id, %device_id,
"User does not exist or device has no metadata." "User does not exist or device has no metadata."
))); )));
} }
@@ -539,8 +539,8 @@ impl Service {
let key = (user_id, device_id); let key = (user_id, device_id);
if self.db.userdeviceid_metadata.qry(&key).await.is_err() { if self.db.userdeviceid_metadata.qry(&key).await.is_err() {
return Err!(Database(error!( return Err!(Database(error!(
?user_id, %user_id,
?device_id, %device_id,
"User does not exist or device has no metadata." "User does not exist or device has no metadata."
))); )));
} }
@@ -1153,7 +1153,7 @@ impl Service {
let (expires_at, user_id): (u64, OwnedUserId) = value.deserialized()?; let (expires_at, user_id): (u64, OwnedUserId) = value.deserialized()?;
if expires_at < utils::millis_since_unix_epoch() { if expires_at < utils::millis_since_unix_epoch() {
trace!(?user_id, ?token, "Removing expired login token"); trace!(%user_id, ?token, "Removing expired login token");
self.db.logintoken_expiresatuserid.remove(token); self.db.logintoken_expiresatuserid.remove(token);
@@ -1231,7 +1231,7 @@ impl Service {
debug!(?uri, "LDAP creating connection..."); debug!(?uri, "LDAP creating connection...");
let (conn, mut ldap) = LdapConnAsync::new(uri.as_str()) let (conn, mut ldap) = LdapConnAsync::new(uri.as_str())
.await .await
.map_err(|e| err!(Ldap(error!(?user_id, "LDAP connection setup error: {e}"))))?; .map_err(|e| err!(Ldap(error!(%user_id, "LDAP connection setup error: {e}"))))?;
let driver = self.services.server.runtime().spawn(async move { let driver = self.services.server.runtime().spawn(async move {
match conn.drive().await { match conn.drive().await {