mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
53 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 77ae79396f | |||
| cdc53b3421 | |||
| 0b667ae4fd | |||
| 83baf9b524 | |||
| 4f198fb4ef | |||
| 1631c0afa4 | |||
| 862684af28 | |||
| 7345c241a9 | |||
| 6a8b988b36 | |||
| f1d6536793 | |||
| cf8d8e4ea6 | |||
| 393d341f07 | |||
| ba55dffa0e | |||
| f3115e14ab | |||
| b3fa4705ef | |||
| 53b06a7918 | |||
| fafc1d3fd1 | |||
| dbc74272c3 | |||
| f11caac05e | |||
| e581face44 | |||
| 037ba41adb | |||
| 941c8f7d52 | |||
| 7dae118af9 | |||
| 07dfc5528d | |||
| 3f4749a796 | |||
| be8d72fafc | |||
| 0008709481 | |||
| ee51d4357f | |||
| 8ffc6d4f15 | |||
| 93efe89a1f | |||
| 16f37d21ff | |||
| 800ac8d1f1 | |||
| 872f5bf077 | |||
| 992217d644 | |||
| 4fb4397a9f | |||
| 61b6947e88 | |||
| 876d3faec4 | |||
| 9cc0cc69f7 | |||
| 5513bb4dff | |||
| 693e327004 | |||
| 3e6571a2b8 | |||
| f0f10f8f3e | |||
| a4f2b55a8a | |||
| 213a361c53 | |||
| 1c21e4af6e | |||
| fceaaedc04 | |||
| 0eff173c0b | |||
| 72bf8e5927 | |||
| 3491f653a5 | |||
| e820dd7aed | |||
| c92b7239a8 | |||
| 2940bc69c1 | |||
| 502919b248 |
@@ -54,7 +54,7 @@ runs:
|
|||||||
run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
||||||
|
|
||||||
- name: Upload binary artifact
|
- name: Upload binary artifact
|
||||||
uses: forgejo/upload-artifact@v4
|
uses: forgejo/upload-artifact@v3
|
||||||
with:
|
with:
|
||||||
name: conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
name: conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
||||||
path: /tmp/binaries/conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
path: /tmp/binaries/conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
||||||
@@ -62,7 +62,7 @@ runs:
|
|||||||
|
|
||||||
- name: Upload digest
|
- name: Upload digest
|
||||||
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
|
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
|
||||||
uses: forgejo/upload-artifact@v4
|
uses: forgejo/upload-artifact@v3
|
||||||
with:
|
with:
|
||||||
name: digests${{ inputs.digest_suffix }}-${{ inputs.slug }}${{ inputs.cpu_suffix }}
|
name: digests${{ inputs.digest_suffix }}-${{ inputs.slug }}${{ inputs.cpu_suffix }}
|
||||||
path: /tmp/digests/*
|
path: /tmp/digests/*
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ jobs:
|
|||||||
uses: actions/checkout@v6
|
uses: actions/checkout@v6
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
ref: ${{ github.ref_name }}
|
||||||
|
|
||||||
- name: Cache Cargo registry
|
- name: Cache Cargo registry
|
||||||
uses: actions/cache@v4
|
uses: actions/cache@v4
|
||||||
@@ -126,7 +127,7 @@ jobs:
|
|||||||
[ -f /etc/conduwuit/conduwuit.toml ] && echo "✅ Config file installed"
|
[ -f /etc/conduwuit/conduwuit.toml ] && echo "✅ Config file installed"
|
||||||
|
|
||||||
- name: Upload deb artifact
|
- name: Upload deb artifact
|
||||||
uses: actions/upload-artifact@v5
|
uses: actions/upload-artifact@v3
|
||||||
with:
|
with:
|
||||||
name: continuwuity-${{ steps.debian-version.outputs.distribution }}
|
name: continuwuity-${{ steps.debian-version.outputs.distribution }}
|
||||||
path: ${{ steps.cargo-deb.outputs.path }}
|
path: ${{ steps.cargo-deb.outputs.path }}
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ jobs:
|
|||||||
uses: actions/checkout@v6
|
uses: actions/checkout@v6
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
ref: ${{ github.ref_name }}
|
||||||
|
|
||||||
|
|
||||||
- name: Cache DNF packages
|
- name: Cache DNF packages
|
||||||
@@ -238,13 +239,13 @@ jobs:
|
|||||||
cp $BIN_RPM upload-bin/
|
cp $BIN_RPM upload-bin/
|
||||||
|
|
||||||
- name: Upload binary RPM
|
- name: Upload binary RPM
|
||||||
uses: actions/upload-artifact@v5
|
uses: actions/upload-artifact@v3
|
||||||
with:
|
with:
|
||||||
name: continuwuity
|
name: continuwuity
|
||||||
path: upload-bin/
|
path: upload-bin/
|
||||||
|
|
||||||
- name: Upload debug RPM artifact
|
- name: Upload debug RPM artifact
|
||||||
uses: actions/upload-artifact@v5
|
uses: actions/upload-artifact@v3
|
||||||
with:
|
with:
|
||||||
name: continuwuity-debug
|
name: continuwuity-debug
|
||||||
path: artifacts/*debuginfo*.rpm
|
path: artifacts/*debuginfo*.rpm
|
||||||
|
|||||||
@@ -109,7 +109,7 @@ jobs:
|
|||||||
cat ./element-web/webapp/config.json
|
cat ./element-web/webapp/config.json
|
||||||
|
|
||||||
- name: 📤 Upload Artifact
|
- name: 📤 Upload Artifact
|
||||||
uses: forgejo/upload-artifact@v4
|
uses: forgejo/upload-artifact@v3
|
||||||
with:
|
with:
|
||||||
name: element-web
|
name: element-web
|
||||||
path: ./element-web/webapp/
|
path: ./element-web/webapp/
|
||||||
|
|||||||
@@ -134,7 +134,7 @@ You can also [view the file on Foregejo](https://forgejo.ellis.link/continuwuati
|
|||||||
## Creating the Continuwuity configuration file
|
## Creating the Continuwuity configuration file
|
||||||
|
|
||||||
Now you need to create the Continuwuity configuration file in
|
Now you need to create the Continuwuity configuration file in
|
||||||
`/etc/continuwuity/continuwuity.toml`. You can find an example configuration at
|
`/etc/conduwuit/conduwuit.toml`. You can find an example configuration at
|
||||||
[conduwuit-example.toml](../reference/config.mdx).
|
[conduwuit-example.toml](../reference/config.mdx).
|
||||||
|
|
||||||
**Please take a moment to read the config. You need to change at least the
|
**Please take a moment to read the config. You need to change at least the
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
{"m.homeserver":{"base_url": "https://matrix.continuwuity.org"},"org.matrix.msc3575.proxy":{"url": "https://matrix.continuwuity.org"}}
|
{"m.homeserver":{"base_url": "https://matrix.continuwuity.org"},"org.matrix.msc3575.proxy":{"url": "https://matrix.continuwuity.org"},"org.matrix.msc4143.rtc_foci":[{"type":"livekit","livekit_service_url":"https://livekit.ellis.link"}]}
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ use service::{
|
|||||||
rooms::{
|
rooms::{
|
||||||
state::RoomMutexGuard,
|
state::RoomMutexGuard,
|
||||||
state_compressor::{CompressedState, HashSetCompressStateEvent},
|
state_compressor::{CompressedState, HashSetCompressStateEvent},
|
||||||
|
timeline::pdu_fits,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -573,6 +574,13 @@ async fn join_room_by_id_helper_remote(
|
|||||||
return state;
|
return state;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
if !pdu_fits(&mut value.clone()) {
|
||||||
|
warn!(
|
||||||
|
"dropping incoming PDU {event_id} in room {room_id} from room join because \
|
||||||
|
it exceeds 65535 bytes or is otherwise too large."
|
||||||
|
);
|
||||||
|
return state;
|
||||||
|
}
|
||||||
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
|
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
|
||||||
if let Some(state_key) = &pdu.state_key {
|
if let Some(state_key) = &pdu.state_key {
|
||||||
let shortstatekey = services
|
let shortstatekey = services
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, at,
|
Err, Result, at,
|
||||||
matrix::{
|
matrix::{
|
||||||
@@ -70,6 +71,7 @@ const LIMIT_DEFAULT: usize = 10;
|
|||||||
/// where the user was joined, depending on `history_visibility`)
|
/// where the user was joined, depending on `history_visibility`)
|
||||||
pub(crate) async fn get_message_events_route(
|
pub(crate) async fn get_message_events_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client_ip): InsecureClientIp,
|
||||||
body: Ruma<get_message_events::v3::Request>,
|
body: Ruma<get_message_events::v3::Request>,
|
||||||
) -> Result<get_message_events::v3::Response> {
|
) -> Result<get_message_events::v3::Response> {
|
||||||
debug_assert!(IGNORED_MESSAGE_TYPES.is_sorted(), "IGNORED_MESSAGE_TYPES is not sorted");
|
debug_assert!(IGNORED_MESSAGE_TYPES.is_sorted(), "IGNORED_MESSAGE_TYPES is not sorted");
|
||||||
@@ -78,6 +80,11 @@ pub(crate) async fn get_message_events_route(
|
|||||||
let room_id = &body.room_id;
|
let room_id = &body.room_id;
|
||||||
let filter = &body.filter;
|
let filter = &body.filter;
|
||||||
|
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(sender_user, sender_device, client_ip)
|
||||||
|
.await;
|
||||||
|
|
||||||
if !services.rooms.metadata.exists(room_id).await {
|
if !services.rooms.metadata.exists(room_id).await {
|
||||||
return Err!(Request(Forbidden("Room does not exist to this server")));
|
return Err!(Request(Forbidden("Room does not exist to this server")));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{Err, PduCount, Result, err};
|
use conduwuit::{Err, PduCount, Result, err};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
MilliSecondsSinceUnixEpoch,
|
MilliSecondsSinceUnixEpoch,
|
||||||
@@ -118,9 +119,14 @@ pub(crate) async fn set_read_marker_route(
|
|||||||
/// Sets private read marker and public read receipt EDU.
|
/// Sets private read marker and public read receipt EDU.
|
||||||
pub(crate) async fn create_receipt_route(
|
pub(crate) async fn create_receipt_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client_ip): InsecureClientIp,
|
||||||
body: Ruma<create_receipt::v3::Request>,
|
body: Ruma<create_receipt::v3::Request>,
|
||||||
) -> Result<create_receipt::v3::Response> {
|
) -> Result<create_receipt::v3::Response> {
|
||||||
let sender_user = body.sender_user();
|
let sender_user = body.sender_user();
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
|
||||||
|
.await;
|
||||||
|
|
||||||
if matches!(
|
if matches!(
|
||||||
&body.receipt_type,
|
&body.receipt_type,
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{Err, Result, matrix::pdu::PduBuilder};
|
use conduwuit::{Err, Result, matrix::pdu::PduBuilder};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent,
|
api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent,
|
||||||
@@ -13,9 +14,14 @@ use crate::Ruma;
|
|||||||
/// - TODO: Handle txn id
|
/// - TODO: Handle txn id
|
||||||
pub(crate) async fn redact_event_route(
|
pub(crate) async fn redact_event_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client_ip): InsecureClientIp,
|
||||||
body: Ruma<redact_event::v3::Request>,
|
body: Ruma<redact_event::v3::Request>,
|
||||||
) -> Result<redact_event::v3::Response> {
|
) -> Result<redact_event::v3::Response> {
|
||||||
let sender_user = body.sender_user();
|
let sender_user = body.sender_user();
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
|
||||||
|
.await;
|
||||||
let body = &body.body;
|
let body = &body.body;
|
||||||
if services.users.is_suspended(sender_user).await? {
|
if services.users.is_suspended(sender_user).await? {
|
||||||
// TODO: Users can redact their own messages while suspended
|
// TODO: Users can redact their own messages while suspended
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{Err, Result, err, matrix::pdu::PduBuilder, utils};
|
use conduwuit::{Err, Result, err, matrix::pdu::PduBuilder, utils};
|
||||||
use ruma::{api::client::message::send_message_event, events::MessageLikeEventType};
|
use ruma::{api::client::message::send_message_event, events::MessageLikeEventType};
|
||||||
use serde_json::from_str;
|
use serde_json::from_str;
|
||||||
@@ -18,6 +19,7 @@ use crate::Ruma;
|
|||||||
/// allowed
|
/// allowed
|
||||||
pub(crate) async fn send_message_event_route(
|
pub(crate) async fn send_message_event_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client_ip): InsecureClientIp,
|
||||||
body: Ruma<send_message_event::v3::Request>,
|
body: Ruma<send_message_event::v3::Request>,
|
||||||
) -> Result<send_message_event::v3::Response> {
|
) -> Result<send_message_event::v3::Response> {
|
||||||
let sender_user = body.sender_user();
|
let sender_user = body.sender_user();
|
||||||
@@ -27,6 +29,11 @@ pub(crate) async fn send_message_event_route(
|
|||||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
|
||||||
|
.await;
|
||||||
|
|
||||||
// Forbid m.room.encrypted if encryption is disabled
|
// Forbid m.room.encrypted if encryption is disabled
|
||||||
if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption
|
if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption
|
||||||
{
|
{
|
||||||
|
|||||||
+10
-3
@@ -1,4 +1,5 @@
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, err,
|
Err, Result, err,
|
||||||
matrix::{Event, pdu::PduBuilder},
|
matrix::{Event, pdu::PduBuilder},
|
||||||
@@ -7,7 +8,7 @@ use conduwuit::{
|
|||||||
use conduwuit_service::Services;
|
use conduwuit_service::Services;
|
||||||
use futures::{FutureExt, TryStreamExt};
|
use futures::{FutureExt, TryStreamExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
OwnedEventId, RoomId, UserId,
|
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
|
||||||
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
|
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
|
||||||
events::{
|
events::{
|
||||||
AnyStateEventContent, StateEventType,
|
AnyStateEventContent, StateEventType,
|
||||||
@@ -30,9 +31,14 @@ use crate::{Ruma, RumaResponse};
|
|||||||
/// Sends a state event into the room.
|
/// Sends a state event into the room.
|
||||||
pub(crate) async fn send_state_event_for_key_route(
|
pub(crate) async fn send_state_event_for_key_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(ip): InsecureClientIp,
|
||||||
body: Ruma<send_state_event::v3::Request>,
|
body: Ruma<send_state_event::v3::Request>,
|
||||||
) -> Result<send_state_event::v3::Response> {
|
) -> Result<send_state_event::v3::Response> {
|
||||||
let sender_user = body.sender_user();
|
let sender_user = body.sender_user();
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
|
||||||
|
.await;
|
||||||
|
|
||||||
if services.users.is_suspended(sender_user).await? {
|
if services.users.is_suspended(sender_user).await? {
|
||||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||||
@@ -61,9 +67,10 @@ pub(crate) async fn send_state_event_for_key_route(
|
|||||||
/// Sends a state event into the room.
|
/// Sends a state event into the room.
|
||||||
pub(crate) async fn send_state_event_for_empty_key_route(
|
pub(crate) async fn send_state_event_for_empty_key_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(ip): InsecureClientIp,
|
||||||
body: Ruma<send_state_event::v3::Request>,
|
body: Ruma<send_state_event::v3::Request>,
|
||||||
) -> Result<RumaResponse<send_state_event::v3::Response>> {
|
) -> Result<RumaResponse<send_state_event::v3::Response>> {
|
||||||
send_state_event_for_key_route(State(services), body)
|
send_state_event_for_key_route(State(services), InsecureClientIp(ip), body)
|
||||||
.boxed()
|
.boxed()
|
||||||
.await
|
.await
|
||||||
.map(RumaResponse)
|
.map(RumaResponse)
|
||||||
@@ -185,7 +192,7 @@ async fn send_state_event_for_key_helper(
|
|||||||
event_type: &StateEventType,
|
event_type: &StateEventType,
|
||||||
json: &Raw<AnyStateEventContent>,
|
json: &Raw<AnyStateEventContent>,
|
||||||
state_key: &str,
|
state_key: &str,
|
||||||
timestamp: Option<ruma::MilliSecondsSinceUnixEpoch>,
|
timestamp: Option<MilliSecondsSinceUnixEpoch>,
|
||||||
) -> Result<OwnedEventId> {
|
) -> Result<OwnedEventId> {
|
||||||
allowed_to_send_state_event(services, room_id, event_type, state_key, json).await?;
|
allowed_to_send_state_event(services, room_id, event_type, state_key, json).await?;
|
||||||
let state_lock = services.rooms.state.mutex.lock(room_id).await;
|
let state_lock = services.rooms.state.mutex.lock(room_id).await;
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Result, extract_variant,
|
Result, extract_variant,
|
||||||
utils::{
|
utils::{
|
||||||
@@ -180,6 +181,7 @@ type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
|
|||||||
)]
|
)]
|
||||||
pub(crate) async fn sync_events_route(
|
pub(crate) async fn sync_events_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client_ip): InsecureClientIp,
|
||||||
body: Ruma<sync_events::v3::Request>,
|
body: Ruma<sync_events::v3::Request>,
|
||||||
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
||||||
let (sender_user, sender_device) = body.sender();
|
let (sender_user, sender_device) = body.sender();
|
||||||
@@ -192,6 +194,12 @@ pub(crate) async fn sync_events_route(
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Increment the "device last active" metadata
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
|
||||||
|
.await;
|
||||||
|
|
||||||
// Setup watchers, so if there's no response, we can wait for them
|
// Setup watchers, so if there's no response, we can wait for them
|
||||||
let watcher = services.sync.watch(sender_user, sender_device);
|
let watcher = services.sync.watch(sender_user, sender_device);
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Error, Result, at, error, extract_variant, is_equal_to,
|
Err, Error, Result, at, error, extract_variant, is_equal_to,
|
||||||
matrix::{Event, TypeStateKey, pdu::PduCount},
|
matrix::{Event, TypeStateKey, pdu::PduCount},
|
||||||
@@ -61,11 +62,18 @@ type KnownRooms = BTreeMap<String, BTreeMap<OwnedRoomId, u64>>;
|
|||||||
/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
|
/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
|
||||||
pub(crate) async fn sync_events_v5_route(
|
pub(crate) async fn sync_events_v5_route(
|
||||||
State(ref services): State<crate::State>,
|
State(ref services): State<crate::State>,
|
||||||
|
InsecureClientIp(client_ip): InsecureClientIp,
|
||||||
body: Ruma<sync_events::v5::Request>,
|
body: Ruma<sync_events::v5::Request>,
|
||||||
) -> Result<sync_events::v5::Response> {
|
) -> Result<sync_events::v5::Response> {
|
||||||
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
|
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
|
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
|
||||||
|
.await;
|
||||||
|
|
||||||
let mut body = body.body;
|
let mut body = body.body;
|
||||||
|
|
||||||
// Setup watchers, so if there's no response, we can wait for them
|
// Setup watchers, so if there's no response, we can wait for them
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{Err, Result, utils, utils::math::Tried};
|
use conduwuit::{Err, Result, utils, utils::math::Tried};
|
||||||
use ruma::api::client::typing::create_typing_event;
|
use ruma::api::client::typing::create_typing_event;
|
||||||
|
|
||||||
@@ -9,10 +10,15 @@ use crate::Ruma;
|
|||||||
/// Sets the typing state of the sender user.
|
/// Sets the typing state of the sender user.
|
||||||
pub(crate) async fn create_typing_event_route(
|
pub(crate) async fn create_typing_event_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(ip): InsecureClientIp,
|
||||||
body: Ruma<create_typing_event::v3::Request>,
|
body: Ruma<create_typing_event::v3::Request>,
|
||||||
) -> Result<create_typing_event::v3::Response> {
|
) -> Result<create_typing_event::v3::Response> {
|
||||||
use create_typing_event::v3::Typing;
|
use create_typing_event::v3::Typing;
|
||||||
let sender_user = body.sender_user();
|
let sender_user = body.sender_user();
|
||||||
|
services
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
|
||||||
|
.await;
|
||||||
|
|
||||||
if sender_user != body.user_id && body.appservice_info.is_none() {
|
if sender_user != body.user_id && body.appservice_info.is_none() {
|
||||||
return Err!(Request(Forbidden("You cannot update typing status of other users.")));
|
return Err!(Request(Forbidden("You cannot update typing status of other users.")));
|
||||||
|
|||||||
@@ -1224,12 +1224,6 @@ pub struct Config {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub rocksdb_repair: bool,
|
pub rocksdb_repair: bool,
|
||||||
|
|
||||||
#[serde(default)]
|
|
||||||
pub rocksdb_read_only: bool,
|
|
||||||
|
|
||||||
#[serde(default)]
|
|
||||||
pub rocksdb_secondary: bool,
|
|
||||||
|
|
||||||
/// Enables idle CPU priority for compaction thread. This is not enabled by
|
/// Enables idle CPU priority for compaction thread. This is not enabled by
|
||||||
/// default to prevent compaction from falling too far behind on busy
|
/// default to prevent compaction from falling too far behind on busy
|
||||||
/// systems.
|
/// systems.
|
||||||
|
|||||||
@@ -177,11 +177,6 @@ where
|
|||||||
|
|
||||||
// [synapse] do_sig_check check the event has valid signatures for member events
|
// [synapse] do_sig_check check the event has valid signatures for member events
|
||||||
|
|
||||||
// TODO do_size_check is false when called by `iterative_auth_check`
|
|
||||||
// do_size_check is also mostly accomplished by ruma with the exception of
|
|
||||||
// checking event_type, state_key, and json are below a certain size (255 and
|
|
||||||
// 65_536 respectively)
|
|
||||||
|
|
||||||
let sender = incoming_event.sender();
|
let sender = incoming_event.sender();
|
||||||
|
|
||||||
// Implementation of https://spec.matrix.org/latest/rooms/v1/#authorization-rules
|
// Implementation of https://spec.matrix.org/latest/rooms/v1/#authorization-rules
|
||||||
|
|||||||
+2
-12
@@ -19,7 +19,7 @@ use std::{
|
|||||||
|
|
||||||
use conduwuit::{Err, Result, debug, info, warn};
|
use conduwuit::{Err, Result, debug, info, warn};
|
||||||
use rocksdb::{
|
use rocksdb::{
|
||||||
AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded,
|
AsColumnFamilyRef, BoundColumnFamily, DBCommon, MultiThreaded, OptimisticTransactionDB,
|
||||||
WaitForCompactOptions,
|
WaitForCompactOptions,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -33,13 +33,11 @@ pub struct Engine {
|
|||||||
pub(crate) db: Db,
|
pub(crate) db: Db,
|
||||||
pub(crate) pool: Arc<Pool>,
|
pub(crate) pool: Arc<Pool>,
|
||||||
pub(crate) ctx: Arc<Context>,
|
pub(crate) ctx: Arc<Context>,
|
||||||
pub(super) read_only: bool,
|
|
||||||
pub(super) secondary: bool,
|
|
||||||
pub(crate) checksums: bool,
|
pub(crate) checksums: bool,
|
||||||
corks: AtomicU32,
|
corks: AtomicU32,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
|
pub(crate) type Db = OptimisticTransactionDB<MultiThreaded>;
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
@@ -129,14 +127,6 @@ impl Engine {
|
|||||||
|
|
||||||
sequence
|
sequence
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
#[must_use]
|
|
||||||
pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
#[must_use]
|
|
||||||
pub fn is_secondary(&self) -> bool { self.secondary }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Engine {
|
impl Drop for Engine {
|
||||||
|
|||||||
@@ -12,9 +12,8 @@ pub fn backup(&self) -> Result {
|
|||||||
let mut engine = self.backup_engine()?;
|
let mut engine = self.backup_engine()?;
|
||||||
let config = &self.ctx.server.config;
|
let config = &self.ctx.server.config;
|
||||||
if config.database_backups_to_keep > 0 {
|
if config.database_backups_to_keep > 0 {
|
||||||
let flush = !self.is_read_only();
|
|
||||||
engine
|
engine
|
||||||
.create_new_backup_flush(&self.db, flush)
|
.create_new_backup_flush(&self.db, true)
|
||||||
.map_err(map_err)?;
|
.map_err(map_err)?;
|
||||||
|
|
||||||
let engine_info = engine.get_backup_info();
|
let engine_info = engine.get_backup_info();
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
|
|
||||||
use conduwuit::{Result, implement};
|
use conduwuit::{Result, implement};
|
||||||
use rocksdb::perf::get_memory_usage_stats;
|
use rocksdb::perf::MemoryUsageBuilder;
|
||||||
|
|
||||||
use super::Engine;
|
use super::Engine;
|
||||||
use crate::or_else;
|
use crate::or_else;
|
||||||
@@ -9,16 +9,21 @@ use crate::or_else;
|
|||||||
#[implement(Engine)]
|
#[implement(Engine)]
|
||||||
pub fn memory_usage(&self) -> Result<String> {
|
pub fn memory_usage(&self) -> Result<String> {
|
||||||
let mut res = String::new();
|
let mut res = String::new();
|
||||||
let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()]))
|
|
||||||
.or_else(or_else)?;
|
let mut builder = MemoryUsageBuilder::new().or_else(or_else)?;
|
||||||
|
builder.add_db(&self.db);
|
||||||
|
builder.add_cache(&self.ctx.row_cache.lock());
|
||||||
|
|
||||||
|
let usage = builder.build().or_else(or_else)?;
|
||||||
|
|
||||||
let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0;
|
let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0;
|
||||||
writeln!(
|
writeln!(
|
||||||
res,
|
res,
|
||||||
"Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow \
|
"Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow \
|
||||||
cache: {:.2} MiB",
|
cache: {:.2} MiB",
|
||||||
mibs(stats.mem_table_total),
|
mibs(usage.approximate_mem_table_total()),
|
||||||
mibs(stats.mem_table_unflushed),
|
mibs(usage.approximate_mem_table_unflushed()),
|
||||||
mibs(stats.mem_table_readers_total),
|
mibs(usage.approximate_mem_table_readers_total()),
|
||||||
mibs(u64::try_from(self.ctx.row_cache.lock().get_usage())?),
|
mibs(u64::try_from(self.ctx.row_cache.lock().get_usage())?),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
|||||||
@@ -35,14 +35,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
|||||||
}
|
}
|
||||||
|
|
||||||
debug!("Opening database...");
|
debug!("Opening database...");
|
||||||
let db = if config.rocksdb_read_only {
|
let db = Db::open_cf_descriptors(&db_opts, path, cfds).or_else(or_else)?;
|
||||||
Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false)
|
|
||||||
} else if config.rocksdb_secondary {
|
|
||||||
Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds)
|
|
||||||
} else {
|
|
||||||
Db::open_cf_descriptors(&db_opts, path, cfds)
|
|
||||||
}
|
|
||||||
.or_else(or_else)?;
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
columns = num_cfds,
|
columns = num_cfds,
|
||||||
@@ -55,8 +48,6 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
|||||||
db,
|
db,
|
||||||
pool: ctx.pool.clone(),
|
pool: ctx.pool.clone(),
|
||||||
ctx: ctx.clone(),
|
ctx: ctx.clone(),
|
||||||
read_only: config.rocksdb_read_only,
|
|
||||||
secondary: config.rocksdb_secondary,
|
|
||||||
checksums: config.rocksdb_checksums,
|
checksums: config.rocksdb_checksums,
|
||||||
corks: AtomicU32::new(0),
|
corks: AtomicU32::new(0),
|
||||||
}))
|
}))
|
||||||
|
|||||||
@@ -219,7 +219,7 @@ where
|
|||||||
K: AsRef<[u8]> + Sized + Debug + 'a,
|
K: AsRef<[u8]> + Sized + Debug + 'a,
|
||||||
V: AsRef<[u8]> + Sized + 'a,
|
V: AsRef<[u8]> + Sized + 'a,
|
||||||
{
|
{
|
||||||
let mut batch = WriteBatchWithTransaction::<false>::default();
|
let mut batch = WriteBatchWithTransaction::<true>::default();
|
||||||
for (key, val) in iter {
|
for (key, val) in iter {
|
||||||
batch.put_cf(&self.cf(), key.as_ref(), val.as_ref());
|
batch.put_cf(&self.cf(), key.as_ref(), val.as_ref());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -77,14 +77,6 @@ impl Database {
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn keys(&self) -> impl Iterator<Item = &MapsKey> + Send + '_ { self.maps.keys() }
|
pub fn keys(&self) -> impl Iterator<Item = &MapsKey> + Send + '_ { self.maps.keys() }
|
||||||
|
|
||||||
#[inline]
|
|
||||||
#[must_use]
|
|
||||||
pub fn is_read_only(&self) -> bool { self.db.is_read_only() }
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
#[must_use]
|
|
||||||
pub fn is_secondary(&self) -> bool { self.db.is_secondary() }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Index<&str> for Database {
|
impl Index<&str> for Database {
|
||||||
|
|||||||
@@ -37,10 +37,6 @@ impl crate::Service for Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn worker(self: Arc<Self>) -> Result {
|
async fn worker(self: Arc<Self>) -> Result {
|
||||||
if self.services.globals.is_read_only() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.services.config.ldap.enable {
|
if self.services.config.ldap.enable {
|
||||||
warn!("emergency password feature not available with LDAP enabled.");
|
warn!("emergency password feature not available with LDAP enabled.");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
|||||||
@@ -31,12 +31,13 @@ impl crate::Service for Service {
|
|||||||
|
|
||||||
let turn_secret = config.turn_secret_file.as_ref().map_or_else(
|
let turn_secret = config.turn_secret_file.as_ref().map_or_else(
|
||||||
|| config.turn_secret.clone(),
|
|| config.turn_secret.clone(),
|
||||||
|path| {
|
|path| match std::fs::read_to_string(path) {
|
||||||
std::fs::read_to_string(path).unwrap_or_else(|e| {
|
| Ok(secret) => secret.trim().to_owned(),
|
||||||
|
| Err(e) => {
|
||||||
error!("Failed to read the TURN secret file: {e}");
|
error!("Failed to read the TURN secret file: {e}");
|
||||||
|
|
||||||
config.turn_secret.clone()
|
config.turn_secret.clone()
|
||||||
})
|
},
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -49,7 +50,7 @@ impl crate::Service for Service {
|
|||||||
return config.registration_token.clone();
|
return config.registration_token.clone();
|
||||||
};
|
};
|
||||||
|
|
||||||
Some(token)
|
Some(token.trim().to_owned())
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -170,7 +171,4 @@ impl Service {
|
|||||||
pub fn server_is_ours(&self, server_name: &ServerName) -> bool {
|
pub fn server_is_ours(&self, server_name: &ServerName) -> bool {
|
||||||
server_name == self.server_name()
|
server_name == self.server_name()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn is_read_only(&self) -> bool { self.db.db.is_read_only() }
|
|
||||||
}
|
}
|
||||||
|
|||||||
+23
-15
@@ -590,6 +590,10 @@ async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result
|
|||||||
|
|
||||||
const FIXED_CORRUPT_MSC4133_FIELDS_MARKER: &[u8] = b"fix_corrupt_msc4133_fields";
|
const FIXED_CORRUPT_MSC4133_FIELDS_MARKER: &[u8] = b"fix_corrupt_msc4133_fields";
|
||||||
async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
|
async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
|
||||||
|
// Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user
|
||||||
|
// profile fields with raw strings instead of quoted JSON ones.
|
||||||
|
// This migration fixes that.
|
||||||
|
|
||||||
use serde_json::{Value, from_slice};
|
use serde_json::{Value, from_slice};
|
||||||
type KeyVal<'a> = ((OwnedUserId, String), &'a [u8]);
|
type KeyVal<'a> = ((OwnedUserId, String), &'a [u8]);
|
||||||
|
|
||||||
@@ -606,24 +610,28 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
|
|||||||
async |(mut total, mut fixed),
|
async |(mut total, mut fixed),
|
||||||
((user, key), value): KeyVal<'_>|
|
((user, key), value): KeyVal<'_>|
|
||||||
-> Result<(usize, usize)> {
|
-> Result<(usize, usize)> {
|
||||||
if let Err(error) = from_slice::<Value>(value) {
|
match from_slice::<Value>(value) {
|
||||||
// Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user
|
// corrupted timezone field
|
||||||
// profile fields with raw strings instead of quoted JSON ones.
|
| Err(_) if key == "us.cloke.msc4175.tz" => {
|
||||||
// This migration fixes that.
|
let new_value = Value::String(String::from_utf8(value.to_vec())?);
|
||||||
let new_value = if key == "us.cloke.msc4175.tz" {
|
|
||||||
Value::String(String::from_utf8(value.to_vec())?)
|
|
||||||
} else {
|
|
||||||
return Err!(
|
|
||||||
"failed to deserialize msc4133 key {} of user {}: {}",
|
|
||||||
key,
|
|
||||||
user,
|
|
||||||
error
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
useridprofilekey_value.put((user, key), Json(new_value));
|
useridprofilekey_value.put((user, key), Json(new_value));
|
||||||
fixed = fixed.saturating_add(1);
|
fixed = fixed.saturating_add(1);
|
||||||
|
},
|
||||||
|
// corrupted value for some other key
|
||||||
|
| Err(error) => {
|
||||||
|
warn!(
|
||||||
|
"deleting MSC4133 key {} for user {} due to deserialization \
|
||||||
|
failure: {}",
|
||||||
|
key, user, error
|
||||||
|
);
|
||||||
|
useridprofilekey_value.del((user, key));
|
||||||
|
},
|
||||||
|
// other key with no issues
|
||||||
|
| Ok(_) => {
|
||||||
|
// do nothing
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
total = total.saturating_add(1);
|
total = total.saturating_add(1);
|
||||||
|
|
||||||
Ok((total, fixed))
|
Ok((total, fixed))
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ use futures::{
|
|||||||
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType};
|
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::rooms::timeline::RawPduId;
|
use crate::rooms::timeline::{RawPduId, pdu_fits};
|
||||||
|
|
||||||
/// When receiving an event one needs to:
|
/// When receiving an event one needs to:
|
||||||
/// 0. Check the server is in the room
|
/// 0. Check the server is in the room
|
||||||
@@ -62,6 +62,13 @@ pub async fn handle_incoming_pdu<'a>(
|
|||||||
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
|
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
|
||||||
return Ok(Some(pdu_id));
|
return Ok(Some(pdu_id));
|
||||||
}
|
}
|
||||||
|
if !pdu_fits(&mut value.clone()) {
|
||||||
|
warn!(
|
||||||
|
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
|
||||||
|
exceeds 65535 bytes or is otherwise too large."
|
||||||
|
);
|
||||||
|
return Err!(Request(TooLarge("PDU is too large")));
|
||||||
|
}
|
||||||
|
|
||||||
// 1.1 Check the server is in the room
|
// 1.1 Check the server is in the room
|
||||||
let meta_exists = self.services.metadata.exists(room_id).map(Ok);
|
let meta_exists = self.services.metadata.exists(room_id).map(Ok);
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
use std::collections::{BTreeMap, HashMap, hash_map};
|
use std::collections::{BTreeMap, HashMap, hash_map};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, trace,
|
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
|
||||||
|
trace, warn,
|
||||||
};
|
};
|
||||||
use futures::future::ready;
|
use futures::future::ready;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
@@ -10,6 +11,7 @@ use ruma::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use super::{check_room_id, get_room_version_id, to_room_version};
|
use super::{check_room_id, get_room_version_id, to_room_version};
|
||||||
|
use crate::rooms::timeline::pdu_fits;
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
@@ -25,6 +27,13 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
|||||||
where
|
where
|
||||||
Pdu: Event + Send + Sync,
|
Pdu: Event + Send + Sync,
|
||||||
{
|
{
|
||||||
|
if !pdu_fits(&mut value.clone()) {
|
||||||
|
warn!(
|
||||||
|
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
|
||||||
|
exceeds 65535 bytes or is otherwise too large."
|
||||||
|
);
|
||||||
|
return Err!(Request(TooLarge("PDU is too large")));
|
||||||
|
}
|
||||||
// 1. Remove unsigned field
|
// 1. Remove unsigned field
|
||||||
value.remove("unsigned");
|
value.remove("unsigned");
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,40 @@ use serde_json::value::{RawValue, to_raw_value};
|
|||||||
|
|
||||||
use super::RoomMutexGuard;
|
use super::RoomMutexGuard;
|
||||||
|
|
||||||
|
pub fn pdu_fits(owned_obj: &mut CanonicalJsonObject) -> bool {
|
||||||
|
// room IDs, event IDs, senders, types, and state keys must all be <= 255 bytes
|
||||||
|
if let Some(CanonicalJsonValue::String(room_id)) = owned_obj.get("room_id") {
|
||||||
|
if room_id.len() > 255 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(CanonicalJsonValue::String(event_id)) = owned_obj.get("event_id") {
|
||||||
|
if event_id.len() > 255 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(CanonicalJsonValue::String(sender)) = owned_obj.get("sender") {
|
||||||
|
if sender.len() > 255 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(CanonicalJsonValue::String(kind)) = owned_obj.get("type") {
|
||||||
|
if kind.len() > 255 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(CanonicalJsonValue::String(state_key)) = owned_obj.get("state_key") {
|
||||||
|
if state_key.len() > 255 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Now check the full PDU size
|
||||||
|
match serde_json::to_string(owned_obj) {
|
||||||
|
| Ok(s) => s.len() <= 65535,
|
||||||
|
| Err(_) => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
pub async fn create_hash_and_sign_event(
|
pub async fn create_hash_and_sign_event(
|
||||||
&self,
|
&self,
|
||||||
@@ -148,19 +182,6 @@ pub async fn create_hash_and_sign_event(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if event_type != TimelineEventType::RoomCreate && prev_events.is_empty() {
|
|
||||||
// return Err!(Request(Unknown("Event incorrectly had zero prev_events.")));
|
|
||||||
// }
|
|
||||||
// if state_key.is_none() && depth.lt(&uint!(2)) {
|
|
||||||
// // The first two events in a room are always m.room.create and
|
|
||||||
// m.room.member, // so any other events with that same depth are illegal.
|
|
||||||
// warn!(
|
|
||||||
// "Had unsafe depth {depth} when creating non-state event in {}. Cowardly
|
|
||||||
// aborting", room_id.expect("room_id is Some here").as_str()
|
|
||||||
// );
|
|
||||||
// return Err!(Request(Unknown("Unsafe depth for non-state event.")));
|
|
||||||
// }
|
|
||||||
|
|
||||||
let mut pdu = PduEvent {
|
let mut pdu = PduEvent {
|
||||||
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
|
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
|
||||||
room_id: room_id.map(ToOwned::to_owned),
|
room_id: room_id.map(ToOwned::to_owned),
|
||||||
@@ -269,8 +290,16 @@ pub async fn create_hash_and_sign_event(
|
|||||||
}
|
}
|
||||||
// Generate event id
|
// Generate event id
|
||||||
pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?;
|
pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?;
|
||||||
// Check with the policy server
|
|
||||||
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
|
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
|
||||||
|
// Verify that the *full* PDU isn't over 64KiB.
|
||||||
|
// Ruma only validates that it's under 64KiB before signing and hashing.
|
||||||
|
// Has to be cloned to prevent mutating pdu_json itself :(
|
||||||
|
if !pdu_fits(&mut pdu_json.clone()) {
|
||||||
|
// feckin huge PDU mate
|
||||||
|
return Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)")));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check with the policy server
|
||||||
if room_id.is_some() {
|
if room_id.is_some() {
|
||||||
trace!(
|
trace!(
|
||||||
"Checking event in room {} with policy server",
|
"Checking event in room {} with policy server",
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ use ruma::{
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
use self::data::Data;
|
use self::data::Data;
|
||||||
pub use self::data::PdusIterItem;
|
pub use self::{create::pdu_fits, data::PdusIterItem};
|
||||||
use crate::{
|
use crate::{
|
||||||
Dep, account_data, admin, appservice, globals, pusher, rooms, sending, server_keys, users,
|
Dep, account_data, admin, appservice, globals, pusher, rooms, sending, server_keys, users,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -130,7 +130,7 @@ impl Services {
|
|||||||
|
|
||||||
// reset dormant online/away statuses to offline, and set the server user as
|
// reset dormant online/away statuses to offline, and set the server user as
|
||||||
// online
|
// online
|
||||||
if self.server.config.allow_local_presence && !self.db.is_read_only() {
|
if self.server.config.allow_local_presence {
|
||||||
self.presence.unset_all_presence().await;
|
self.presence.unset_all_presence().await;
|
||||||
_ = self
|
_ = self
|
||||||
.presence
|
.presence
|
||||||
@@ -146,7 +146,7 @@ impl Services {
|
|||||||
info!("Shutting down services...");
|
info!("Shutting down services...");
|
||||||
|
|
||||||
// set the server user as offline
|
// set the server user as offline
|
||||||
if self.server.config.allow_local_presence && !self.db.is_read_only() {
|
if self.server.config.allow_local_presence {
|
||||||
_ = self
|
_ = self
|
||||||
.presence
|
.presence
|
||||||
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline)
|
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline)
|
||||||
|
|||||||
+30
-7
@@ -11,7 +11,7 @@ use database::{Deserialized, Json, Map};
|
|||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedUserId, UserId,
|
CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedUserId, UserId,
|
||||||
api::client::{
|
api::client::{
|
||||||
error::ErrorKind,
|
error::{ErrorKind, StandardErrorBody},
|
||||||
uiaa::{AuthData, AuthType, Password, UiaaInfo, UserIdentifier},
|
uiaa::{AuthData, AuthType, Password, UiaaInfo, UserIdentifier},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
@@ -104,6 +104,7 @@ pub fn create(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
|
#[allow(clippy::useless_let_if_seq)]
|
||||||
pub async fn try_auth(
|
pub async fn try_auth(
|
||||||
&self,
|
&self,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
@@ -163,16 +164,38 @@ pub async fn try_auth(
|
|||||||
let user_id = user_id_from_username;
|
let user_id = user_id_from_username;
|
||||||
|
|
||||||
// Check if password is correct
|
// Check if password is correct
|
||||||
|
let mut password_verified = false;
|
||||||
|
|
||||||
|
// First try local password hash verification
|
||||||
if let Ok(hash) = self.services.users.password_hash(&user_id).await {
|
if let Ok(hash) = self.services.users.password_hash(&user_id).await {
|
||||||
let hash_matches = hash::verify_password(password, &hash).is_ok();
|
password_verified = hash::verify_password(password, &hash).is_ok();
|
||||||
if !hash_matches {
|
}
|
||||||
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
|
|
||||||
|
// If local password verification failed, try LDAP authentication
|
||||||
|
#[cfg(feature = "ldap")]
|
||||||
|
if !password_verified && self.services.config.ldap.enable {
|
||||||
|
// Search for user in LDAP to get their DN
|
||||||
|
if let Ok(dns) = self.services.users.search_ldap(&user_id).await {
|
||||||
|
if let Some((user_dn, _is_admin)) = dns.first() {
|
||||||
|
// Try to authenticate with LDAP
|
||||||
|
password_verified = self
|
||||||
|
.services
|
||||||
|
.users
|
||||||
|
.auth_ldap(user_dn, password)
|
||||||
|
.await
|
||||||
|
.is_ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !password_verified {
|
||||||
|
uiaainfo.auth_error = Some(StandardErrorBody {
|
||||||
kind: ErrorKind::forbidden(),
|
kind: ErrorKind::forbidden(),
|
||||||
message: "Invalid username or password.".to_owned(),
|
message: "Invalid username or password.".to_owned(),
|
||||||
});
|
});
|
||||||
|
|
||||||
return Ok((false, uiaainfo));
|
return Ok((false, uiaainfo));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Password was correct! Let's add it to `completed`
|
// Password was correct! Let's add it to `completed`
|
||||||
uiaainfo.completed.push(AuthType::Password);
|
uiaainfo.completed.push(AuthType::Password);
|
||||||
@@ -197,7 +220,7 @@ pub async fn try_auth(
|
|||||||
},
|
},
|
||||||
| Err(e) => {
|
| Err(e) => {
|
||||||
error!("ReCaptcha verification failed: {e:?}");
|
error!("ReCaptcha verification failed: {e:?}");
|
||||||
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
|
uiaainfo.auth_error = Some(StandardErrorBody {
|
||||||
kind: ErrorKind::forbidden(),
|
kind: ErrorKind::forbidden(),
|
||||||
message: "ReCaptcha verification failed.".to_owned(),
|
message: "ReCaptcha verification failed.".to_owned(),
|
||||||
});
|
});
|
||||||
@@ -210,7 +233,7 @@ pub async fn try_auth(
|
|||||||
if tokens.contains(t.token.trim()) {
|
if tokens.contains(t.token.trim()) {
|
||||||
uiaainfo.completed.push(AuthType::RegistrationToken);
|
uiaainfo.completed.push(AuthType::RegistrationToken);
|
||||||
} else {
|
} else {
|
||||||
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
|
uiaainfo.auth_error = Some(StandardErrorBody {
|
||||||
kind: ErrorKind::forbidden(),
|
kind: ErrorKind::forbidden(),
|
||||||
message: "Invalid registration token.".to_owned(),
|
message: "Invalid registration token.".to_owned(),
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
#[cfg(feature = "ldap")]
|
#[cfg(feature = "ldap")]
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::{collections::BTreeMap, mem, sync::Arc};
|
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
|
||||||
|
|
||||||
#[cfg(feature = "ldap")]
|
#[cfg(feature = "ldap")]
|
||||||
use conduwuit::result::LogErr;
|
use conduwuit::result::LogErr;
|
||||||
@@ -25,6 +25,7 @@ use ruma::{
|
|||||||
invite_permission_config::{FilterLevel, InvitePermissionConfigEvent},
|
invite_permission_config::{FilterLevel, InvitePermissionConfigEvent},
|
||||||
},
|
},
|
||||||
serde::Raw,
|
serde::Raw,
|
||||||
|
uint,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
@@ -980,6 +981,7 @@ impl Service {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Updates device metadata and increments the device list version.
|
||||||
pub async fn update_device_metadata(
|
pub async fn update_device_metadata(
|
||||||
&self,
|
&self,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
@@ -987,13 +989,51 @@ impl Service {
|
|||||||
device: &Device,
|
device: &Device,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
increment(&self.db.userid_devicelistversion, user_id.as_bytes());
|
increment(&self.db.userid_devicelistversion, user_id.as_bytes());
|
||||||
|
self.update_device_metadata_no_increment(user_id, device_id, device)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updates device metadata without incrementing the device list version.
|
||||||
|
// This is namely used for updating the last_seen_ip and last_seen_ts values,
|
||||||
|
// as those do not need a device list version bump due to them not being
|
||||||
|
// relevant to other consumers.
|
||||||
|
pub async fn update_device_metadata_no_increment(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
device_id: &DeviceId,
|
||||||
|
device: &Device,
|
||||||
|
) -> Result<()> {
|
||||||
let key = (user_id, device_id);
|
let key = (user_id, device_id);
|
||||||
self.db.userdeviceid_metadata.put(key, Json(device));
|
self.db.userdeviceid_metadata.put(key, Json(device));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn update_device_last_seen(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
device_id: Option<&DeviceId>,
|
||||||
|
ip: IpAddr,
|
||||||
|
) {
|
||||||
|
let now = MilliSecondsSinceUnixEpoch::now();
|
||||||
|
if let Some(device_id) = device_id {
|
||||||
|
if let Ok(mut device) = self.get_device_metadata(user_id, device_id).await {
|
||||||
|
device.last_seen_ip = Some(ip.to_string());
|
||||||
|
// If the last update was less than 10 seconds ago, don't update the timestamp
|
||||||
|
if let Some(prev) = device.last_seen_ts {
|
||||||
|
if now.get().saturating_sub(prev.get()) < uint!(10_000) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
device.last_seen_ts = Some(now);
|
||||||
|
|
||||||
|
self.update_device_metadata_no_increment(user_id, device_id, &device)
|
||||||
|
.await
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Get device metadata.
|
/// Get device metadata.
|
||||||
pub async fn get_device_metadata(
|
pub async fn get_device_metadata(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
Reference in New Issue
Block a user