mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c7f8eec282 | |||
| 0d4bbe612d | |||
| c4d297ae3b | |||
| d15064871e | |||
| b925936195 | |||
| 56feba0ea0 | |||
| 8d89ba94d5 | |||
| 0b135c7717 | |||
| 30c9d6d2df | |||
| 74841b6711 | |||
| dabbdc7517 |
@@ -0,0 +1,2 @@
|
||||
Improved the performance and reliability of fetching missing events, improving network partition recovery. Contributed
|
||||
by @nex.
|
||||
@@ -297,7 +297,7 @@
|
||||
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
#
|
||||
#max_fetch_prev_events = 192
|
||||
#max_fetch_prev_events = 256
|
||||
|
||||
# How many incoming federation transactions the server is willing to be
|
||||
# processing at any given time before it becomes overloaded and starts
|
||||
|
||||
@@ -13,7 +13,7 @@ pub(crate) async fn ban_room(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<rooms::ban::v1::Request>,
|
||||
) -> Result<rooms::ban::v1::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if !services.users.is_admin(sender_user).await {
|
||||
return Err!(Request(Forbidden("Only server administrators can use this endpoint")));
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ pub(crate) async fn list_rooms(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<rooms::list::v1::Request>,
|
||||
) -> Result<rooms::list::v1::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if !services.users.is_admin(sender_user).await {
|
||||
return Err!(Request(Forbidden("Only server administrators can use this endpoint")));
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use ruma::{
|
||||
use service::{mailer::messages, uiaa::Identity, users::HashedPassword};
|
||||
|
||||
use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH};
|
||||
use crate::Ruma;
|
||||
use crate::{Ruma, router::ClientIdentity};
|
||||
|
||||
pub(crate) mod register;
|
||||
pub(crate) mod threepid;
|
||||
@@ -75,13 +75,11 @@ pub(crate) async fn get_register_available_route(
|
||||
return Err!(Request(UserInUse("User ID is not available.")));
|
||||
}
|
||||
|
||||
if let Some(ref info) = body.appservice_info {
|
||||
if !info.is_user_match(&user_id) {
|
||||
return Err!(Request(Exclusive("Username is not in an appservice namespace.")));
|
||||
}
|
||||
}
|
||||
|
||||
if services.appservice.is_exclusive_user_id(&user_id).await {
|
||||
if let Some(ClientIdentity::Appservice { appservice_info, .. }) = &body.identity
|
||||
&& !appservice_info.is_user_match(&user_id)
|
||||
{
|
||||
return Err!(Request(Exclusive("Username is not in an appservice namespace.")));
|
||||
} else if services.appservice.is_exclusive_user_id(&user_id).await {
|
||||
return Err!(Request(Exclusive("Username is reserved by an appservice.")));
|
||||
}
|
||||
|
||||
@@ -111,7 +109,8 @@ pub(crate) async fn change_password_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<change_password::v3::Request>,
|
||||
) -> Result<change_password::v3::Response> {
|
||||
let identity = if let Some(ref user_id) = body.sender_user {
|
||||
let identity = if let Some(user_id) = body.identity.as_ref().map(ClientIdentity::sender_user)
|
||||
{
|
||||
// A signed-in user is trying to change their password, prompt them for their
|
||||
// existing one
|
||||
|
||||
@@ -157,7 +156,12 @@ pub(crate) async fn change_password_route(
|
||||
services
|
||||
.users
|
||||
.all_device_ids(&sender_user)
|
||||
.ready_filter(|id| *id != body.sender_device())
|
||||
.ready_filter(|id| {
|
||||
body.identity
|
||||
.as_ref()
|
||||
.and_then(|identity| identity.sender_device())
|
||||
.is_none_or(|sender_device| sender_device != *id)
|
||||
})
|
||||
.for_each(async |id| services.users.remove_device(&sender_user, &id).await)
|
||||
.await;
|
||||
|
||||
@@ -173,7 +177,12 @@ pub(crate) async fn change_password_route(
|
||||
.await
|
||||
.ok()
|
||||
.as_ref()
|
||||
.is_some_and(|pusher_device| pusher_device != body.sender_device())
|
||||
.is_some_and(|pusher_device| {
|
||||
body.identity
|
||||
.as_ref()
|
||||
.and_then(|identity| identity.sender_device())
|
||||
.is_none_or(|sender_device| sender_device != *pusher_device)
|
||||
})
|
||||
.then_some(pushkey)
|
||||
})
|
||||
.for_each(async |pushkey| {
|
||||
@@ -241,9 +250,11 @@ pub(crate) async fn whoami_route(
|
||||
State(_): State<crate::State>,
|
||||
body: Ruma<whoami::v3::Request>,
|
||||
) -> Result<whoami::v3::Response> {
|
||||
Ok(assign!(whoami::v3::Response::new(body.sender_user().to_owned(), false), {
|
||||
device_id: body.sender_device,
|
||||
}))
|
||||
Ok(
|
||||
assign!(whoami::v3::Response::new(body.identity.sender_user().to_owned(), false), {
|
||||
device_id: body.identity.sender_device().map(ToOwned::to_owned),
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// # `POST /_matrix/client/r0/account/deactivate`
|
||||
@@ -266,8 +277,9 @@ pub(crate) async fn deactivate_route(
|
||||
// Authentication for this endpoint is technically optional,
|
||||
// but we require the user to be logged in
|
||||
let sender_user = body
|
||||
.sender_user
|
||||
.identity
|
||||
.as_ref()
|
||||
.map(ClientIdentity::sender_user)
|
||||
.ok_or_else(|| err!(Request(MissingToken("Missing access token."))))?;
|
||||
|
||||
// Prompt the user to confirm with their password using UIAA
|
||||
|
||||
@@ -59,7 +59,7 @@ pub(crate) async fn register_route(
|
||||
let allow_registration =
|
||||
services.config.allow_registration || services.firstrun.is_first_run();
|
||||
|
||||
if !allow_registration && body.appservice_info.is_none() {
|
||||
if !allow_registration && body.identity.is_none() {
|
||||
info!(
|
||||
?body.username,
|
||||
?body.initial_device_display_name,
|
||||
@@ -71,7 +71,7 @@ pub(crate) async fn register_route(
|
||||
)));
|
||||
}
|
||||
|
||||
let identity = if body.appservice_info.is_some() {
|
||||
let identity = if body.identity.is_some() {
|
||||
// Appservices can skip auth
|
||||
None
|
||||
} else {
|
||||
@@ -107,7 +107,7 @@ pub(crate) async fn register_route(
|
||||
// For appservice logins, make sure that the user ID is in the appservice's
|
||||
// namespace
|
||||
|
||||
match body.appservice_info {
|
||||
match body.identity {
|
||||
| Some(ref info) =>
|
||||
if !info.is_user_match(&user_id) && !emergency_mode_enabled {
|
||||
return Err!(Request(Exclusive(
|
||||
@@ -125,7 +125,7 @@ pub(crate) async fn register_route(
|
||||
return Err!(Request(Exclusive("Username is reserved by an appservice.")));
|
||||
}
|
||||
|
||||
let password = if body.appservice_info.is_some() {
|
||||
let password = if body.identity.is_some() {
|
||||
None
|
||||
} else if let Some(password) = body.password.as_deref() {
|
||||
Some(HashedPassword::new(password)?)
|
||||
@@ -140,9 +140,7 @@ pub(crate) async fn register_route(
|
||||
let mut displayname = user_id.localpart().to_owned();
|
||||
|
||||
// Apply the new user displayname suffix, if it's set
|
||||
if !services.globals.new_user_displayname_suffix().is_empty()
|
||||
&& body.appservice_info.is_none()
|
||||
{
|
||||
if !services.globals.new_user_displayname_suffix().is_empty() && body.identity.is_none() {
|
||||
write!(displayname, " {}", services.server.config.new_user_displayname_suffix)?;
|
||||
}
|
||||
|
||||
@@ -209,7 +207,7 @@ pub(crate) async fn register_route(
|
||||
|
||||
let device_display_name = body.initial_device_display_name.as_deref().unwrap_or("");
|
||||
|
||||
if body.appservice_info.is_none() {
|
||||
if body.identity.is_none() {
|
||||
if !device_display_name.is_empty() {
|
||||
let notice = format!(
|
||||
"New user \"{user_id}\" registered on this server from IP {client} and device \
|
||||
@@ -255,7 +253,7 @@ pub(crate) async fn register_route(
|
||||
}
|
||||
}
|
||||
|
||||
if body.appservice_info.is_none() && !services.server.config.auto_join_rooms.is_empty() {
|
||||
if body.identity.is_none() && !services.server.config.auto_join_rooms.is_empty() {
|
||||
for room in &services.server.config.auto_join_rooms {
|
||||
let Ok(room_id) = services.rooms.alias.resolve(room).await else {
|
||||
error!(
|
||||
|
||||
@@ -13,7 +13,7 @@ use ruma::{
|
||||
};
|
||||
use service::{mailer::messages, uiaa::Identity};
|
||||
|
||||
use crate::Ruma;
|
||||
use crate::{Ruma, router::ClientIdentity};
|
||||
|
||||
/// # `GET _matrix/client/v3/account/3pid`
|
||||
///
|
||||
@@ -22,7 +22,7 @@ pub(crate) async fn third_party_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_3pids::v3::Request>,
|
||||
) -> Result<get_3pids::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let mut threepids = vec![];
|
||||
|
||||
if let Some(email) = services
|
||||
@@ -53,6 +53,14 @@ pub(crate) async fn request_3pid_management_token_via_email_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<request_3pid_management_token_via_email::v3::Request>,
|
||||
) -> Result<request_3pid_management_token_via_email::v3::Response> {
|
||||
// Authentication for this endpoint is technically optional,
|
||||
// but we require the user to be logged in
|
||||
let sender_user = body
|
||||
.identity
|
||||
.as_ref()
|
||||
.map(ClientIdentity::sender_user)
|
||||
.ok_or_else(|| err!(Request(MissingToken("Missing access token."))))?;
|
||||
|
||||
if !services.threepid.email_requirement().may_change() {
|
||||
return Err!(Request(Forbidden("You may not change your email address.")));
|
||||
}
|
||||
@@ -76,7 +84,7 @@ pub(crate) async fn request_3pid_management_token_via_email_route(
|
||||
Mailbox::new(None, email),
|
||||
|verification_link| messages::ChangeEmail {
|
||||
server_name: services.config.server_name.as_str(),
|
||||
user_id: body.sender_user.as_deref(),
|
||||
user_id: Some(sender_user),
|
||||
verification_link,
|
||||
},
|
||||
&body.client_secret,
|
||||
@@ -107,8 +115,6 @@ pub(crate) async fn add_3pid_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<add_3pid::v3::Request>,
|
||||
) -> Result<add_3pid::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
|
||||
if !services.threepid.email_requirement().may_change() {
|
||||
return Err!(Request(Forbidden("You may not change your email address.")));
|
||||
}
|
||||
@@ -116,7 +122,10 @@ pub(crate) async fn add_3pid_route(
|
||||
// Require password auth to add an email
|
||||
let _ = services
|
||||
.uiaa
|
||||
.authenticate_password(&body.auth, Some(Identity::from_user_id(sender_user)))
|
||||
.authenticate_password(
|
||||
&body.auth,
|
||||
Some(Identity::from_user_id(body.identity.sender_user())),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let email = services
|
||||
@@ -127,7 +136,7 @@ pub(crate) async fn add_3pid_route(
|
||||
|
||||
services
|
||||
.threepid
|
||||
.associate_localpart_email(sender_user.localpart(), &email)
|
||||
.associate_localpart_email(body.identity.sender_user().localpart(), &email)
|
||||
.await?;
|
||||
|
||||
Ok(add_3pid::v3::Response::new())
|
||||
@@ -138,8 +147,6 @@ pub(crate) async fn delete_3pid_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<delete_3pid::v3::Request>,
|
||||
) -> Result<delete_3pid::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
|
||||
if body.medium != Medium::Email {
|
||||
return Ok(delete_3pid::v3::Response::new(ThirdPartyIdRemovalStatus::NoSupport));
|
||||
}
|
||||
@@ -150,7 +157,7 @@ pub(crate) async fn delete_3pid_route(
|
||||
|
||||
if services
|
||||
.threepid
|
||||
.disassociate_localpart_email(sender_user.localpart())
|
||||
.disassociate_localpart_email(body.identity.sender_user().localpart())
|
||||
.await
|
||||
.is_none()
|
||||
{
|
||||
|
||||
@@ -22,9 +22,9 @@ pub(crate) async fn set_global_account_data_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_global_account_data::v3::Request>,
|
||||
) -> Result<set_global_account_data::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if sender_user != body.user_id && body.appservice_info.is_none() {
|
||||
if sender_user != body.user_id && !body.identity.is_appservice() {
|
||||
return Err!(Request(Forbidden("You cannot set account data for other users.")));
|
||||
}
|
||||
|
||||
@@ -47,9 +47,9 @@ pub(crate) async fn set_room_account_data_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_room_account_data::v3::Request>,
|
||||
) -> Result<set_room_account_data::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if sender_user != body.user_id && body.appservice_info.is_none() {
|
||||
if sender_user != body.user_id && !body.identity.is_appservice() {
|
||||
return Err!(Request(Forbidden("You cannot set account data for other users.")));
|
||||
}
|
||||
|
||||
@@ -72,9 +72,9 @@ pub(crate) async fn get_global_account_data_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_global_account_data::v3::Request>,
|
||||
) -> Result<get_global_account_data::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if sender_user != body.user_id && body.appservice_info.is_none() {
|
||||
if sender_user != body.user_id && !body.identity.is_appservice() {
|
||||
return Err!(Request(Forbidden("You cannot get account data of other users.")));
|
||||
}
|
||||
|
||||
@@ -94,9 +94,9 @@ pub(crate) async fn get_room_account_data_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_room_account_data::v3::Request>,
|
||||
) -> Result<get_room_account_data::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if sender_user != body.user_id && body.appservice_info.is_none() {
|
||||
if sender_user != body.user_id && !body.identity.is_appservice() {
|
||||
return Err!(Request(Forbidden("You cannot get account data of other users.")));
|
||||
}
|
||||
|
||||
|
||||
@@ -12,10 +12,11 @@ pub(crate) async fn get_suspended_status(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_suspended::v1::Request>,
|
||||
) -> Result<get_suspended::v1::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
|
||||
let (admin, active) =
|
||||
join(services.users.is_admin(sender_user), services.users.is_active(&body.user_id)).await;
|
||||
let (admin, active) = join(
|
||||
services.users.is_admin(body.identity.sender_user()),
|
||||
services.users.is_active(&body.user_id),
|
||||
)
|
||||
.await;
|
||||
if !admin {
|
||||
return Err!(Request(Forbidden("Only server administrators can use this endpoint")));
|
||||
}
|
||||
@@ -37,7 +38,7 @@ pub(crate) async fn put_suspended_status(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_suspended::v1::Request>,
|
||||
) -> Result<set_suspended::v1::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let (sender_admin, active, target_admin) = join3(
|
||||
services.users.is_admin(sender_user),
|
||||
|
||||
@@ -11,7 +11,7 @@ pub(crate) async fn create_alias_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<create_alias::v3::Request>,
|
||||
) -> Result<create_alias::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
@@ -19,7 +19,7 @@ pub(crate) async fn create_alias_route(
|
||||
services
|
||||
.rooms
|
||||
.alias
|
||||
.appservice_checks(&body.room_alias, &body.appservice_info)
|
||||
.appservice_checks(&body.room_alias, body.identity.appservice_info())
|
||||
.await?;
|
||||
|
||||
// this isn't apart of alias_checks or delete alias route because we should
|
||||
@@ -59,7 +59,7 @@ pub(crate) async fn delete_alias_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<delete_alias::v3::Request>,
|
||||
) -> Result<delete_alias::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
@@ -67,7 +67,7 @@ pub(crate) async fn delete_alias_route(
|
||||
services
|
||||
.rooms
|
||||
.alias
|
||||
.appservice_checks(&body.room_alias, &body.appservice_info)
|
||||
.appservice_checks(&body.room_alias, body.identity.appservice_info())
|
||||
.await?;
|
||||
|
||||
services
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{Err, Result, err};
|
||||
use conduwuit::{Err, Result};
|
||||
use ruma::{
|
||||
api::{appservice::ping, client::appservice::request_ping},
|
||||
assign,
|
||||
@@ -15,9 +15,7 @@ pub(crate) async fn appservice_ping(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<request_ping::v1::Request>,
|
||||
) -> Result<request_ping::v1::Response> {
|
||||
let appservice_info = body.appservice_info.as_ref().ok_or_else(|| {
|
||||
err!(Request(Forbidden("This endpoint can only be called by appservices.")))
|
||||
})?;
|
||||
let appservice_info = &body.identity;
|
||||
|
||||
if body.appservice_id != appservice_info.registration.id {
|
||||
return Err!(Request(Forbidden(
|
||||
|
||||
+50
-26
@@ -25,7 +25,7 @@ pub(crate) async fn create_backup_version_route(
|
||||
) -> Result<create_backup_version::v3::Response> {
|
||||
let version = services
|
||||
.key_backups
|
||||
.create_backup(body.sender_user(), &body.algorithm)?;
|
||||
.create_backup(body.identity.sender_user(), &body.algorithm)?;
|
||||
|
||||
Ok(create_backup_version::v3::Response::new(version))
|
||||
}
|
||||
@@ -40,7 +40,7 @@ pub(crate) async fn update_backup_version_route(
|
||||
) -> Result<update_backup_version::v3::Response> {
|
||||
services
|
||||
.key_backups
|
||||
.update_backup(body.sender_user(), &body.version, &body.algorithm)
|
||||
.update_backup(body.identity.sender_user(), &body.version, &body.algorithm)
|
||||
.await?;
|
||||
|
||||
Ok(update_backup_version::v3::Response::new())
|
||||
@@ -55,11 +55,11 @@ pub(crate) async fn get_latest_backup_info_route(
|
||||
) -> Result<get_latest_backup_info::v3::Response> {
|
||||
let (version, algorithm) = services
|
||||
.key_backups
|
||||
.get_latest_backup(body.sender_user())
|
||||
.get_latest_backup(body.identity.sender_user())
|
||||
.await
|
||||
.map_err(|_| err!(Request(NotFound("Key backup does not exist."))))?;
|
||||
|
||||
let (count, etag) = get_count_etag(&services, body.sender_user(), &version).await;
|
||||
let (count, etag) = get_count_etag(&services, body.identity.sender_user(), &version).await;
|
||||
|
||||
Ok(get_latest_backup_info::v3::Response::new(algorithm, count, etag, version))
|
||||
}
|
||||
@@ -73,13 +73,14 @@ pub(crate) async fn get_backup_info_route(
|
||||
) -> Result<get_backup_info::v3::Response> {
|
||||
let algorithm = services
|
||||
.key_backups
|
||||
.get_backup(body.sender_user(), &body.version)
|
||||
.get_backup(body.identity.sender_user(), &body.version)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
err!(Request(NotFound("Key backup does not exist at version {:?}", body.version)))
|
||||
})?;
|
||||
|
||||
let (count, etag) = get_count_etag(&services, body.sender_user(), &body.version).await;
|
||||
let (count, etag) =
|
||||
get_count_etag(&services, body.identity.sender_user(), &body.version).await;
|
||||
|
||||
Ok(get_backup_info::v3::Response::new(algorithm, count, etag, body.version.clone()))
|
||||
}
|
||||
@@ -96,7 +97,7 @@ pub(crate) async fn delete_backup_version_route(
|
||||
) -> Result<delete_backup_version::v3::Response> {
|
||||
services
|
||||
.key_backups
|
||||
.delete_backup(body.sender_user(), &body.version)
|
||||
.delete_backup(body.identity.sender_user(), &body.version)
|
||||
.await;
|
||||
|
||||
Ok(delete_backup_version::v3::Response::new())
|
||||
@@ -116,7 +117,7 @@ pub(crate) async fn add_backup_keys_route(
|
||||
) -> Result<add_backup_keys::v3::Response> {
|
||||
if services
|
||||
.key_backups
|
||||
.get_latest_backup_version(body.sender_user())
|
||||
.get_latest_backup_version(body.identity.sender_user())
|
||||
.await
|
||||
.is_ok_and(|version| version != body.version)
|
||||
{
|
||||
@@ -129,12 +130,19 @@ pub(crate) async fn add_backup_keys_route(
|
||||
for (session_id, key_data) in &room.sessions {
|
||||
services
|
||||
.key_backups
|
||||
.add_key(body.sender_user(), &body.version, room_id, session_id, key_data)
|
||||
.add_key(
|
||||
body.identity.sender_user(),
|
||||
&body.version,
|
||||
room_id,
|
||||
session_id,
|
||||
key_data,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
let (count, etag) = get_count_etag(&services, body.sender_user(), &body.version).await;
|
||||
let (count, etag) =
|
||||
get_count_etag(&services, body.identity.sender_user(), &body.version).await;
|
||||
|
||||
Ok(add_backup_keys::v3::Response::new(etag, count))
|
||||
}
|
||||
@@ -153,7 +161,7 @@ pub(crate) async fn add_backup_keys_for_room_route(
|
||||
) -> Result<add_backup_keys_for_room::v3::Response> {
|
||||
if services
|
||||
.key_backups
|
||||
.get_latest_backup_version(body.sender_user())
|
||||
.get_latest_backup_version(body.identity.sender_user())
|
||||
.await
|
||||
.is_ok_and(|version| version != body.version)
|
||||
{
|
||||
@@ -165,11 +173,18 @@ pub(crate) async fn add_backup_keys_for_room_route(
|
||||
for (session_id, key_data) in &body.sessions {
|
||||
services
|
||||
.key_backups
|
||||
.add_key(body.sender_user(), &body.version, &body.room_id, session_id, key_data)
|
||||
.add_key(
|
||||
body.identity.sender_user(),
|
||||
&body.version,
|
||||
&body.room_id,
|
||||
session_id,
|
||||
key_data,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let (count, etag) = get_count_etag(&services, body.sender_user(), &body.version).await;
|
||||
let (count, etag) =
|
||||
get_count_etag(&services, body.identity.sender_user(), &body.version).await;
|
||||
|
||||
Ok(add_backup_keys_for_room::v3::Response::new(etag, count))
|
||||
}
|
||||
@@ -188,7 +203,7 @@ pub(crate) async fn add_backup_keys_for_session_route(
|
||||
) -> Result<add_backup_keys_for_session::v3::Response> {
|
||||
if services
|
||||
.key_backups
|
||||
.get_latest_backup_version(body.sender_user())
|
||||
.get_latest_backup_version(body.identity.sender_user())
|
||||
.await
|
||||
.is_ok_and(|version| version != body.version)
|
||||
{
|
||||
@@ -201,7 +216,7 @@ pub(crate) async fn add_backup_keys_for_session_route(
|
||||
let mut ok_to_replace = true;
|
||||
if let Some(old_key) = &services
|
||||
.key_backups
|
||||
.get_session(body.sender_user(), &body.version, &body.room_id, &body.session_id)
|
||||
.get_session(body.identity.sender_user(), &body.version, &body.room_id, &body.session_id)
|
||||
.await
|
||||
.ok()
|
||||
{
|
||||
@@ -260,7 +275,7 @@ pub(crate) async fn add_backup_keys_for_session_route(
|
||||
services
|
||||
.key_backups
|
||||
.add_key(
|
||||
body.sender_user(),
|
||||
body.identity.sender_user(),
|
||||
&body.version,
|
||||
&body.room_id,
|
||||
&body.session_id,
|
||||
@@ -269,7 +284,8 @@ pub(crate) async fn add_backup_keys_for_session_route(
|
||||
.await?;
|
||||
}
|
||||
|
||||
let (count, etag) = get_count_etag(&services, body.sender_user(), &body.version).await;
|
||||
let (count, etag) =
|
||||
get_count_etag(&services, body.identity.sender_user(), &body.version).await;
|
||||
|
||||
Ok(add_backup_keys_for_session::v3::Response::new(etag, count))
|
||||
}
|
||||
@@ -283,7 +299,7 @@ pub(crate) async fn get_backup_keys_route(
|
||||
) -> Result<get_backup_keys::v3::Response> {
|
||||
let rooms = services
|
||||
.key_backups
|
||||
.get_all(body.sender_user(), &body.version)
|
||||
.get_all(body.identity.sender_user(), &body.version)
|
||||
.await;
|
||||
|
||||
Ok(get_backup_keys::v3::Response::new(rooms))
|
||||
@@ -298,7 +314,7 @@ pub(crate) async fn get_backup_keys_for_room_route(
|
||||
) -> Result<get_backup_keys_for_room::v3::Response> {
|
||||
let sessions = services
|
||||
.key_backups
|
||||
.get_room(body.sender_user(), &body.version, &body.room_id)
|
||||
.get_room(body.identity.sender_user(), &body.version, &body.room_id)
|
||||
.await;
|
||||
|
||||
Ok(get_backup_keys_for_room::v3::Response::new(sessions))
|
||||
@@ -313,7 +329,7 @@ pub(crate) async fn get_backup_keys_for_session_route(
|
||||
) -> Result<get_backup_keys_for_session::v3::Response> {
|
||||
let key_data = services
|
||||
.key_backups
|
||||
.get_session(body.sender_user(), &body.version, &body.room_id, &body.session_id)
|
||||
.get_session(body.identity.sender_user(), &body.version, &body.room_id, &body.session_id)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
err!(Request(NotFound(debug_error!("Backup key not found for this user's session."))))
|
||||
@@ -331,10 +347,11 @@ pub(crate) async fn delete_backup_keys_route(
|
||||
) -> Result<delete_backup_keys::v3::Response> {
|
||||
services
|
||||
.key_backups
|
||||
.delete_all_keys(body.sender_user(), &body.version)
|
||||
.delete_all_keys(body.identity.sender_user(), &body.version)
|
||||
.await;
|
||||
|
||||
let (count, etag) = get_count_etag(&services, body.sender_user(), &body.version).await;
|
||||
let (count, etag) =
|
||||
get_count_etag(&services, body.identity.sender_user(), &body.version).await;
|
||||
|
||||
Ok(delete_backup_keys::v3::Response::new(etag, count))
|
||||
}
|
||||
@@ -348,10 +365,11 @@ pub(crate) async fn delete_backup_keys_for_room_route(
|
||||
) -> Result<delete_backup_keys_for_room::v3::Response> {
|
||||
services
|
||||
.key_backups
|
||||
.delete_room_keys(body.sender_user(), &body.version, &body.room_id)
|
||||
.delete_room_keys(body.identity.sender_user(), &body.version, &body.room_id)
|
||||
.await;
|
||||
|
||||
let (count, etag) = get_count_etag(&services, body.sender_user(), &body.version).await;
|
||||
let (count, etag) =
|
||||
get_count_etag(&services, body.identity.sender_user(), &body.version).await;
|
||||
|
||||
Ok(delete_backup_keys_for_room::v3::Response::new(etag, count))
|
||||
}
|
||||
@@ -365,10 +383,16 @@ pub(crate) async fn delete_backup_keys_for_session_route(
|
||||
) -> Result<delete_backup_keys_for_session::v3::Response> {
|
||||
services
|
||||
.key_backups
|
||||
.delete_room_key(body.sender_user(), &body.version, &body.room_id, &body.session_id)
|
||||
.delete_room_key(
|
||||
body.identity.sender_user(),
|
||||
&body.version,
|
||||
&body.room_id,
|
||||
&body.session_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
let (count, etag) = get_count_etag(&services, body.sender_user(), &body.version).await;
|
||||
let (count, etag) =
|
||||
get_count_etag(&services, body.identity.sender_user(), &body.version).await;
|
||||
|
||||
Ok(delete_backup_keys_for_session::v3::Response::new(etag, count))
|
||||
}
|
||||
|
||||
@@ -48,11 +48,7 @@ pub(crate) async fn get_capabilities_route(
|
||||
json!({"enabled": services.config.forget_forced_upon_leave}),
|
||||
)?;
|
||||
|
||||
if services
|
||||
.users
|
||||
.is_admin(body.sender_user.as_ref().unwrap())
|
||||
.await
|
||||
{
|
||||
if services.users.is_admin(body.identity.sender_user()).await {
|
||||
// Advertise suspension API
|
||||
capabilities.set("uk.timedout.msc4323", json!({"suspend": true, "lock": false}))?;
|
||||
}
|
||||
|
||||
@@ -37,8 +37,8 @@ pub(crate) async fn get_context_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_context::v3::Request>,
|
||||
) -> Result<get_context::v3::Response> {
|
||||
let sender = body.sender();
|
||||
let (sender_user, sender_device) = sender;
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.sender_device();
|
||||
let room_id = &body.room_id;
|
||||
let event_id = &body.event_id;
|
||||
let filter = &body.filter;
|
||||
@@ -143,7 +143,7 @@ pub(crate) async fn get_context_route(
|
||||
|
||||
let lazy_loading_context = lazy_loading::Context {
|
||||
user_id: sender_user,
|
||||
device_id: Some(sender_device),
|
||||
device_id: sender_device,
|
||||
room_id,
|
||||
token: Some(base_count.into_unsigned()),
|
||||
options: Some(&filter.lazy_load_options),
|
||||
|
||||
@@ -25,16 +25,11 @@ pub(crate) async fn put_dehydrated_device_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<put_dehydrated_device::Request>,
|
||||
) -> Result<put_dehydrated_device::Response> {
|
||||
let sender_user = body
|
||||
.sender_user
|
||||
.as_deref()
|
||||
.expect("AccessToken authentication required");
|
||||
|
||||
let device_id = body.body.device_id.clone();
|
||||
let device_id = body.device_id.clone();
|
||||
|
||||
services
|
||||
.users
|
||||
.set_dehydrated_device(sender_user, body.body)
|
||||
.set_dehydrated_device(body.identity.sender_user(), body.body)
|
||||
.await?;
|
||||
|
||||
Ok(put_dehydrated_device::Response::new(device_id))
|
||||
@@ -49,7 +44,7 @@ pub(crate) async fn delete_dehydrated_device_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<delete_dehydrated_device::Request>,
|
||||
) -> Result<delete_dehydrated_device::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let device_id = services.users.get_dehydrated_device_id(sender_user).await?;
|
||||
|
||||
@@ -67,7 +62,7 @@ pub(crate) async fn get_dehydrated_device_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<get_dehydrated_device::Request>,
|
||||
) -> Result<get_dehydrated_device::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let device = services.users.get_dehydrated_device(sender_user).await?;
|
||||
|
||||
@@ -83,7 +78,7 @@ pub(crate) async fn get_dehydrated_events_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<get_events::Request>,
|
||||
) -> Result<get_events::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let device_id = &body.body.device_id;
|
||||
let existing_id = services.users.get_dehydrated_device_id(sender_user).await;
|
||||
|
||||
@@ -21,7 +21,7 @@ pub(crate) async fn get_devices_route(
|
||||
) -> Result<get_devices::v3::Response> {
|
||||
let devices: Vec<device::Device> = services
|
||||
.users
|
||||
.all_devices_metadata(body.sender_user())
|
||||
.all_devices_metadata(body.identity.sender_user())
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
@@ -37,7 +37,7 @@ pub(crate) async fn get_device_route(
|
||||
) -> Result<get_device::v3::Response> {
|
||||
let device = services
|
||||
.users
|
||||
.get_device_metadata(body.sender_user(), &body.body.device_id)
|
||||
.get_device_metadata(body.identity.sender_user(), &body.body.device_id)
|
||||
.await
|
||||
.map_err(|_| err!(Request(NotFound("Device not found."))))?;
|
||||
|
||||
@@ -53,8 +53,8 @@ pub(crate) async fn update_device_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<update_device::v3::Request>,
|
||||
) -> Result<update_device::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let appservice = body.appservice_info.as_ref();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let appservice = body.identity.appservice_info();
|
||||
|
||||
match services
|
||||
.users
|
||||
@@ -118,8 +118,8 @@ pub(crate) async fn delete_device_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<delete_device::v3::Request>,
|
||||
) -> Result<delete_device::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let appservice = body.appservice_info.as_ref();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let appservice = body.identity.appservice_info();
|
||||
|
||||
// Appservices get to skip UIAA for this endpoint
|
||||
if appservice.is_none() {
|
||||
@@ -154,8 +154,8 @@ pub(crate) async fn delete_devices_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<delete_devices::v3::Request>,
|
||||
) -> Result<delete_devices::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let appservice = body.appservice_info.as_ref();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let appservice = body.identity.appservice_info();
|
||||
|
||||
// Appservices get to skip UIAA for this endpoint
|
||||
if appservice.is_none() {
|
||||
|
||||
@@ -112,7 +112,7 @@ pub(crate) async fn set_room_visibility_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<set_room_visibility::v3::Request>,
|
||||
) -> Result<set_room_visibility::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if !services.rooms.metadata.exists(&body.room_id).await {
|
||||
// Return 404 if the room doesn't exist
|
||||
@@ -130,7 +130,7 @@ pub(crate) async fn set_room_visibility_route(
|
||||
| room::Visibility::Public => {
|
||||
if services.server.config.lockdown_public_room_directory
|
||||
&& !services.users.is_admin(sender_user).await
|
||||
&& body.appservice_info.is_none()
|
||||
&& !body.identity.is_appservice()
|
||||
{
|
||||
info!(
|
||||
"Non-admin user {sender_user} tried to publish {0} to the room directory \
|
||||
|
||||
@@ -15,7 +15,7 @@ pub(crate) async fn get_filter_route(
|
||||
) -> Result<get_filter::v3::Response> {
|
||||
services
|
||||
.users
|
||||
.get_filter(body.sender_user(), &body.filter_id)
|
||||
.get_filter(body.identity.sender_user(), &body.filter_id)
|
||||
.await
|
||||
.map(get_filter::v3::Response::new)
|
||||
.map_err(|_| err!(Request(NotFound("Filter not found."))))
|
||||
@@ -30,7 +30,7 @@ pub(crate) async fn create_filter_route(
|
||||
) -> Result<create_filter::v3::Response> {
|
||||
let filter_id = services
|
||||
.users
|
||||
.create_filter(body.sender_user(), &body.filter);
|
||||
.create_filter(body.identity.sender_user(), &body.filter);
|
||||
|
||||
Ok(create_filter::v3::Response::new(filter_id))
|
||||
}
|
||||
|
||||
@@ -41,7 +41,8 @@ pub(crate) async fn upload_keys_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<upload_keys::v3::Request>,
|
||||
) -> Result<upload_keys::v3::Response> {
|
||||
let (sender_user, sender_device) = body.sender();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.expect_sender_device()?;
|
||||
|
||||
for (key_id, one_time_key) in &body.one_time_keys {
|
||||
if one_time_key
|
||||
@@ -154,7 +155,7 @@ pub(crate) async fn get_keys_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_keys::v3::Request>,
|
||||
) -> Result<get_keys::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
get_keys_helper(
|
||||
&services,
|
||||
@@ -191,7 +192,7 @@ pub(crate) async fn upload_signing_keys_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<upload_signing_keys::v3::Request>,
|
||||
) -> Result<upload_signing_keys::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if uiaa_needed_to_upload_keys(
|
||||
services,
|
||||
@@ -287,7 +288,7 @@ pub(crate) async fn upload_signatures_route(
|
||||
return Ok(upload_signatures::v3::Response::new());
|
||||
}
|
||||
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
for (user_id, keys) in &body.signed_keys {
|
||||
for (key_id, key) in keys {
|
||||
@@ -340,7 +341,7 @@ pub(crate) async fn get_key_changes_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_key_changes::v3::Request>,
|
||||
) -> Result<get_key_changes::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let mut device_list_updates = HashSet::new();
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ pub(crate) async fn create_content_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<create_content::v3::Request>,
|
||||
) -> Result<create_content::v3::Response> {
|
||||
let user = body.sender_user();
|
||||
let user = body.identity.sender_user();
|
||||
if services.users.is_suspended(user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
@@ -92,7 +92,7 @@ pub(crate) async fn get_content_thumbnail_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<get_content_thumbnail::v1::Request>,
|
||||
) -> Result<get_content_thumbnail::v1::Response> {
|
||||
let user = body.sender_user();
|
||||
let user = body.identity.sender_user();
|
||||
|
||||
let dim = Dim::from_ruma(body.width, body.height, body.method.clone())?;
|
||||
let mxc = Mxc {
|
||||
@@ -142,7 +142,7 @@ pub(crate) async fn get_content_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<get_content::v1::Request>,
|
||||
) -> Result<get_content::v1::Response> {
|
||||
let user = body.sender_user();
|
||||
let user = body.identity.sender_user();
|
||||
|
||||
let mxc = Mxc {
|
||||
server_name: &body.server_name,
|
||||
@@ -189,7 +189,7 @@ pub(crate) async fn get_content_as_filename_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<get_content_as_filename::v1::Request>,
|
||||
) -> Result<get_content_as_filename::v1::Response> {
|
||||
let user = body.sender_user();
|
||||
let user = body.identity.sender_user();
|
||||
|
||||
let mxc = Mxc {
|
||||
server_name: &body.server_name,
|
||||
@@ -240,7 +240,7 @@ pub(crate) async fn get_media_preview_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<get_media_preview::v1::Request>,
|
||||
) -> Result<get_media_preview::v1::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let url = &body.url;
|
||||
let url = Url::parse(&body.url).map_err(|e| {
|
||||
|
||||
@@ -56,7 +56,7 @@ pub(crate) async fn get_media_preview_legacy_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<get_media_preview::v3::Request>,
|
||||
) -> Result<get_media_preview::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let url = &body.url;
|
||||
let url = Url::parse(&body.url).map_err(|e| {
|
||||
|
||||
@@ -15,7 +15,7 @@ pub(crate) async fn ban_user_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<ban_user::v3::Request>,
|
||||
) -> Result<ban_user::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if sender_user == body.user_id {
|
||||
return Err!(Request(Forbidden("You cannot ban yourself.")));
|
||||
|
||||
@@ -18,7 +18,7 @@ pub(crate) async fn forget_room_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<forget_room::v3::Request>,
|
||||
) -> Result<forget_room::v3::Response> {
|
||||
let user_id = body.sender_user();
|
||||
let user_id = body.identity.sender_user();
|
||||
let room_id = &body.room_id;
|
||||
|
||||
let joined = services.rooms.state_cache.is_joined(user_id, room_id);
|
||||
|
||||
@@ -29,7 +29,7 @@ pub(crate) async fn invite_user_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<invite_user::v3::Request>,
|
||||
) -> Result<invite_user::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ pub(crate) async fn join_room_by_id_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<join_room_by_id::v3::Request>,
|
||||
) -> Result<join_room_by_id::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
@@ -97,7 +97,7 @@ pub(crate) async fn join_room_by_id_or_alias_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<join_room_by_id_or_alias::v3::Request>,
|
||||
) -> Result<join_room_by_id_or_alias::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let body = &body.body;
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
|
||||
@@ -15,7 +15,7 @@ pub(crate) async fn kick_user_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<kick_user::v3::Request>,
|
||||
) -> Result<kick_user::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ pub(crate) async fn knock_room_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<knock_room::v3::Request>,
|
||||
) -> Result<knock_room::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let body = &body.body;
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
|
||||
@@ -32,7 +32,7 @@ pub(crate) async fn leave_room_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<leave_room::v3::Request>,
|
||||
) -> Result<leave_room::v3::Response> {
|
||||
leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone())
|
||||
leave_room(&services, body.identity.sender_user(), &body.room_id, body.reason.clone())
|
||||
.boxed()
|
||||
.await
|
||||
.map(|()| leave_room::v3::Response::new())
|
||||
|
||||
@@ -30,7 +30,7 @@ pub(crate) async fn get_member_events_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_member_events::v3::Request>,
|
||||
) -> Result<get_member_events::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let membership = body.membership.as_ref();
|
||||
let not_membership = body.not_membership.as_ref();
|
||||
|
||||
@@ -72,7 +72,7 @@ pub(crate) async fn joined_members_route(
|
||||
if !services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_see_state_events(body.sender_user(), &body.room_id)
|
||||
.user_can_see_state_events(body.identity.sender_user(), &body.room_id)
|
||||
.await
|
||||
{
|
||||
return Err!(Request(Forbidden("You don't have permission to view this room.")));
|
||||
|
||||
@@ -40,7 +40,7 @@ pub(crate) async fn joined_rooms_route(
|
||||
let joined_rooms = services
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_joined(body.sender_user())
|
||||
.rooms_joined(body.identity.sender_user())
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ pub(crate) async fn unban_user_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<unban_user::v3::Request>,
|
||||
) -> Result<unban_user::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use conduwuit_service::{
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, pin_mut};
|
||||
use ruma::{
|
||||
DeviceId, RoomId, UserId,
|
||||
RoomId, UserId,
|
||||
api::{
|
||||
Direction,
|
||||
client::{filter::RoomEventFilter, message::get_message_events},
|
||||
@@ -37,7 +37,6 @@ use ruma::{
|
||||
serde::Raw,
|
||||
};
|
||||
use ruminuwuity::invite_permission_config::FilterLevel;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
@@ -76,8 +75,8 @@ pub(crate) async fn get_message_events_route(
|
||||
ClientIp(client_ip): ClientIp,
|
||||
body: Ruma<get_message_events::v3::Request>,
|
||||
) -> Result<get_message_events::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_device = body.sender_device.as_deref();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.sender_device();
|
||||
let room_id = &body.room_id;
|
||||
let filter = &body.filter;
|
||||
|
||||
@@ -158,17 +157,7 @@ pub(crate) async fn get_message_events_route(
|
||||
|
||||
let lazy_loading_context = lazy_loading::Context {
|
||||
user_id: sender_user,
|
||||
device_id: sender_device.or_else(|| {
|
||||
if let Some(registration) = body.appservice_info.as_ref() {
|
||||
Some(<&DeviceId>::from(registration.registration.id.as_str()))
|
||||
} else {
|
||||
warn!(
|
||||
"No device_id provided and no appservice registration found, this should be \
|
||||
unreachable"
|
||||
);
|
||||
None
|
||||
}
|
||||
}),
|
||||
device_id: sender_device,
|
||||
room_id,
|
||||
token: Some(from.into_unsigned()),
|
||||
options: Some(&filter.lazy_load_options),
|
||||
|
||||
@@ -15,7 +15,7 @@ pub(crate) async fn get_mutual_rooms_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<mutual_rooms::unstable::Request>,
|
||||
) -> Result<mutual_rooms::unstable::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if sender_user == body.user_id {
|
||||
return Err!(Request(Unknown("You cannot request rooms in common with yourself.")));
|
||||
|
||||
@@ -16,7 +16,7 @@ pub(crate) async fn create_openid_token_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<account::request_openid_token::v3::Request>,
|
||||
) -> Result<account::request_openid_token::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if sender_user != body.user_id {
|
||||
return Err!(Request(InvalidParam(
|
||||
|
||||
@@ -20,13 +20,19 @@ pub(crate) async fn set_presence_route(
|
||||
return Err!(Request(Forbidden("Presence is disabled on this server")));
|
||||
}
|
||||
|
||||
if body.sender_user() != body.user_id && body.appservice_info.is_none() {
|
||||
if body.identity.sender_user() != body.user_id && !body.identity.is_appservice() {
|
||||
return Err!(Request(InvalidParam("Not allowed to set presence of other users")));
|
||||
}
|
||||
|
||||
services
|
||||
.presence
|
||||
.set_presence(body.sender_user(), &body.presence, None, None, body.status_msg.clone())
|
||||
.set_presence(
|
||||
body.identity.sender_user(),
|
||||
&body.presence,
|
||||
None,
|
||||
None,
|
||||
body.status_msg.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(set_presence::v3::Response::new())
|
||||
@@ -49,7 +55,7 @@ pub(crate) async fn get_presence_route(
|
||||
let has_shared_rooms = services
|
||||
.rooms
|
||||
.state_cache
|
||||
.user_sees_user(body.sender_user(), &body.user_id)
|
||||
.user_sees_user(body.identity.sender_user(), &body.user_id)
|
||||
.await;
|
||||
|
||||
if has_shared_rooms {
|
||||
|
||||
@@ -51,9 +51,12 @@ pub(crate) async fn set_profile_field_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_profile_field::v3::Request>,
|
||||
) -> Result<set_profile_field::v3::Response> {
|
||||
if body.user_id != body.sender_user()
|
||||
&& !(body.appservice_info.is_some()
|
||||
|| services.admin.user_is_admin(body.sender_user()).await)
|
||||
if body.user_id != body.identity.sender_user()
|
||||
&& !(body.identity.is_appservice()
|
||||
|| services
|
||||
.admin
|
||||
.user_is_admin(body.identity.sender_user())
|
||||
.await)
|
||||
{
|
||||
return Err!(Request(Forbidden("You may not change other users' profile data.")));
|
||||
}
|
||||
@@ -72,9 +75,12 @@ pub(crate) async fn delete_profile_field_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<delete_profile_field::v3::Request>,
|
||||
) -> Result<delete_profile_field::v3::Response> {
|
||||
if body.user_id != body.sender_user()
|
||||
&& !(body.appservice_info.is_some()
|
||||
|| services.admin.user_is_admin(body.sender_user()).await)
|
||||
if body.user_id != body.identity.sender_user()
|
||||
&& !(body.identity.is_appservice()
|
||||
|| services
|
||||
.admin
|
||||
.user_is_admin(body.identity.sender_user())
|
||||
.await)
|
||||
{
|
||||
return Err!(Request(Forbidden("You may not change other users' profile data.")));
|
||||
}
|
||||
|
||||
+13
-12
@@ -30,7 +30,7 @@ pub(crate) async fn get_pushrules_all_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_pushrules_all::v3::Request>,
|
||||
) -> Result<get_pushrules_all::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let Some(content_value) = services
|
||||
.account_data
|
||||
@@ -101,7 +101,7 @@ pub(crate) async fn get_pushrules_global_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_pushrules_global_scope::v3::Request>,
|
||||
) -> Result<get_pushrules_global_scope::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let Some(content_value) = services
|
||||
.account_data
|
||||
@@ -189,7 +189,7 @@ pub(crate) async fn get_pushrule_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_pushrule::v3::Request>,
|
||||
) -> Result<get_pushrule::v3::Response> {
|
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
// remove old deprecated mentions push rules as per MSC4210
|
||||
#[allow(deprecated)]
|
||||
@@ -226,7 +226,7 @@ pub(crate) async fn set_pushrule_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_pushrule::v3::Request>,
|
||||
) -> Result<set_pushrule::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let body = &body.body;
|
||||
let mut account_data: PushRulesEvent = services
|
||||
.account_data
|
||||
@@ -282,7 +282,7 @@ pub(crate) async fn get_pushrule_actions_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_pushrule_actions::v3::Request>,
|
||||
) -> Result<get_pushrule_actions::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
// remove old deprecated mentions push rules as per MSC4210
|
||||
#[allow(deprecated)]
|
||||
@@ -316,7 +316,7 @@ pub(crate) async fn set_pushrule_actions_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_pushrule_actions::v3::Request>,
|
||||
) -> Result<set_pushrule_actions::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let mut account_data: PushRulesEvent = services
|
||||
.account_data
|
||||
@@ -349,7 +349,7 @@ pub(crate) async fn get_pushrule_enabled_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_pushrule_enabled::v3::Request>,
|
||||
) -> Result<get_pushrule_enabled::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
// remove old deprecated mentions push rules as per MSC4210
|
||||
#[allow(deprecated)]
|
||||
@@ -383,7 +383,7 @@ pub(crate) async fn set_pushrule_enabled_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_pushrule_enabled::v3::Request>,
|
||||
) -> Result<set_pushrule_enabled::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let mut account_data: PushRulesEvent = services
|
||||
.account_data
|
||||
@@ -416,7 +416,7 @@ pub(crate) async fn delete_pushrule_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<delete_pushrule::v3::Request>,
|
||||
) -> Result<delete_pushrule::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let mut account_data: PushRulesEvent = services
|
||||
.account_data
|
||||
@@ -458,7 +458,7 @@ pub(crate) async fn get_pushers_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_pushers::v3::Request>,
|
||||
) -> Result<get_pushers::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
Ok(get_pushers::v3::Response::new(services.pusher.get_pushers(sender_user).await))
|
||||
}
|
||||
@@ -472,11 +472,12 @@ pub(crate) async fn set_pushers_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_pusher::v3::Request>,
|
||||
) -> Result<set_pusher::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.expect_sender_device()?;
|
||||
|
||||
services
|
||||
.pusher
|
||||
.set_pusher(sender_user, body.sender_device(), &body.action)
|
||||
.set_pusher(sender_user, sender_device, &body.action)
|
||||
.await?;
|
||||
|
||||
Ok(set_pusher::v3::Response::new())
|
||||
|
||||
@@ -26,7 +26,7 @@ pub(crate) async fn set_read_marker_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<set_read_marker::v3::Request>,
|
||||
) -> Result<set_read_marker::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if let Some(event) = &body.fully_read {
|
||||
let fully_read_event = FullyReadEvent::new(FullyReadEventContent::new(event.to_owned()));
|
||||
@@ -118,10 +118,10 @@ pub(crate) async fn create_receipt_route(
|
||||
ClientIp(client_ip): ClientIp,
|
||||
body: Ruma<create_receipt::v3::Request>,
|
||||
) -> Result<create_receipt::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
services
|
||||
.users
|
||||
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
|
||||
.update_device_last_seen(sender_user, body.identity.sender_device(), client_ip)
|
||||
.await;
|
||||
|
||||
if matches!(
|
||||
|
||||
@@ -17,10 +17,10 @@ pub(crate) async fn redact_event_route(
|
||||
ClientIp(client_ip): ClientIp,
|
||||
body: Ruma<redact_event::v3::Request>,
|
||||
) -> Result<redact_event::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
services
|
||||
.users
|
||||
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
|
||||
.update_device_last_seen(sender_user, body.identity.sender_device(), client_ip)
|
||||
.await;
|
||||
let body = &body.body;
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
|
||||
@@ -28,7 +28,7 @@ pub(crate) async fn get_relating_events_with_rel_type_and_event_type_route(
|
||||
) -> Result<get_relating_events_with_rel_type_and_event_type::v1::Response> {
|
||||
paginate_relations_with_filter(
|
||||
&services,
|
||||
body.sender_user(),
|
||||
body.identity.sender_user(),
|
||||
&body.room_id,
|
||||
&body.event_id,
|
||||
body.event_type.clone().into(),
|
||||
@@ -56,7 +56,7 @@ pub(crate) async fn get_relating_events_with_rel_type_route(
|
||||
) -> Result<get_relating_events_with_rel_type::v1::Response> {
|
||||
paginate_relations_with_filter(
|
||||
&services,
|
||||
body.sender_user(),
|
||||
body.identity.sender_user(),
|
||||
&body.room_id,
|
||||
&body.event_id,
|
||||
None,
|
||||
@@ -84,7 +84,7 @@ pub(crate) async fn get_relating_events_route(
|
||||
) -> Result<get_relating_events::v1::Response> {
|
||||
paginate_relations_with_filter(
|
||||
&services,
|
||||
body.sender_user(),
|
||||
body.identity.sender_user(),
|
||||
&body.room_id,
|
||||
&body.event_id,
|
||||
None,
|
||||
|
||||
@@ -36,7 +36,7 @@ pub(crate) async fn report_room_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<report_room::v3::Request>,
|
||||
) -> Result<report_room::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
@@ -92,7 +92,7 @@ pub(crate) async fn report_event_route(
|
||||
body: Ruma<report_content::v3::Request>,
|
||||
) -> Result<report_content::v3::Response> {
|
||||
// user authentication
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
@@ -135,8 +135,8 @@ pub(crate) async fn report_user_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<report_user::v3::Request>,
|
||||
) -> Result<report_user::v3::Response> {
|
||||
// user authentication
|
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ pub(crate) async fn get_room_aliases_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<aliases::v3::Request>,
|
||||
) -> Result<aliases::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if !services
|
||||
.rooms
|
||||
|
||||
@@ -61,10 +61,10 @@ pub(crate) async fn create_room_route(
|
||||
) -> Result<create_room::v3::Response> {
|
||||
use create_room::v3::RoomPreset;
|
||||
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if !services.globals.allow_room_creation()
|
||||
&& body.appservice_info.is_none()
|
||||
&& !body.identity.is_appservice()
|
||||
&& !services.users.is_admin(sender_user).await
|
||||
{
|
||||
return Err!(Request(Forbidden("Room creation has been disabled.",)));
|
||||
@@ -130,7 +130,7 @@ pub(crate) async fn create_room_route(
|
||||
if body.visibility == room::Visibility::Public
|
||||
&& services.server.config.lockdown_public_room_directory
|
||||
&& !services.users.is_admin(sender_user).await
|
||||
&& body.appservice_info.is_none()
|
||||
&& !body.identity.is_appservice()
|
||||
{
|
||||
warn!(
|
||||
"Non-admin user {sender_user} tried to publish {room_id:?} to the room directory \
|
||||
@@ -186,7 +186,7 @@ pub(crate) async fn create_room_route(
|
||||
|
||||
let alias: Option<OwnedRoomAliasId> = match body.room_alias_name.as_ref() {
|
||||
| Some(alias) =>
|
||||
Some(room_alias_check(&services, alias, body.appservice_info.as_ref()).await?),
|
||||
Some(room_alias_check(&services, alias, body.identity.appservice_info()).await?),
|
||||
| _ => None,
|
||||
};
|
||||
|
||||
|
||||
@@ -24,25 +24,25 @@ pub(crate) async fn get_room_event_route(
|
||||
let visible = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_see_event(body.sender_user(), room_id, event_id)
|
||||
.user_can_see_event(body.identity.sender_user(), room_id, event_id)
|
||||
.map(Ok);
|
||||
|
||||
let (mut event, visible) = try_join(event, visible).await?;
|
||||
|
||||
if !visible || is_ignored_pdu(services, &event, body.sender_user()).await? {
|
||||
if !visible || is_ignored_pdu(services, &event, body.identity.sender_user()).await? {
|
||||
return Err!(Request(Forbidden("You don't have permission to view this event.")));
|
||||
}
|
||||
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut event)
|
||||
.add_bundled_aggregations_to_pdu(body.identity.sender_user(), &mut event)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations to event: {e}");
|
||||
}
|
||||
|
||||
event.set_unsigned(body.sender_user.as_deref());
|
||||
event.set_unsigned(Some(body.identity.sender_user()));
|
||||
|
||||
Ok(get_room_event::v3::Response::new(event.into_format()))
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ pub(crate) async fn room_initial_sync_route(
|
||||
if !services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_see_state_events(body.sender_user(), room_id)
|
||||
.user_can_see_state_events(body.identity.sender_user(), room_id)
|
||||
.await
|
||||
{
|
||||
return Err!(Request(Forbidden("No room preview available.")));
|
||||
@@ -31,7 +31,7 @@ pub(crate) async fn room_initial_sync_route(
|
||||
let membership = services
|
||||
.rooms
|
||||
.state_cache
|
||||
.user_membership(body.sender_user(), room_id)
|
||||
.user_membership(body.identity.sender_user(), room_id)
|
||||
.map(Ok);
|
||||
|
||||
let visibility = services.rooms.directory.visibility(room_id).map(Ok);
|
||||
@@ -52,16 +52,14 @@ pub(crate) async fn room_initial_sync_route(
|
||||
.pdus_rev(room_id, None)
|
||||
.try_take(limit)
|
||||
.and_then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(body.sender_user.as_deref());
|
||||
if let Some(sender_user) = body.sender_user.as_deref() {
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
pdu.1.set_unsigned(Some(body.identity.sender_user()));
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(body.identity.sender_user(), &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
Ok(pdu)
|
||||
})
|
||||
|
||||
@@ -4,7 +4,7 @@ use conduwuit::{Err, Result};
|
||||
use ruma::api::client::room::get_summary;
|
||||
use service::rooms::summary::Accessibility;
|
||||
|
||||
use crate::Ruma;
|
||||
use crate::{Ruma, router::ClientIdentity};
|
||||
|
||||
/// # `GET /_matrix/client/v1/room_summary/{roomIdOrAlias}`
|
||||
///
|
||||
@@ -28,7 +28,11 @@ pub(crate) async fn get_room_summary(
|
||||
let summary = services
|
||||
.rooms
|
||||
.summary
|
||||
.get_room_summary_for_user(body.sender_user.as_deref(), &room_id, &servers)
|
||||
.get_room_summary_for_user(
|
||||
body.identity.as_ref().map(ClientIdentity::sender_user),
|
||||
&room_id,
|
||||
&servers,
|
||||
)
|
||||
.await?;
|
||||
|
||||
match summary {
|
||||
|
||||
@@ -277,7 +277,7 @@ pub(crate) async fn upgrade_room_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<upgrade_room::v3::Request>,
|
||||
) -> Result<upgrade_room::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let (supported, forbid_unstable, is_unstable) = (
|
||||
services.server.supported_room_version(&body.new_version),
|
||||
|
||||
@@ -43,7 +43,7 @@ pub(crate) async fn search_events_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<Request>,
|
||||
) -> Result<Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let next_batch = body.next_batch.as_deref();
|
||||
|
||||
let mut result_categories = ResultCategories::new();
|
||||
|
||||
@@ -22,16 +22,16 @@ pub(crate) async fn send_message_event_route(
|
||||
ClientIp(client_ip): ClientIp,
|
||||
body: Ruma<send_message_event::v3::Request>,
|
||||
) -> Result<send_message_event::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_device = body.sender_device.as_deref();
|
||||
let appservice_info = body.appservice_info.as_ref();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.sender_device();
|
||||
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
|
||||
services
|
||||
.users
|
||||
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
|
||||
.update_device_last_seen(sender_user, sender_device, client_ip)
|
||||
.await;
|
||||
|
||||
// Forbid m.room.encrypted if encryption is disabled
|
||||
@@ -83,7 +83,11 @@ pub(crate) async fn send_message_event_route(
|
||||
event_type: body.event_type.clone().into(),
|
||||
content,
|
||||
unsigned: Some(unsigned),
|
||||
timestamp: appservice_info.and(body.timestamp),
|
||||
timestamp: if body.identity.is_appservice() {
|
||||
body.timestamp
|
||||
} else {
|
||||
None
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
sender_user,
|
||||
|
||||
@@ -146,7 +146,7 @@ pub(crate) async fn login_route(
|
||||
}) => {
|
||||
debug!("Got appservice login type");
|
||||
|
||||
let Some(ref info) = body.appservice_info else {
|
||||
let Some(ref info) = body.identity else {
|
||||
return Err!(Request(MissingToken("Missing appservice token.")));
|
||||
};
|
||||
|
||||
@@ -254,7 +254,7 @@ pub(crate) async fn login_token_route(
|
||||
return Err!(Request(Forbidden("Login via an existing session is not enabled")));
|
||||
}
|
||||
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
// Prompt the user to confirm with their password using UIAA
|
||||
let _ = services
|
||||
@@ -286,7 +286,9 @@ pub(crate) async fn logout_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<logout::v3::Request>,
|
||||
) -> Result<logout::v3::Response> {
|
||||
let (sender_user, sender_device) = body.sender();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.expect_sender_device()?;
|
||||
|
||||
services
|
||||
.users
|
||||
.remove_device(sender_user, sender_device)
|
||||
@@ -332,7 +334,7 @@ pub(crate) async fn logout_all_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<logout_all::v3::Request>,
|
||||
) -> Result<logout_all::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
services
|
||||
.users
|
||||
.all_device_ids(sender_user)
|
||||
|
||||
@@ -27,7 +27,7 @@ pub(crate) async fn get_hierarchy_route(
|
||||
.rooms
|
||||
.summary
|
||||
.get_room_hierarchy_for_user(
|
||||
body.sender_user(),
|
||||
body.identity.sender_user(),
|
||||
body.room_id.clone(),
|
||||
max_depth,
|
||||
body.suggested_only,
|
||||
|
||||
@@ -38,10 +38,10 @@ pub(crate) async fn send_state_event_for_key_route(
|
||||
ClientIp(ip): ClientIp,
|
||||
body: Ruma<send_state_event::v3::Request>,
|
||||
) -> Result<send_state_event::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
services
|
||||
.users
|
||||
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
|
||||
.update_device_last_seen(sender_user, body.identity.sender_device(), ip)
|
||||
.await;
|
||||
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
@@ -55,7 +55,7 @@ pub(crate) async fn send_state_event_for_key_route(
|
||||
&body.event_type,
|
||||
&body.body.body,
|
||||
&body.state_key,
|
||||
if body.appservice_info.is_some() {
|
||||
if body.identity.is_appservice() {
|
||||
body.timestamp
|
||||
} else {
|
||||
None
|
||||
@@ -91,7 +91,7 @@ pub(crate) async fn get_state_events_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_state_events::v3::Request>,
|
||||
) -> Result<get_state_events::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if !services
|
||||
.rooms
|
||||
@@ -125,7 +125,7 @@ pub(crate) async fn get_state_event_for_key_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_state_event_for_key::v3::Request>,
|
||||
) -> Result<get_state_event_for_key::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
if !services
|
||||
.rooms
|
||||
|
||||
@@ -181,7 +181,8 @@ pub(crate) async fn sync_events_route(
|
||||
ClientIp(client_ip): ClientIp,
|
||||
body: Ruma<sync_events::v3::Request>,
|
||||
) -> Result<sync_events::v3::Response> {
|
||||
let (sender_user, sender_device) = body.sender();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.expect_sender_device()?;
|
||||
|
||||
// Presence update
|
||||
if services.config.allow_local_presence {
|
||||
@@ -225,7 +226,8 @@ pub(crate) async fn build_sync_events(
|
||||
services: &Services,
|
||||
body: &Ruma<sync_events::v3::Request>,
|
||||
) -> Result<sync_events::v3::Response> {
|
||||
let (syncing_user, syncing_device) = body.sender();
|
||||
let syncing_user = body.identity.sender_user();
|
||||
let syncing_device = body.identity.sender_device().expect("should have a device");
|
||||
|
||||
let current_count = services.globals.current_count()?;
|
||||
|
||||
|
||||
@@ -70,8 +70,8 @@ pub(crate) async fn sync_events_v5_route(
|
||||
body: Ruma<sync_events::v5::Request>,
|
||||
) -> Result<sync_events::v5::Response> {
|
||||
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
|
||||
let ref sender_user = body.sender_user().to_owned();
|
||||
let ref sender_device = body.sender_device().to_owned();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.expect_sender_device()?;
|
||||
|
||||
services
|
||||
.users
|
||||
@@ -93,7 +93,7 @@ pub(crate) async fn sync_events_v5_route(
|
||||
.and_then(|string| string.parse().ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
let snake_key = into_snake_key(sender_user.as_ref(), sender_device.as_str(), conn_id);
|
||||
let snake_key = into_snake_key(sender_user, sender_device.as_str(), conn_id);
|
||||
|
||||
if globalsince != 0 && !services.sync.snake_connection_cached(&snake_key) {
|
||||
return Err!(Request(UnknownPos(
|
||||
|
||||
@@ -21,7 +21,7 @@ pub(crate) async fn update_tag_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<create_tag::v3::Request>,
|
||||
) -> Result<create_tag::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let mut tags_event = services
|
||||
.account_data
|
||||
@@ -56,7 +56,7 @@ pub(crate) async fn delete_tag_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<delete_tag::v3::Request>,
|
||||
) -> Result<delete_tag::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let mut tags_event = services
|
||||
.account_data
|
||||
@@ -88,7 +88,7 @@ pub(crate) async fn get_tags_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<get_tags::v3::Request>,
|
||||
) -> Result<get_tags::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
|
||||
let tags_event = services
|
||||
.account_data
|
||||
|
||||
@@ -34,14 +34,14 @@ pub(crate) async fn get_threads_route(
|
||||
let threads: Vec<(PduCount, PduEvent)> = services
|
||||
.rooms
|
||||
.threads
|
||||
.threads_until(body.sender_user(), &body.room_id, from, &body.include)
|
||||
.threads_until(body.identity.sender_user(), &body.room_id, from, &body.include)
|
||||
.await?
|
||||
.take(limit)
|
||||
.filter_map(|(count, pdu)| async move {
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id)
|
||||
.user_can_see_event(body.identity.sender_user(), &body.room_id, &pdu.event_id)
|
||||
.await
|
||||
.then_some((count, pdu))
|
||||
})
|
||||
@@ -49,7 +49,7 @@ pub(crate) async fn get_threads_route(
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu)
|
||||
.add_bundled_aggregations_to_pdu(body.identity.sender_user(), &mut pdu)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations to thread: {e}");
|
||||
|
||||
@@ -22,8 +22,8 @@ pub(crate) async fn send_event_to_device_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<send_event_to_device::v3::Request>,
|
||||
) -> Result<send_event_to_device::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_device = body.sender_device.as_deref();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let sender_device = body.identity.sender_device();
|
||||
|
||||
// Check if this is a new transaction id
|
||||
if services
|
||||
|
||||
@@ -14,13 +14,13 @@ pub(crate) async fn create_typing_event_route(
|
||||
body: Ruma<create_typing_event::v3::Request>,
|
||||
) -> Result<create_typing_event::v3::Response> {
|
||||
use create_typing_event::v3::Typing;
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
services
|
||||
.users
|
||||
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
|
||||
.update_device_last_seen(sender_user, body.identity.sender_device(), ip)
|
||||
.await;
|
||||
|
||||
if sender_user != body.user_id && body.appservice_info.is_none() {
|
||||
if sender_user != body.user_id && !body.identity.is_appservice() {
|
||||
return Err!(Request(Forbidden("You cannot update typing status of other users.")));
|
||||
}
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ pub(crate) async fn search_users_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<search_users::v3::Request>,
|
||||
) -> Result<search_users::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let sender_user = body.identity.sender_user();
|
||||
let limit = usize::try_from(body.limit)
|
||||
.map_or(LIMIT_DEFAULT, usize::from)
|
||||
.min(LIMIT_MAX);
|
||||
|
||||
+3
-13
@@ -2,15 +2,13 @@ use std::time::{Duration, SystemTime};
|
||||
|
||||
use axum::extract::State;
|
||||
use base64::{Engine as _, engine::general_purpose};
|
||||
use conduwuit::{Err, Result, utils};
|
||||
use conduwuit::{Err, Result};
|
||||
use hmac::{Hmac, KeyInit, Mac};
|
||||
use ruma::{SecondsSinceUnixEpoch, UserId, api::client::voip::get_turn_server_info};
|
||||
use ruma::{SecondsSinceUnixEpoch, api::client::voip::get_turn_server_info};
|
||||
use sha1::Sha1;
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
const RANDOM_USER_ID_LENGTH: usize = 10;
|
||||
|
||||
type HmacSha1 = Hmac<Sha1>;
|
||||
|
||||
/// # `GET /_matrix/client/r0/voip/turnServer`
|
||||
@@ -35,15 +33,7 @@ pub(crate) async fn turn_server_route(
|
||||
)
|
||||
.expect("time is valid");
|
||||
|
||||
let user = body.sender_user.unwrap_or_else(|| {
|
||||
UserId::parse_with_server_name(
|
||||
utils::random_string(RANDOM_USER_ID_LENGTH).to_lowercase(),
|
||||
&services.server.name,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let username: String = format!("{}:{}", expiry.get(), user);
|
||||
let username: String = format!("{}:{}", expiry.get(), body.identity.sender_user());
|
||||
|
||||
let mut mac = HmacSha1::new_from_slice(turn_secret.as_bytes())
|
||||
.expect("HMAC can take key of any size");
|
||||
|
||||
+1
-1
@@ -15,7 +15,7 @@ pub(super) use conduwuit_service::state::State;
|
||||
use http::{Uri, uri};
|
||||
|
||||
use self::handler::RouterExt;
|
||||
pub(super) use self::{args::Args as Ruma, response::RumaResponse};
|
||||
pub(super) use self::{args::Args as Ruma, auth::ClientIdentity, response::RumaResponse};
|
||||
use crate::{admin, client, server};
|
||||
|
||||
pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
||||
|
||||
+13
-68
@@ -6,17 +6,14 @@ use axum::{
|
||||
extract::{FromRequest, Path, Query},
|
||||
};
|
||||
use conduwuit::{Error, Result, err};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, DeviceId, OwnedDeviceId, OwnedServerName, OwnedUserId, ServerName,
|
||||
UserId, api::IncomingRequest,
|
||||
};
|
||||
use ruma::{CanonicalJsonObject, api::IncomingRequest};
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::{State, router::auth::CheckAuth, service::appservice::RegistrationInfo};
|
||||
use crate::{State, router::auth::CheckAuth};
|
||||
|
||||
/// Query parameters needed to authenticate requests
|
||||
#[derive(Deserialize)]
|
||||
pub(super) struct AuthQueryParams {
|
||||
pub(crate) struct AuthQueryParams {
|
||||
pub(super) user_id: Option<String>,
|
||||
/// Device ID for appservice device masquerading (MSC3202/MSC4190).
|
||||
/// Can be provided as `device_id` or `org.matrix.msc3202.device_id`.
|
||||
@@ -25,67 +22,22 @@ pub(super) struct AuthQueryParams {
|
||||
}
|
||||
|
||||
/// Extractor for Ruma request structs
|
||||
pub(crate) struct Args<T> {
|
||||
pub(crate) struct Args<R: IncomingRequest<Authentication: CheckAuth> + Send + Sync + 'static> {
|
||||
/// Request struct body
|
||||
pub(crate) body: T,
|
||||
pub(crate) body: R,
|
||||
|
||||
/// Federation server authentication: X-Matrix origin
|
||||
/// None when not a federation server.
|
||||
pub(crate) origin: Option<OwnedServerName>,
|
||||
|
||||
/// Local user authentication: user_id.
|
||||
/// None when not an authenticated local user.
|
||||
pub(crate) sender_user: Option<OwnedUserId>,
|
||||
|
||||
/// Local user authentication: device_id.
|
||||
/// None when not an authenticated local user or no device.
|
||||
pub(crate) sender_device: Option<OwnedDeviceId>,
|
||||
|
||||
/// Appservice authentication; registration info.
|
||||
/// None when not an appservice.
|
||||
pub(crate) appservice_info: Option<RegistrationInfo>,
|
||||
|
||||
/// Parsed JSON content.
|
||||
/// None when body is not a valid string
|
||||
/// Parsed JSON body. None when body is not JSON.
|
||||
pub(crate) json_body: Option<CanonicalJsonObject>,
|
||||
|
||||
/// Identity of the requesting entity
|
||||
pub(crate) identity: <R::Authentication as CheckAuth>::Identity,
|
||||
}
|
||||
|
||||
impl<T> Args<T>
|
||||
impl<R> Deref for Args<R>
|
||||
where
|
||||
T: IncomingRequest + Send + Sync + 'static,
|
||||
R: IncomingRequest<Authentication: CheckAuth> + Send + Sync + 'static,
|
||||
{
|
||||
#[inline]
|
||||
pub(crate) fn sender(&self) -> (&UserId, &DeviceId) {
|
||||
(self.sender_user(), self.sender_device())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn sender_user(&self) -> &UserId {
|
||||
self.sender_user
|
||||
.as_deref()
|
||||
.expect("user must be authenticated for this handler")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn sender_device(&self) -> &DeviceId {
|
||||
self.sender_device
|
||||
.as_deref()
|
||||
.expect("user must be authenticated and device identified")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn origin(&self) -> &ServerName {
|
||||
self.origin
|
||||
.as_deref()
|
||||
.expect("server must be authenticated for this handler")
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for Args<T>
|
||||
where
|
||||
T: IncomingRequest + Send + Sync + 'static,
|
||||
{
|
||||
type Target = T;
|
||||
type Target = R;
|
||||
|
||||
fn deref(&self) -> &Self::Target { &self.body }
|
||||
}
|
||||
@@ -145,13 +97,6 @@ where
|
||||
let body = R::try_from_http_request(request, &path)
|
||||
.map_err(|e| err!(Request(BadJson(debug_warn!("{e}")))))?;
|
||||
|
||||
Ok(Self {
|
||||
body,
|
||||
origin: auth.origin,
|
||||
sender_user: auth.sender_user,
|
||||
sender_device: auth.sender_device,
|
||||
appservice_info: auth.appservice_info,
|
||||
json_body,
|
||||
})
|
||||
Ok(Self { body, json_body, identity: auth })
|
||||
}
|
||||
}
|
||||
|
||||
+136
-98
@@ -2,7 +2,7 @@ use std::any::{Any, TypeId};
|
||||
|
||||
use conduwuit::{Err, Result, err};
|
||||
use ruma::{
|
||||
OwnedDeviceId, OwnedServerName, OwnedUserId, UserId,
|
||||
DeviceId, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId,
|
||||
api::{
|
||||
IncomingRequest,
|
||||
auth_scheme::{
|
||||
@@ -20,20 +20,57 @@ use service::{
|
||||
|
||||
use crate::{router::args::AuthQueryParams, service::appservice::RegistrationInfo};
|
||||
|
||||
#[derive(Default)]
|
||||
pub(super) struct Auth {
|
||||
pub(super) origin: Option<OwnedServerName>,
|
||||
pub(super) sender_user: Option<OwnedUserId>,
|
||||
pub(super) sender_device: Option<OwnedDeviceId>,
|
||||
pub(super) appservice_info: Option<RegistrationInfo>,
|
||||
pub(crate) enum ClientIdentity {
|
||||
User {
|
||||
sender_user: OwnedUserId,
|
||||
sender_device: OwnedDeviceId,
|
||||
},
|
||||
Appservice {
|
||||
sender_user: OwnedUserId,
|
||||
sender_device: Option<OwnedDeviceId>,
|
||||
appservice_info: Box<RegistrationInfo>,
|
||||
},
|
||||
}
|
||||
|
||||
pub(super) trait CheckAuth: AuthScheme {
|
||||
impl ClientIdentity {
|
||||
pub(crate) fn sender_user(&self) -> &UserId {
|
||||
match self {
|
||||
| Self::User { sender_user, .. } | Self::Appservice { sender_user, .. } =>
|
||||
sender_user,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn sender_device(&self) -> Option<&DeviceId> {
|
||||
match self {
|
||||
| Self::User { sender_device, .. } => Some(sender_device),
|
||||
| Self::Appservice { sender_device, .. } => sender_device.as_deref(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn expect_sender_device(&self) -> Result<&DeviceId> {
|
||||
self.sender_device().ok_or_else(|| {
|
||||
err!(Request(Forbidden("Appservices must masquerade to use this endpoint.")))
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn appservice_info(&self) -> Option<&RegistrationInfo> {
|
||||
match self {
|
||||
| Self::User { .. } => None,
|
||||
| Self::Appservice { appservice_info, .. } => Some(appservice_info),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_appservice(&self) -> bool { matches!(self, Self::Appservice { .. }) }
|
||||
}
|
||||
|
||||
pub(crate) trait CheckAuth: AuthScheme {
|
||||
type Identity: Send;
|
||||
|
||||
fn authenticate<R: IncomingRequest + Any, B: AsRef<[u8]> + Sync>(
|
||||
services: &Services,
|
||||
incoming_request: &hyper::Request<B>,
|
||||
query: AuthQueryParams,
|
||||
) -> impl Future<Output = Result<Auth>> + Send {
|
||||
) -> impl Future<Output = Result<Self::Identity>> + Send {
|
||||
async move {
|
||||
let route = TypeId::of::<R>();
|
||||
|
||||
@@ -54,17 +91,19 @@ pub(super) trait CheckAuth: AuthScheme {
|
||||
request: &hyper::Request<B>,
|
||||
query: AuthQueryParams,
|
||||
route: TypeId,
|
||||
) -> impl Future<Output = Result<Auth>> + Send;
|
||||
) -> impl Future<Output = Result<Self::Identity>> + Send;
|
||||
}
|
||||
|
||||
impl CheckAuth for ServerSignatures {
|
||||
type Identity = OwnedServerName;
|
||||
|
||||
async fn verify<B: AsRef<[u8]> + Sync>(
|
||||
services: &Services,
|
||||
output: Self::Output,
|
||||
request: &hyper::Request<B>,
|
||||
_query: AuthQueryParams,
|
||||
_route: TypeId,
|
||||
) -> Result<Auth> {
|
||||
) -> Result<Self::Identity> {
|
||||
let destination = services.globals.server_name();
|
||||
if output
|
||||
.destination
|
||||
@@ -96,10 +135,7 @@ impl CheckAuth for ServerSignatures {
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(Auth {
|
||||
origin: Some(output.origin.clone()),
|
||||
..Default::default()
|
||||
})
|
||||
Ok(output.origin)
|
||||
},
|
||||
| Err(err) =>
|
||||
Err!(Request(Unauthorized(warn!("Failed to verify X-Matrix header: {err}")))),
|
||||
@@ -108,162 +144,164 @@ impl CheckAuth for ServerSignatures {
|
||||
}
|
||||
|
||||
impl CheckAuth for AccessToken {
|
||||
type Identity = ClientIdentity;
|
||||
|
||||
async fn verify<B: AsRef<[u8]> + Sync>(
|
||||
services: &Services,
|
||||
output: Self::Output,
|
||||
_request: &hyper::Request<B>,
|
||||
query: AuthQueryParams,
|
||||
route: TypeId,
|
||||
) -> Result<Auth> {
|
||||
// Check for appservice tokens first
|
||||
|
||||
let (sender_user, sender_device, appservice_info) = {
|
||||
if let Ok((sender_user, sender_device)) =
|
||||
services.users.find_from_token(&output).await
|
||||
) -> Result<Self::Identity> {
|
||||
if let Ok((sender_user, sender_device)) = services.users.find_from_token(&output).await {
|
||||
// Locked users can only use /logout and /logout/all
|
||||
if services
|
||||
.users
|
||||
.is_locked(&sender_user)
|
||||
.await
|
||||
.is_ok_and(std::convert::identity)
|
||||
{
|
||||
// Locked users can only use /logout and /logout/all
|
||||
if services
|
||||
.users
|
||||
.is_locked(&sender_user)
|
||||
.await
|
||||
.is_ok_and(std::convert::identity)
|
||||
if !(route == TypeId::of::<client::session::logout::v3::Request>()
|
||||
|| route == TypeId::of::<client::session::logout_all::v3::Request>())
|
||||
{
|
||||
if !(route == TypeId::of::<client::session::logout::v3::Request>()
|
||||
|| route == TypeId::of::<client::session::logout_all::v3::Request>())
|
||||
{
|
||||
return Err!(Request(Unauthorized("Your account is locked.")));
|
||||
}
|
||||
return Err!(Request(Unauthorized("Your account is locked.")));
|
||||
}
|
||||
}
|
||||
|
||||
(Some(sender_user), Some(sender_device), None)
|
||||
} else if let Ok(appservice_info) = services.appservice.find_from_token(&output).await
|
||||
{
|
||||
let Ok(sender_user) = query.user_id.clone().map_or_else(
|
||||
|| {
|
||||
UserId::parse_with_server_name(
|
||||
appservice_info.registration.sender_localpart.as_str(),
|
||||
services.globals.server_name(),
|
||||
)
|
||||
},
|
||||
UserId::parse,
|
||||
) else {
|
||||
return Err!(Request(InvalidUsername("Username is invalid.")));
|
||||
Ok(ClientIdentity::User { sender_user, sender_device })
|
||||
} else if let Ok(appservice_info) = services.appservice.find_from_token(&output).await {
|
||||
let Ok(sender_user) = query.user_id.clone().map_or_else(
|
||||
|| {
|
||||
UserId::parse_with_server_name(
|
||||
appservice_info.registration.sender_localpart.as_str(),
|
||||
services.globals.server_name(),
|
||||
)
|
||||
},
|
||||
UserId::parse,
|
||||
) else {
|
||||
return Err!(Request(InvalidUsername("Username is invalid.")));
|
||||
};
|
||||
|
||||
if !appservice_info.is_user_match(&sender_user) {
|
||||
return Err!(Request(Exclusive("User is not in namespace.")));
|
||||
}
|
||||
|
||||
// MSC3202/MSC4190: Handle device_id masquerading for appservices.
|
||||
// The device_id can be provided via `device_id` or
|
||||
// `org.matrix.msc3202.device_id` query parameter.
|
||||
let sender_device =
|
||||
if let Some(device_id) = query.device_id.as_deref().map(Into::into) {
|
||||
// Verify the device exists for this user
|
||||
if services
|
||||
.users
|
||||
.get_device_metadata(&sender_user, device_id)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Err!(Request(Forbidden(
|
||||
"Device does not exist for user or appservice cannot masquerade as \
|
||||
this device."
|
||||
)));
|
||||
}
|
||||
|
||||
Some(device_id.to_owned())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if !appservice_info.is_user_match(&sender_user) {
|
||||
return Err!(Request(Exclusive("User is not in namespace.")));
|
||||
}
|
||||
|
||||
// MSC3202/MSC4190: Handle device_id masquerading for appservices.
|
||||
// The device_id can be provided via `device_id` or
|
||||
// `org.matrix.msc3202.device_id` query parameter.
|
||||
let sender_device =
|
||||
if let Some(device_id) = query.device_id.as_deref().map(Into::into) {
|
||||
// Verify the device exists for this user
|
||||
if services
|
||||
.users
|
||||
.get_device_metadata(&sender_user, device_id)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Err!(Request(Forbidden(
|
||||
"Device does not exist for user or appservice cannot masquerade \
|
||||
as this device."
|
||||
)));
|
||||
}
|
||||
|
||||
Some(device_id.to_owned())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
(Some(sender_user), sender_device, Some(appservice_info))
|
||||
} else {
|
||||
return Err!(Request(Unauthorized("Invalid access token.")));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Auth {
|
||||
sender_user,
|
||||
sender_device,
|
||||
appservice_info,
|
||||
..Default::default()
|
||||
})
|
||||
Ok(ClientIdentity::Appservice {
|
||||
sender_user,
|
||||
sender_device,
|
||||
appservice_info: Box::new(appservice_info),
|
||||
})
|
||||
} else {
|
||||
Err!(Request(Unauthorized("Invalid access token.")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckAuth for AccessTokenOptional {
|
||||
type Identity = Option<ClientIdentity>;
|
||||
|
||||
async fn verify<B: AsRef<[u8]> + Sync>(
|
||||
services: &Services,
|
||||
output: Self::Output,
|
||||
request: &hyper::Request<B>,
|
||||
query: AuthQueryParams,
|
||||
route: TypeId,
|
||||
) -> Result<Auth> {
|
||||
) -> Result<Self::Identity> {
|
||||
match output {
|
||||
| Some(token) =>
|
||||
<AccessToken as CheckAuth>::verify(services, token, request, query, route).await,
|
||||
| None => Ok(Auth::default()),
|
||||
<AccessToken as CheckAuth>::verify(services, token, request, query, route)
|
||||
.await
|
||||
.map(Some),
|
||||
| None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckAuth for AppserviceToken {
|
||||
type Identity = RegistrationInfo;
|
||||
|
||||
async fn verify<B: AsRef<[u8]> + Sync>(
|
||||
services: &Services,
|
||||
output: Self::Output,
|
||||
_request: &hyper::Request<B>,
|
||||
_query: AuthQueryParams,
|
||||
_route: TypeId,
|
||||
) -> Result<Auth> {
|
||||
) -> Result<Self::Identity> {
|
||||
let Ok(appservice_info) = services.appservice.find_from_token(&output).await else {
|
||||
return Err!(Request(Unauthorized("Invalid appservice token.")));
|
||||
};
|
||||
|
||||
Ok(Auth {
|
||||
appservice_info: Some(appservice_info),
|
||||
..Default::default()
|
||||
})
|
||||
Ok(appservice_info)
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckAuth for AppserviceTokenOptional {
|
||||
type Identity = Option<RegistrationInfo>;
|
||||
|
||||
async fn verify<B: AsRef<[u8]> + Sync>(
|
||||
services: &Services,
|
||||
output: Self::Output,
|
||||
request: &hyper::Request<B>,
|
||||
query: AuthQueryParams,
|
||||
route: TypeId,
|
||||
) -> Result<Auth> {
|
||||
) -> Result<Self::Identity> {
|
||||
match output {
|
||||
| Some(token) =>
|
||||
<AppserviceToken as CheckAuth>::verify(services, token, request, query, route)
|
||||
.await,
|
||||
| None => Ok(Auth::default()),
|
||||
.await
|
||||
.map(Some),
|
||||
| None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckAuth for NoAuthentication {
|
||||
type Identity = ();
|
||||
|
||||
async fn verify<B: AsRef<[u8]> + Sync>(
|
||||
_services: &Services,
|
||||
_output: Self::Output,
|
||||
_request: &hyper::Request<B>,
|
||||
_query: AuthQueryParams,
|
||||
_route: TypeId,
|
||||
) -> Result<Auth> {
|
||||
Ok(Auth::default())
|
||||
) -> Result<Self::Identity> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckAuth for NoAccessToken {
|
||||
type Identity = Option<ClientIdentity>;
|
||||
|
||||
async fn verify<B: AsRef<[u8]> + Sync>(
|
||||
services: &Services,
|
||||
_output: Self::Output,
|
||||
request: &hyper::Request<B>,
|
||||
query: AuthQueryParams,
|
||||
route: TypeId,
|
||||
) -> Result<Auth> {
|
||||
) -> Result<Self::Identity> {
|
||||
// We handle these the same as AccessTokenOptional
|
||||
let token = AccessTokenOptional::extract_authentication(request).map_err(|err| {
|
||||
err!(Request(Unauthorized(warn!("Failed to extract authorization: {}", err))))
|
||||
|
||||
@@ -28,7 +28,7 @@ pub(crate) async fn get_backfill_route(
|
||||
) -> Result<get_backfill::v1::Response> {
|
||||
AccessCheck {
|
||||
services: &services,
|
||||
origin: body.origin(),
|
||||
origin: &body.identity,
|
||||
room_id: &body.room_id,
|
||||
event_id: None,
|
||||
}
|
||||
@@ -41,7 +41,7 @@ pub(crate) async fn get_backfill_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve backfill for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
@@ -76,7 +76,7 @@ pub(crate) async fn get_backfill_route(
|
||||
Ok(services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.server_can_see_event(body.origin(), &pdu.room_id_or_hash(), &pdu.event_id)
|
||||
.server_can_see_event(&body.identity, &pdu.room_id_or_hash(), &pdu.event_id)
|
||||
.await
|
||||
.then_some(pdu))
|
||||
})
|
||||
|
||||
@@ -40,7 +40,7 @@ pub(crate) async fn get_event_route(
|
||||
|
||||
AccessCheck {
|
||||
services: &services,
|
||||
origin: body.origin(),
|
||||
origin: &body.identity,
|
||||
room_id,
|
||||
event_id: Some(&body.event_id),
|
||||
}
|
||||
@@ -54,7 +54,7 @@ pub(crate) async fn get_event_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve state for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
|
||||
@@ -19,7 +19,7 @@ pub(crate) async fn get_event_authorization_route(
|
||||
) -> Result<get_event_authorization::v1::Response> {
|
||||
AccessCheck {
|
||||
services: &services,
|
||||
origin: body.origin(),
|
||||
origin: &body.identity,
|
||||
room_id: &body.room_id,
|
||||
event_id: None,
|
||||
}
|
||||
@@ -42,7 +42,7 @@ pub(crate) async fn get_event_authorization_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve state for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
|
||||
@@ -22,7 +22,7 @@ pub(crate) async fn get_missing_events_route(
|
||||
) -> Result<get_missing_events::v1::Response> {
|
||||
AccessCheck {
|
||||
services: &services,
|
||||
origin: body.origin(),
|
||||
origin: &body.identity,
|
||||
room_id: &body.room_id,
|
||||
event_id: None,
|
||||
}
|
||||
@@ -36,7 +36,7 @@ pub(crate) async fn get_missing_events_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve state for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
@@ -91,10 +91,10 @@ pub(crate) async fn get_missing_events_route(
|
||||
if !services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.server_can_see_event(body.origin(), &body.room_id, pdu.event_id())
|
||||
.server_can_see_event(&body.identity, &body.room_id, pdu.event_id())
|
||||
.await
|
||||
{
|
||||
debug!(%next_event_id, origin = %body.origin(), "redacting event origin cannot see");
|
||||
debug!(%next_event_id, origin = %body.identity, "redacting event origin cannot see");
|
||||
pdu.redact(&room_version, json!({}))?;
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ pub(crate) async fn get_hierarchy_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve state for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
@@ -29,7 +29,7 @@ pub(crate) async fn get_hierarchy_route(
|
||||
let response = services
|
||||
.rooms
|
||||
.summary
|
||||
.get_local_room_summary_for_server(body.origin(), &body.room_id, body.suggested_only)
|
||||
.get_local_room_summary_for_server(&body.identity, &body.room_id, body.suggested_only)
|
||||
.await;
|
||||
|
||||
if let Accessibility::Accessible(response) = response {
|
||||
|
||||
@@ -32,7 +32,7 @@ pub(crate) async fn create_invite_route(
|
||||
services
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(body.origin(), &body.room_id)
|
||||
.acl_check(&body.identity, &body.room_id)
|
||||
.await?;
|
||||
|
||||
if !services.server.supported_room_version(&body.room_version) {
|
||||
@@ -54,12 +54,11 @@ pub(crate) async fn create_invite_route(
|
||||
|
||||
if services
|
||||
.moderation
|
||||
.is_remote_server_forbidden(body.origin())
|
||||
.is_remote_server_forbidden(&body.identity)
|
||||
{
|
||||
warn!(
|
||||
"Received federated/remote invite from banned server {} for room ID {}. Rejecting.",
|
||||
body.origin(),
|
||||
body.room_id
|
||||
body.identity, body.room_id
|
||||
);
|
||||
|
||||
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||
@@ -105,7 +104,7 @@ pub(crate) async fn create_invite_route(
|
||||
.and_then(Result::ok)
|
||||
.ok_or_else(|| err!(Request(InvalidParam("Invalid sender property"))))?;
|
||||
|
||||
if sender_user.server_name() != body.origin() {
|
||||
if sender_user.server_name() != body.identity {
|
||||
return Err!(Request(Forbidden("Sender's server does not match the origin server.",)));
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ use crate::Ruma;
|
||||
/// # `GET /_matrix/federation/v1/make_join/{roomId}/{userId}`
|
||||
///
|
||||
/// Creates a join template.
|
||||
#[tracing::instrument(skip_all, fields(room_id = %body.room_id, user_id = %body.user_id, origin = %body.origin()), level = "info")]
|
||||
#[tracing::instrument(skip_all, fields(room_id = %body.room_id, user_id = %body.user_id, origin = %body.identity), level = "info")]
|
||||
pub(crate) async fn create_join_event_template_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<prepare_join_event::v1::Request>,
|
||||
@@ -45,14 +45,14 @@ pub(crate) async fn create_join_event_template_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
room_id = %body.room_id,
|
||||
"Refusing to serve make_join for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
}
|
||||
|
||||
if body.user_id.server_name() != body.origin() {
|
||||
if body.user_id.server_name() != body.identity {
|
||||
return Err!(Request(BadJson("Not allowed to join on behalf of another server/user.")));
|
||||
}
|
||||
|
||||
@@ -60,19 +60,17 @@ pub(crate) async fn create_join_event_template_route(
|
||||
services
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(body.origin(), &body.room_id)
|
||||
.acl_check(&body.identity, &body.room_id)
|
||||
.await?;
|
||||
|
||||
if services
|
||||
.moderation
|
||||
.is_remote_server_forbidden(body.origin())
|
||||
.is_remote_server_forbidden(&body.identity)
|
||||
{
|
||||
warn!(
|
||||
"Server {} for remote user {} tried joining room ID {} which has a server name that \
|
||||
is globally forbidden. Rejecting.",
|
||||
body.origin(),
|
||||
&body.user_id,
|
||||
&body.room_id,
|
||||
body.identity, &body.user_id, &body.room_id,
|
||||
);
|
||||
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||
}
|
||||
|
||||
@@ -28,14 +28,14 @@ pub(crate) async fn create_knock_event_template_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
room_id = %body.room_id,
|
||||
"Refusing to serve make_knock for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
}
|
||||
|
||||
if body.user_id.server_name() != body.origin() {
|
||||
if body.user_id.server_name() != body.identity {
|
||||
return Err!(Request(BadJson("Not allowed to knock on behalf of another server/user.")));
|
||||
}
|
||||
|
||||
@@ -43,19 +43,17 @@ pub(crate) async fn create_knock_event_template_route(
|
||||
services
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(body.origin(), &body.room_id)
|
||||
.acl_check(&body.identity, &body.room_id)
|
||||
.await?;
|
||||
|
||||
if services
|
||||
.moderation
|
||||
.is_remote_server_forbidden(body.origin())
|
||||
.is_remote_server_forbidden(&body.identity)
|
||||
{
|
||||
warn!(
|
||||
"Server {} for remote user {} tried knocking room ID {} which has a server name \
|
||||
that is globally forbidden. Rejecting.",
|
||||
body.origin(),
|
||||
&body.user_id,
|
||||
&body.room_id,
|
||||
body.identity, &body.user_id, &body.room_id,
|
||||
);
|
||||
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||
}
|
||||
|
||||
@@ -26,13 +26,13 @@ pub(crate) async fn create_leave_event_template_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve make_leave for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
}
|
||||
|
||||
if body.user_id.server_name() != body.origin() {
|
||||
if body.user_id.server_name() != body.identity {
|
||||
return Err!(Request(Forbidden(
|
||||
"Not allowed to leave on behalf of another server/user."
|
||||
)));
|
||||
@@ -42,7 +42,7 @@ pub(crate) async fn create_leave_event_template_route(
|
||||
services
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(body.origin(), &body.room_id)
|
||||
.acl_check(&body.identity, &body.room_id)
|
||||
.await?;
|
||||
|
||||
let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
|
||||
|
||||
@@ -62,7 +62,7 @@ pub(crate) async fn send_transaction_message_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<send_transaction_message::v1::Request>,
|
||||
) -> Result<send_transaction_message::v1::Response> {
|
||||
if body.origin() != body.body.origin {
|
||||
if body.identity != body.body.origin {
|
||||
return Err!(Request(Forbidden(
|
||||
"Not allowed to send transactions on behalf of other servers"
|
||||
)));
|
||||
@@ -80,7 +80,7 @@ pub(crate) async fn send_transaction_message_route(
|
||||
)));
|
||||
}
|
||||
|
||||
let txn_key = (body.origin().to_owned(), body.transaction_id.clone());
|
||||
let txn_key = (body.identity.clone(), body.transaction_id.clone());
|
||||
|
||||
// Atomically check cache, join active, or start new transaction
|
||||
match services
|
||||
@@ -136,7 +136,7 @@ async fn wait_for_result(
|
||||
skip_all,
|
||||
fields(
|
||||
id = ?body.transaction_id.as_str(),
|
||||
origin = ?body.origin()
|
||||
origin = ?body.identity
|
||||
)
|
||||
)]
|
||||
async fn process_inbound_transaction(
|
||||
@@ -164,7 +164,7 @@ async fn process_inbound_transaction(
|
||||
.stream();
|
||||
|
||||
debug!(pdus = body.pdus.len(), edus = body.edus.len(), "Processing transaction",);
|
||||
let results = match handle(&services, &client, body.origin(), pdus, edus).await {
|
||||
let results = match handle(&services, &client, &body.identity, pdus, edus).await {
|
||||
| Ok(results) => results,
|
||||
| Err(err) => {
|
||||
fail_federation_txn(services, &txn_key, &sender, err);
|
||||
@@ -381,6 +381,7 @@ async fn handle_room(
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||
.boxed()
|
||||
.await
|
||||
.map(|_| ());
|
||||
results.push((event_id, result));
|
||||
|
||||
@@ -304,7 +304,7 @@ pub(crate) async fn create_join_event_v2_route(
|
||||
) -> Result<create_join_event::v2::Response> {
|
||||
if services
|
||||
.moderation
|
||||
.is_remote_server_forbidden(body.origin())
|
||||
.is_remote_server_forbidden(&body.identity)
|
||||
{
|
||||
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||
}
|
||||
@@ -314,8 +314,7 @@ pub(crate) async fn create_join_event_v2_route(
|
||||
warn!(
|
||||
"Server {} tried joining room ID {} through us which has a server name that is \
|
||||
globally forbidden. Rejecting.",
|
||||
body.origin(),
|
||||
&body.room_id,
|
||||
body.identity, &body.room_id,
|
||||
);
|
||||
return Err!(Request(Forbidden(warn!(
|
||||
"Room ID server name {server} is banned on this homeserver."
|
||||
@@ -325,12 +324,12 @@ pub(crate) async fn create_join_event_v2_route(
|
||||
|
||||
let now = Instant::now();
|
||||
let room_state =
|
||||
create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members)
|
||||
create_join_event(&services, &body.identity, &body.room_id, &body.pdu, body.omit_members)
|
||||
.boxed()
|
||||
.await?;
|
||||
info!(
|
||||
"Finished sending a join for {} in {} in {:?}",
|
||||
body.origin(),
|
||||
body.identity,
|
||||
&body.room_id,
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
@@ -26,13 +26,12 @@ pub(crate) async fn create_knock_event_v1_route(
|
||||
) -> Result<create_knock_event::v1::Response> {
|
||||
if services
|
||||
.moderation
|
||||
.is_remote_server_forbidden(body.origin())
|
||||
.is_remote_server_forbidden(&body.identity)
|
||||
{
|
||||
warn!(
|
||||
"Server {} tried knocking room ID {} who has a server name that is globally \
|
||||
forbidden. Rejecting.",
|
||||
body.origin(),
|
||||
&body.room_id,
|
||||
body.identity, &body.room_id,
|
||||
);
|
||||
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||
}
|
||||
@@ -42,8 +41,7 @@ pub(crate) async fn create_knock_event_v1_route(
|
||||
warn!(
|
||||
"Server {} tried knocking room ID {} which has a server name that is globally \
|
||||
forbidden. Rejecting.",
|
||||
body.origin(),
|
||||
&body.room_id,
|
||||
body.identity, &body.room_id,
|
||||
);
|
||||
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||
}
|
||||
@@ -60,7 +58,7 @@ pub(crate) async fn create_knock_event_v1_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve send_knock for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
@@ -70,7 +68,7 @@ pub(crate) async fn create_knock_event_v1_route(
|
||||
services
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(body.origin(), &body.room_id)
|
||||
.acl_check(&body.identity, &body.room_id)
|
||||
.await?;
|
||||
|
||||
let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
|
||||
@@ -133,7 +131,7 @@ pub(crate) async fn create_knock_event_v1_route(
|
||||
.await?;
|
||||
|
||||
// check if origin server is trying to send for another server
|
||||
if sender.server_name() != body.origin() {
|
||||
if sender.server_name() != body.identity {
|
||||
return Err!(Request(BadJson("Not allowed to knock on behalf of another server/user.")));
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ pub(crate) async fn create_leave_event_v2_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<create_leave_event::v2::Request>,
|
||||
) -> Result<create_leave_event::v2::Response> {
|
||||
create_leave_event(&services, body.origin(), &body.room_id, &body.pdu).await?;
|
||||
create_leave_event(&services, &body.identity, &body.room_id, &body.pdu).await?;
|
||||
|
||||
Ok(create_leave_event::v2::Response::new())
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ pub(crate) async fn get_room_state_route(
|
||||
) -> Result<get_room_state::v1::Response> {
|
||||
AccessCheck {
|
||||
services: &services,
|
||||
origin: body.origin(),
|
||||
origin: &body.identity,
|
||||
room_id: &body.room_id,
|
||||
event_id: None,
|
||||
}
|
||||
@@ -40,7 +40,7 @@ pub(crate) async fn get_room_state_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve state for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
|
||||
@@ -18,7 +18,7 @@ pub(crate) async fn get_room_state_ids_route(
|
||||
) -> Result<get_room_state_ids::v1::Response> {
|
||||
AccessCheck {
|
||||
services: &services,
|
||||
origin: body.origin(),
|
||||
origin: &body.identity,
|
||||
room_id: &body.room_id,
|
||||
event_id: None,
|
||||
}
|
||||
@@ -41,7 +41,7 @@ pub(crate) async fn get_room_state_ids_route(
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
origin = body.origin().as_str(),
|
||||
origin = body.identity.as_str(),
|
||||
"Refusing to serve state for room we aren't participating in"
|
||||
);
|
||||
return Err!(Request(NotFound("This server is not participating in that room.")));
|
||||
|
||||
@@ -60,13 +60,13 @@ pub(crate) async fn get_devices_route(
|
||||
|
||||
let master_key = services
|
||||
.users
|
||||
.get_master_key(None, &body.user_id, &|u| u.server_name() == body.origin())
|
||||
.get_master_key(None, &body.user_id, &|u| u.server_name() == body.identity)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
let self_signing_key = services
|
||||
.users
|
||||
.get_self_signing_key(None, &body.user_id, &|u| u.server_name() == body.origin())
|
||||
.get_self_signing_key(None, &body.user_id, &|u| u.server_name() == body.identity)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
@@ -94,7 +94,7 @@ pub(crate) async fn get_keys_route(
|
||||
&services,
|
||||
None,
|
||||
&body.device_keys,
|
||||
|u| Some(u.server_name()) == body.origin.as_deref(),
|
||||
|u| u.server_name() == body.identity,
|
||||
services.globals.allow_device_name_federation(),
|
||||
Duration::from_secs(0),
|
||||
)
|
||||
|
||||
@@ -375,7 +375,7 @@ pub struct Config {
|
||||
#[serde(default = "default_max_request_size")]
|
||||
pub max_request_size: usize,
|
||||
|
||||
/// default: 192
|
||||
/// default: 256
|
||||
#[serde(default = "default_max_fetch_prev_events")]
|
||||
pub max_fetch_prev_events: u16,
|
||||
|
||||
@@ -2549,7 +2549,7 @@ fn default_pusher_timeout() -> u64 { 60 }
|
||||
|
||||
fn default_pusher_idle_timeout() -> u64 { 15 }
|
||||
|
||||
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
||||
fn default_max_fetch_prev_events() -> u16 { 256_u16 }
|
||||
|
||||
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
|
||||
|
||||
|
||||
@@ -187,6 +187,10 @@ pub(super) static MAPS: &[Descriptor] = &[
|
||||
val_size_hint: Some(8),
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "roomid_mindepth",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "roomserverids",
|
||||
..descriptor::RANDOM_SMALL
|
||||
|
||||
@@ -294,7 +294,7 @@ impl Service {
|
||||
pub async fn appservice_checks(
|
||||
&self,
|
||||
room_alias: &RoomAliasId,
|
||||
appservice_info: &Option<RegistrationInfo>,
|
||||
appservice_info: Option<&RegistrationInfo>,
|
||||
) -> Result<()> {
|
||||
if !self
|
||||
.services
|
||||
|
||||
@@ -1,233 +1,456 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet, VecDeque, hash_map},
|
||||
collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use assign::assign;
|
||||
use conduwuit::{
|
||||
Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json,
|
||||
trace, utils::continue_exponential_backoff_secs, warn,
|
||||
Event, PduEvent, debug, debug_info, debug_warn, err, error,
|
||||
matrix::event::gen_event_id_canonical_json,
|
||||
state_res::lexicographical_topological_sort,
|
||||
trace,
|
||||
utils::{IterStream, continue_exponential_backoff_secs, stream::BroadbandExt},
|
||||
warn,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||
api::federation::event::get_event,
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||
RoomId, ServerName, UInt,
|
||||
api::federation::event::{get_event, get_missing_events},
|
||||
int,
|
||||
};
|
||||
|
||||
use super::get_room_version_rules;
|
||||
|
||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||
/// it is appended to the outliers Tree.
|
||||
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
|
||||
/// returning them in a topologically sorted order.
|
||||
///
|
||||
/// Returns pdu and if we fetched it over federation the raw json.
|
||||
///
|
||||
/// a. Look in the main timeline (pduid_pdu tree)
|
||||
/// b. Look at outlier pdu tree
|
||||
/// c. Ask origin server over federation
|
||||
/// d. TODO: Ask other servers over federation?
|
||||
#[implement(super::Service)]
|
||||
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
events: Events,
|
||||
create_event: &'a Pdu,
|
||||
room_id: &'a RoomId,
|
||||
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let back_off = |id| match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(id)
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
||||
},
|
||||
};
|
||||
/// This is used to attempt to process PDUs in an order that respects their
|
||||
/// dependencies, however it is ultimately the sender's responsibility to send
|
||||
/// them in a processable order, so this is just a best effort attempt. It does
|
||||
/// not account for power levels or other tie breaks.
|
||||
pub async fn build_local_dag<S: std::hash::BuildHasher>(
|
||||
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject, S>,
|
||||
) -> conduwuit::Result<Vec<OwnedEventId>> {
|
||||
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
|
||||
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
|
||||
HashMap::with_capacity(pdu_map.len());
|
||||
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
|
||||
|
||||
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
||||
trace!("Fetching {} outlier pdus", events.clone().count());
|
||||
for (event_id, value) in pdu_map {
|
||||
// We already checked that these properties are correct in parse_incoming_pdu,
|
||||
// so it's safe to unwrap here.
|
||||
// We also filter to remove any prev_events that are not in this pdu_map, as we
|
||||
// need to have at least one event with zero out degrees for the lexico-topo
|
||||
// sort below. If there are multiple events with omitted prevs, they will be
|
||||
// ordered by timestamp, then event ID. At that point though, it's unlikely to
|
||||
// matter.
|
||||
let prev_events = value
|
||||
.get("prev_events")
|
||||
.unwrap()
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
|
||||
.filter(|id| pdu_map.contains_key(id))
|
||||
.collect();
|
||||
|
||||
for id in events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
||||
continue;
|
||||
}
|
||||
dag.insert(event_id.clone(), prev_events);
|
||||
let origin_server_ts = value
|
||||
.get("origin_server_ts")
|
||||
.and_then(CanonicalJsonValue::as_integer)
|
||||
.unwrap_or_default();
|
||||
id_origin_ts.insert(event_id.clone(), origin_server_ts);
|
||||
}
|
||||
|
||||
// c. Ask origin server over federation
|
||||
// We also handle its auth chain here so we don't get a stack overflow in
|
||||
// handle_outlier_pdu.
|
||||
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
|
||||
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
||||
debug!(count = dag.len(), "Sorting incoming events with partial graph");
|
||||
lexicographical_topological_sort(&dag, &async |node_id| {
|
||||
// Note: we don't bother fetching power levels because that would massively slow
|
||||
// this function down. This is a best-effort attempt to order events correctly
|
||||
// for processing, however ultimately that should be the sender's job.
|
||||
let ts = id_origin_ts
|
||||
.get(&node_id)
|
||||
.copied()
|
||||
.unwrap_or_else(|| int!(0))
|
||||
.to_string()
|
||||
.parse::<u64>()
|
||||
.ok()
|
||||
.and_then(UInt::new)
|
||||
.unwrap_or_default();
|
||||
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
|
||||
})
|
||||
.await
|
||||
.inspect(|sorted| {
|
||||
debug_assert_eq!(
|
||||
sorted.len(),
|
||||
pdu_map.len(),
|
||||
"Sorted graph was not the same size as the input graph"
|
||||
);
|
||||
})
|
||||
.map_err(|e| err!("failed to resolve local graph: {e}"))
|
||||
}
|
||||
|
||||
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
||||
while let Some(next_id) = todo_auth_events.pop_front() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 60 * 2;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 8;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug_warn!(
|
||||
tried = ?*tries,
|
||||
elapsed = ?time.elapsed(),
|
||||
"Backing off from {next_id}",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
impl super::Service {
|
||||
/// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the
|
||||
/// DAG.
|
||||
///
|
||||
/// When this function is called, the "earliest events" (current forward
|
||||
/// extremities) will be collected, and the function will loop with an
|
||||
/// exponentially incrementing limit (up to 100 per request) until it has
|
||||
/// filled the gap, i.e. when the remote says there's no more events.
|
||||
///
|
||||
/// This function will iterate until the remote returns no more events,
|
||||
/// increasing the limit by a factor of 10. If 100 iterations are reached or
|
||||
/// max_fetch_prev_events events are backfilled, the function will give up
|
||||
/// and return what it has, to avoid pulling in too much data (for example,
|
||||
/// absurdly large gaps).
|
||||
///
|
||||
/// This function does not persist the events. The caller is responsible for
|
||||
/// passing them through handle_incoming_pdu.
|
||||
///
|
||||
/// ## Parameters
|
||||
///
|
||||
/// - `room_id`: The room's ID.
|
||||
/// - `head`: The event we are potentially missing prev_events for.
|
||||
/// - `tail`: The most recently known events in the graph (typically forward
|
||||
/// extremities).
|
||||
/// - `via`: The server to ask for missing events.
|
||||
/// - `min_depth`: Don't process events with a `depth` lower than this
|
||||
/// value. Not massively useful, but can help short-circuit infinite loops
|
||||
/// and weird edge paths.
|
||||
pub async fn get_missing_events(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
head: &PduEvent,
|
||||
tail: Vec<OwnedEventId>,
|
||||
via: &ServerName,
|
||||
min_depth: UInt,
|
||||
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let missing_count = head
|
||||
.prev_events()
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
match self
|
||||
.services
|
||||
.timeline
|
||||
.get_non_outlier_pdu_json(event_id)
|
||||
.await
|
||||
.inspect(|_| debug!("Found prev_event {event_id} locally."))
|
||||
.inspect_err(
|
||||
|e| debug!(%e, "Could not find prev_event {event_id} locally."),
|
||||
) {
|
||||
| Ok(_) => None,
|
||||
| Err(_) => Some(event_id),
|
||||
}
|
||||
})
|
||||
.count()
|
||||
.await;
|
||||
debug_assert_ne!(
|
||||
missing_count, 0,
|
||||
"event passed to get_missing_events is not missing any events (wasteful call)"
|
||||
);
|
||||
};
|
||||
|
||||
if events_all.contains(&next_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.services.timeline.pdu_exists(&next_id).await {
|
||||
trace!("Found {next_id} in db");
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("Fetching {next_id} over federation from {origin}.");
|
||||
match self
|
||||
let mut discovered = HashMap::with_capacity(20);
|
||||
let mut latest_events = vec![head.event_id().to_owned()];
|
||||
let mut iterations = 0_u8;
|
||||
loop {
|
||||
iterations = iterations.saturating_add(1);
|
||||
let limit = iterations.saturating_mul(10).min(100);
|
||||
debug_info!(%limit, %via, %iterations, discovered=discovered.len(), %min_depth, "Attempting to gap fill missing events");
|
||||
let response: get_missing_events::v1::Response = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
get_event::v1::Request::new((*next_id).to_owned()),
|
||||
via,
|
||||
assign!(
|
||||
get_missing_events::v1::Request::new(
|
||||
room_id.to_owned(),
|
||||
tail.clone(),
|
||||
latest_events.clone()
|
||||
),
|
||||
{limit: limit.into(), min_depth}
|
||||
),
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(res) => {
|
||||
debug!("Got {next_id} over federation from {origin}");
|
||||
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
.await?;
|
||||
|
||||
let Ok((calculated_event_id, value)) =
|
||||
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
|
||||
else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
if response.events.is_empty() {
|
||||
debug_info!(%via, "Finished gap filling missing events (remote returned no more events).");
|
||||
break;
|
||||
}
|
||||
debug_info!("Got {} events back from remote", response.events.len());
|
||||
|
||||
if calculated_event_id != *next_id {
|
||||
warn!(
|
||||
"Server didn't return event id we requested: requested: {next_id}, \
|
||||
we got {calculated_event_id}. Event: {:?}",
|
||||
&res.pdu
|
||||
);
|
||||
latest_events.clear();
|
||||
for raw_event in response.events {
|
||||
let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?;
|
||||
let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| {
|
||||
err!(Request(BadJson("Failed to parse backfilled event {event_id}: {e}")))
|
||||
})?;
|
||||
|
||||
if pdu.depth < min_depth {
|
||||
debug_warn!(
|
||||
"Received PDU with depth {} below min_depth {}, ignoring",
|
||||
pdu.depth,
|
||||
min_depth
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
for prev_event_id in pdu.prev_events() {
|
||||
if discovered.contains_key(prev_event_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(auth_events) = value
|
||||
.get("auth_events")
|
||||
.and_then(CanonicalJsonValue::as_array)
|
||||
if self
|
||||
.services
|
||||
.timeline
|
||||
.non_outlier_pdu_exists(prev_event_id)
|
||||
.await
|
||||
{
|
||||
for auth_event in auth_events {
|
||||
match serde_json::from_value::<OwnedEventId>(
|
||||
auth_event.clone().into(),
|
||||
) {
|
||||
| Ok(auth_event) => {
|
||||
trace!(
|
||||
"Found auth event id {auth_event} for event {next_id}"
|
||||
);
|
||||
todo_auth_events.push_back(auth_event);
|
||||
},
|
||||
| _ => {
|
||||
warn!("Auth event id is not valid");
|
||||
},
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Auth event list invalid");
|
||||
continue;
|
||||
}
|
||||
latest_events.push(prev_event_id.to_owned());
|
||||
break;
|
||||
}
|
||||
|
||||
events_in_reverse_order.push((next_id.clone(), value));
|
||||
events_all.insert(next_id);
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
|
||||
back_off((*next_id).to_owned());
|
||||
},
|
||||
discovered.insert(event_id.clone(), pdu);
|
||||
}
|
||||
|
||||
if latest_events.is_empty() {
|
||||
break;
|
||||
} else if discovered.len() > self.services.server.config.max_fetch_prev_events.into()
|
||||
|| iterations >= 20
|
||||
{
|
||||
error!(
|
||||
filled=discovered.len(),
|
||||
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
|
||||
%iterations,
|
||||
"Gap too large, giving up"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
|
||||
Ok(discovered)
|
||||
}
|
||||
|
||||
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
|
||||
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Some(local_pdu) = local_pdu {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
pdus.push((local_pdu.clone(), None));
|
||||
}
|
||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||
/// it is appended to the outliers Tree.
|
||||
///
|
||||
/// Returns pdu and if we fetched it over federation the raw json.
|
||||
///
|
||||
/// a. Look in the main timeline (pduid_pdu tree)
|
||||
/// b. Look at outlier pdu tree
|
||||
/// c. Ask origin server over federation
|
||||
/// d. TODO: Ask other servers over federation?
|
||||
#[deprecated]
|
||||
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
events: Events,
|
||||
create_event: &'a Pdu,
|
||||
room_id: &'a RoomId,
|
||||
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let back_off = |id| match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(id)
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
||||
},
|
||||
};
|
||||
|
||||
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug!("Backing off from {next_id}");
|
||||
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
||||
trace!("Fetching {} outlier pdus", events.clone().count());
|
||||
|
||||
for id in events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
||||
continue;
|
||||
}
|
||||
|
||||
// c. Ask origin server over federation
|
||||
// We also handle its auth chain here so we don't get a stack overflow in
|
||||
// handle_outlier_pdu.
|
||||
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
|
||||
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
||||
|
||||
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
||||
while let Some(next_id) = todo_auth_events.pop_front() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 60 * 2;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 8;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug_warn!(
|
||||
tried = ?*tries,
|
||||
elapsed = ?time.elapsed(),
|
||||
"Backing off from {next_id}",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if events_all.contains(&next_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.services.timeline.pdu_exists(&next_id).await {
|
||||
trace!("Found {next_id} in db");
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("Fetching {next_id} over federation from {origin}.");
|
||||
match self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
get_event::v1::Request::new((*next_id).to_owned()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(res) => {
|
||||
debug!("Got {next_id} over federation from {origin}");
|
||||
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
|
||||
let Ok((calculated_event_id, value)) =
|
||||
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
|
||||
else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
|
||||
if calculated_event_id != *next_id {
|
||||
warn!(
|
||||
"Server didn't return event id we requested: requested: \
|
||||
{next_id}, we got {calculated_event_id}. Event: {:?}",
|
||||
&res.pdu
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(auth_events) = value
|
||||
.get("auth_events")
|
||||
.and_then(CanonicalJsonValue::as_array)
|
||||
{
|
||||
for auth_event in auth_events {
|
||||
match serde_json::from_value::<OwnedEventId>(
|
||||
auth_event.clone().into(),
|
||||
) {
|
||||
| Ok(auth_event) => {
|
||||
trace!(
|
||||
"Found auth event id {auth_event} for event \
|
||||
{next_id}"
|
||||
);
|
||||
todo_auth_events.push_back(auth_event);
|
||||
},
|
||||
| _ => {
|
||||
warn!("Auth event id is not valid");
|
||||
},
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Auth event list invalid");
|
||||
}
|
||||
|
||||
events_in_reverse_order.push((next_id.clone(), value));
|
||||
events_all.insert(next_id);
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
|
||||
back_off((*next_id).to_owned());
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Handling outlier {next_id}");
|
||||
match Box::pin(self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&next_id,
|
||||
room_id,
|
||||
value.clone(),
|
||||
true,
|
||||
))
|
||||
.await
|
||||
{
|
||||
| Ok((pdu, json)) =>
|
||||
if next_id == *id {
|
||||
trace!("Handled outlier {next_id} (original request)");
|
||||
pdus.push((pdu, Some(json)));
|
||||
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
|
||||
}
|
||||
|
||||
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
|
||||
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Some(local_pdu) = local_pdu {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
pdus.push((local_pdu.clone(), None));
|
||||
}
|
||||
|
||||
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug!("Backing off from {next_id}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Handling outlier {next_id}");
|
||||
match Box::pin(self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&next_id,
|
||||
room_id,
|
||||
value.clone(),
|
||||
true,
|
||||
))
|
||||
.await
|
||||
{
|
||||
| Ok((pdu, json)) =>
|
||||
if next_id == *id {
|
||||
trace!("Handled outlier {next_id} (original request)");
|
||||
pdus.push((pdu, Some(json)));
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Authentication of event {next_id} failed: {e:?}");
|
||||
back_off(next_id);
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Authentication of event {next_id} failed: {e:?}");
|
||||
back_off(next_id);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
||||
pdus
|
||||
}
|
||||
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
||||
pdus
|
||||
}
|
||||
|
||||
@@ -1,128 +1,96 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet, VecDeque},
|
||||
iter::once,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use conduwuit::{
|
||||
Event, PduEvent, Result, debug_warn, err, implement,
|
||||
state_res::{self},
|
||||
};
|
||||
use futures::{FutureExt, future};
|
||||
use ruma::{
|
||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
|
||||
int, uint,
|
||||
Event, PduEvent, debug, debug_info,
|
||||
utils::{BoolExt, IterStream, stream::BroadbandExt},
|
||||
warn,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use ruma::{RoomId, ServerName};
|
||||
|
||||
use super::check_room_id;
|
||||
use crate::rooms::event_handler::build_local_dag;
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(%origin),
|
||||
)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(super) async fn fetch_prev<'a, Pdu, Events>(
|
||||
&self,
|
||||
origin: &ServerName,
|
||||
create_event: &Pdu,
|
||||
room_id: &RoomId,
|
||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
||||
initial_set: Events,
|
||||
) -> Result<(
|
||||
Vec<OwnedEventId>,
|
||||
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
||||
)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let num_ids = initial_set.clone().count();
|
||||
let mut eventid_info = HashMap::new();
|
||||
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
|
||||
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
|
||||
initial_set.map(ToOwned::to_owned).collect();
|
||||
|
||||
let mut amount = 0;
|
||||
|
||||
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
|
||||
self.services.server.check_running()?;
|
||||
|
||||
match self
|
||||
.fetch_and_handle_outliers(
|
||||
origin,
|
||||
once(prev_event_id.as_ref()),
|
||||
create_event,
|
||||
room_id,
|
||||
)
|
||||
.boxed()
|
||||
.await
|
||||
.pop()
|
||||
{
|
||||
| Some((pdu, mut json_opt)) => {
|
||||
check_room_id(room_id, &pdu)?;
|
||||
|
||||
let limit = self.services.server.config.max_fetch_prev_events;
|
||||
if amount > limit {
|
||||
debug_warn!("Max prev event limit reached! Limit: {limit}");
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
continue;
|
||||
}
|
||||
|
||||
if json_opt.is_none() {
|
||||
json_opt = self
|
||||
.services
|
||||
.outlier
|
||||
.get_outlier_pdu_json(&prev_event_id)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
|
||||
if let Some(json) = json_opt {
|
||||
if pdu.origin_server_ts() > first_ts_in_room {
|
||||
amount = amount.saturating_add(1);
|
||||
for prev_prev in pdu.prev_events() {
|
||||
if !graph.contains_key(prev_prev) {
|
||||
todo_outlier_stack.push_back(prev_prev.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
graph.insert(
|
||||
prev_event_id.clone(),
|
||||
pdu.prev_events().map(ToOwned::to_owned).collect(),
|
||||
);
|
||||
} else {
|
||||
// Time based check failed
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
}
|
||||
|
||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
||||
} else {
|
||||
// Get json failed, so this was not fetched over federation
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
}
|
||||
},
|
||||
| _ => {
|
||||
// Fetch and handle failed
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
},
|
||||
impl super::Service {
|
||||
/// Fetches any missing prev_events for this event and persists them before
|
||||
/// returning.
|
||||
pub(super) async fn fetch_prevs(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
create_event: &PduEvent,
|
||||
incoming_pdu: &PduEvent,
|
||||
origin: &ServerName,
|
||||
) -> conduwuit::Result<()> {
|
||||
let missing = incoming_pdu
|
||||
.prev_events()
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
self.services
|
||||
.timeline
|
||||
.get_non_outlier_pdu_json(event_id)
|
||||
.await
|
||||
.is_ok()
|
||||
.or(|| event_id.to_owned())
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
if missing.is_empty() {
|
||||
debug!(event_id=%incoming_pdu.event_id(), "No missing prev events.");
|
||||
return Ok(());
|
||||
}
|
||||
debug!(%room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events");
|
||||
let tail = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(room_id)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
let backfilled = self
|
||||
.get_missing_events(
|
||||
room_id,
|
||||
incoming_pdu,
|
||||
tail,
|
||||
origin,
|
||||
self.services.metadata.get_mindepth(room_id).await,
|
||||
)
|
||||
.await?;
|
||||
debug_info!("Fetched {} missing events", backfilled.len());
|
||||
|
||||
// Persist all fetched events
|
||||
let mapped = backfilled
|
||||
.iter()
|
||||
.map(|(eid, evt)| {
|
||||
let mut obj = evt.to_canonical_object();
|
||||
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||
(eid.clone(), obj)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let to_persist = if mapped.len() <= 1 {
|
||||
mapped.keys().map(ToOwned::to_owned).collect()
|
||||
} else {
|
||||
build_local_dag(&mapped).await?
|
||||
};
|
||||
|
||||
for event_id in to_persist {
|
||||
debug_info!("Persisting fetched prev event {event_id}");
|
||||
let obj = mapped.get(&event_id).cloned().unwrap();
|
||||
match self
|
||||
.handle_outlier_pdu(origin, create_event, &event_id, room_id, obj, false)
|
||||
.await
|
||||
{
|
||||
| Ok((pdu, val)) =>
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
|
||||
.await,
|
||||
| Err(e) => {
|
||||
warn!("Failed to persist prev_event {event_id}: {e}");
|
||||
continue;
|
||||
},
|
||||
}?;
|
||||
}
|
||||
|
||||
// NOTE because i keep forgetting: the caller persists incoming_pdu.
|
||||
// we only care about its prev events
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let event_fetch = |event_id| {
|
||||
let origin_server_ts = eventid_info
|
||||
.get(&event_id)
|
||||
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
|
||||
|
||||
// This return value is the key used for sorting events,
|
||||
// events are then sorted by power level, time,
|
||||
// and lexically by event_id.
|
||||
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
|
||||
};
|
||||
|
||||
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
|
||||
.await
|
||||
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
|
||||
|
||||
Ok((sorted, eventid_info))
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::{HashMap, hash_map};
|
||||
|
||||
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
|
||||
use futures::FutureExt;
|
||||
use ruma::{
|
||||
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
|
||||
events::StateEventType,
|
||||
@@ -42,7 +41,6 @@ where
|
||||
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
|
||||
let state_vec = self
|
||||
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, hash_map},
|
||||
time::Instant,
|
||||
};
|
||||
use std::{collections::BTreeMap, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
|
||||
implement, info, trace, utils::stream::IterStream, warn,
|
||||
implement, info, trace, warn,
|
||||
};
|
||||
use futures::{
|
||||
FutureExt, TryFutureExt, TryStreamExt,
|
||||
FutureExt,
|
||||
future::{OptionFuture, try_join4},
|
||||
};
|
||||
use ruma::{
|
||||
@@ -236,63 +233,21 @@ pub async fn handle_incoming_pdu<'a>(
|
||||
}
|
||||
|
||||
// Skip old events
|
||||
let first_ts_in_room = self
|
||||
.services
|
||||
.timeline
|
||||
.first_pdu_in_room(room_id)
|
||||
.await?
|
||||
.origin_server_ts();
|
||||
// let first_ts_in_room = self
|
||||
// .services
|
||||
// .timeline
|
||||
// .first_pdu_in_room(room_id)
|
||||
// .await?
|
||||
// .origin_server_ts();
|
||||
|
||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
||||
// These are timeline events
|
||||
let (sorted_prev_events, mut eventid_info) = self
|
||||
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
|
||||
.await?;
|
||||
debug!("Handling previous events");
|
||||
|
||||
debug!(
|
||||
events = ?sorted_prev_events,
|
||||
"Handling previous events"
|
||||
);
|
||||
|
||||
sorted_prev_events
|
||||
.iter()
|
||||
.try_stream()
|
||||
.map_ok(AsRef::as_ref)
|
||||
.try_for_each(|prev_id| {
|
||||
self.handle_prev_pdu(
|
||||
origin,
|
||||
event_id,
|
||||
room_id,
|
||||
eventid_info.remove(prev_id),
|
||||
create_event,
|
||||
first_ts_in_room,
|
||||
prev_id,
|
||||
)
|
||||
.inspect_err(move |e| {
|
||||
warn!("Prev {prev_id} failed: {e}");
|
||||
match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(prev_id.into())
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
let tries = e.get().1.saturating_add(1);
|
||||
*e.get_mut() = (Instant::now(), tries);
|
||||
},
|
||||
}
|
||||
})
|
||||
.map(|_| self.services.server.check_running())
|
||||
})
|
||||
.boxed()
|
||||
self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
|
||||
.await?;
|
||||
|
||||
// Done with prev events, now handling the incoming event
|
||||
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
||||
.boxed()
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use std::collections::{BTreeMap, HashMap, hash_map};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
|
||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, info, state_res,
|
||||
trace, warn,
|
||||
};
|
||||
use futures::future::ready;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||
events::StateEventType,
|
||||
api::federation::authorization::get_event_authorization, events::StateEventType,
|
||||
};
|
||||
|
||||
use super::{check_room_id, get_room_version_rules};
|
||||
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
event_id: &'a EventId,
|
||||
room_id: &'a RoomId,
|
||||
mut value: CanonicalJsonObject,
|
||||
auth_events_known: bool,
|
||||
_auth_events_known: bool,
|
||||
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
@@ -107,45 +107,52 @@ where
|
||||
}
|
||||
|
||||
// Fetch any missing ones & reject invalid ones
|
||||
let missing_auth_events = if auth_events_known {
|
||||
pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
pdu_event.auth_events().collect::<Vec<_>>()
|
||||
};
|
||||
if !missing_auth_events.is_empty() || !auth_events_known {
|
||||
debug_info!(
|
||||
"Fetching {} missing auth events for outlier event {event_id}",
|
||||
missing_auth_events.len()
|
||||
);
|
||||
for (pdu, _) in self
|
||||
.fetch_and_handle_outliers(
|
||||
if auth_events.len() != pdu_event.auth_events().count() {
|
||||
info!("Missing some auth events, asking remote for auth chain");
|
||||
let response: get_event_authorization::v1::Response = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
missing_auth_events.iter().copied(),
|
||||
create_event,
|
||||
room_id,
|
||||
get_event_authorization::v1::Request::new(
|
||||
room_id.to_owned(),
|
||||
event_id.to_owned(),
|
||||
),
|
||||
)
|
||||
.await
|
||||
{
|
||||
auth_events.insert(pdu.event_id().to_owned(), pdu);
|
||||
.map_err(|e| {
|
||||
err!(Request(Forbidden(
|
||||
"Remote server is not divulging incoming event's auth chain: {e}"
|
||||
)))
|
||||
})?;
|
||||
let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len());
|
||||
for auth_pdu_json in response.auth_chain {
|
||||
let (auth_event_room_id, auth_event_id, auth_pdu_json) =
|
||||
self.parse_incoming_pdu(&auth_pdu_json).await?;
|
||||
if auth_event_room_id != room_id {
|
||||
return Err!(Request(BadJson(
|
||||
"Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}."
|
||||
)));
|
||||
}
|
||||
let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json)
|
||||
.map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?;
|
||||
auth_chain_map.insert(auth_event_id, auth_pdu);
|
||||
}
|
||||
} else {
|
||||
debug!("No missing auth events for outlier event {event_id}");
|
||||
}
|
||||
// reject if we are still missing some
|
||||
let still_missing = pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.collect::<Vec<_>>();
|
||||
if !still_missing.is_empty() {
|
||||
// Don't reject: this could be a temporary condition
|
||||
// TODO: use get_missing_events?
|
||||
return Err!(Request(InvalidParam(
|
||||
"Could not fetch all auth events for outlier event {event_id}, still missing: \
|
||||
{still_missing:?}"
|
||||
)));
|
||||
for aid in pdu_event.auth_events() {
|
||||
if auth_events.contains_key(aid) {
|
||||
continue;
|
||||
}
|
||||
if let Some(auth_event) = auth_chain_map.get(aid) {
|
||||
auth_events.insert(aid.to_owned(), auth_event.clone());
|
||||
} else {
|
||||
return Err!(Request(Forbidden(
|
||||
"Remote server is not divulging incoming event's auth events (missing: \
|
||||
{aid})"
|
||||
)));
|
||||
}
|
||||
}
|
||||
// TODO: do events received from auth chain need persisting? that sounds
|
||||
// awfully slow
|
||||
}
|
||||
|
||||
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
use std::{collections::BTreeMap, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
|
||||
utils::continue_exponential_backoff_secs,
|
||||
};
|
||||
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
|
||||
use tracing::debug;
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(
|
||||
name = "prev",
|
||||
level = INFO_SPAN_LEVEL,
|
||||
skip_all,
|
||||
fields(%prev_id),
|
||||
)]
|
||||
pub(super) async fn handle_prev_pdu<'a, Pdu>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
event_id: &'a EventId,
|
||||
room_id: &'a RoomId,
|
||||
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
||||
create_event: &'a Pdu,
|
||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
||||
prev_id: &'a EventId,
|
||||
) -> Result
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
{
|
||||
// Check for disabled again because it might have changed
|
||||
if self.services.metadata.is_disabled(room_id).await {
|
||||
return Err!(Request(Forbidden(debug_warn!(
|
||||
"Federaton of room {room_id} is currently disabled on this server. Request by \
|
||||
origin {origin} and event ID {event_id}"
|
||||
))));
|
||||
}
|
||||
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(prev_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
|
||||
debug!(
|
||||
?tries,
|
||||
duration = ?time.elapsed(),
|
||||
"Backing off from prev_event"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let Some((pdu, json)) = eventid_info else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Skip old events
|
||||
if pdu.origin_server_ts() < first_ts_in_room {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let start_time = Instant::now();
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
|
||||
|
||||
defer! {{
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.remove(room_id);
|
||||
}};
|
||||
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
elapsed = ?start_time.elapsed(),
|
||||
"Handled prev_event",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -4,7 +4,6 @@ mod fetch_prev;
|
||||
mod fetch_state;
|
||||
mod handle_incoming_pdu;
|
||||
mod handle_outlier_pdu;
|
||||
mod handle_prev_pdu;
|
||||
mod parse_incoming_pdu;
|
||||
mod policy_server;
|
||||
mod resolve_state;
|
||||
@@ -15,6 +14,7 @@ use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
|
||||
pub use fetch_and_handle_outliers::build_local_dag;
|
||||
use ruma::{
|
||||
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
|
||||
room_version_rules::RoomVersionRules,
|
||||
@@ -22,7 +22,6 @@ use ruma::{
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::{Dep, globals, rooms, sending, server_keys};
|
||||
|
||||
pub struct Service {
|
||||
pub mutex_federation: RoomMutexMap,
|
||||
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
||||
|
||||
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
|
||||
|
||||
/// Parses every entry in an array as an event ID, returning an error if any
|
||||
/// step fails.
|
||||
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> {
|
||||
pub(super) fn expect_event_id_array(
|
||||
value: &CanonicalJsonObject,
|
||||
field: &str,
|
||||
) -> Result<Vec<OwnedEventId>> {
|
||||
value
|
||||
.get(field)
|
||||
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
|
||||
|
||||
@@ -204,17 +204,27 @@ pub async fn policy_server_allows_event(
|
||||
%ps.via,
|
||||
"Asking policy server to sign event"
|
||||
);
|
||||
self.fetch_policy_server_signature(pdu, pdu_json, &ps.via, outgoing, room_id, ps_key, 0)
|
||||
.await?;
|
||||
|
||||
// Verify that the policy server signature was made with the same public key as
|
||||
// is in the state event, not just that it was signed.
|
||||
if let Err(e) = self
|
||||
.fetch_policy_server_signature(pdu, pdu_json, &ps.via, outgoing, room_id, ps_key, 0)
|
||||
.await
|
||||
{
|
||||
if e.is_not_found() {
|
||||
return Ok(());
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
trace!(
|
||||
"Got successful response for fetching PS signature, ensuring it is signed with the \
|
||||
expected key."
|
||||
);
|
||||
if verify_policy_signature(&ps.via, ps_key, pdu_json, &room_version_rules.redaction) {
|
||||
Ok(())
|
||||
} else if incoming {
|
||||
Err!(Request(Forbidden("Policy server signature is invalid")))
|
||||
} else {
|
||||
Err(Error::Request(
|
||||
ErrorKind::Unknown,
|
||||
"Policy server signature was made with a different key to the one advertised".into(),
|
||||
"Policy server signature is invalid".into(),
|
||||
StatusCode::BAD_GATEWAY,
|
||||
))
|
||||
}
|
||||
@@ -272,7 +282,7 @@ async fn handle_policy_server_error(
|
||||
"Policy server is not actually a policy server or is not protecting this room: {}",
|
||||
error.message()
|
||||
);
|
||||
Ok(())
|
||||
Err(error)
|
||||
},
|
||||
| StatusCode::TOO_MANY_REQUESTS => {
|
||||
if let Some(retry_after) = error.retry_after() {
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::{
|
||||
};
|
||||
|
||||
use conduwuit::{
|
||||
Result, debug, err, error, implement,
|
||||
Result, debug, debug_error, err, error, implement,
|
||||
matrix::{Event, StateMap},
|
||||
trace,
|
||||
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
|
||||
@@ -37,6 +37,7 @@ where
|
||||
.pdu_shortstatehash(prev_event)
|
||||
.await
|
||||
else {
|
||||
trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state.");
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -99,6 +100,7 @@ where
|
||||
.map_ok(move |sstatehash| (sstatehash, prev_event))
|
||||
})
|
||||
.try_collect::<HashMap<_, _>>()
|
||||
.inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}"))
|
||||
.await
|
||||
else {
|
||||
return Ok(None);
|
||||
|
||||
@@ -41,6 +41,7 @@ where
|
||||
.get_pdu_id(incoming_pdu.event_id())
|
||||
.await
|
||||
{
|
||||
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
|
||||
return Ok(Some(pduid));
|
||||
}
|
||||
|
||||
@@ -63,6 +64,7 @@ where
|
||||
"Upgrading PDU from outlier to timeline"
|
||||
);
|
||||
let timer = Instant::now();
|
||||
let min_depth = self.services.metadata.get_mindepth(room_id).await;
|
||||
let room_version_rules = get_room_version_rules(create_event)?;
|
||||
|
||||
// 10. Fetch missing state and auth chain events by calling /state_ids at
|
||||
@@ -81,6 +83,7 @@ where
|
||||
};
|
||||
|
||||
if state_at_incoming_event.is_none() {
|
||||
trace!("Could not calculate incoming state, asking remote {origin} for it");
|
||||
state_at_incoming_event = self
|
||||
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
||||
.await?;
|
||||
@@ -382,6 +385,11 @@ where
|
||||
|
||||
// Event has passed all auth/stateres checks
|
||||
drop(state_lock);
|
||||
if incoming_pdu.depth > min_depth {
|
||||
self.services
|
||||
.metadata
|
||||
.set_mindepth(room_id, incoming_pdu.depth.into());
|
||||
}
|
||||
|
||||
Ok(pdu_id)
|
||||
}
|
||||
|
||||
@@ -626,6 +626,10 @@ impl Service {
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
self.services
|
||||
.metadata
|
||||
.maybe_set_mindepth(room_id, parsed_join_pdu.depth.into())
|
||||
.await;
|
||||
|
||||
info!("Setting final room state for new room");
|
||||
// We set the room state after inserting the pdu, so that we never have a moment
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{Result, implement, utils::stream::TryIgnore};
|
||||
use database::Map;
|
||||
use database::{Deserialized, Map};
|
||||
use futures::{Stream, StreamExt};
|
||||
use ruma::{OwnedRoomId, RoomId};
|
||||
use ruma::{OwnedRoomId, RoomId, UInt, uint};
|
||||
|
||||
use crate::{Dep, rooms};
|
||||
|
||||
@@ -17,6 +17,7 @@ struct Data {
|
||||
bannedroomids: Arc<Map>,
|
||||
roomid_shortroomid: Arc<Map>,
|
||||
pduid_pdu: Arc<Map>,
|
||||
roomid_mindepth: Arc<Map>,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
@@ -31,6 +32,7 @@ impl crate::Service for Service {
|
||||
bannedroomids: args.db["bannedroomids"].clone(),
|
||||
roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
|
||||
pduid_pdu: args.db["pduid_pdu"].clone(),
|
||||
roomid_mindepth: args.db["roomid_mindepth"].clone(),
|
||||
},
|
||||
services: Services {
|
||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||
@@ -98,3 +100,27 @@ pub async fn is_disabled(&self, room_id: &RoomId) -> bool {
|
||||
pub async fn is_banned(&self, room_id: &RoomId) -> bool {
|
||||
self.db.bannedroomids.get(room_id).await.is_ok()
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt {
|
||||
self.db
|
||||
.roomid_mindepth
|
||||
.get(room_id)
|
||||
.await
|
||||
.deserialized::<UInt>()
|
||||
.unwrap_or_else(|_| uint!(0))
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
|
||||
self.db
|
||||
.roomid_mindepth
|
||||
.put_raw(room_id.as_bytes(), min_depth.to_be_bytes());
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
|
||||
if min_depth > self.get_mindepth(room_id).await.into() {
|
||||
self.set_mindepth(room_id, min_depth);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,6 +173,10 @@ impl Service {
|
||||
self.db.get_non_outlier_pdu_json(event_id).await
|
||||
}
|
||||
|
||||
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool {
|
||||
self.db.non_outlier_pdu_exists(event_id).await.is_ok()
|
||||
}
|
||||
|
||||
/// Returns the pdu's id.
|
||||
#[inline]
|
||||
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
|
||||
|
||||
@@ -34,8 +34,9 @@ where
|
||||
|
||||
batch
|
||||
});
|
||||
|
||||
debug_assert!(!server_keys.is_empty(), "empty batch request to notary");
|
||||
if server_keys.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
while let Some(batch) = server_keys
|
||||
|
||||
Reference in New Issue
Block a user