mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c40cc3b236 | |||
| 754959e80d | |||
| 37888fb670 | |||
| 7207398a9e | |||
| 1a7bda209b | |||
| 7e1950b3d2 |
Generated
+26
-11
@@ -887,7 +887,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit"
|
name = "conduwuit"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"clap",
|
"clap",
|
||||||
"conduwuit_admin",
|
"conduwuit_admin",
|
||||||
@@ -919,7 +919,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_admin"
|
name = "conduwuit_admin"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"clap",
|
"clap",
|
||||||
"conduwuit_api",
|
"conduwuit_api",
|
||||||
@@ -940,7 +940,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_api"
|
name = "conduwuit_api"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"axum",
|
"axum",
|
||||||
@@ -972,14 +972,14 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_build_metadata"
|
name = "conduwuit_build_metadata"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"built",
|
"built",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_core"
|
name = "conduwuit_core"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"argon2",
|
"argon2",
|
||||||
"arrayvec",
|
"arrayvec",
|
||||||
@@ -1041,7 +1041,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_database"
|
name = "conduwuit_database"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-channel",
|
"async-channel",
|
||||||
"conduwuit_core",
|
"conduwuit_core",
|
||||||
@@ -1059,7 +1059,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_macros"
|
name = "conduwuit_macros"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"itertools 0.14.0",
|
"itertools 0.14.0",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
@@ -1069,7 +1069,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_router"
|
name = "conduwuit_router"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"axum",
|
"axum",
|
||||||
"axum-client-ip",
|
"axum-client-ip",
|
||||||
@@ -1103,7 +1103,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_service"
|
name = "conduwuit_service"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"askama",
|
"askama",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -1145,7 +1145,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "conduwuit_web"
|
name = "conduwuit_web"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"askama",
|
"askama",
|
||||||
"axum",
|
"axum",
|
||||||
@@ -4055,12 +4055,14 @@ dependencies = [
|
|||||||
"sync_wrapper",
|
"sync_wrapper",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
|
"tokio-util",
|
||||||
"tower",
|
"tower",
|
||||||
"tower-http",
|
"tower-http",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"url",
|
"url",
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
"wasm-bindgen-futures",
|
"wasm-bindgen-futures",
|
||||||
|
"wasm-streams",
|
||||||
"web-sys",
|
"web-sys",
|
||||||
"webpki-roots",
|
"webpki-roots",
|
||||||
]
|
]
|
||||||
@@ -5868,6 +5870,19 @@ dependencies = [
|
|||||||
"wasmparser",
|
"wasmparser",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasm-streams"
|
||||||
|
version = "0.4.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"js-sys",
|
||||||
|
"wasm-bindgen",
|
||||||
|
"wasm-bindgen-futures",
|
||||||
|
"web-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wasmparser"
|
name = "wasmparser"
|
||||||
version = "0.244.0"
|
version = "0.244.0"
|
||||||
@@ -6332,7 +6347,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "xtask"
|
name = "xtask"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"askama",
|
"askama",
|
||||||
"cargo_metadata",
|
"cargo_metadata",
|
||||||
|
|||||||
+3
-1
@@ -12,7 +12,7 @@ license = "Apache-2.0"
|
|||||||
# See also `rust-toolchain.toml`
|
# See also `rust-toolchain.toml`
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
repository = "https://forgejo.ellis.link/continuwuation/continuwuity"
|
repository = "https://forgejo.ellis.link/continuwuation/continuwuity"
|
||||||
version = "0.5.5"
|
version = "0.5.6-alpha"
|
||||||
|
|
||||||
[workspace.metadata.crane]
|
[workspace.metadata.crane]
|
||||||
name = "conduwuit"
|
name = "conduwuit"
|
||||||
@@ -144,6 +144,7 @@ features = [
|
|||||||
"socks",
|
"socks",
|
||||||
"hickory-dns",
|
"hickory-dns",
|
||||||
"http2",
|
"http2",
|
||||||
|
"stream",
|
||||||
]
|
]
|
||||||
|
|
||||||
[workspace.dependencies.serde]
|
[workspace.dependencies.serde]
|
||||||
@@ -363,6 +364,7 @@ features = [
|
|||||||
"unstable-msc2870",
|
"unstable-msc2870",
|
||||||
"unstable-msc3026",
|
"unstable-msc3026",
|
||||||
"unstable-msc3061",
|
"unstable-msc3061",
|
||||||
|
"unstable-msc3814",
|
||||||
"unstable-msc3245",
|
"unstable-msc3245",
|
||||||
"unstable-msc3266",
|
"unstable-msc3266",
|
||||||
"unstable-msc3381", # polls
|
"unstable-msc3381", # polls
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
Added MSC3814 Dehydrated Devices - you can now decrypt messages sent while all devices were logged out.
|
||||||
+15
-3
@@ -15,6 +15,18 @@ disallowed-macros = [
|
|||||||
{ path = "log::trace", reason = "use conduwuit_core::trace" },
|
{ path = "log::trace", reason = "use conduwuit_core::trace" },
|
||||||
]
|
]
|
||||||
|
|
||||||
disallowed-methods = [
|
[[disallowed-methods]]
|
||||||
{ path = "tokio::spawn", reason = "use and pass conduuwit_core::server::Server::runtime() to spawn from" },
|
path = "tokio::spawn"
|
||||||
]
|
reason = "use and pass conduwuit_core::server::Server::runtime() to spawn from"
|
||||||
|
|
||||||
|
[[disallowed-methods]]
|
||||||
|
path = "reqwest::Response::bytes"
|
||||||
|
reason = "bytes is unsafe, use limit_read via the conduwuit_core::utils::LimitReadExt trait instead"
|
||||||
|
|
||||||
|
[[disallowed-methods]]
|
||||||
|
path = "reqwest::Response::text"
|
||||||
|
reason = "text is unsafe, use limit_read_text via the conduwuit_core::utils::LimitReadExt trait instead"
|
||||||
|
|
||||||
|
[[disallowed-methods]]
|
||||||
|
path = "reqwest::Response::json"
|
||||||
|
reason = "json is unsafe, use limit_read_text via the conduwuit_core::utils::LimitReadExt trait instead"
|
||||||
|
|||||||
+7
-2
@@ -180,6 +180,11 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
|||||||
export RUSTFLAGS="${RUSTFLAGS}"
|
export RUSTFLAGS="${RUSTFLAGS}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
RUST_PROFILE_DIR="${RUST_PROFILE}"
|
||||||
|
if [[ "${RUST_PROFILE}" == "dev" ]]; then
|
||||||
|
RUST_PROFILE_DIR="debug"
|
||||||
|
fi
|
||||||
|
|
||||||
TARGET_DIR=($(cargo metadata --no-deps --format-version 1 | \
|
TARGET_DIR=($(cargo metadata --no-deps --format-version 1 | \
|
||||||
jq -r ".target_directory"))
|
jq -r ".target_directory"))
|
||||||
mkdir /out/sbin
|
mkdir /out/sbin
|
||||||
@@ -191,8 +196,8 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
|||||||
jq -r ".packages[] | select(.name == \"$PACKAGE\") | .targets[] | select( .kind | map(. == \"bin\") | any ) | .name"))
|
jq -r ".packages[] | select(.name == \"$PACKAGE\") | .targets[] | select( .kind | map(. == \"bin\") | any ) | .name"))
|
||||||
for BINARY in "${BINARIES[@]}"; do
|
for BINARY in "${BINARIES[@]}"; do
|
||||||
echo $BINARY
|
echo $BINARY
|
||||||
xx-verify $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE}/$BINARY
|
xx-verify $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE_DIR}/$BINARY
|
||||||
cp $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE}/$BINARY /out/sbin/$BINARY
|
cp $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE_DIR}/$BINARY /out/sbin/$BINARY
|
||||||
done
|
done
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
|
|
||||||
use conduwuit::{Err, Result};
|
use conduwuit::{Err, Result, utils::response::LimitReadExt};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use ruma::{OwnedRoomId, OwnedServerName, OwnedUserId};
|
use ruma::{OwnedRoomId, OwnedServerName, OwnedUserId};
|
||||||
|
|
||||||
@@ -55,7 +55,15 @@ pub(super) async fn fetch_support_well_known(&self, server_name: OwnedServerName
|
|||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let text = response.text().await?;
|
let text = response
|
||||||
|
.limit_read_text(
|
||||||
|
self.services
|
||||||
|
.config
|
||||||
|
.max_request_size
|
||||||
|
.try_into()
|
||||||
|
.expect("u64 fits into usize"),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
if text.is_empty() {
|
if text.is_empty() {
|
||||||
return Err!("Response text/body is empty.");
|
return Err!("Response text/body is empty.");
|
||||||
|
|||||||
@@ -0,0 +1,121 @@
|
|||||||
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
|
use conduwuit::{Err, Result, at};
|
||||||
|
use futures::StreamExt;
|
||||||
|
use ruma::api::client::dehydrated_device::{
|
||||||
|
delete_dehydrated_device::unstable as delete_dehydrated_device,
|
||||||
|
get_dehydrated_device::unstable as get_dehydrated_device, get_events::unstable as get_events,
|
||||||
|
put_dehydrated_device::unstable as put_dehydrated_device,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::Ruma;
|
||||||
|
|
||||||
|
const MAX_BATCH_EVENTS: usize = 50;
|
||||||
|
|
||||||
|
/// # `PUT /_matrix/client/../dehydrated_device`
|
||||||
|
///
|
||||||
|
/// Creates or overwrites the user's dehydrated device.
|
||||||
|
#[tracing::instrument(skip_all, fields(%client))]
|
||||||
|
pub(crate) async fn put_dehydrated_device_route(
|
||||||
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client): InsecureClientIp,
|
||||||
|
body: Ruma<put_dehydrated_device::Request>,
|
||||||
|
) -> Result<put_dehydrated_device::Response> {
|
||||||
|
let sender_user = body
|
||||||
|
.sender_user
|
||||||
|
.as_deref()
|
||||||
|
.expect("AccessToken authentication required");
|
||||||
|
|
||||||
|
let device_id = body.body.device_id.clone();
|
||||||
|
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.set_dehydrated_device(sender_user, body.body)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(put_dehydrated_device::Response { device_id })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # `DELETE /_matrix/client/../dehydrated_device`
|
||||||
|
///
|
||||||
|
/// Deletes the user's dehydrated device without replacement.
|
||||||
|
#[tracing::instrument(skip_all, fields(%client))]
|
||||||
|
pub(crate) async fn delete_dehydrated_device_route(
|
||||||
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client): InsecureClientIp,
|
||||||
|
body: Ruma<delete_dehydrated_device::Request>,
|
||||||
|
) -> Result<delete_dehydrated_device::Response> {
|
||||||
|
let sender_user = body.sender_user();
|
||||||
|
|
||||||
|
let device_id = services.users.get_dehydrated_device_id(sender_user).await?;
|
||||||
|
|
||||||
|
services.users.remove_device(sender_user, &device_id).await;
|
||||||
|
|
||||||
|
Ok(delete_dehydrated_device::Response { device_id })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # `GET /_matrix/client/../dehydrated_device`
|
||||||
|
///
|
||||||
|
/// Gets the user's dehydrated device
|
||||||
|
#[tracing::instrument(skip_all, fields(%client))]
|
||||||
|
pub(crate) async fn get_dehydrated_device_route(
|
||||||
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client): InsecureClientIp,
|
||||||
|
body: Ruma<get_dehydrated_device::Request>,
|
||||||
|
) -> Result<get_dehydrated_device::Response> {
|
||||||
|
let sender_user = body.sender_user();
|
||||||
|
|
||||||
|
let device = services.users.get_dehydrated_device(sender_user).await?;
|
||||||
|
|
||||||
|
Ok(get_dehydrated_device::Response {
|
||||||
|
device_id: device.device_id,
|
||||||
|
device_data: device.device_data,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # `GET /_matrix/client/../dehydrated_device/{device_id}/events`
|
||||||
|
///
|
||||||
|
/// Paginates the events of the dehydrated device.
|
||||||
|
#[tracing::instrument(skip_all, fields(%client))]
|
||||||
|
pub(crate) async fn get_dehydrated_events_route(
|
||||||
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client): InsecureClientIp,
|
||||||
|
body: Ruma<get_events::Request>,
|
||||||
|
) -> Result<get_events::Response> {
|
||||||
|
let sender_user = body.sender_user();
|
||||||
|
|
||||||
|
let device_id = &body.body.device_id;
|
||||||
|
let existing_id = services.users.get_dehydrated_device_id(sender_user).await;
|
||||||
|
|
||||||
|
if existing_id.as_ref().is_err()
|
||||||
|
|| existing_id
|
||||||
|
.as_ref()
|
||||||
|
.is_ok_and(|existing_id| existing_id != device_id)
|
||||||
|
{
|
||||||
|
return Err!(Request(Forbidden("Not the dehydrated device_id.")));
|
||||||
|
}
|
||||||
|
|
||||||
|
let since: Option<u64> = body
|
||||||
|
.body
|
||||||
|
.next_batch
|
||||||
|
.as_deref()
|
||||||
|
.map(str::parse)
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
let mut next_batch: Option<u64> = None;
|
||||||
|
let events = services
|
||||||
|
.users
|
||||||
|
.get_to_device_events(sender_user, device_id, since, None)
|
||||||
|
.take(MAX_BATCH_EVENTS)
|
||||||
|
.inspect(|&(count, _)| {
|
||||||
|
next_batch.replace(count);
|
||||||
|
})
|
||||||
|
.map(at!(1))
|
||||||
|
.collect()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Ok(get_events::Response {
|
||||||
|
events,
|
||||||
|
next_batch: next_batch.as_ref().map(ToString::to_string),
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -6,6 +6,7 @@ pub(super) mod appservice;
|
|||||||
pub(super) mod backup;
|
pub(super) mod backup;
|
||||||
pub(super) mod capabilities;
|
pub(super) mod capabilities;
|
||||||
pub(super) mod context;
|
pub(super) mod context;
|
||||||
|
pub(super) mod dehydrated_device;
|
||||||
pub(super) mod device;
|
pub(super) mod device;
|
||||||
pub(super) mod directory;
|
pub(super) mod directory;
|
||||||
pub(super) mod filter;
|
pub(super) mod filter;
|
||||||
@@ -49,6 +50,7 @@ pub(super) use appservice::*;
|
|||||||
pub(super) use backup::*;
|
pub(super) use backup::*;
|
||||||
pub(super) use capabilities::*;
|
pub(super) use capabilities::*;
|
||||||
pub(super) use context::*;
|
pub(super) use context::*;
|
||||||
|
pub(super) use dehydrated_device::*;
|
||||||
pub(super) use device::*;
|
pub(super) use device::*;
|
||||||
pub(super) use directory::*;
|
pub(super) use directory::*;
|
||||||
pub(super) use filter::*;
|
pub(super) use filter::*;
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ use std::{
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum_client_ip::InsecureClientIp;
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Result, extract_variant,
|
Result, at, extract_variant,
|
||||||
utils::{
|
utils::{
|
||||||
ReadyExt, TryFutureExtExt,
|
ReadyExt, TryFutureExtExt,
|
||||||
stream::{BroadbandExt, Tools, WidebandExt},
|
stream::{BroadbandExt, Tools, WidebandExt},
|
||||||
@@ -385,6 +385,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
last_sync_end_count,
|
last_sync_end_count,
|
||||||
Some(current_count),
|
Some(current_count),
|
||||||
)
|
)
|
||||||
|
.map(at!(1))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let device_one_time_keys_count = services
|
let device_one_time_keys_count = services
|
||||||
|
|||||||
@@ -1029,6 +1029,7 @@ async fn collect_to_device(
|
|||||||
events: services
|
events: services
|
||||||
.users
|
.users
|
||||||
.get_to_device_events(sender_user, sender_device, None, Some(next_batch))
|
.get_to_device_events(sender_user, sender_device, None, Some(next_batch))
|
||||||
|
.map(at!(1))
|
||||||
.collect()
|
.collect()
|
||||||
.await,
|
.await,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ pub(crate) async fn get_supported_versions_route(
|
|||||||
("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */
|
("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */
|
||||||
("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */
|
("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */
|
||||||
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
|
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
|
||||||
|
("org.matrix.msc3814".to_owned(), true), /* dehydrated devices */
|
||||||
("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */
|
("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */
|
||||||
("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */
|
("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */
|
||||||
("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */
|
("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */
|
||||||
|
|||||||
@@ -160,6 +160,10 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
|||||||
.ruma_route(&client::update_device_route)
|
.ruma_route(&client::update_device_route)
|
||||||
.ruma_route(&client::delete_device_route)
|
.ruma_route(&client::delete_device_route)
|
||||||
.ruma_route(&client::delete_devices_route)
|
.ruma_route(&client::delete_devices_route)
|
||||||
|
.ruma_route(&client::put_dehydrated_device_route)
|
||||||
|
.ruma_route(&client::delete_dehydrated_device_route)
|
||||||
|
.ruma_route(&client::get_dehydrated_device_route)
|
||||||
|
.ruma_route(&client::get_dehydrated_events_route)
|
||||||
.ruma_route(&client::get_tags_route)
|
.ruma_route(&client::get_tags_route)
|
||||||
.ruma_route(&client::update_tag_route)
|
.ruma_route(&client::update_tag_route)
|
||||||
.ruma_route(&client::delete_tag_route)
|
.ruma_route(&client::delete_tag_route)
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ pub mod json;
|
|||||||
pub mod math;
|
pub mod math;
|
||||||
pub mod mutex_map;
|
pub mod mutex_map;
|
||||||
pub mod rand;
|
pub mod rand;
|
||||||
|
pub mod response;
|
||||||
pub mod result;
|
pub mod result;
|
||||||
pub mod set;
|
pub mod set;
|
||||||
pub mod stream;
|
pub mod stream;
|
||||||
|
|||||||
@@ -0,0 +1,51 @@
|
|||||||
|
use futures::StreamExt;
|
||||||
|
use num_traits::ToPrimitive;
|
||||||
|
|
||||||
|
use crate::Err;
|
||||||
|
|
||||||
|
/// Reads the response body while enforcing a maximum size limit to prevent
|
||||||
|
/// memory exhaustion.
|
||||||
|
pub async fn limit_read(response: reqwest::Response, max_size: u64) -> crate::Result<Vec<u8>> {
|
||||||
|
if response.content_length().is_some_and(|len| len > max_size) {
|
||||||
|
return Err!(BadServerResponse("Response too large"));
|
||||||
|
}
|
||||||
|
let mut data = Vec::new();
|
||||||
|
let mut reader = response.bytes_stream();
|
||||||
|
|
||||||
|
while let Some(chunk) = reader.next().await {
|
||||||
|
let chunk = chunk?;
|
||||||
|
data.extend_from_slice(&chunk);
|
||||||
|
|
||||||
|
if data.len() > max_size.to_usize().expect("max_size must fit in usize") {
|
||||||
|
return Err!(BadServerResponse("Response too large"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads the response body as text while enforcing a maximum size limit to
|
||||||
|
/// prevent memory exhaustion.
|
||||||
|
pub async fn limit_read_text(
|
||||||
|
response: reqwest::Response,
|
||||||
|
max_size: u64,
|
||||||
|
) -> crate::Result<String> {
|
||||||
|
let text = String::from_utf8(limit_read(response, max_size).await?)?;
|
||||||
|
Ok(text)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
|
pub trait LimitReadExt {
|
||||||
|
async fn limit_read(self, max_size: u64) -> crate::Result<Vec<u8>>;
|
||||||
|
async fn limit_read_text(self, max_size: u64) -> crate::Result<String>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LimitReadExt for reqwest::Response {
|
||||||
|
async fn limit_read(self, max_size: u64) -> crate::Result<Vec<u8>> {
|
||||||
|
limit_read(self, max_size).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn limit_read_text(self, max_size: u64) -> crate::Result<String> {
|
||||||
|
limit_read_text(self, max_size).await
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -362,6 +362,10 @@ pub(super) static MAPS: &[Descriptor] = &[
|
|||||||
name: "userid_blurhash",
|
name: "userid_blurhash",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
},
|
},
|
||||||
|
Descriptor {
|
||||||
|
name: "userid_dehydrateddevice",
|
||||||
|
..descriptor::RANDOM_SMALL
|
||||||
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "userid_devicelistversion",
|
name: "userid_devicelistversion",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
|
|||||||
@@ -530,7 +530,12 @@ impl Service {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn is_admin_command<E>(&self, event: &E, body: &str) -> Option<InvocationSource>
|
pub async fn is_admin_command<E>(
|
||||||
|
&self,
|
||||||
|
event: &E,
|
||||||
|
body: &str,
|
||||||
|
sent_locally: bool,
|
||||||
|
) -> Option<InvocationSource>
|
||||||
where
|
where
|
||||||
E: Event + Send + Sync,
|
E: Event + Send + Sync,
|
||||||
{
|
{
|
||||||
@@ -580,6 +585,15 @@ impl Service {
|
|||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Escaped commands must be sent locally (via client API), not via federation
|
||||||
|
if !sent_locally {
|
||||||
|
conduwuit::warn!(
|
||||||
|
"Ignoring escaped admin command from {} that arrived via federation",
|
||||||
|
event.sender()
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
// Looks good
|
// Looks good
|
||||||
Some(InvocationSource::EscapedCommand)
|
Some(InvocationSource::EscapedCommand)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use conduwuit::{Result, Server, debug, error, warn};
|
use conduwuit::{Result, Server, debug, error, utils::response::LimitReadExt, warn};
|
||||||
use database::{Deserialized, Map};
|
use database::{Deserialized, Map};
|
||||||
use ruma::events::{Mentions, room::message::RoomMessageEventContent};
|
use ruma::events::{Mentions, room::message::RoomMessageEventContent};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
@@ -137,7 +137,7 @@ impl Service {
|
|||||||
.get(CHECK_FOR_ANNOUNCEMENTS_URL)
|
.get(CHECK_FOR_ANNOUNCEMENTS_URL)
|
||||||
.send()
|
.send()
|
||||||
.await?
|
.await?
|
||||||
.text()
|
.limit_read_text(1024 * 1024)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let response = serde_json::from_str::<CheckForAnnouncementsResponse>(&response)?;
|
let response = serde_json::from_str::<CheckForAnnouncementsResponse>(&response)?;
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ use std::{fmt::Debug, mem};
|
|||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
|
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, implement,
|
||||||
error::inspect_debug_log, implement, trace,
|
trace, utils::response::LimitReadExt,
|
||||||
};
|
};
|
||||||
use http::{HeaderValue, header::AUTHORIZATION};
|
use http::{HeaderValue, header::AUTHORIZATION};
|
||||||
use ipaddress::IPAddress;
|
use ipaddress::IPAddress;
|
||||||
@@ -133,7 +133,22 @@ async fn handle_response<T>(
|
|||||||
where
|
where
|
||||||
T: OutgoingRequest + Send,
|
T: OutgoingRequest + Send,
|
||||||
{
|
{
|
||||||
let response = into_http_response(dest, actual, method, url, response).await?;
|
const HUGE_ENDPOINTS: [&str; 2] =
|
||||||
|
["/_matrix/federation/v2/send_join/", "/_matrix/federation/v2/state/"];
|
||||||
|
let size_limit: u64 = if HUGE_ENDPOINTS.iter().any(|e| url.path().starts_with(e)) {
|
||||||
|
// Some federation endpoints can return huge response bodies, so we'll bump the
|
||||||
|
// limit for those endpoints specifically.
|
||||||
|
self.services
|
||||||
|
.server
|
||||||
|
.config
|
||||||
|
.max_request_size
|
||||||
|
.saturating_mul(10)
|
||||||
|
} else {
|
||||||
|
self.services.server.config.max_request_size
|
||||||
|
}
|
||||||
|
.try_into()
|
||||||
|
.expect("size_limit (usize) should fit within a u64");
|
||||||
|
let response = into_http_response(dest, actual, method, url, response, size_limit).await?;
|
||||||
|
|
||||||
T::IncomingResponse::try_from_http_response(response)
|
T::IncomingResponse::try_from_http_response(response)
|
||||||
.map_err(|e| err!(BadServerResponse("Server returned bad 200 response: {e:?}")))
|
.map_err(|e| err!(BadServerResponse("Server returned bad 200 response: {e:?}")))
|
||||||
@@ -145,6 +160,7 @@ async fn into_http_response(
|
|||||||
method: &Method,
|
method: &Method,
|
||||||
url: &Url,
|
url: &Url,
|
||||||
mut response: Response,
|
mut response: Response,
|
||||||
|
max_size: u64,
|
||||||
) -> Result<http::Response<Bytes>> {
|
) -> Result<http::Response<Bytes>> {
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
trace!(
|
trace!(
|
||||||
@@ -167,14 +183,14 @@ async fn into_http_response(
|
|||||||
);
|
);
|
||||||
|
|
||||||
trace!("Waiting for response body...");
|
trace!("Waiting for response body...");
|
||||||
let body = response
|
|
||||||
.bytes()
|
|
||||||
.await
|
|
||||||
.inspect_err(inspect_debug_log)
|
|
||||||
.unwrap_or_else(|_| Vec::new().into());
|
|
||||||
|
|
||||||
let http_response = http_response_builder
|
let http_response = http_response_builder
|
||||||
.body(body)
|
.body(
|
||||||
|
response
|
||||||
|
.limit_read(max_size)
|
||||||
|
.await
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into(),
|
||||||
|
)
|
||||||
.expect("reqwest body is valid http body");
|
.expect("reqwest body is valid http body");
|
||||||
|
|
||||||
debug!("Got {status:?} for {method} {url}");
|
debug!("Got {status:?} for {method} {url}");
|
||||||
|
|||||||
@@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
|
|
||||||
use conduwuit::{Err, Result, debug, err};
|
use conduwuit::{Err, Result, debug, err, utils::response::LimitReadExt};
|
||||||
use conduwuit_core::implement;
|
use conduwuit_core::implement;
|
||||||
use ipaddress::IPAddress;
|
use ipaddress::IPAddress;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
@@ -112,8 +112,22 @@ pub async fn download_image(&self, url: &str) -> Result<UrlPreviewData> {
|
|||||||
use image::ImageReader;
|
use image::ImageReader;
|
||||||
use ruma::Mxc;
|
use ruma::Mxc;
|
||||||
|
|
||||||
let image = self.services.client.url_preview.get(url).send().await?;
|
let image = self
|
||||||
let image = image.bytes().await?;
|
.services
|
||||||
|
.client
|
||||||
|
.url_preview
|
||||||
|
.get(url)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.limit_read(
|
||||||
|
self.services
|
||||||
|
.server
|
||||||
|
.config
|
||||||
|
.max_request_size
|
||||||
|
.try_into()
|
||||||
|
.expect("u64 should fit in usize"),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
let mxc = Mxc {
|
let mxc = Mxc {
|
||||||
server_name: self.services.globals.server_name(),
|
server_name: self.services.globals.server_name(),
|
||||||
media_id: &random_string(super::MXC_LENGTH),
|
media_id: &random_string(super::MXC_LENGTH),
|
||||||
@@ -151,24 +165,20 @@ async fn download_html(&self, url: &str) -> Result<UrlPreviewData> {
|
|||||||
use webpage::HTML;
|
use webpage::HTML;
|
||||||
|
|
||||||
let client = &self.services.client.url_preview;
|
let client = &self.services.client.url_preview;
|
||||||
let mut response = client.get(url).send().await?;
|
let body = client
|
||||||
|
.get(url)
|
||||||
let mut bytes: Vec<u8> = Vec::new();
|
.send()
|
||||||
while let Some(chunk) = response.chunk().await? {
|
.await?
|
||||||
bytes.extend_from_slice(&chunk);
|
.limit_read_text(
|
||||||
if bytes.len() > self.services.globals.url_preview_max_spider_size() {
|
self.services
|
||||||
debug!(
|
.server
|
||||||
"Response body from URL {} exceeds url_preview_max_spider_size ({}), not \
|
.config
|
||||||
processing the rest of the response body and assuming our necessary data is in \
|
.max_request_size
|
||||||
this range.",
|
.try_into()
|
||||||
url,
|
.expect("u64 should fit in usize"),
|
||||||
self.services.globals.url_preview_max_spider_size()
|
)
|
||||||
);
|
.await?;
|
||||||
break;
|
let Ok(html) = HTML::from_string(body.clone(), Some(url.to_owned())) else {
|
||||||
}
|
|
||||||
}
|
|
||||||
let body = String::from_utf8_lossy(&bytes);
|
|
||||||
let Ok(html) = HTML::from_string(body.to_string(), Some(url.to_owned())) else {
|
|
||||||
return Err!(Request(Unknown("Failed to parse HTML")));
|
return Err!(Request(Unknown("Failed to parse HTML")));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use std::{fmt::Debug, time::Duration};
|
|||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Error, Result, debug_warn, err, implement,
|
Err, Error, Result, debug_warn, err, implement,
|
||||||
utils::content_disposition::make_content_disposition,
|
utils::{content_disposition::make_content_disposition, response::LimitReadExt},
|
||||||
};
|
};
|
||||||
use http::header::{CONTENT_DISPOSITION, CONTENT_TYPE, HeaderValue};
|
use http::header::{CONTENT_DISPOSITION, CONTENT_TYPE, HeaderValue};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
@@ -286,10 +286,15 @@ async fn location_request(&self, location: &str) -> Result<FileMeta> {
|
|||||||
.and_then(Result::ok);
|
.and_then(Result::ok);
|
||||||
|
|
||||||
response
|
response
|
||||||
.bytes()
|
.limit_read(
|
||||||
|
self.services
|
||||||
|
.server
|
||||||
|
.config
|
||||||
|
.max_request_size
|
||||||
|
.try_into()
|
||||||
|
.expect("u64 should fit in usize"),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.map(Vec::from)
|
|
||||||
.map_err(Into::into)
|
|
||||||
.map(|content| FileMeta {
|
.map(|content| FileMeta {
|
||||||
content: Some(content),
|
content: Some(content),
|
||||||
content_type: content_type.clone(),
|
content_type: content_type.clone(),
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::{fmt::Debug, mem, sync::Arc};
|
use std::{fmt::Debug, mem, sync::Arc};
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
use conduwuit::utils::response::LimitReadExt;
|
||||||
use conduwuit_core::{
|
use conduwuit_core::{
|
||||||
Err, Event, Result, debug_warn, err, trace,
|
Err, Event, Result, debug_warn, err, trace,
|
||||||
utils::{stream::TryIgnore, string_from_bytes},
|
utils::{stream::TryIgnore, string_from_bytes},
|
||||||
@@ -30,7 +31,7 @@ use ruma::{
|
|||||||
uint,
|
uint,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{Dep, client, globals, rooms, sending, users};
|
use crate::{Dep, client, config, globals, rooms, sending, users};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
db: Data,
|
db: Data,
|
||||||
@@ -39,6 +40,7 @@ pub struct Service {
|
|||||||
|
|
||||||
struct Services {
|
struct Services {
|
||||||
globals: Dep<globals::Service>,
|
globals: Dep<globals::Service>,
|
||||||
|
config: Dep<config::Service>,
|
||||||
client: Dep<client::Service>,
|
client: Dep<client::Service>,
|
||||||
state_accessor: Dep<rooms::state_accessor::Service>,
|
state_accessor: Dep<rooms::state_accessor::Service>,
|
||||||
state_cache: Dep<rooms::state_cache::Service>,
|
state_cache: Dep<rooms::state_cache::Service>,
|
||||||
@@ -61,6 +63,7 @@ impl crate::Service for Service {
|
|||||||
services: Services {
|
services: Services {
|
||||||
globals: args.depend::<globals::Service>("globals"),
|
globals: args.depend::<globals::Service>("globals"),
|
||||||
client: args.depend::<client::Service>("client"),
|
client: args.depend::<client::Service>("client"),
|
||||||
|
config: args.depend::<config::Service>("config"),
|
||||||
state_accessor: args
|
state_accessor: args
|
||||||
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
|
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
|
||||||
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
|
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
|
||||||
@@ -245,7 +248,15 @@ impl Service {
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let body = response.bytes().await?;
|
let body = response
|
||||||
|
.limit_read(
|
||||||
|
self.services
|
||||||
|
.config
|
||||||
|
.max_request_size
|
||||||
|
.try_into()
|
||||||
|
.expect("usize fits into u64"),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
|
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
use conduwuit::{Result, debug, debug_error, debug_info, debug_warn, implement, trace};
|
use conduwuit::{
|
||||||
|
Result, debug, debug_error, debug_info, implement, trace, utils::response::LimitReadExt,
|
||||||
|
};
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(name = "well-known", level = "debug", skip(self, dest))]
|
#[tracing::instrument(name = "well-known", level = "debug", skip(self, dest))]
|
||||||
@@ -24,12 +26,8 @@ pub(super) async fn request_well_known(&self, dest: &str) -> Result<Option<Strin
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let text = response.text().await?;
|
let text = response.limit_read_text(8192).await?;
|
||||||
trace!("response text: {text:?}");
|
trace!("response text: {text:?}");
|
||||||
if text.len() >= 12288 {
|
|
||||||
debug_warn!("response contains junk");
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
let body: serde_json::Value = serde_json::from_str(&text).unwrap_or_default();
|
let body: serde_json::Value = serde_json::from_str(&text).unwrap_or_default();
|
||||||
|
|
||||||
|
|||||||
@@ -72,6 +72,26 @@ where
|
|||||||
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock, room_id)
|
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock, room_id)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Process admin commands for federation events
|
||||||
|
if *pdu.kind() == TimelineEventType::RoomMessage {
|
||||||
|
let content: ExtractBody = pdu.get_content()?;
|
||||||
|
if let Some(body) = content.body {
|
||||||
|
if let Some(source) = self
|
||||||
|
.services
|
||||||
|
.admin
|
||||||
|
.is_admin_command(pdu, &body, false)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
self.services.admin.command_with_sender(
|
||||||
|
body,
|
||||||
|
Some(pdu.event_id().into()),
|
||||||
|
source,
|
||||||
|
pdu.sender.clone().into(),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Some(pdu_id))
|
Ok(Some(pdu_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -334,15 +354,6 @@ where
|
|||||||
let content: ExtractBody = pdu.get_content()?;
|
let content: ExtractBody = pdu.get_content()?;
|
||||||
if let Some(body) = content.body {
|
if let Some(body) = content.body {
|
||||||
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
|
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
|
||||||
|
|
||||||
if let Some(source) = self.services.admin.is_admin_command(pdu, &body).await {
|
|
||||||
self.services.admin.command_with_sender(
|
|
||||||
body,
|
|
||||||
Some((pdu.event_id()).into()),
|
|
||||||
source,
|
|
||||||
pdu.sender.clone().into(),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
| _ => {},
|
| _ => {},
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use ruma::{
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::RoomMutexGuard;
|
use super::{ExtractBody, RoomMutexGuard};
|
||||||
|
|
||||||
/// Creates a new persisted data unit and adds it to a room. This function
|
/// Creates a new persisted data unit and adds it to a room. This function
|
||||||
/// takes a roomid_mutex_state, meaning that only this function is able to
|
/// takes a roomid_mutex_state, meaning that only this function is able to
|
||||||
@@ -126,6 +126,26 @@ pub async fn build_and_append_pdu(
|
|||||||
.boxed()
|
.boxed()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Process admin commands for locally sent events
|
||||||
|
if *pdu.kind() == TimelineEventType::RoomMessage {
|
||||||
|
let content: ExtractBody = pdu.get_content()?;
|
||||||
|
if let Some(body) = content.body {
|
||||||
|
if let Some(source) = self
|
||||||
|
.services
|
||||||
|
.admin
|
||||||
|
.is_admin_command(&pdu, &body, true)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
self.services.admin.command_with_sender(
|
||||||
|
body,
|
||||||
|
Some(pdu.event_id().into()),
|
||||||
|
source,
|
||||||
|
pdu.sender.clone().into(),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We set the room state after inserting the pdu, so that we never have a moment
|
// We set the room state after inserting the pdu, so that we never have a moment
|
||||||
// in time where events in the current room state do not exist
|
// in time where events in the current room state do not exist
|
||||||
trace!("Setting room state for room {room_id}");
|
trace!("Setting room state for room {room_id}");
|
||||||
@@ -167,6 +187,8 @@ pub async fn build_and_append_pdu(
|
|||||||
Ok(pdu.event_id().to_owned())
|
Ok(pdu.event_id().to_owned())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Assert invariants about the admin room, to prevent (for example) all admins
|
||||||
|
/// from leaving or being banned from the room
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(skip_all, level = "debug")]
|
#[tracing::instrument(skip_all, level = "debug")]
|
||||||
async fn check_pdu_for_admin_room<Pdu>(&self, pdu: &Pdu, sender: &UserId) -> Result
|
async fn check_pdu_for_admin_room<Pdu>(&self, pdu: &Pdu, sender: &UserId) -> Result
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use std::{fmt::Debug, mem};
|
use std::{fmt::Debug, mem};
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use conduwuit::{Err, Result, debug_error, err, utils, warn};
|
use conduwuit::{Err, Result, debug_error, err, utils, utils::response::LimitReadExt, warn};
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use ruma::api::{IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken};
|
use ruma::api::{IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken};
|
||||||
|
|
||||||
@@ -38,7 +38,7 @@ where
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let body = response.bytes().await?; // TODO: handle timeout
|
let body = response.limit_read(65535).await?; // TODO: handle timeout
|
||||||
|
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
debug_error!("Antispam response bytes: {:?}", utils::string_from_bytes(&body));
|
debug_error!("Antispam response bytes: {:?}", utils::string_from_bytes(&body));
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
use std::{fmt::Debug, mem};
|
use std::{fmt::Debug, mem};
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use conduwuit::{Err, Result, debug_error, err, implement, trace, utils, warn};
|
use conduwuit::{
|
||||||
|
Err, Result, debug_error, err, implement, trace, utils, utils::response::LimitReadExt, warn,
|
||||||
|
};
|
||||||
use ruma::api::{
|
use ruma::api::{
|
||||||
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
|
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
|
||||||
};
|
};
|
||||||
@@ -77,7 +79,15 @@ where
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let body = response.bytes().await?;
|
let body = response
|
||||||
|
.limit_read(
|
||||||
|
self.server
|
||||||
|
.config
|
||||||
|
.max_request_size
|
||||||
|
.try_into()
|
||||||
|
.expect("usize fits into u64"),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
|
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ use std::{
|
|||||||
|
|
||||||
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
|
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
|
||||||
use conduwuit_core::{
|
use conduwuit_core::{
|
||||||
Error, Event, Result, debug, err, error,
|
Error, Event, Result, at, debug, err, error,
|
||||||
result::LogErr,
|
result::LogErr,
|
||||||
trace,
|
trace,
|
||||||
utils::{
|
utils::{
|
||||||
@@ -175,7 +175,7 @@ impl Service {
|
|||||||
if !new_events.is_empty() {
|
if !new_events.is_empty() {
|
||||||
self.db.mark_as_active(new_events.iter());
|
self.db.mark_as_active(new_events.iter());
|
||||||
|
|
||||||
let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect();
|
let new_events_vec = new_events.into_iter().map(at!(1)).collect();
|
||||||
futures.push(self.send_events(dest.clone(), new_events_vec));
|
futures.push(self.send_events(dest.clone(), new_events_vec));
|
||||||
} else {
|
} else {
|
||||||
statuses.remove(dest);
|
statuses.remove(dest);
|
||||||
|
|||||||
@@ -0,0 +1,149 @@
|
|||||||
|
use conduwuit::{Err, Result, implement, trace};
|
||||||
|
use conduwuit_database::{Deserialized, Json};
|
||||||
|
use ruma::{
|
||||||
|
DeviceId, OwnedDeviceId, UserId,
|
||||||
|
api::client::dehydrated_device::{
|
||||||
|
DehydratedDeviceData, put_dehydrated_device::unstable::Request,
|
||||||
|
},
|
||||||
|
serde::Raw,
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct DehydratedDevice {
|
||||||
|
/// Unique ID of the device.
|
||||||
|
pub device_id: OwnedDeviceId,
|
||||||
|
|
||||||
|
/// Contains serialized and encrypted private data.
|
||||||
|
pub device_data: Raw<DehydratedDeviceData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates or recreates the user's dehydrated device.
|
||||||
|
#[implement(super::Service)]
|
||||||
|
#[tracing::instrument(
|
||||||
|
level = "info",
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
%user_id,
|
||||||
|
device_id = %request.device_id,
|
||||||
|
display_name = ?request.initial_device_display_name,
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
pub async fn set_dehydrated_device(&self, user_id: &UserId, request: Request) -> Result {
|
||||||
|
assert!(
|
||||||
|
self.exists(user_id).await,
|
||||||
|
"Tried to create dehydrated device for non-existent user"
|
||||||
|
);
|
||||||
|
|
||||||
|
let existing_id = self.get_dehydrated_device_id(user_id).await;
|
||||||
|
|
||||||
|
if existing_id.is_err()
|
||||||
|
&& self
|
||||||
|
.get_device_metadata(user_id, &request.device_id)
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
return Err!("A hydrated device already exists with that ID.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(existing_id) = existing_id {
|
||||||
|
self.remove_device(user_id, &existing_id).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.create_device(
|
||||||
|
user_id,
|
||||||
|
&request.device_id,
|
||||||
|
"",
|
||||||
|
request.initial_device_display_name.clone(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
trace!(device_data = ?request.device_data);
|
||||||
|
self.db.userid_dehydrateddevice.raw_put(
|
||||||
|
user_id,
|
||||||
|
Json(&DehydratedDevice {
|
||||||
|
device_id: request.device_id.clone(),
|
||||||
|
device_data: request.device_data,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
trace!(device_keys = ?request.device_keys);
|
||||||
|
self.add_device_keys(user_id, &request.device_id, &request.device_keys)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
trace!(one_time_keys = ?request.one_time_keys);
|
||||||
|
for (one_time_key_key, one_time_key_value) in &request.one_time_keys {
|
||||||
|
self.add_one_time_key(user_id, &request.device_id, one_time_key_key, one_time_key_value)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes a user's dehydrated device.
|
||||||
|
///
|
||||||
|
/// Calling this directly will remove the dehydrated data but leak the frontage
|
||||||
|
/// device. Thus this is called by the regular device interface such that the
|
||||||
|
/// dehydrated data will not leak instead.
|
||||||
|
///
|
||||||
|
/// If device_id is given, the user's dehydrated device must match or this is a
|
||||||
|
/// no-op, but an Err is still returned to indicate that. Otherwise returns the
|
||||||
|
/// removed dehydrated device_id.
|
||||||
|
#[implement(super::Service)]
|
||||||
|
#[tracing::instrument(
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
%user_id,
|
||||||
|
device_id = ?maybe_device_id,
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
pub(super) async fn remove_dehydrated_device(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
maybe_device_id: Option<&DeviceId>,
|
||||||
|
) -> Result<OwnedDeviceId> {
|
||||||
|
let Ok(device_id) = self.get_dehydrated_device_id(user_id).await else {
|
||||||
|
return Err!(Request(NotFound("No dehydrated device for this user.")));
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(maybe_device_id) = maybe_device_id {
|
||||||
|
if maybe_device_id != device_id {
|
||||||
|
return Err!(Request(NotFound("Not the user's dehydrated device.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.db.userid_dehydrateddevice.remove(user_id);
|
||||||
|
|
||||||
|
Ok(device_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the device_id of the user's dehydrated device.
|
||||||
|
#[implement(super::Service)]
|
||||||
|
#[tracing::instrument(
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(%user_id)
|
||||||
|
)]
|
||||||
|
pub async fn get_dehydrated_device_id(&self, user_id: &UserId) -> Result<OwnedDeviceId> {
|
||||||
|
self.get_dehydrated_device(user_id)
|
||||||
|
.await
|
||||||
|
.map(|device| device.device_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the dehydrated device private data
|
||||||
|
#[implement(super::Service)]
|
||||||
|
#[tracing::instrument(
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(%user_id),
|
||||||
|
ret,
|
||||||
|
)]
|
||||||
|
pub async fn get_dehydrated_device(&self, user_id: &UserId) -> Result<DehydratedDevice> {
|
||||||
|
self.db
|
||||||
|
.userid_dehydrateddevice
|
||||||
|
.get(user_id)
|
||||||
|
.await
|
||||||
|
.deserialized()
|
||||||
|
}
|
||||||
@@ -1,3 +1,5 @@
|
|||||||
|
pub(super) mod dehydrated_device;
|
||||||
|
|
||||||
#[cfg(feature = "ldap")]
|
#[cfg(feature = "ldap")]
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
|
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
|
||||||
@@ -5,7 +7,7 @@ use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
|
|||||||
#[cfg(feature = "ldap")]
|
#[cfg(feature = "ldap")]
|
||||||
use conduwuit::result::LogErr;
|
use conduwuit::result::LogErr;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Error, Result, Server, at, debug_warn, err, is_equal_to, trace,
|
Err, Error, Result, Server, debug_warn, err, is_equal_to, trace,
|
||||||
utils::{self, ReadyExt, stream::TryIgnore, string::Unquoted},
|
utils::{self, ReadyExt, stream::TryIgnore, string::Unquoted},
|
||||||
};
|
};
|
||||||
#[cfg(feature = "ldap")]
|
#[cfg(feature = "ldap")]
|
||||||
@@ -70,6 +72,7 @@ struct Data {
|
|||||||
userfilterid_filter: Arc<Map>,
|
userfilterid_filter: Arc<Map>,
|
||||||
userid_avatarurl: Arc<Map>,
|
userid_avatarurl: Arc<Map>,
|
||||||
userid_blurhash: Arc<Map>,
|
userid_blurhash: Arc<Map>,
|
||||||
|
userid_dehydrateddevice: Arc<Map>,
|
||||||
userid_devicelistversion: Arc<Map>,
|
userid_devicelistversion: Arc<Map>,
|
||||||
userid_displayname: Arc<Map>,
|
userid_displayname: Arc<Map>,
|
||||||
userid_lastonetimekeyupdate: Arc<Map>,
|
userid_lastonetimekeyupdate: Arc<Map>,
|
||||||
@@ -110,6 +113,7 @@ impl crate::Service for Service {
|
|||||||
userfilterid_filter: args.db["userfilterid_filter"].clone(),
|
userfilterid_filter: args.db["userfilterid_filter"].clone(),
|
||||||
userid_avatarurl: args.db["userid_avatarurl"].clone(),
|
userid_avatarurl: args.db["userid_avatarurl"].clone(),
|
||||||
userid_blurhash: args.db["userid_blurhash"].clone(),
|
userid_blurhash: args.db["userid_blurhash"].clone(),
|
||||||
|
userid_dehydrateddevice: args.db["userid_dehydrateddevice"].clone(),
|
||||||
userid_devicelistversion: args.db["userid_devicelistversion"].clone(),
|
userid_devicelistversion: args.db["userid_devicelistversion"].clone(),
|
||||||
userid_displayname: args.db["userid_displayname"].clone(),
|
userid_displayname: args.db["userid_displayname"].clone(),
|
||||||
userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
|
userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
|
||||||
@@ -480,6 +484,11 @@ impl Service {
|
|||||||
|
|
||||||
/// Removes a device from a user.
|
/// Removes a device from a user.
|
||||||
pub async fn remove_device(&self, user_id: &UserId, device_id: &DeviceId) {
|
pub async fn remove_device(&self, user_id: &UserId, device_id: &DeviceId) {
|
||||||
|
// Remove dehydrated device if this is the dehydrated device
|
||||||
|
let _: Result<_> = self
|
||||||
|
.remove_dehydrated_device(user_id, Some(device_id))
|
||||||
|
.await;
|
||||||
|
|
||||||
let userdeviceid = (user_id, device_id);
|
let userdeviceid = (user_id, device_id);
|
||||||
|
|
||||||
// Remove tokens
|
// Remove tokens
|
||||||
@@ -1003,7 +1012,7 @@ impl Service {
|
|||||||
device_id: &'a DeviceId,
|
device_id: &'a DeviceId,
|
||||||
since: Option<u64>,
|
since: Option<u64>,
|
||||||
to: Option<u64>,
|
to: Option<u64>,
|
||||||
) -> impl Stream<Item = Raw<AnyToDeviceEvent>> + Send + 'a {
|
) -> impl Stream<Item = (u64, Raw<AnyToDeviceEvent>)> + Send + 'a {
|
||||||
type Key<'a> = (&'a UserId, &'a DeviceId, u64);
|
type Key<'a> = (&'a UserId, &'a DeviceId, u64);
|
||||||
|
|
||||||
let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1)));
|
let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1)));
|
||||||
@@ -1017,7 +1026,7 @@ impl Service {
|
|||||||
&& device_id == *device_id_
|
&& device_id == *device_id_
|
||||||
&& to.is_none_or(|to| *count <= to)
|
&& to.is_none_or(|to| *count <= to)
|
||||||
})
|
})
|
||||||
.map(at!(1))
|
.map(|((_, _, count), event)| (count, event))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_to_device_events<Until>(
|
pub async fn remove_to_device_events<Until>(
|
||||||
|
|||||||
Reference in New Issue
Block a user