Compare commits

..

1 Commits

Author SHA1 Message Date
Jade Ellis 5bae8b4f6c feat: Deal with low screen widths in CSS 2025-05-21 02:54:03 +01:00
29 changed files with 673 additions and 661 deletions
-2
View File
@@ -80,7 +80,6 @@ jobs:
-D warnings -D warnings
- name: Show sccache stats - name: Show sccache stats
if: always()
run: sccache --show-stats run: sccache --show-stats
cargo-test: cargo-test:
@@ -138,5 +137,4 @@ jobs:
--no-fail-fast --no-fail-fast
- name: Show sccache stats - name: Show sccache stats
if: always()
run: sccache --show-stats run: sccache --show-stats
Generated
+278 -200
View File
File diff suppressed because it is too large Load Diff
-63
View File
@@ -1,63 +0,0 @@
# Security Policy for Continuwuity
This document outlines the security policy for Continuwuity. Our goal is to maintain a secure platform for all users, and we take security matters seriously.
## Supported Versions
We provide security updates for the following versions of Continuwuity:
| Version | Supported |
| -------------- |:----------------:|
| Latest release | ✅ |
| Main branch | ✅ |
| Older releases | ❌ |
We may backport fixes to the previous release at our discretion, but we don't guarantee this.
## Reporting a Vulnerability
### Responsible Disclosure
We appreciate the efforts of security researchers and the community in identifying and reporting vulnerabilities. To ensure that potential vulnerabilities are addressed properly, please follow these guidelines:
1. Contact members of the team over E2EE private message.
- [@jade:ellis.link](https://matrix.to/#/@jade:ellis.link)
- [@nex:nexy7574.co.uk](https://matrix.to/#/@nex:nexy7574.co.uk) <!-- ? -->
2. **Email the security team** directly at [security@continuwuity.org](mailto:security@continuwuity.org). This is not E2EE, so don't include sensitive details.
3. **Do not disclose the vulnerability publicly** until it has been addressed
4. **Provide detailed information** about the vulnerability, including:
- A clear description of the issue
- Steps to reproduce
- Potential impact
- Any possible mitigations
- Version(s) affected, including specific commits if possible
If you have any doubts about a potential security vulnerability, contact us via private channels first! We'd prefer that you bother us, instead of having a vulnerability disclosed without a fix.
### What to Expect
When you report a security vulnerability:
1. **Acknowledgment**: We will acknowledge receipt of your report.
2. **Assessment**: We will assess the vulnerability and determine its impact on our users
3. **Updates**: We will provide updates on our progress in addressing the vulnerability, and may request you help test mitigations
4. **Resolution**: Once resolved, we will notify you and discuss coordinated disclosure
5. **Credit**: We will recognize your contribution (unless you prefer to remain anonymous)
## Security Update Process
When security vulnerabilities are identified:
1. We will develop and test fixes in a private branch
2. Security updates will be released as soon as possible
3. Release notes will include information about the vulnerabilities, avoiding details that could facilitate exploitation where possible
4. Critical security updates may be backported to the previous stable release
## Additional Resources
- [Matrix Security Disclosure Policy](https://matrix.org/security-disclosure-policy/)
- [Continuwuity Documentation](https://continuwuity.org/introduction)
---
This security policy was last updated on May 25, 2025.
+5 -15
View File
@@ -1641,29 +1641,19 @@
# #
#server = #server =
# URL to a support page for the server, which will be served as part of # This item is undocumented. Please contribute documentation for it.
# the MSC1929 server support endpoint at /.well-known/matrix/support.
# Will be included alongside any contact information
# #
#support_page = #support_page =
# Role string for server support contacts, to be served as part of the # This item is undocumented. Please contribute documentation for it.
# MSC1929 server support endpoint at /.well-known/matrix/support.
# #
#support_role = "m.role.admin" #support_role =
# Email address for server support contacts, to be served as part of the # This item is undocumented. Please contribute documentation for it.
# MSC1929 server support endpoint.
# This will be used along with support_mxid if specified.
# #
#support_email = #support_email =
# Matrix ID for server support contacts, to be served as part of the # This item is undocumented. Please contribute documentation for it.
# MSC1929 server support endpoint.
# This will be used along with support_email if specified.
#
# If no email or mxid is specified, all of the server's admins will be
# listed.
# #
#support_mxid = #support_mxid =
-1
View File
@@ -20,4 +20,3 @@
- [Testing](development/testing.md) - [Testing](development/testing.md)
- [Hot Reloading ("Live" Development)](development/hot_reload.md) - [Hot Reloading ("Live" Development)](development/hot_reload.md)
- [Community (and Guidelines)](community.md) - [Community (and Guidelines)](community.md)
- [Security](security.md)
-1
View File
@@ -1 +0,0 @@
{{#include ../SECURITY.md}}
-1
View File
@@ -1243,7 +1243,6 @@ async fn join_room_by_id_helper_remote(
services.rooms.timeline.get_pdu(event_id).await.ok() services.rooms.timeline.get_pdu(event_id).await.ok()
}; };
debug!("running stateres check on send_join parsed PDU");
let auth_check = state_res::event_auth::auth_check( let auth_check = state_res::event_auth::auth_check(
&state_res::RoomVersion::new(&room_version_id)?, &state_res::RoomVersion::new(&room_version_id)?,
&parsed_join_pdu, &parsed_join_pdu,
+22 -42
View File
@@ -1,6 +1,5 @@
use axum::{Json, extract::State, response::IntoResponse}; use axum::{Json, extract::State, response::IntoResponse};
use conduwuit::{Error, Result}; use conduwuit::{Error, Result};
use futures::StreamExt;
use ruma::api::client::{ use ruma::api::client::{
discovery::{ discovery::{
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo}, discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
@@ -18,7 +17,7 @@ pub(crate) async fn well_known_client(
State(services): State<crate::State>, State(services): State<crate::State>,
_body: Ruma<discover_homeserver::Request>, _body: Ruma<discover_homeserver::Request>,
) -> Result<discover_homeserver::Response> { ) -> Result<discover_homeserver::Response> {
let client_url = match services.config.well_known.client.as_ref() { let client_url = match services.server.config.well_known.client.as_ref() {
| Some(url) => url.to_string(), | Some(url) => url.to_string(),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")), | None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
}; };
@@ -34,63 +33,44 @@ pub(crate) async fn well_known_client(
/// # `GET /.well-known/matrix/support` /// # `GET /.well-known/matrix/support`
/// ///
/// Server support contact and support page of a homeserver's domain. /// Server support contact and support page of a homeserver's domain.
/// Implements MSC1929 for server discovery.
/// If no configuration is set, uses admin users as contacts.
pub(crate) async fn well_known_support( pub(crate) async fn well_known_support(
State(services): State<crate::State>, State(services): State<crate::State>,
_body: Ruma<discover_support::Request>, _body: Ruma<discover_support::Request>,
) -> Result<discover_support::Response> { ) -> Result<discover_support::Response> {
let support_page = services let support_page = services
.server
.config .config
.well_known .well_known
.support_page .support_page
.as_ref() .as_ref()
.map(ToString::to_string); .map(ToString::to_string);
let email_address = services.config.well_known.support_email.clone(); let role = services.server.config.well_known.support_role.clone();
let matrix_id = services.config.well_known.support_mxid.clone();
// support page or role must be either defined for this to be valid
if support_page.is_none() && role.is_none() {
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
}
let email_address = services.server.config.well_known.support_email.clone();
let matrix_id = services.server.config.well_known.support_mxid.clone();
// if a role is specified, an email address or matrix id is required
if role.is_some() && (email_address.is_none() && matrix_id.is_none()) {
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
}
// TODO: support defining multiple contacts in the config // TODO: support defining multiple contacts in the config
let mut contacts: Vec<Contact> = vec![]; let mut contacts: Vec<Contact> = vec![];
let role_value = services if let Some(role) = role {
.config let contact = Contact { role, email_address, matrix_id };
.well_known
.support_role
.clone()
.unwrap_or_else(|| "m.role.admin".to_owned().into());
// Add configured contact if at least one contact method is specified contacts.push(contact);
if email_address.is_some() || matrix_id.is_some() {
contacts.push(Contact {
role: role_value.clone(),
email_address: email_address.clone(),
matrix_id: matrix_id.clone(),
});
}
// Try to add admin users as contacts if no contacts are configured
if contacts.is_empty() {
if let Ok(admin_room) = services.admin.get_admin_room().await {
let admin_users = services.rooms.state_cache.room_members(&admin_room);
let mut stream = admin_users;
while let Some(user_id) = stream.next().await {
// Skip server user
if *user_id == services.globals.server_user {
break;
}
contacts.push(Contact {
role: role_value.clone(),
email_address: None,
matrix_id: Some(user_id.to_owned()),
});
}
}
} }
// support page or role+contacts must be either defined for this to be valid
if contacts.is_empty() && support_page.is_none() { if contacts.is_empty() && support_page.is_none() {
// No admin room, no configured contacts, and no support page
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")); return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
} }
@@ -104,9 +84,9 @@ pub(crate) async fn well_known_support(
pub(crate) async fn syncv3_client_server_json( pub(crate) async fn syncv3_client_server_json(
State(services): State<crate::State>, State(services): State<crate::State>,
) -> Result<impl IntoResponse> { ) -> Result<impl IntoResponse> {
let server_url = match services.config.well_known.client.as_ref() { let server_url = match services.server.config.well_known.client.as_ref() {
| Some(url) => url.to_string(), | Some(url) => url.to_string(),
| None => match services.config.well_known.server.as_ref() { | None => match services.server.config.well_known.server.as_ref() {
| Some(url) => url.to_string(), | Some(url) => url.to_string(),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")), | None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
}, },
+3 -3
View File
@@ -18,8 +18,8 @@ build = "build.rs"
[lib] [lib]
path = "mod.rs" path = "mod.rs"
crate-type = [ crate-type = [
"rlib", "rlib",
# "dylib", # "dylib",
] ]
[features] [features]
@@ -28,7 +28,7 @@ crate-type = [
[dependencies] [dependencies]
[build-dependencies] [build-dependencies]
built = { version = "0.8", features = [] } built = {version = "0.7", features = ["cargo-lock", "dependency-tree"]}
[lints] [lints]
workspace = true workspace = true
-1
View File
@@ -78,7 +78,6 @@ fn main() {
} }
// --- Rerun Triggers --- // --- Rerun Triggers ---
// TODO: The git rerun triggers seem to always run
// Rerun if the git HEAD changes // Rerun if the git HEAD changes
println!("cargo:rerun-if-changed=.git/HEAD"); println!("cargo:rerun-if-changed=.git/HEAD");
// Rerun if the ref pointed to by HEAD changes (e.g., new commit on branch) // Rerun if the ref pointed to by HEAD changes (e.g., new commit on branch)
+3 -9
View File
@@ -12,17 +12,11 @@ pub static VERSION_EXTRA: Option<&str> =
v v
} else if let v @ Some(_) = option_env!("CONDUWUIT_VERSION_EXTRA") { } else if let v @ Some(_) = option_env!("CONDUWUIT_VERSION_EXTRA") {
v v
} else if let v @ Some(_) = option_env!("CONDUIT_VERSION_EXTRA") {
v
} else { } else {
option_env!("CONDUIT_VERSION_EXTRA") GIT_COMMIT_HASH_SHORT
}; };
#[must_use]
pub fn version_tag() -> Option<&'static str> {
VERSION_EXTRA
.filter(|s| !s.is_empty())
.or(GIT_COMMIT_HASH_SHORT)
}
pub static GIT_REMOTE_WEB_URL: Option<&str> = option_env!("GIT_REMOTE_WEB_URL"); pub static GIT_REMOTE_WEB_URL: Option<&str> = option_env!("GIT_REMOTE_WEB_URL");
pub static GIT_REMOTE_COMMIT_URL: Option<&str> = option_env!("GIT_REMOTE_COMMIT_URL"); pub static GIT_REMOTE_COMMIT_URL: Option<&str> = option_env!("GIT_REMOTE_COMMIT_URL");
-4
View File
@@ -274,10 +274,6 @@ pub fn set_dirty_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Res
} }
} }
pub fn background_thread_enable(enable: bool) -> Result<bool> {
set::<u8>(&mallctl!("background_thread"), enable.into()).map(is_nonzero!())
}
#[inline] #[inline]
#[must_use] #[must_use]
pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() } pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() }
-16
View File
@@ -1897,28 +1897,12 @@ pub struct WellKnownConfig {
/// example: "matrix.example.com:443" /// example: "matrix.example.com:443"
pub server: Option<OwnedServerName>, pub server: Option<OwnedServerName>,
/// URL to a support page for the server, which will be served as part of
/// the MSC1929 server support endpoint at /.well-known/matrix/support.
/// Will be included alongside any contact information
pub support_page: Option<Url>, pub support_page: Option<Url>,
/// Role string for server support contacts, to be served as part of the
/// MSC1929 server support endpoint at /.well-known/matrix/support.
///
/// default: "m.role.admin"
pub support_role: Option<ContactRole>, pub support_role: Option<ContactRole>,
/// Email address for server support contacts, to be served as part of the
/// MSC1929 server support endpoint.
/// This will be used along with support_mxid if specified.
pub support_email: Option<String>, pub support_email: Option<String>,
/// Matrix ID for server support contacts, to be served as part of the
/// MSC1929 server support endpoint.
/// This will be used along with support_email if specified.
///
/// If no email or mxid is specified, all of the server's admins will be
/// listed.
pub support_mxid: Option<OwnedUserId>, pub support_mxid: Option<OwnedUserId>,
} }
+1 -1
View File
@@ -26,6 +26,6 @@ pub fn user_agent() -> &'static str { USER_AGENT.get_or_init(init_user_agent) }
fn init_user_agent() -> String { format!("{}/{}", name(), version()) } fn init_user_agent() -> String { format!("{}/{}", name(), version()) }
fn init_version() -> String { fn init_version() -> String {
conduwuit_build_metadata::version_tag() conduwuit_build_metadata::VERSION_EXTRA
.map_or(SEMANTIC.to_owned(), |extra| format!("{SEMANTIC} ({extra})")) .map_or(SEMANTIC.to_owned(), |extra| format!("{SEMANTIC} ({extra})"))
} }
+18 -8
View File
@@ -1,10 +1,18 @@
use std::{
borrow::Borrow,
fmt::{Debug, Display},
hash::Hash,
};
use ruma::{EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, events::TimelineEventType}; use ruma::{EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, events::TimelineEventType};
use serde_json::value::RawValue as RawJsonValue; use serde_json::value::RawValue as RawJsonValue;
/// Abstraction of a PDU so users can have their own PDU types. /// Abstraction of a PDU so users can have their own PDU types.
pub trait Event { pub trait Event {
type Id: Clone + Debug + Display + Eq + Ord + Hash + Send + Borrow<EventId>;
/// The `EventId` of this event. /// The `EventId` of this event.
fn event_id(&self) -> &EventId; fn event_id(&self) -> &Self::Id;
/// The `RoomId` of this event. /// The `RoomId` of this event.
fn room_id(&self) -> &RoomId; fn room_id(&self) -> &RoomId;
@@ -26,18 +34,20 @@ pub trait Event {
/// The events before this event. /// The events before this event.
// Requires GATs to avoid boxing (and TAIT for making it convenient). // Requires GATs to avoid boxing (and TAIT for making it convenient).
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_; fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_;
/// All the authenticating events for this event. /// All the authenticating events for this event.
// Requires GATs to avoid boxing (and TAIT for making it convenient). // Requires GATs to avoid boxing (and TAIT for making it convenient).
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_; fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_;
/// If this event is a redaction event this is the event it redacts. /// If this event is a redaction event this is the event it redacts.
fn redacts(&self) -> Option<&EventId>; fn redacts(&self) -> Option<&Self::Id>;
} }
impl<T: Event> Event for &T { impl<T: Event> Event for &T {
fn event_id(&self) -> &EventId { (*self).event_id() } type Id = T::Id;
fn event_id(&self) -> &Self::Id { (*self).event_id() }
fn room_id(&self) -> &RoomId { (*self).room_id() } fn room_id(&self) -> &RoomId { (*self).room_id() }
@@ -51,13 +61,13 @@ impl<T: Event> Event for &T {
fn state_key(&self) -> Option<&str> { (*self).state_key() } fn state_key(&self) -> Option<&str> { (*self).state_key() }
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ { fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
(*self).prev_events() (*self).prev_events()
} }
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ { fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
(*self).auth_events() (*self).auth_events()
} }
fn redacts(&self) -> Option<&EventId> { (*self).redacts() } fn redacts(&self) -> Option<&Self::Id> { (*self).redacts() }
} }
+8 -6
View File
@@ -79,7 +79,9 @@ impl Pdu {
} }
impl Event for Pdu { impl Event for Pdu {
fn event_id(&self) -> &EventId { &self.event_id } type Id = OwnedEventId;
fn event_id(&self) -> &Self::Id { &self.event_id }
fn room_id(&self) -> &RoomId { &self.room_id } fn room_id(&self) -> &RoomId { &self.room_id }
@@ -95,15 +97,15 @@ impl Event for Pdu {
fn state_key(&self) -> Option<&str> { self.state_key.as_deref() } fn state_key(&self) -> Option<&str> { self.state_key.as_deref() }
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ { fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
self.prev_events.iter().map(AsRef::as_ref) self.prev_events.iter()
} }
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ { fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
self.auth_events.iter().map(AsRef::as_ref) self.auth_events.iter()
} }
fn redacts(&self) -> Option<&EventId> { self.redacts.as_deref() } fn redacts(&self) -> Option<&Self::Id> { self.redacts.as_ref() }
} }
/// Prevent derived equality which wouldn't limit itself to event_id /// Prevent derived equality which wouldn't limit itself to event_id
+50 -19
View File
@@ -1,8 +1,8 @@
use ruma::{ use ruma::{
events::{ events::{
AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent, AnySyncStateEvent, AnyEphemeralRoomEvent, AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent,
AnySyncTimelineEvent, AnyTimelineEvent, StateEvent, room::member::RoomMemberEventContent, AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, StateEvent,
space::child::HierarchySpaceChildEvent, room::member::RoomMemberEventContent, space::child::HierarchySpaceChildEvent,
}, },
serde::Raw, serde::Raw,
}; };
@@ -10,6 +10,41 @@ use serde_json::{json, value::Value as JsonValue};
use crate::implement; use crate::implement;
/// This only works for events that are also AnyRoomEvents.
#[must_use]
#[implement(super::Pdu)]
pub fn into_any_event(self) -> Raw<AnyEphemeralRoomEvent> {
serde_json::from_value(self.into_any_event_value()).expect("Raw::from_value always works")
}
/// This only works for events that are also AnyRoomEvents.
#[implement(super::Pdu)]
#[must_use]
#[inline]
pub fn into_any_event_value(self) -> JsonValue {
let (redacts, content) = self.copy_redacts();
let mut json = json!({
"content": content,
"type": self.kind,
"event_id": self.event_id,
"sender": self.sender,
"origin_server_ts": self.origin_server_ts,
"room_id": self.room_id,
});
if let Some(unsigned) = &self.unsigned {
json["unsigned"] = json!(unsigned);
}
if let Some(state_key) = &self.state_key {
json["state_key"] = json!(state_key);
}
if let Some(redacts) = &redacts {
json["redacts"] = json!(redacts);
}
json
}
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
#[inline] #[inline]
@@ -18,8 +53,7 @@ pub fn into_room_event(self) -> Raw<AnyTimelineEvent> { self.to_room_event() }
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn to_room_event(&self) -> Raw<AnyTimelineEvent> { pub fn to_room_event(&self) -> Raw<AnyTimelineEvent> {
let value = self.to_room_event_value(); serde_json::from_value(self.to_room_event_value()).expect("Raw::from_value always works")
serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@@ -57,8 +91,8 @@ pub fn into_message_like_event(self) -> Raw<AnyMessageLikeEvent> { self.to_messa
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> { pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> {
let value = self.to_message_like_event_value(); serde_json::from_value(self.to_message_like_event_value())
serde_json::from_value(value).expect("Failed to serialize Event value") .expect("Raw::from_value always works")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@@ -96,8 +130,7 @@ pub fn into_sync_room_event(self) -> Raw<AnySyncTimelineEvent> { self.to_sync_ro
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> { pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
let value = self.to_sync_room_event_value(); serde_json::from_value(self.to_sync_room_event_value()).expect("Raw::from_value always works")
serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@@ -129,8 +162,7 @@ pub fn to_sync_room_event_value(&self) -> JsonValue {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn into_state_event(self) -> Raw<AnyStateEvent> { pub fn into_state_event(self) -> Raw<AnyStateEvent> {
let value = self.into_state_event_value(); serde_json::from_value(self.into_state_event_value()).expect("Raw::from_value always works")
serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@@ -157,8 +189,8 @@ pub fn into_state_event_value(self) -> JsonValue {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn into_sync_state_event(self) -> Raw<AnySyncStateEvent> { pub fn into_sync_state_event(self) -> Raw<AnySyncStateEvent> {
let value = self.into_sync_state_event_value(); serde_json::from_value(self.into_sync_state_event_value())
serde_json::from_value(value).expect("Failed to serialize Event value") .expect("Raw::from_value always works")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@@ -191,8 +223,8 @@ pub fn into_stripped_state_event(self) -> Raw<AnyStrippedStateEvent> {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn to_stripped_state_event(&self) -> Raw<AnyStrippedStateEvent> { pub fn to_stripped_state_event(&self) -> Raw<AnyStrippedStateEvent> {
let value = self.to_stripped_state_event_value(); serde_json::from_value(self.to_stripped_state_event_value())
serde_json::from_value(value).expect("Failed to serialize Event value") .expect("Raw::from_value always works")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@@ -210,8 +242,8 @@ pub fn to_stripped_state_event_value(&self) -> JsonValue {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn into_stripped_spacechild_state_event(self) -> Raw<HierarchySpaceChildEvent> { pub fn into_stripped_spacechild_state_event(self) -> Raw<HierarchySpaceChildEvent> {
let value = self.into_stripped_spacechild_state_event_value(); serde_json::from_value(self.into_stripped_spacechild_state_event_value())
serde_json::from_value(value).expect("Failed to serialize Event value") .expect("Raw::from_value always works")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@@ -230,8 +262,7 @@ pub fn into_stripped_spacechild_state_event_value(self) -> JsonValue {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn into_member_event(self) -> Raw<StateEvent<RoomMemberEventContent>> { pub fn into_member_event(self) -> Raw<StateEvent<RoomMemberEventContent>> {
let value = self.into_member_event_value(); serde_json::from_value(self.into_member_event_value()).expect("Raw::from_value always works")
serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
+21 -21
View File
@@ -52,6 +52,7 @@ fn lexico_topo_sort(c: &mut test::Bencher) {
#[cfg(conduwuit_bench)] #[cfg(conduwuit_bench)]
#[cfg_attr(conduwuit_bench, bench)] #[cfg_attr(conduwuit_bench, bench)]
fn resolution_shallow_auth_chain(c: &mut test::Bencher) { fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
let parallel_fetches = 32;
let mut store = TestStore(hashmap! {}); let mut store = TestStore(hashmap! {});
// build up the DAG // build up the DAG
@@ -77,6 +78,7 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
&auth_chain_sets, &auth_chain_sets,
&fetch, &fetch,
&exists, &exists,
parallel_fetches,
) )
.await .await
{ {
@@ -89,6 +91,7 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
#[cfg(conduwuit_bench)] #[cfg(conduwuit_bench)]
#[cfg_attr(conduwuit_bench, bench)] #[cfg_attr(conduwuit_bench, bench)]
fn resolve_deeper_event_set(c: &mut test::Bencher) { fn resolve_deeper_event_set(c: &mut test::Bencher) {
let parallel_fetches = 32;
let mut inner = INITIAL_EVENTS(); let mut inner = INITIAL_EVENTS();
let ban = BAN_STATE_SET(); let ban = BAN_STATE_SET();
@@ -150,6 +153,7 @@ fn resolve_deeper_event_set(c: &mut test::Bencher) {
&auth_chain_sets, &auth_chain_sets,
&fetch, &fetch,
&exists, &exists,
parallel_fetches,
) )
.await .await
{ {
@@ -186,11 +190,7 @@ impl<E: Event + Clone> TestStore<E> {
} }
/// Returns a Vec of the related auth events to the given `event`. /// Returns a Vec of the related auth events to the given `event`.
fn auth_event_ids( fn auth_event_ids(&self, room_id: &RoomId, event_ids: Vec<E::Id>) -> Result<HashSet<E::Id>> {
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
) -> Result<HashSet<OwnedEventId>> {
let mut result = HashSet::new(); let mut result = HashSet::new();
let mut stack = event_ids; let mut stack = event_ids;
@@ -216,8 +216,8 @@ impl<E: Event + Clone> TestStore<E> {
fn auth_chain_diff( fn auth_chain_diff(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
event_ids: Vec<Vec<OwnedEventId>>, event_ids: Vec<Vec<E::Id>>,
) -> Result<Vec<OwnedEventId>> { ) -> Result<Vec<E::Id>> {
let mut auth_chain_sets = vec![]; let mut auth_chain_sets = vec![];
for ids in event_ids { for ids in event_ids {
// TODO state store `auth_event_ids` returns self in the event ids list // TODO state store `auth_event_ids` returns self in the event ids list
@@ -238,7 +238,7 @@ impl<E: Event + Clone> TestStore<E> {
Ok(auth_chain_sets Ok(auth_chain_sets
.into_iter() .into_iter()
.flatten() .flatten()
.filter(|id| !common.contains(id)) .filter(|id| !common.contains(id.borrow()))
.collect()) .collect())
} else { } else {
Ok(vec![]) Ok(vec![])
@@ -565,7 +565,7 @@ impl EventTypeExt for &TimelineEventType {
mod event { mod event {
use ruma::{ use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
events::{TimelineEventType, pdu::Pdu}, events::{TimelineEventType, pdu::Pdu},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -574,7 +574,9 @@ mod event {
use super::Event; use super::Event;
impl Event for PduEvent { impl Event for PduEvent {
fn event_id(&self) -> &EventId { &self.event_id } type Id = OwnedEventId;
fn event_id(&self) -> &Self::Id { &self.event_id }
fn room_id(&self) -> &RoomId { fn room_id(&self) -> &RoomId {
match &self.rest { match &self.rest {
@@ -630,30 +632,28 @@ mod event {
} }
} }
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> { fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => | Pdu::RoomV1Pdu(ev) => Box::new(ev.prev_events.iter().map(|(id, _)| id)),
Box::new(ev.prev_events.iter().map(|(id, _)| id.as_ref())), | Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter()),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter().map(AsRef::as_ref)),
#[cfg(not(feature = "unstable-exhaustive-types"))] #[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
} }
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> { fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => | Pdu::RoomV1Pdu(ev) => Box::new(ev.auth_events.iter().map(|(id, _)| id)),
Box::new(ev.auth_events.iter().map(|(id, _)| id.as_ref())), | Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter()),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter().map(AsRef::as_ref)),
#[cfg(not(feature = "unstable-exhaustive-types"))] #[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
} }
fn redacts(&self) -> Option<&EventId> { fn redacts(&self) -> Option<&Self::Id> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_deref(), | Pdu::RoomV1Pdu(ev) => ev.redacts.as_ref(),
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_deref(), | Pdu::RoomV3Pdu(ev) => ev.redacts.as_ref(),
#[cfg(not(feature = "unstable-exhaustive-types"))] #[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
+8 -31
View File
@@ -13,7 +13,6 @@ use ruma::{
power_levels::RoomPowerLevelsEventContent, power_levels::RoomPowerLevelsEventContent,
third_party_invite::RoomThirdPartyInviteEventContent, third_party_invite::RoomThirdPartyInviteEventContent,
}, },
EventId,
int, int,
serde::{Base64, Raw}, serde::{Base64, Raw},
}; };
@@ -22,6 +21,7 @@ use serde::{
de::{Error as _, IgnoredAny}, de::{Error as _, IgnoredAny},
}; };
use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue}; use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue};
use super::{ use super::{
Error, Event, Result, StateEventType, StateKey, TimelineEventType, Error, Event, Result, StateEventType, StateKey, TimelineEventType,
power_levels::{ power_levels::{
@@ -133,7 +133,7 @@ pub fn auth_types_for_event(
level = "debug", level = "debug",
skip_all, skip_all,
fields( fields(
event_id = incoming_event.event_id().as_str(), event_id = incoming_event.event_id().borrow().as_str()
) )
)] )]
pub async fn auth_check<F, Fut, Fetched, Incoming>( pub async fn auth_check<F, Fut, Fetched, Incoming>(
@@ -217,9 +217,8 @@ where
} }
/* /*
// TODO: In the past this code was commented as it caused problems with Synapse. This is no // TODO: In the past this code caused problems federating with synapse, maybe this has been
// longer the case. This needs to be implemented. // resolved already. Needs testing.
// See also: https://github.com/ruma/ruma/pull/2064
// //
// 2. Reject if auth_events // 2. Reject if auth_events
// a. auth_events cannot have duplicate keys since it's a BTree // a. auth_events cannot have duplicate keys since it's a BTree
@@ -251,38 +250,16 @@ where
let room_create_event = match room_create_event { let room_create_event = match room_create_event {
| None => { | None => {
error!( warn!("no m.room.create event in auth chain");
create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
"no m.room.create event found for {} ({})!",
incoming_event.event_id().as_str(),
incoming_event.room_id().as_str()
);
return Ok(false); return Ok(false);
}, },
| Some(e) => e, | Some(e) => e,
}; };
// just re-check 1.2 to work around a bug
let Some(room_id_server_name) = incoming_event.room_id().server_name() else {
warn!("room ID has no servername");
return Ok(false);
};
if room_id_server_name != room_create_event.sender().server_name() {
warn!(
"servername of room ID origin ({}) does not match servername of m.room.create \
sender ({})",
room_id_server_name,
room_create_event.sender().server_name()
);
return Ok(false);
}
// 3. If event does not have m.room.create in auth_events reject // 3. If event does not have m.room.create in auth_events reject
if !incoming_event if !incoming_event
.auth_events() .auth_events()
.any(|id| id == room_create_event.event_id()) .any(|id| id.borrow() == room_create_event.event_id().borrow())
{ {
warn!("no m.room.create event in auth events"); warn!("no m.room.create event in auth events");
return Ok(false); return Ok(false);
@@ -1044,11 +1021,11 @@ fn check_redaction(
// If the domain of the event_id of the event being redacted is the same as the // If the domain of the event_id of the event being redacted is the same as the
// domain of the event_id of the m.room.redaction, allow // domain of the event_id of the m.room.redaction, allow
if redaction_event.event_id().server_name() if redaction_event.event_id().borrow().server_name()
== redaction_event == redaction_event
.redacts() .redacts()
.as_ref() .as_ref()
.and_then(|&id| id.server_name()) .and_then(|&id| id.borrow().server_name())
{ {
debug!("redaction event allowed via room version 1 rules"); debug!("redaction event allowed via room version 1 rules");
return Ok(true); return Ok(true);
+122 -97
View File
@@ -20,7 +20,7 @@ use std::{
use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future}; use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
use ruma::{ use ruma::{
EventId, Int, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomVersionId, EventId, Int, MilliSecondsSinceUnixEpoch, RoomVersionId,
events::{ events::{
StateEventType, TimelineEventType, StateEventType, TimelineEventType,
room::member::{MembershipState, RoomMemberEventContent}, room::member::{MembershipState, RoomMemberEventContent},
@@ -39,7 +39,9 @@ use crate::{
debug, debug_error, debug, debug_error,
matrix::{event::Event, pdu::StateKey}, matrix::{event::Event, pdu::StateKey},
trace, trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, WidebandExt}, utils::stream::{
BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryReadyExt, WidebandExt,
},
warn, warn,
}; };
@@ -67,6 +69,9 @@ type Result<T, E = Error> = crate::Result<T, E>;
/// * `event_fetch` - Any event not found in the `event_map` will defer to this /// * `event_fetch` - Any event not found in the `event_map` will defer to this
/// closure to find the event. /// closure to find the event.
/// ///
/// * `parallel_fetches` - The number of asynchronous fetch requests in-flight
/// for any given operation.
///
/// ## Invariants /// ## Invariants
/// ///
/// The caller of `resolve` must ensure that all the events are from the same /// The caller of `resolve` must ensure that all the events are from the same
@@ -77,19 +82,21 @@ type Result<T, E = Error> = crate::Result<T, E>;
pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>( pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>(
room_version: &RoomVersionId, room_version: &RoomVersionId,
state_sets: Sets, state_sets: Sets,
auth_chain_sets: &'a [HashSet<OwnedEventId, Hasher>], auth_chain_sets: &'a [HashSet<E::Id, Hasher>],
event_fetch: &Fetch, event_fetch: &Fetch,
event_exists: &Exists, event_exists: &Exists,
) -> Result<StateMap<OwnedEventId>> parallel_fetches: usize,
) -> Result<StateMap<E::Id>>
where where
Fetch: Fn(OwnedEventId) -> FetchFut + Sync, Fetch: Fn(E::Id) -> FetchFut + Sync,
FetchFut: Future<Output = Option<E>> + Send, FetchFut: Future<Output = Option<E>> + Send,
Exists: Fn(OwnedEventId) -> ExistsFut + Sync, Exists: Fn(E::Id) -> ExistsFut + Sync,
ExistsFut: Future<Output = bool> + Send, ExistsFut: Future<Output = bool> + Send,
Sets: IntoIterator<IntoIter = SetIter> + Send, Sets: IntoIterator<IntoIter = SetIter> + Send,
SetIter: Iterator<Item = &'a StateMap<OwnedEventId>> + Clone + Send, SetIter: Iterator<Item = &'a StateMap<E::Id>> + Clone + Send,
Hasher: BuildHasher + Send + Sync, Hasher: BuildHasher + Send + Sync,
E: Event + Clone + Send + Sync, E: Event + Clone + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
for<'b> &'b E: Send, for<'b> &'b E: Send,
{ {
debug!("State resolution starting"); debug!("State resolution starting");
@@ -140,8 +147,13 @@ where
// Sort the control events based on power_level/clock/event_id and // Sort the control events based on power_level/clock/event_id and
// outgoing/incoming edges // outgoing/incoming edges
let sorted_control_levels = let sorted_control_levels = reverse_topological_power_sort(
reverse_topological_power_sort(control_events, &all_conflicted, &event_fetch).await?; control_events,
&all_conflicted,
&event_fetch,
parallel_fetches,
)
.await?;
debug!(count = sorted_control_levels.len(), "power events"); debug!(count = sorted_control_levels.len(), "power events");
trace!(list = ?sorted_control_levels, "sorted power events"); trace!(list = ?sorted_control_levels, "sorted power events");
@@ -150,7 +162,7 @@ where
// Sequentially auth check each control event. // Sequentially auth check each control event.
let resolved_control = iterative_auth_check( let resolved_control = iterative_auth_check(
&room_version, &room_version,
sorted_control_levels.iter().stream().map(AsRef::as_ref), sorted_control_levels.iter().stream(),
clean.clone(), clean.clone(),
&event_fetch, &event_fetch,
) )
@@ -167,7 +179,7 @@ where
// that failed auth // that failed auth
let events_to_resolve: Vec<_> = all_conflicted let events_to_resolve: Vec<_> = all_conflicted
.iter() .iter()
.filter(|&id| !deduped_power_ev.contains(id)) .filter(|&id| !deduped_power_ev.contains(id.borrow()))
.cloned() .cloned()
.collect(); .collect();
@@ -187,7 +199,7 @@ where
let mut resolved_state = iterative_auth_check( let mut resolved_state = iterative_auth_check(
&room_version, &room_version,
sorted_left_events.iter().stream().map(AsRef::as_ref), sorted_left_events.iter().stream(),
resolved_control, // The control events are added to the final resolved state resolved_control, // The control events are added to the final resolved state
&event_fetch, &event_fetch,
) )
@@ -280,14 +292,16 @@ where
/// earlier (further back in time) origin server timestamp. /// earlier (further back in time) origin server timestamp.
#[tracing::instrument(level = "debug", skip_all)] #[tracing::instrument(level = "debug", skip_all)]
async fn reverse_topological_power_sort<E, F, Fut>( async fn reverse_topological_power_sort<E, F, Fut>(
events_to_sort: Vec<OwnedEventId>, events_to_sort: Vec<E::Id>,
auth_diff: &HashSet<OwnedEventId>, auth_diff: &HashSet<E::Id>,
fetch_event: &F, fetch_event: &F,
) -> Result<Vec<OwnedEventId>> parallel_fetches: usize,
) -> Result<Vec<E::Id>>
where where
F: Fn(OwnedEventId) -> Fut + Sync, F: Fn(E::Id) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync, E: Event + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
{ {
debug!("reverse topological sort of power events"); debug!("reverse topological sort of power events");
@@ -297,36 +311,35 @@ where
} }
// This is used in the `key_fn` passed to the lexico_topo_sort fn // This is used in the `key_fn` passed to the lexico_topo_sort fn
let event_to_pl: HashMap<_, _> = graph let event_to_pl = graph
.keys() .keys()
.cloned()
.stream() .stream()
.broad_filter_map(async |event_id| { .map(|event_id| {
let pl = get_power_level_for_sender(&event_id, fetch_event) get_power_level_for_sender(event_id.clone(), fetch_event)
.await .map(move |res| res.map(|pl| (event_id, pl)))
.ok()?;
Some((event_id, pl))
}) })
.inspect(|(event_id, pl)| { .buffer_unordered(parallel_fetches)
.ready_try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| {
debug!( debug!(
event_id = event_id.as_str(), event_id = event_id.borrow().as_str(),
power_level = i64::from(*pl), power_level = i64::from(pl),
"found the power level of an event's sender", "found the power level of an event's sender",
); );
event_to_pl.insert(event_id.clone(), pl);
Ok(event_to_pl)
}) })
.collect()
.boxed() .boxed()
.await; .await?;
let fetcher = async |event_id: OwnedEventId| { let event_to_pl = &event_to_pl;
let fetcher = |event_id: E::Id| async move {
let pl = *event_to_pl let pl = *event_to_pl
.get(&event_id) .get(event_id.borrow())
.ok_or_else(|| Error::NotFound(String::new()))?; .ok_or_else(|| Error::NotFound(String::new()))?;
let ev = fetch_event(event_id) let ev = fetch_event(event_id)
.await .await
.ok_or_else(|| Error::NotFound(String::new()))?; .ok_or_else(|| Error::NotFound(String::new()))?;
Ok((pl, ev.origin_server_ts())) Ok((pl, ev.origin_server_ts()))
}; };
@@ -463,17 +476,18 @@ where
/// the eventId at the eventId's generation (we walk backwards to `EventId`s /// the eventId at the eventId's generation (we walk backwards to `EventId`s
/// most recent previous power level event). /// most recent previous power level event).
async fn get_power_level_for_sender<E, F, Fut>( async fn get_power_level_for_sender<E, F, Fut>(
event_id: &EventId, event_id: E::Id,
fetch_event: &F, fetch_event: &F,
) -> serde_json::Result<Int> ) -> serde_json::Result<Int>
where where
F: Fn(OwnedEventId) -> Fut + Sync, F: Fn(E::Id) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send, E: Event + Send,
E::Id: Borrow<EventId> + Send,
{ {
debug!("fetch event ({event_id}) senders power level"); debug!("fetch event ({event_id}) senders power level");
let event = fetch_event(event_id.to_owned()).await; let event = fetch_event(event_id).await;
let auth_events = event.as_ref().map(Event::auth_events); let auth_events = event.as_ref().map(Event::auth_events);
@@ -481,7 +495,7 @@ where
.into_iter() .into_iter()
.flatten() .flatten()
.stream() .stream()
.broadn_filter_map(5, |aid| fetch_event(aid.to_owned())) .broadn_filter_map(5, |aid| fetch_event(aid.clone()))
.ready_find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, "")) .ready_find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, ""))
.await; .await;
@@ -514,13 +528,14 @@ where
async fn iterative_auth_check<'a, E, F, Fut, S>( async fn iterative_auth_check<'a, E, F, Fut, S>(
room_version: &RoomVersion, room_version: &RoomVersion,
events_to_check: S, events_to_check: S,
unconflicted_state: StateMap<OwnedEventId>, unconflicted_state: StateMap<E::Id>,
fetch_event: &F, fetch_event: &F,
) -> Result<StateMap<OwnedEventId>> ) -> Result<StateMap<E::Id>>
where where
F: Fn(OwnedEventId) -> Fut + Sync, F: Fn(E::Id) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
S: Stream<Item = &'a EventId> + Send + 'a, E::Id: Borrow<EventId> + Clone + Eq + Ord + Send + Sync + 'a,
S: Stream<Item = &'a E::Id> + Send + 'a,
E: Event + Clone + Send + Sync, E: Event + Clone + Send + Sync,
{ {
debug!("starting iterative auth check"); debug!("starting iterative auth check");
@@ -528,7 +543,7 @@ where
let events_to_check: Vec<_> = events_to_check let events_to_check: Vec<_> = events_to_check
.map(Result::Ok) .map(Result::Ok)
.broad_and_then(async |event_id| { .broad_and_then(async |event_id| {
fetch_event(event_id.to_owned()) fetch_event(event_id.clone())
.await .await
.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}"))) .ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
}) })
@@ -536,16 +551,16 @@ where
.boxed() .boxed()
.await?; .await?;
let auth_event_ids: HashSet<OwnedEventId> = events_to_check let auth_event_ids: HashSet<E::Id> = events_to_check
.iter() .iter()
.flat_map(|event: &E| event.auth_events().map(ToOwned::to_owned)) .flat_map(|event: &E| event.auth_events().map(Clone::clone))
.collect(); .collect();
let auth_events: HashMap<OwnedEventId, E> = auth_event_ids let auth_events: HashMap<E::Id, E> = auth_event_ids
.into_iter() .into_iter()
.stream() .stream()
.broad_filter_map(fetch_event) .broad_filter_map(fetch_event)
.map(|auth_event| (auth_event.event_id().to_owned(), auth_event)) .map(|auth_event| (auth_event.event_id().clone(), auth_event))
.collect() .collect()
.boxed() .boxed()
.await; .await;
@@ -566,7 +581,7 @@ where
let mut auth_state = StateMap::new(); let mut auth_state = StateMap::new();
for aid in event.auth_events() { for aid in event.auth_events() {
if let Some(ev) = auth_events.get(aid) { if let Some(ev) = auth_events.get(aid.borrow()) {
//TODO: synapse checks "rejected_reason" which is most likely related to //TODO: synapse checks "rejected_reason" which is most likely related to
// soft-failing // soft-failing
auth_state.insert( auth_state.insert(
@@ -577,7 +592,7 @@ where
ev.clone(), ev.clone(),
); );
} else { } else {
warn!(event_id = aid.as_str(), "missing auth event"); warn!(event_id = aid.borrow().as_str(), "missing auth event");
} }
} }
@@ -586,7 +601,7 @@ where
.stream() .stream()
.ready_filter_map(|key| Some((key, resolved_state.get(key)?))) .ready_filter_map(|key| Some((key, resolved_state.get(key)?)))
.filter_map(|(key, ev_id)| async move { .filter_map(|(key, ev_id)| async move {
if let Some(event) = auth_events.get(ev_id) { if let Some(event) = auth_events.get(ev_id.borrow()) {
Some((key, event.clone())) Some((key, event.clone()))
} else { } else {
Some((key, fetch_event(ev_id.clone()).await?)) Some((key, fetch_event(ev_id.clone()).await?))
@@ -609,7 +624,7 @@ where
let fetch_state = |ty: &StateEventType, key: &str| { let fetch_state = |ty: &StateEventType, key: &str| {
future::ready(auth_state.get(&ty.with_state_key(key))) future::ready(auth_state.get(&ty.with_state_key(key)))
}; };
debug!("running auth check on {:?}", event.event_id());
let auth_result = let auth_result =
auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await; auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await;
@@ -618,7 +633,7 @@ where
// add event to resolved state map // add event to resolved state map
resolved_state.insert( resolved_state.insert(
event.event_type().with_state_key(state_key), event.event_type().with_state_key(state_key),
event.event_id().to_owned(), event.event_id().clone(),
); );
}, },
| Ok(false) => { | Ok(false) => {
@@ -645,14 +660,15 @@ where
/// level as a parent) will be marked as depth 1. depth 1 is "older" than depth /// level as a parent) will be marked as depth 1. depth 1 is "older" than depth
/// 0. /// 0.
async fn mainline_sort<E, F, Fut>( async fn mainline_sort<E, F, Fut>(
to_sort: &[OwnedEventId], to_sort: &[E::Id],
resolved_power_level: Option<OwnedEventId>, resolved_power_level: Option<E::Id>,
fetch_event: &F, fetch_event: &F,
) -> Result<Vec<OwnedEventId>> ) -> Result<Vec<E::Id>>
where where
F: Fn(OwnedEventId) -> Fut + Sync, F: Fn(E::Id) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Clone + Send + Sync, E: Event + Clone + Send + Sync,
E::Id: Borrow<EventId> + Clone + Send + Sync,
{ {
debug!("mainline sort of events"); debug!("mainline sort of events");
@@ -672,7 +688,7 @@ where
pl = None; pl = None;
for aid in event.auth_events() { for aid in event.auth_events() {
let ev = fetch_event(aid.to_owned()) let ev = fetch_event(aid.clone())
.await .await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?; .ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
@@ -718,29 +734,26 @@ where
/// that has an associated mainline depth. /// that has an associated mainline depth.
async fn get_mainline_depth<E, F, Fut>( async fn get_mainline_depth<E, F, Fut>(
mut event: Option<E>, mut event: Option<E>,
mainline_map: &HashMap<OwnedEventId, usize>, mainline_map: &HashMap<E::Id, usize>,
fetch_event: &F, fetch_event: &F,
) -> Result<usize> ) -> Result<usize>
where where
F: Fn(OwnedEventId) -> Fut + Sync, F: Fn(E::Id) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync, E: Event + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
{ {
let mut room_id = None;
while let Some(sort_ev) = event { while let Some(sort_ev) = event {
trace!(event_id = sort_ev.event_id().as_str(), "mainline"); debug!(event_id = sort_ev.event_id().borrow().as_str(), "mainline");
if room_id.is_none() {
room_id = Some(sort_ev.room_id().to_owned());
}
let id = sort_ev.event_id(); let id = sort_ev.event_id();
if let Some(depth) = mainline_map.get(id) { if let Some(depth) = mainline_map.get(id.borrow()) {
return Ok(*depth); return Ok(*depth);
} }
event = None; event = None;
for aid in sort_ev.auth_events() { for aid in sort_ev.auth_events() {
let aev = fetch_event(aid.to_owned()) let aev = fetch_event(aid.clone())
.await .await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?; .ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
@@ -750,19 +763,20 @@ where
} }
} }
} }
warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth"); // Did not find a power level event so we default to zero
Ok(0) Ok(0)
} }
async fn add_event_and_auth_chain_to_graph<E, F, Fut>( async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
graph: &mut HashMap<OwnedEventId, HashSet<OwnedEventId>>, graph: &mut HashMap<E::Id, HashSet<E::Id>>,
event_id: OwnedEventId, event_id: E::Id,
auth_diff: &HashSet<OwnedEventId>, auth_diff: &HashSet<E::Id>,
fetch_event: &F, fetch_event: &F,
) where ) where
F: Fn(OwnedEventId) -> Fut + Sync, F: Fn(E::Id) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync, E: Event + Send + Sync,
E::Id: Borrow<EventId> + Clone + Send + Sync,
{ {
let mut state = vec![event_id]; let mut state = vec![event_id];
while let Some(eid) = state.pop() { while let Some(eid) = state.pop() {
@@ -772,27 +786,26 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
// Prefer the store to event as the store filters dedups the events // Prefer the store to event as the store filters dedups the events
for aid in auth_events { for aid in auth_events {
if auth_diff.contains(aid) { if auth_diff.contains(aid.borrow()) {
if !graph.contains_key(aid) { if !graph.contains_key(aid.borrow()) {
state.push(aid.to_owned()); state.push(aid.to_owned());
} }
graph // We just inserted this at the start of the while loop
.get_mut(&eid) graph.get_mut(eid.borrow()).unwrap().insert(aid.to_owned());
.expect("We just inserted this at the start of the while loop")
.insert(aid.to_owned());
} }
} }
} }
} }
async fn is_power_event_id<E, F, Fut>(event_id: &EventId, fetch: &F) -> bool async fn is_power_event_id<E, F, Fut>(event_id: &E::Id, fetch: &F) -> bool
where where
F: Fn(OwnedEventId) -> Fut + Sync, F: Fn(E::Id) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send, E: Event + Send,
E::Id: Borrow<EventId> + Send + Sync,
{ {
match fetch(event_id.to_owned()).await.as_ref() { match fetch(event_id.clone()).await.as_ref() {
| Some(state) => is_power_event(state), | Some(state) => is_power_event(state),
| _ => false, | _ => false,
} }
@@ -896,13 +909,13 @@ mod tests {
let fetcher = |id| ready(events.get(&id).cloned()); let fetcher = |id| ready(events.get(&id).cloned());
let sorted_power_events = let sorted_power_events =
super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher) super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher, 1)
.await .await
.unwrap(); .unwrap();
let resolved_power = super::iterative_auth_check( let resolved_power = super::iterative_auth_check(
&RoomVersion::V6, &RoomVersion::V6,
sorted_power_events.iter().map(AsRef::as_ref).stream(), sorted_power_events.iter().stream(),
HashMap::new(), // unconflicted events HashMap::new(), // unconflicted events
&fetcher, &fetcher,
) )
@@ -1287,7 +1300,7 @@ mod tests {
let ev_map = store.0.clone(); let ev_map = store.0.clone();
let fetcher = |id| ready(ev_map.get(&id).cloned()); let fetcher = |id| ready(ev_map.get(&id).cloned());
let exists = |id: OwnedEventId| ready(ev_map.get(&*id).is_some()); let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&*id).is_some());
let state_sets = [state_at_bob, state_at_charlie]; let state_sets = [state_at_bob, state_at_charlie];
let auth_chain: Vec<_> = state_sets let auth_chain: Vec<_> = state_sets
@@ -1299,13 +1312,19 @@ mod tests {
}) })
.collect(); .collect();
let resolved = let resolved = match super::resolve(
match super::resolve(&RoomVersionId::V2, &state_sets, &auth_chain, &fetcher, &exists) &RoomVersionId::V2,
.await &state_sets,
{ &auth_chain,
| Ok(state) => state, &fetcher,
| Err(e) => panic!("{e}"), &exists,
}; 1,
)
.await
{
| Ok(state) => state,
| Err(e) => panic!("{e}"),
};
assert_eq!(expected, resolved); assert_eq!(expected, resolved);
} }
@@ -1410,15 +1429,21 @@ mod tests {
}) })
.collect(); .collect();
let fetcher = |id: OwnedEventId| ready(ev_map.get(&id).cloned()); let fetcher = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).cloned());
let exists = |id: OwnedEventId| ready(ev_map.get(&id).is_some()); let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).is_some());
let resolved = let resolved = match super::resolve(
match super::resolve(&RoomVersionId::V6, &state_sets, &auth_chain, &fetcher, &exists) &RoomVersionId::V6,
.await &state_sets,
{ &auth_chain,
| Ok(state) => state, &fetcher,
| Err(e) => panic!("{e}"), &exists,
}; 1,
)
.await
{
| Ok(state) => state,
| Err(e) => panic!("{e}"),
};
debug!( debug!(
resolved = ?resolved resolved = ?resolved
+26 -20
View File
@@ -133,11 +133,17 @@ pub(crate) async fn do_check(
.collect(); .collect();
let event_map = &event_map; let event_map = &event_map;
let fetch = |id: OwnedEventId| ready(event_map.get(&id).cloned()); let fetch = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).cloned());
let exists = |id: OwnedEventId| ready(event_map.get(&id).is_some()); let exists = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).is_some());
let resolved = let resolved = super::resolve(
super::resolve(&RoomVersionId::V6, state_sets, &auth_chain_sets, &fetch, &exists) &RoomVersionId::V6,
.await; state_sets,
&auth_chain_sets,
&fetch,
&exists,
1,
)
.await;
match resolved { match resolved {
| Ok(state) => state, | Ok(state) => state,
@@ -241,8 +247,8 @@ impl<E: Event + Clone> TestStore<E> {
pub(crate) fn auth_event_ids( pub(crate) fn auth_event_ids(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
event_ids: Vec<OwnedEventId>, event_ids: Vec<E::Id>,
) -> Result<HashSet<OwnedEventId>> { ) -> Result<HashSet<E::Id>> {
let mut result = HashSet::new(); let mut result = HashSet::new();
let mut stack = event_ids; let mut stack = event_ids;
@@ -578,7 +584,7 @@ pub(crate) fn INITIAL_EDGES() -> Vec<OwnedEventId> {
pub(crate) mod event { pub(crate) mod event {
use ruma::{ use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
events::{TimelineEventType, pdu::Pdu}, events::{TimelineEventType, pdu::Pdu},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -587,7 +593,9 @@ pub(crate) mod event {
use crate::Event; use crate::Event;
impl Event for PduEvent { impl Event for PduEvent {
fn event_id(&self) -> &EventId { &self.event_id } type Id = OwnedEventId;
fn event_id(&self) -> &Self::Id { &self.event_id }
fn room_id(&self) -> &RoomId { fn room_id(&self) -> &RoomId {
match &self.rest { match &self.rest {
@@ -644,31 +652,29 @@ pub(crate) mod event {
} }
#[allow(refining_impl_trait)] #[allow(refining_impl_trait)]
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> { fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => | Pdu::RoomV1Pdu(ev) => Box::new(ev.prev_events.iter().map(|(id, _)| id)),
Box::new(ev.prev_events.iter().map(|(id, _)| id.as_ref())), | Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter()),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter().map(AsRef::as_ref)),
#[allow(unreachable_patterns)] #[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
} }
#[allow(refining_impl_trait)] #[allow(refining_impl_trait)]
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> { fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => | Pdu::RoomV1Pdu(ev) => Box::new(ev.auth_events.iter().map(|(id, _)| id)),
Box::new(ev.auth_events.iter().map(|(id, _)| id.as_ref())), | Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter()),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter().map(AsRef::as_ref)),
#[allow(unreachable_patterns)] #[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
} }
fn redacts(&self) -> Option<&EventId> { fn redacts(&self) -> Option<&Self::Id> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_deref(), | Pdu::RoomV1Pdu(ev) => ev.redacts.as_ref(),
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_deref(), | Pdu::RoomV3Pdu(ev) => ev.redacts.as_ref(),
#[allow(unreachable_patterns)] #[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
+7 -12
View File
@@ -98,7 +98,12 @@ pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
Level::INFO Level::INFO
}; };
wait_shutdown(server, runtime); debug!(
timeout = ?SHUTDOWN_TIMEOUT,
"Waiting for runtime..."
);
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
let runtime_metrics = server.server.metrics.runtime_interval().unwrap_or_default(); let runtime_metrics = server.server.metrics.runtime_interval().unwrap_or_default();
event!(LEVEL, ?runtime_metrics, "Final runtime metrics"); event!(LEVEL, ?runtime_metrics, "Final runtime metrics");
@@ -106,23 +111,13 @@ pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
#[cfg(not(tokio_unstable))] #[cfg(not(tokio_unstable))]
#[tracing::instrument(name = "stop", level = "info", skip_all)] #[tracing::instrument(name = "stop", level = "info", skip_all)]
pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) { pub(super) fn shutdown(_server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
wait_shutdown(server, runtime);
}
fn wait_shutdown(_server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
debug!( debug!(
timeout = ?SHUTDOWN_TIMEOUT, timeout = ?SHUTDOWN_TIMEOUT,
"Waiting for runtime..." "Waiting for runtime..."
); );
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
// Join any jemalloc threads so they don't appear in use at exit.
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
conduwuit_core::alloc::je::background_thread_enable(false)
.log_debug_err()
.ok();
} }
#[tracing::instrument( #[tracing::instrument(
@@ -76,7 +76,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
// 5. Reject "due to auth events" if can't get all the auth events or some of // 5. Reject "due to auth events" if can't get all the auth events or some of
// the auth events are also rejected "due to auth events" // the auth events are also rejected "due to auth events"
// NOTE: Step 5 is not applied anymore because it failed too often // NOTE: Step 5 is not applied anymore because it failed too often
debug!("Fetching auth events for {}", incoming_pdu.event_id); debug!("Fetching auth events");
Box::pin(self.fetch_and_handle_outliers( Box::pin(self.fetch_and_handle_outliers(
origin, origin,
&incoming_pdu.auth_events, &incoming_pdu.auth_events,
@@ -88,12 +88,12 @@ pub(super) async fn handle_outlier_pdu<'a>(
// 6. Reject "due to auth events" if the event doesn't pass auth based on the // 6. Reject "due to auth events" if the event doesn't pass auth based on the
// auth events // auth events
debug!("Checking {} based on auth events", incoming_pdu.event_id); debug!("Checking based on auth events");
// Build map of auth events // Build map of auth events
let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len()); let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len());
for id in &incoming_pdu.auth_events { for id in &incoming_pdu.auth_events {
let Ok(auth_event) = self.services.timeline.get_pdu(id).await else { let Ok(auth_event) = self.services.timeline.get_pdu(id).await else {
warn!("Could not find auth event {id} for {}", incoming_pdu.event_id); warn!("Could not find auth event {id}");
continue; continue;
}; };
@@ -119,7 +119,10 @@ pub(super) async fn handle_outlier_pdu<'a>(
} }
// The original create event must be in the auth events // The original create event must be in the auth events
if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { if !matches!(
auth_events.get(&(StateEventType::RoomCreate, String::new().into())),
Some(_) | None
) {
return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
} }
@@ -128,7 +131,6 @@ pub(super) async fn handle_outlier_pdu<'a>(
ready(auth_events.get(&key)) ready(auth_events.get(&key))
}; };
debug!("running auth check to handle outlier pdu {:?}", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check( let auth_check = state_res::event_auth::auth_check(
&to_room_version(&room_version_id), &to_room_version(&room_version_id),
&incoming_pdu, &incoming_pdu,
@@ -8,7 +8,7 @@ use conduwuit::{
Error, Result, err, implement, Error, Result, err, implement,
state_res::{self, StateMap}, state_res::{self, StateMap},
trace, trace,
utils::stream::{IterStream, ReadyExt, TryWidebandExt, WidebandExt}, utils::stream::{IterStream, ReadyExt, TryWidebandExt, WidebandExt, automatic_width},
}; };
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join}; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join};
use ruma::{OwnedEventId, RoomId, RoomVersionId}; use ruma::{OwnedEventId, RoomId, RoomVersionId};
@@ -112,7 +112,14 @@ where
{ {
let event_fetch = |event_id| self.event_fetch(event_id); let event_fetch = |event_id| self.event_fetch(event_id);
let event_exists = |event_id| self.event_exists(event_id); let event_exists = |event_id| self.event_exists(event_id);
state_res::resolve(room_version, state_sets, auth_chain_sets, &event_fetch, &event_exists) state_res::resolve(
.map_err(|e| err!(error!("State resolution failed: {e:?}"))) room_version,
.await state_sets,
auth_chain_sets,
&event_fetch,
&event_exists,
automatic_width(),
)
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
.await
} }
@@ -1,6 +1,12 @@
use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
use conduwuit::{Err, Result, debug, debug_info, err, implement, matrix::{EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, warn, info}; use conduwuit::{
Err, Result, debug, debug_info, err, implement,
matrix::{EventTypeExt, PduEvent, StateKey, state_res},
trace,
utils::stream::{BroadbandExt, ReadyExt},
warn,
};
use futures::{FutureExt, StreamExt, future::ready}; use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
@@ -38,7 +44,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
return Err!(Request(InvalidParam("Event has been soft failed"))); return Err!(Request(InvalidParam("Event has been soft failed")));
} }
debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id); debug!("Upgrading to timeline pdu");
let timer = Instant::now(); let timer = Instant::now();
let room_version_id = get_room_version_id(create_event)?; let room_version_id = get_room_version_id(create_event)?;
@@ -46,7 +52,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// backwards extremities doing all the checks in this list starting at 1. // backwards extremities doing all the checks in this list starting at 1.
// These are not timeline events. // These are not timeline events.
debug!("Resolving state at event {}", incoming_pdu.event_id); debug!("Resolving state at event");
let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 { let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 {
self.state_at_incoming_degree_one(&incoming_pdu).await? self.state_at_incoming_degree_one(&incoming_pdu).await?
} else { } else {
@@ -64,7 +70,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
state_at_incoming_event.expect("we always set this to some above"); state_at_incoming_event.expect("we always set this to some above");
let room_version = to_room_version(&room_version_id); let room_version = to_room_version(&room_version_id);
debug!("Performing auth check to upgrade {}", incoming_pdu.event_id); debug!("Performing auth check");
// 11. Check the auth of the event passes based on the state of the event // 11. Check the auth of the event passes based on the state of the event
let state_fetch_state = &state_at_incoming_event; let state_fetch_state = &state_at_incoming_event;
let state_fetch = |k: StateEventType, s: StateKey| async move { let state_fetch = |k: StateEventType, s: StateKey| async move {
@@ -74,7 +80,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
self.services.timeline.get_pdu(event_id).await.ok() self.services.timeline.get_pdu(event_id).await.ok()
}; };
debug!("running auth check on {}", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check( let auth_check = state_res::event_auth::auth_check(
&room_version, &room_version,
&incoming_pdu, &incoming_pdu,
@@ -88,7 +93,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
return Err!(Request(Forbidden("Event has failed auth check with state at the event."))); return Err!(Request(Forbidden("Event has failed auth check with state at the event.")));
} }
debug!("Gathering auth events for {}", incoming_pdu.event_id); debug!("Gathering auth events");
let auth_events = self let auth_events = self
.services .services
.state .state
@@ -106,7 +111,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
ready(auth_events.get(&key).cloned()) ready(auth_events.get(&key).cloned())
}; };
debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check( let auth_check = state_res::event_auth::auth_check(
&room_version, &room_version,
&incoming_pdu, &incoming_pdu,
@@ -117,7 +121,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
// Soft fail check before doing state res // Soft fail check before doing state res
debug!("Performing soft-fail check on {}", incoming_pdu.event_id); debug!("Performing soft-fail check");
let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) {
| (false, _) => true, | (false, _) => true,
| (true, None) => false, | (true, None) => false,
@@ -214,8 +218,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// 14. Check if the event passes auth based on the "current state" of the room, // 14. Check if the event passes auth based on the "current state" of the room,
// if not soft fail it // if not soft fail it
if soft_fail { if soft_fail {
info!("Soft failing event {}", incoming_pdu.event_id); debug!("Soft failing event");
assert!(extremities.is_empty(), "soft_fail extremities empty");
let extremities = extremities.iter().map(Borrow::borrow); let extremities = extremities.iter().map(Borrow::borrow);
self.services self.services
+3 -19
View File
@@ -698,20 +698,6 @@ impl Service {
.await .await
.saturating_add(uint!(1)); .saturating_add(uint!(1));
if state_key.is_none() {
if prev_events.is_empty() {
warn!("Timeline event had zero prev_events, something broke.");
return Err!(Request(Unknown("Timeline event had zero prev_events.")));
}
if depth.le(&uint!(2)) {
warn!(
"Had unsafe depth of {depth} in {room_id} when creating non-state event. \
Bad!"
);
return Err!(Request(Unknown("Unsafe depth for non-state event.")));
}
};
let mut unsigned = unsigned.unwrap_or_default(); let mut unsigned = unsigned.unwrap_or_default();
if let Some(state_key) = &state_key { if let Some(state_key) = &state_key {
@@ -771,7 +757,6 @@ impl Service {
ready(auth_events.get(&key)) ready(auth_events.get(&key))
}; };
debug!("running auth check on new {} event by {} in {}", pdu.kind, pdu.sender, pdu.room_id);
let auth_check = state_res::auth_check( let auth_check = state_res::auth_check(
&room_version, &room_version,
&pdu, &pdu,
@@ -976,9 +961,8 @@ impl Service {
state_lock: &'a RoomMutexGuard, state_lock: &'a RoomMutexGuard,
) -> Result<Option<RawPduId>> ) -> Result<Option<RawPduId>>
where where
Leaves: Iterator<Item = &'a EventId> + Send + Clone + 'a, Leaves: Iterator<Item = &'a EventId> + Send + 'a,
{ {
assert!(new_room_leaves.clone().count() > 0, "extremities are empty");
// We append to state before appending the pdu, so we don't have a moment in // We append to state before appending the pdu, so we don't have a moment in
// time with the pdu without it's state. This is okay because append_pdu can't // time with the pdu without it's state. This is okay because append_pdu can't
// fail. // fail.
@@ -1158,7 +1142,7 @@ impl Service {
.boxed(); .boxed();
while let Some(ref backfill_server) = servers.next().await { while let Some(ref backfill_server) = servers.next().await {
info!("Asking {backfill_server} for backfill in {:?}", room_id.to_owned()); info!("Asking {backfill_server} for backfill");
let response = self let response = self
.services .services
.sending .sending
@@ -1186,7 +1170,7 @@ impl Service {
} }
} }
warn!("No servers could backfill, but backfill was needed in room {room_id}"); info!("No servers could backfill, but backfill was needed in room {room_id}");
Ok(()) Ok(())
} }
+64 -47
View File
@@ -1,68 +1,85 @@
:root { :root {
color-scheme: light; color-scheme: light;
--font-stack: sans-serif; --font-stack: sans-serif;
--background-color: #fff; --background-color: #fff;
--text-color: #000; --text-color: #000;
--bg: oklch(0.76 0.0854 317.27); --bg: oklch(0.76 0.0854 317.27);
--panel-bg: oklch(0.91 0.042 317.27); --panel-bg: oklch(0.91 0.042 317.27);
--name-lightness: 0.45; --name-lightness: 0.45;
@media (prefers-color-scheme: dark) { @media (prefers-color-scheme: dark) {
color-scheme: dark; color-scheme: dark;
--text-color: #fff; --text-color: #fff;
--bg: oklch(0.15 0.042 317.27); --bg: oklch(0.15 0.042 317.27);
--panel-bg: oklch(0.24 0.03 317.27); --panel-bg: oklch(0.24 0.03 317.27);
--name-lightness: 0.8; --name-lightness: 0.8;
} }
--c1: oklch(0.44 0.177 353.06); --c1: oklch(0.44 0.177 353.06);
--c2: oklch(0.59 0.158 150.88); --c2: oklch(0.59 0.158 150.88);
--normal-font-size: 1rem; --normal-font-size: 1rem;
--small-font-size: 0.8rem; --small-font-size: 0.8rem;
} }
body { body {
color: var(--text-color); color: var(--text-color);
font-family: var(--font-stack); font-family: var(--font-stack);
margin: 0; margin: 0;
padding: 0; padding: 0;
display: grid; display: grid;
place-items: center; place-items: center;
min-height: 100vh; min-height: 100vh;
} }
html { html {
background-color: var(--bg); background-color: var(--bg);
background-image: linear-gradient( background-image: linear-gradient(
70deg, 70deg,
oklch(from var(--bg) l + 0.2 c h), oklch(from var(--bg) l + 0.2 c h),
oklch(from var(--bg) l - 0.2 c h) oklch(from var(--bg) l - 0.2 c h)
); );
font-size: 16px; font-size: 16px;
} }
.panel { .panel {
width: min(clamp(24rem, 12rem + 40vw, 48rem), 100vw); width: min(clamp(24rem, 12rem + 40vw, 48rem), calc(100vw - 3rem));
border-radius: 15px; border-radius: 15px;
background-color: var(--panel-bg); background-color: var(--panel-bg);
padding-inline: 1.5rem; padding-inline: 1.5rem;
padding-block: 1rem; padding-block: 1rem;
box-shadow: 0 0.25em 0.375em hsla(0, 0%, 0%, 0.1); box-shadow: 0 0.25em 0.375em hsla(0, 0%, 0%, 0.1);
}
@media (max-width: 24rem) {
.panel {
padding-inline: 0.25rem;
width: calc(100vw - 0.5rem);
border-radius: 0;
margin-block-start: 0.2rem;
}
main {
height: 100%;
}
}
footer {
padding-inline: 0.25rem;
height: max(fit-content, 2rem);
} }
.project-name { .project-name {
text-decoration: none; text-decoration: none;
background: linear-gradient( background: linear-gradient(
130deg, 130deg,
oklch(from var(--c1) var(--name-lightness) c h), oklch(from var(--c1) var(--name-lightness) c h),
oklch(from var(--c2) var(--name-lightness) c h) oklch(from var(--c2) var(--name-lightness) c h)
); );
background-clip: text; background-clip: text;
color: transparent; color: transparent;
filter: brightness(1.2); filter: brightness(1.2);
} }
+1 -1
View File
@@ -6,7 +6,7 @@ use axum::{
response::{Html, IntoResponse, Response}, response::{Html, IntoResponse, Response},
routing::get, routing::get,
}; };
use conduwuit_build_metadata::{GIT_REMOTE_COMMIT_URL, GIT_REMOTE_WEB_URL, version_tag}; use conduwuit_build_metadata::{GIT_REMOTE_COMMIT_URL, GIT_REMOTE_WEB_URL, VERSION_EXTRA};
use conduwuit_service::state; use conduwuit_service::state;
pub fn build() -> Router<state::State> { pub fn build() -> Router<state::State> {
+1 -1
View File
@@ -18,7 +18,7 @@
{%~ block footer ~%} {%~ block footer ~%}
<footer> <footer>
<p>Powered by <a href="https://continuwuity.org">Continuwuity</a> <p>Powered by <a href="https://continuwuity.org">Continuwuity</a>
{%~ if let Some(version_info) = self::version_tag() ~%} {%~ if let Some(version_info) = VERSION_EXTRA ~%}
{%~ if let Some(url) = GIT_REMOTE_COMMIT_URL.or(GIT_REMOTE_WEB_URL) ~%} {%~ if let Some(url) = GIT_REMOTE_COMMIT_URL.or(GIT_REMOTE_WEB_URL) ~%}
(<a href="{{ url }}">{{ version_info }}</a>) (<a href="{{ url }}">{{ version_info }}</a>)
{%~ else ~%} {%~ else ~%}