Compare commits

...

8 Commits

Author SHA1 Message Date
nexy7574 daaea3f766 feat(async-media): Partial implementation of async media 2025-08-06 01:29:41 +01:00
nexy7574 e4a43b1a5b fix(policy-server): Call the PS later in the PDU creation process
This avoids accidentally sending partially built PDUs to the policy server,
which may cause issues with some implementations
2025-08-02 00:19:33 +01:00
Jade Ellis 5775e0ad9d docs: Make traefik router names consistent 2025-07-30 19:55:48 +01:00
Jade Ellis 238cc627e3 docs: Set traefik labels 2025-07-30 19:33:53 +01:00
Jade Ellis b1516209c4 chore: Update funding file 2025-07-30 19:23:38 +01:00
Jade Ellis 0589884109 docs: Fix documentation link in README
Closes https://forgejo.ellis.link/continuwuation/continuwuity/issues/913
2025-07-28 19:28:34 +01:00
Jade Ellis 4a83df5b57 chore: Fix link 2025-07-25 17:35:18 +01:00
Jade Ellis aa08edc55f chore: Release announcement 2025-07-25 17:30:31 +01:00
9 changed files with 195 additions and 31 deletions
+2 -3
View File
@@ -1,5 +1,4 @@
github: [JadedBlueEyes]
# Doesn't support an array, so we can only list nex
ko_fi: nexy7574
github: [JadedBlueEyes, nexy7574]
custom:
- https://ko-fi.com/nexy7574
- https://ko-fi.com/JadedBlueEyes
+1 -1
View File
@@ -57,7 +57,7 @@ Continuwuity aims to:
### Can I try it out?
Check out the [documentation](introduction) for installation instructions.
Check out the [documentation](https://continuwuity.org) for installation instructions.
There are currently no open registration Continuwuity instances available.
@@ -12,6 +12,15 @@ services:
#- ./continuwuity.toml:/etc/continuwuity.toml
networks:
- proxy
labels:
- "traefik.enable=true"
- "traefik.http.routers.continuwuity.rule=(Host(`matrix.example.com`) || (Host(`example.com`) && PathPrefix(`/.well-known/matrix`)))"
- "traefik.http.routers.continuwuity.entrypoints=websecure" # your HTTPS entry point
- "traefik.http.routers.continuwuity.tls=true"
- "traefik.http.routers.continuwuity.service=continuwuity"
- "traefik.http.services.continuwuity.loadbalancer.server.port=6167"
# possibly, depending on your config:
# - "traefik.http.routers.continuwuity.tls.certresolver=letsencrypt"
environment:
CONTINUWUITY_SERVER_NAME: your.server.name.example # EDIT THIS
CONTINUWUITY_DATABASE_PATH: /var/lib/continuwuity
@@ -12,6 +12,14 @@ services:
#- ./continuwuity.toml:/etc/continuwuity.toml
networks:
- proxy
labels:
- "traefik.enable=true"
- "traefik.http.routers.continuwuity.rule=(Host(`matrix.example.com`) || (Host(`example.com`) && PathPrefix(`/.well-known/matrix`)))"
- "traefik.http.routers.continuwuity.entrypoints=websecure"
- "traefik.http.routers.continuwuity.tls.certresolver=letsencrypt"
- "traefik.http.services.continuwuity.loadbalancer.server.port=6167"
# Uncomment and adjust the following if you want to use middleware
# - "traefik.http.routers.continuwuity.middlewares=secureHeaders@file"
environment:
CONTINUWUITY_SERVER_NAME: your.server.name.example # EDIT THIS
CONTINUWUITY_TRUSTED_SERVERS: '["matrix.org"]'
+2 -2
View File
@@ -6,8 +6,8 @@
"message": "Welcome to Continuwuity! Important announcements about the project will appear here."
},
{
"id": 2,
"message": "🎉 Continuwuity v0.5.0-rc.6 is now available! This release includes improved knock-restricted room handling, automatic support contact configuration, and a new HTML landing page. Check [the release notes for full details](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.0-rc.6) and upgrade instructions."
"id": 3,
"message": "_taps microphone_ The Continuwuity 0.5.0-rc.7 release is now available, and it's better than ever! **177 commits**, **35 pull requests**, **11 contributors,** and a lot of new stuff!\n\nFor highlights, we've got:\n\n* 🕵️ Full Policy Server support to fight spam!\n* 🚀 Smarter room & space upgrades.\n* 🚫 User suspension tools for better moderation.\n* 🤖 reCaptcha support for safer open registration.\n* 🔍 Ability to disable read receipts & typing indicators.\n* ⚡ Sweeping performance improvements!\n\nGet the [full changelog and downloads on our Forgejo](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.0-rc.7) - and make sure you're in the [Announcements room](https://matrix.to/#/!releases:continuwuity.org/$hN9z6L2_dTAlPxFLAoXVfo_g8DyYXu4cpvWsSrWhmB0) to get stuff like this sooner."
}
]
}
+116 -6
View File
@@ -3,13 +3,15 @@ use std::time::Duration;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, err,
Err, Error, Result, debug, debug_info, err, error,
utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize},
warn,
};
use conduwuit_service::{
Services,
media::{CACHE_CONTROL_IMMUTABLE, CORP_CROSS_ORIGIN, Dim, FileMeta, MXC_LENGTH},
};
use http::StatusCode;
use reqwest::Url;
use ruma::{
Mxc, UserId,
@@ -18,7 +20,8 @@ use ruma::{
get_content, get_content_as_filename, get_content_thumbnail, get_media_config,
get_media_preview,
},
media::create_content,
error::ErrorKind,
media::{create_content, create_content_async, create_mxc_uri},
},
};
@@ -83,6 +86,83 @@ pub(crate) async fn create_content_route(
})
}
/// # `POST /_matrix/media/v1/create`
///
/// Creates a new MXC URI to later be populated.
#[tracing::instrument(
name = "media_create_async",
level = "debug",
skip_all,
fields(%client),
)]
pub(crate) async fn create_async_mxc_uri_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<create_mxc_uri::v1::Request>,
) -> Result<create_mxc_uri::v1::Response> {
let user = body.sender_user();
if services.users.is_suspended(user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let ref mxc = Mxc {
server_name: services.globals.server_name(),
media_id: &utils::random_string(MXC_LENGTH),
};
services.media.create_async(mxc, Some(user)).await?;
// TODO: add expiring MXC URIs to prevent exhaustion of MXC IDs
Ok(create_mxc_uri::v1::Response {
content_uri: mxc.to_string().into(),
unused_expires_at: None,
})
}
/// # `PUT /_matrix/media/v3/upload/{serverName}/{mediaId}`
///
/// Permanently save media in the server, using an existing MXC URI.
#[tracing::instrument(
name = "media_async_upload",
level = "debug",
skip_all,
fields(%client),
)]
pub(crate) async fn upload_async_media_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<create_content_async::v3::Request>,
) -> Result<create_content_async::v3::Response> {
let user = body.sender_user();
if services.users.is_suspended(user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let ref mxc = Mxc {
server_name: &body.server_name,
media_id: &body.media_id,
};
if !services.globals.server_is_ours(&body.server_name) {
return Err!(Request(Forbidden("Media uri does not belong to us.")));
}
if !services.media.exists(mxc).await {
return Err!(Request(NotFound("Media uri does not exist.")));
} else if services.media.is_populated(mxc).await {
return Err(Error::Request(
ErrorKind::CannotOverwriteMedia,
"Media uri is already populated.".into(),
StatusCode::CONFLICT,
));
}
let filename = body.filename.as_deref();
let content_type = body.content_type.as_deref();
let content_disposition = make_content_disposition(None, content_type, filename);
services
.media
.create(mxc, Some(user), Some(&content_disposition), content_type, &body.file)
.await?;
Ok(create_content_async::v3::Response {})
}
/// # `GET /_matrix/client/v1/media/thumbnail/{serverName}/{mediaId}`
///
/// Load media thumbnail from our server or over federation.
@@ -313,18 +393,48 @@ async fn fetch_thumbnail_meta(
.await
}
async fn wait_for_population(
services: &Services,
mxc: &Mxc<'_>,
timeout_ms: Duration,
) -> Result<FileMeta> {
async fn inner(services: &Services, mxc: &Mxc<'_>) -> Result<FileMeta> {
if !services.media.exists(mxc).await {
return Err!(Request(NotFound("Media not found.")));
}
loop {
if let Ok(Some(filemeta)) = services.media.get(mxc).await {
return Ok(filemeta);
}
// TODO(async-media): A notify/send mechanism would be better than polling.
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
tokio::time::timeout(
timeout_ms
.checked_sub(Duration::from_millis(500))
.unwrap_or(Duration::from_millis(0)),
inner(services, mxc),
)
.await
.map_err(|_| err!(Request(NotYetUploaded("Media was not ready in time."))))?
}
async fn fetch_file_meta(
services: &Services,
mxc: &Mxc<'_>,
user: &UserId,
timeout_ms: Duration,
) -> Result<FileMeta> {
if let Some(filemeta) = services.media.get(mxc).await? {
return Ok(filemeta);
if services.globals.server_is_ours(mxc.server_name) {
let result = wait_for_population(services, mxc, timeout_ms).await;
if let Ok(filemeta) = result {
return Ok(filemeta);
}
return Err!(Request(NotFound("Local media not found.")));
}
if services.globals.server_is_ours(mxc.server_name) {
return Err!(Request(NotFound("Local media not found.")));
if let Some(filemeta) = services.media.get(mxc).await? {
return Ok(filemeta);
}
services
+2
View File
@@ -154,6 +154,8 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.ruma_route(&client::turn_server_route)
.ruma_route(&client::send_event_to_device_route)
.ruma_route(&client::create_content_route)
.ruma_route(&client::create_async_mxc_uri_route)
.ruma_route(&client::upload_async_media_route)
.ruma_route(&client::get_content_thumbnail_route)
.ruma_route(&client::get_content_route)
.ruma_route(&client::get_content_as_filename_route)
+36
View File
@@ -105,6 +105,15 @@ impl Service {
Ok(())
}
/// Creates an MXC key but no associated file.
/// Used for async media. Must be later overwritten with `create`
pub async fn create_async(&self, mxc: &Mxc<'_>, user: Option<&UserId>) -> Result {
// effectively just reserves the MXC ID in the database
self.db
.create_file_metadata(mxc, user, &Dim::default(), None, None)?;
Ok(())
}
/// Deletes a file in the database and from the media directory via an MXC
pub async fn delete(&self, mxc: &Mxc<'_>) -> Result<()> {
match self.db.search_mxc_metadata_prefix(mxc).await {
@@ -179,6 +188,33 @@ impl Service {
}
}
/// Checks that an MXC URI exists in our media database.
pub async fn exists(&self, mxc: &Mxc<'_>) -> bool {
self.db.search_mxc_metadata_prefix(mxc).await.is_ok()
}
/// Checks that an MXC URI exists *and* has a file associated with it in our
/// media database.
pub async fn is_populated(&self, mxc: &Mxc<'_>) -> bool {
match self.db.search_mxc_metadata_prefix(mxc).await {
| Ok(keys) => {
if keys.is_empty() {
return false;
}
for key in keys {
let path = self.get_media_file(&key);
if fs::metadata(path).await.is_ok() {
return true;
}
}
false
},
| _ => false,
}
}
/// Gets all the MXC URIs in our media database
pub async fn get_all_mxcs(&self) -> Result<Vec<OwnedMxcUri>> {
let all_keys = self.db.get_all_media_keys().await;
+19 -19
View File
@@ -165,25 +165,6 @@ pub async fn create_hash_and_sign_event(
return Err!(Request(Forbidden("Event is not authorized.")));
}
// Check with the policy server
match self
.services
.event_handler
.ask_policy_server(&pdu, room_id)
.await
{
| Ok(true) => {},
| Ok(false) => {
return Err!(Request(Forbidden(debug_warn!(
"Policy server marked this event as spam"
))));
},
| Err(e) => {
// fail open
warn!("Failed to check event with policy server: {e}");
},
}
// Hash and sign
let mut pdu_json = utils::to_canonical_object(&pdu).map_err(|e| {
err!(Request(BadJson(warn!("Failed to convert PDU to canonical JSON: {e}"))))
@@ -222,6 +203,25 @@ pub async fn create_hash_and_sign_event(
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
// Check with the policy server
match self
.services
.event_handler
.ask_policy_server(&pdu, room_id)
.await
{
| Ok(true) => {},
| Ok(false) => {
return Err!(Request(Forbidden(debug_warn!(
"Policy server marked this event as spam"
))));
},
| Err(e) => {
// fail open
warn!("Failed to check event with policy server: {e}");
},
}
// Generate short event id
let _shorteventid = self
.services