Compare commits

..

3 Commits

Author SHA1 Message Date
Ginger 931b9735e9 chore: Formatting fixes 2026-02-09 21:28:00 -05:00
Ginger 8add53f8ab feat: Implement a migration to fix busted local invites 2026-02-09 21:01:29 -05:00
Ginger 50d75bfc8f fix: Properly set stripped state for local invites 2026-02-09 16:25:26 -05:00
13 changed files with 56 additions and 748 deletions
+1 -1
View File
@@ -1,9 +1,9 @@
# Local build and dev artifacts
target/
!target/debug/conduwuit
# Docker files
Dockerfile*
docker/
# IDE files
.vscode
+1 -1
View File
@@ -23,7 +23,7 @@ repos:
- id: check-added-large-files
- repo: https://github.com/crate-ci/typos
rev: v1.43.4
rev: v1.43.3
hooks:
- id: typos
- id: typos
-1
View File
@@ -1 +0,0 @@
Introduce a resolver command to allow flushing a server from the cache or to flush the complete cache. Contributed by @Omar007
+3 -3
View File
@@ -2,9 +2,9 @@ FROM ubuntu:latest
EXPOSE 8008
EXPOSE 8448
RUN apt-get update && apt-get install -y ca-certificates liburing2 && rm -rf /var/lib/apt/lists/*
RUN mkdir -p /etc/continuwuity /var/lib/continuwuity /usr/local/bin/
COPY complement/complement-entrypoint.sh /usr/local/bin/complement-entrypoint.sh
COPY complement/complement.config.toml /etc/continuwuity/config.toml
RUN mkdir -p /etc/continuwuity /var/lib/continuwuity
COPY docker/complement-entrypoint.sh /usr/local/bin/complement-entrypoint.sh
COPY docker/complement.config.toml /etc/continuwuity/config.toml
COPY target/debug/conduwuit /usr/local/bin/conduwuit
RUN chmod +x /usr/local/bin/conduwuit /usr/local/bin/complement-entrypoint.sh
#HEALTHCHECK --interval=30s --timeout=5s CMD curl --fail http://localhost:8008/_continuwuity/server_version || exit 1
+1 -1
View File
@@ -269,7 +269,7 @@ curl https://your.server.name:8448/_matrix/federation/v1/version
```
- To check if your server can communicate with other homeservers, use the
[Matrix Federation Tester](https://federationtester.mtrnord.blog/). If you can
[Matrix Federation Tester](https://federationtester.matrix.org/). If you can
register but cannot join federated rooms, check your configuration and verify
that port 8448 is open and forwarded correctly.
@@ -6,10 +6,10 @@
"message": "Welcome to Continuwuity! Important announcements about the project will appear here."
},
{
"id": 9,
"id": 8,
"mention_room": false,
"date": "2026-02-09",
"message": "Yesterday we released [v0.5.4](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.4). Bugfixes, performance improvements and more moderation features! There's also a security fix, so please update as soon as possible. Don't forget to join [our announcements channel](https://matrix.to/#/!jIdNjSM5X-V5JVx2h2kAhUZIIQ08GyzPL55NFZAH1vM/%2489TY9CqRg4-ff1MGo3Ulc5r5X4pakfdzT-99RD8Docc?via=ellis.link&via=explodie.org&via=matrix.org) to get important information sooner <3 "
"date": "2026-01-12",
"message": "Hey everyone!\n\nJust letting you know we've released [v0.5.3](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.3) - this one is a bit of a hotfix for an issue with inviting and allowing others to join rooms.\n\nIf you appreceate the round-the-clock work we've been doing to keep your servers secure over this holiday period, we'd really appreciate your support - you can sponsor individuals on our team using the 'sponsor' button at the top of [our GitHub repository](https://github.com/continuwuity/continuwuity). If you can't do that, even a star helps - spreading the word and advocating for our project helps keep it going.\n\nHave a lovely rest of your year \\\n[Jade \\(she/her\\)](https://matrix.to/#/%40jade%3Aellis.link) \n🩵"
}
]
}
-13
View File
@@ -112,19 +112,6 @@ Query the destinations cache
Query the overrides cache
### `!admin query resolver flush-cache`
Flush a given server from the resolver caches or flush them completely
* Examples:
* Flush a specific server:
`!admin query resolver flush-cache matrix.example.com`
* Flush all resolver caches completely:
`!admin query resolver flush-cache --all`
## `!admin query pusher`
pusher service
-10
View File
@@ -20,16 +20,6 @@ log into the server account (`@conduit`) from a web client
## General potential issues
### Configuration not working as expected
Sometimes you can make a mistake in your configuration that
means things don't get passed to Continuwuity correctly.
This is particularly easy to do with environment variables.
To check what configuration Continuwuity actually sees, you can
use the `!admin server show-config` command in your admin room.
Beware that this prints out any secrets in your configuration,
so you might want to delete the result afterwards!
### Potential DNS issues when using Docker
Docker's DNS setup for containers in a non-default network intercepts queries to
+1 -25
View File
@@ -1,5 +1,5 @@
use clap::Subcommand;
use conduwuit::{Err, Result, utils::time};
use conduwuit::{Result, utils::time};
use futures::StreamExt;
use ruma::OwnedServerName;
@@ -7,7 +7,6 @@ use crate::{admin_command, admin_command_dispatch};
#[admin_command_dispatch]
#[derive(Debug, Subcommand)]
#[allow(clippy::enum_variant_names)]
/// Resolver service and caches
pub enum ResolverCommand {
/// Query the destinations cache
@@ -19,14 +18,6 @@ pub enum ResolverCommand {
OverridesCache {
name: Option<String>,
},
/// Flush a specific server from the resolver caches or everything
FlushCache {
name: Option<OwnedServerName>,
#[arg(short, long)]
all: bool,
},
}
#[admin_command]
@@ -78,18 +69,3 @@ async fn overrides_cache(&self, server_name: Option<String>) -> Result {
Ok(())
}
#[admin_command]
async fn flush_cache(&self, name: Option<OwnedServerName>, all: bool) -> Result {
if all {
self.services.resolver.cache.clear().await;
writeln!(self, "Resolver caches cleared!").await
} else if let Some(name) = name {
self.services.resolver.cache.del_destination(&name);
self.services.resolver.cache.del_override(&name);
self.write_str(&format!("Cleared {name} from resolver caches!"))
.await
} else {
Err!("Missing name. Supply a name or use --all to flush the whole cache.")
}
}
+2 -622
View File
@@ -1,29 +1,12 @@
use std::collections::BTreeMap;
use api::client::leave_room;
use clap::Subcommand;
use conduwuit::{
Err, Result, RoomVersion, debug, info,
Err, Result, debug, info,
utils::{IterStream, ReadyExt},
warn,
};
use futures::{FutureExt, StreamExt};
use ruma::{
Int, OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId,
events::{
StateEventType,
room::{
create::RoomCreateEventContent,
history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
join_rules::{JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
power_levels::RoomPowerLevelsEventContent,
tombstone::RoomTombstoneEventContent,
},
},
exports::serde::Deserialize,
};
use serde_json::json;
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId};
use crate::{admin_command, admin_command_dispatch, get_room_info};
@@ -60,59 +43,6 @@ pub enum RoomModerationCommand {
/// information
no_details: bool,
},
/// - Take over a room by puppeting a local user into giving you a higher
/// power level
Takeover {
/// Whether to force joining the room if no local users are in the room
#[arg(long)]
force: bool,
/// The room in the format of `!roomid:example.com` or a room alias in
/// the format of `#roomalias:example.com`
room: OwnedRoomOrAliasId,
},
/// - Shut down a room, as much is possible. **This is immediate and
/// irreversible**.
///
/// This command requires that you have a local user in the room with at
/// least a moderator power level. It will first attempt to raise power
/// levels so that nobody can use the room further, then remove the
/// canonical alias event, sets the history visibility to `joined`,
/// sets the join rules to `org.continuwuity.shutdown` (preventing anyone
/// from joining even with an invite), and then bans or kicks all users,
/// setting the MSC4293 "redact events" flag on those users if possible.
/// Finally, it will send a room tombstone event, which will effectively
/// make the room unusable on most clients even if the room state resets.
///
/// This effectively will make the room unusable, unjoinable, and removes
/// everyone from it. This is as close to a "shutdown" as you can get with
/// federation.
ShutdownRoom {
/// If no local users with a power level are joined to the room, setting
/// this flag will attempt one, and will join the user with the
/// highest power level to the room to perform the shutdown.
///
/// If this flag is not set, and no local users can perform the
/// shutdown, no further attempt will be made.
#[arg(long)]
force: bool,
/// Whether to use MSC4293 fields to indicate that all messages in the
/// room should be redacted. This will make it more difficult for
/// clients that implement MSC4293 (like Element) to render the room
/// in the event users manage to rejoin.
#[arg(long)]
redact: bool,
///
#[arg(long)]
yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process:
bool,
/// The room in the format of `!roomid:example.com` or a room alias in
/// the format of `#roomalias:example.com`
room: OwnedRoomOrAliasId,
},
}
#[admin_command]
@@ -538,553 +468,3 @@ async fn list_banned_rooms(&self, no_details: bool) -> Result {
self.write_str(&format!("Rooms Banned ({num}):\n```\n{body}\n```",))
.await
}
#[admin_command]
async fn takeover(&self, force: bool, room: OwnedRoomOrAliasId) -> Result {
let room_id = if room.is_room_id() {
let room_id = match RoomId::parse(&room) {
| Ok(room_id) => room_id,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
room_id.to_owned()
} else if room.is_room_alias_id() {
let room_alias = match RoomAliasId::parse(&room) {
| Ok(room_alias) => room_alias,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
| Ok((room_id, servers)) => {
debug!(
?room_id,
?servers,
"Got federation response fetching room ID for room {room}"
);
room_id
},
| Err(e) => {
return Err!("Failed to resolve room alias {room} to a room ID: {e}");
},
}
} else {
return Err!(
"Room specified is not a room ID or room alias. Please note that this requires a \
full room ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`)",
);
};
let room_version =
RoomVersion::new(&self.services.rooms.state.get_room_version(&room_id).await?)?;
let Ok(create_content) = self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomCreateEventContent>(
&room_id,
&StateEventType::RoomCreate,
"",
)
.await
else {
return Err!("Failed to get room create event");
};
let mut power_levels = match self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomPowerLevelsEventContent>(
&room_id,
&StateEventType::RoomPowerLevels,
"",
)
.await
{
| Ok(content) => content,
| Err(e) => {
return Err!("Failed to get power levels for room {room_id}: {e}");
},
};
let local_creators = if room_version.explicitly_privilege_room_creators
&& create_content.additional_creators.is_some()
{
create_content
.additional_creators
.clone()
.unwrap()
.into_iter()
.filter(|user_id| self.services.globals.user_is_local(user_id))
.collect::<Vec<_>>()
} else {
vec![]
};
let local_users = power_levels
.users
.iter()
.filter(|(user_id, _)| self.services.globals.user_is_local(user_id))
.map(|(user_id, level)| (user_id.clone(), *level))
.collect::<BTreeMap<_, _>>();
let min_pl = power_levels
.events
.get(&StateEventType::RoomPowerLevels.into())
.copied()
.unwrap_or(power_levels.state_default);
let mut ordered_users = local_users
.iter()
.chain(local_creators.iter().map(|user_id| (user_id, &Int::MAX)))
.map(|(user_id, level)| {
if local_creators.contains(user_id) {
(user_id, Int::MAX)
} else {
(user_id, *level)
}
})
.filter(|(user_id, level)| *level >= min_pl || local_creators.contains(*user_id))
.collect::<Vec<_>>();
ordered_users.sort_by_key(|(_, level)| level.saturating_mul(Int::from(-1)));
for (user_id, powerlevel) in ordered_users {
if !self
.services
.rooms
.state_cache
.is_joined(user_id.as_ref(), &room_id)
.await
{
if !force {
continue;
}
info!("Joining {user_id} to room {room_id} to perform takeover");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(user_id.as_str()),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to join {user_id} to room {room_id} to perform takeover: {e}");
drop(lock);
continue;
}
drop(lock);
}
info!("Promoting you to power level {powerlevel} in room {room_id} via {user_id}");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
power_levels
.users
.insert(self.sender.expect("you should exist").to_owned(), powerlevel);
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(String::new(), &power_levels),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to promote you to power level {powerlevel} in room {room_id} via \
{user_id}: {e}"
);
drop(lock);
continue;
}
return self
.write_str(&format!(
"Successfully promoted you to power level {powerlevel} in room {room_id} via \
{user_id}"
))
.await;
}
self.write_str("Failed to promote you, no local users with sufficient power level found.")
.await
}
#[admin_command]
async fn shutdown_room(
&self,
force: bool,
redact: bool,
yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process: bool,
room: OwnedRoomOrAliasId,
) -> Result {
let room_id = if room.is_room_id() {
let room_id = match RoomId::parse(&room) {
| Ok(room_id) => room_id,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
room_id.to_owned()
} else if room.is_room_alias_id() {
let room_alias = match RoomAliasId::parse(&room) {
| Ok(room_alias) => room_alias,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
| Ok((room_id, servers)) => {
debug!(
?room_id,
?servers,
"Got federation response fetching room ID for room {room}"
);
room_id
},
| Err(e) => {
return Err!("Failed to resolve room alias {room} to a room ID: {e}");
},
}
} else {
return Err!(
"Room specified is not a room ID or room alias. Please note that this requires a \
full room ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`)",
);
};
if !yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process {
return Err!(
"This command is irreversible and will immediately shutdown the room, making it \
completely unusable if successful. If you are sure you want to do this, add the \
flag --yes-i-am-sure-i-want-to-irreversibly-shutdown-this-room-destroying-it-in-the-process \
to your command."
);
}
let mut power_levels: RoomPowerLevelsEventContent = match self
.services
.rooms
.state_accessor
.room_state_get_content(&room_id, &StateEventType::RoomPowerLevels, "")
.await
.map_err(|e| Err!("Failed to get power levels for room {room_id}: {e}"))
{
| Ok(content) => content,
| Err(e) => {
return e;
},
};
let mut joined_users = self
.services
.rooms
.state_cache
.room_members(&room_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await;
let room_version =
RoomVersion::new(&self.services.rooms.state.get_room_version(&room_id).await?)?;
let Ok(create_content) = self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomCreateEventContent>(
&room_id,
&StateEventType::RoomCreate,
"",
)
.await
else {
return Err!("Failed to get room create event");
};
let local_creators = if room_version.explicitly_privilege_room_creators
&& create_content.additional_creators.is_some()
{
create_content
.additional_creators
.unwrap()
.into_iter()
.filter(|user_id| self.services.globals.user_is_local(user_id))
.collect::<Vec<_>>()
} else {
vec![]
};
let local_users = power_levels
.users
.iter()
.filter(|(user_id, _)| self.services.globals.user_is_local(user_id))
.map(|(user_id, level)| (user_id.clone(), *level))
.collect::<BTreeMap<_, _>>();
let mut ordered_users = local_users
.iter()
.chain(local_creators.iter().map(|user_id| (user_id, &Int::MAX)))
.map(|(user_id, level)| {
if local_creators.contains(user_id) {
(user_id, Int::MAX)
} else {
(user_id, *level)
}
})
.collect::<Vec<_>>();
ordered_users.sort_by_key(|(_, level)| level.saturating_mul(Int::from(-1)));
let mut changed_join_rules = false;
let mut changed_history_visibility = false;
let mut changed_power_levels = false;
let mut sent_tombstone = false;
let mut removed_ok: u32 = 0;
for (user_id, powerlevel) in ordered_users {
if !self
.services
.rooms
.state_cache
.is_joined(user_id.as_ref(), &room_id)
.await
{
if !force {
continue;
}
info!("Joining {user_id} to room {room_id} to perform shutdown");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(user_id.as_str()),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to join {user_id} to room {room_id} to perform shutdown: {e}");
drop(lock);
continue;
}
drop(lock);
}
if !changed_power_levels {
info!("Raising minimum power levels to {powerlevel} via {user_id}");
power_levels.events_default = power_levels.events_default.max(powerlevel);
power_levels.state_default = power_levels.state_default.max(powerlevel);
if power_levels.users_default < powerlevel {
power_levels.users_default = Int::MIN;
}
power_levels.kick = power_levels.kick.max(powerlevel);
power_levels.ban = power_levels.ban.max(powerlevel);
for (event_type, event_pl) in power_levels.events.clone() {
power_levels
.events
.insert(event_type, event_pl.max(powerlevel));
}
for (user, user_pl) in power_levels.users.clone() {
if user_pl < powerlevel {
power_levels.users.remove(&user);
}
}
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(String::new(), &power_levels.clone()),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to raise power levels to {powerlevel} in room {room_id} via \
{user_id}: {e}"
);
} else {
changed_power_levels = true;
}
drop(lock);
}
if !changed_join_rules {
info!("Setting room to private via {user_id}");
// NOTE: Setting the room to `private` soft-bricks it, as new joins with this
// join rule can actually be authorised.
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomJoinRulesEventContent::new(
JoinRule::deserialize(json!("\"org.continuwuity.shutdown\""))
.expect("valid fixed json"),
),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to set room to private in room {room_id} via {user_id}: {e}");
} else {
changed_join_rules = true;
}
drop(lock);
}
if !changed_history_visibility {
info!("Setting history visibility to joined via {user_id}");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomHistoryVisibilityEventContent::new(HistoryVisibility::Joined),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to set history visibility to joined in room {room_id} via \
{user_id}: {e}"
);
} else {
changed_history_visibility = true;
}
drop(lock);
}
info!("Removing {} users in {room_id} via {user_id}", joined_users.len());
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
for remove_user in &joined_users.clone() {
if remove_user == user_id || self.services.admin.user_is_admin(user_id).await {
continue;
}
let user_pl = power_levels
.users
.get(remove_user)
.copied()
.unwrap_or(power_levels.users_default);
let new_membership = if power_levels.ban <= powerlevel && user_pl < powerlevel {
MembershipState::Ban
} else {
MembershipState::Leave
};
debug!("Removing {remove_user} via {user_id}");
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(remove_user.as_str()),
&RoomMemberEventContent {
membership: new_membership.clone(),
redact_events: if redact { Some(true) } else { None },
..RoomMemberEventContent::new(new_membership.clone())
},
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to remove {remove_user} via {user_id}: {e}");
continue;
}
removed_ok = removed_ok.saturating_add(1);
if self.services.globals.user_is_local(remove_user) {
self.services
.rooms
.state_cache
.forget(&room_id, remove_user);
}
joined_users.retain(|u| u != remove_user);
}
if !sent_tombstone {
info!("Sending tombstone event for {room_id} via {user_id}");
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomTombstoneEventContent::new(
format!("Room {room_id} has been shut down"),
room_id.clone(),
),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to send tombstone event for {room_id} via {user_id}: {e}");
} else {
sent_tombstone = true;
}
}
}
self.write_str(&format!(
"Room shutdown complete, removed {removed_ok} users, changed join rules: \
{changed_join_rules}.\nConsider banning the room with `ban-room`.",
))
.await
}
+43 -63
View File
@@ -1,9 +1,6 @@
use std::collections::{HashSet, VecDeque};
use axum::extract::State;
use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn};
use ruma::{OwnedEventId, api::federation::event::get_missing_events};
use serde_json::{json, value::RawValue};
use conduwuit::{Err, Result, debug, debug_error, info, utils::to_canonical_object};
use ruma::api::federation::event::get_missing_events;
use super::AccessCheck;
use crate::Ruma;
@@ -48,76 +45,59 @@ pub(crate) async fn get_missing_events_route(
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
let mut queued_events = body.latest_events.clone();
// the vec will never have more entries the limit
let mut events = Vec::with_capacity(limit);
let mut queue: VecDeque<OwnedEventId> = VecDeque::from(body.latest_events.clone());
let mut results: Vec<Box<RawValue>> = Vec::with_capacity(limit);
let mut seen: HashSet<OwnedEventId> = HashSet::from_iter(body.earliest_events.clone());
while let Some(next_event_id) = queue.pop_front() {
if seen.contains(&next_event_id) {
trace!(%next_event_id, "already seen event, skipping");
let mut i: usize = 0;
while i < queued_events.len() && events.len() < limit {
let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else {
debug!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Event {} does not exist locally, skipping", &queued_events[i]
);
i = i.saturating_add(1);
continue;
}
if results.len() >= limit {
debug!(%next_event_id, "reached limit of events to return, breaking");
break;
}
let mut pdu = match services.rooms.timeline.get_pdu(&next_event_id).await {
| Ok(pdu) => pdu,
| Err(e) => {
warn!("could not find event {next_event_id} while walking missing events: {e}");
continue;
},
};
if pdu.room_id_or_hash() != body.room_id {
return Err!(Request(Unknown(
"Event {next_event_id} is not in room {}",
body.room_id
)));
if body.earliest_events.contains(&queued_events[i]) {
i = i.saturating_add(1);
continue;
}
if !services
.rooms
.state_accessor
.server_can_see_event(body.origin(), &body.room_id, pdu.event_id())
.server_can_see_event(body.origin(), &body.room_id, &queued_events[i])
.await
{
debug!(%next_event_id, origin = %body.origin(), "redacting event origin cannot see");
pdu.redact(&room_version, json!({}))?;
debug!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id
);
i = i.saturating_add(1);
continue;
}
trace!(
%next_event_id,
prev_events = ?pdu.prev_events().collect::<Vec<_>>(),
"adding event to results and queueing prev events"
);
queue.extend(pdu.prev_events.clone());
seen.insert(next_event_id.clone());
if body.latest_events.contains(&next_event_id) {
continue; // Don't include latest_events in results,
// but do include their prev_events in the queue
}
results.push(
services
.sending
.convert_to_outgoing_federation_event(to_canonical_object(pdu)?)
.await,
);
trace!(
%next_event_id,
queue_len = queue.len(),
seen_len = seen.len(),
results_len = results.len(),
"event added to results"
);
i = i.saturating_add(1);
let Ok(event) = to_canonical_object(&pdu) else {
debug_error!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Failed to convert PDU in database to canonical JSON: {pdu:?}"
);
continue;
};
let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned);
let event = services
.sending
.convert_to_outgoing_federation_event(event)
.await;
queued_events.extend(prev_events);
events.push(event);
}
if !queue.is_empty() {
debug!("limit reached before queue was empty");
}
results.reverse(); // return oldest first
Ok(get_missing_events::v1::Response { events: results })
Ok(get_missing_events::v1::Response { events })
}
-4
View File
@@ -406,10 +406,6 @@ impl Service {
/// Checks whether a given user is an admin of this server
pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
if self.services.globals.server_user == user_id {
return true;
}
if self
.services
.server
+1 -1
View File
@@ -9,7 +9,7 @@
<li>Read the <a href="https://continuwuity.org/introduction">documentation</a></li>
<li>Join the <a href="https://matrix.to/#/#continuwuity:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">Continuwuity Matrix room</a> or <a href="https://matrix.to/#/#space:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">space</a></li>
<li>Log in with a <a href="https://matrix.org/ecosystem/clients/">client</a></li>
<li>Ensure <a href="https://federationtester.mtrnord.blog/?serverName={{ server_name }}">federation</a> works</li>
<li>Ensure <a href="https://federationtester.matrix.org/#{{ server_name }}">federation</a> works</li>
</ul>
</div>