Compare commits

..

10 Commits

Author SHA1 Message Date
timedout 93b9007e1d fix: Unreliability in kicks & bans 2026-02-14 15:23:22 +00:00
timedout 2919c8e636 chore: Add double-lock flag 2026-02-14 14:24:13 +00:00
timedout 5ad653ab86 chore: Revert leaked changes to leave.rs 2026-02-14 14:17:23 +00:00
nexy7574 c9d9ed0a90 feat: Port room takeover and shutdown commands from continuwuity.rocks 2026-02-14 14:12:52 +00:00
arxari d9520f9382 Change the federation testing site in the docs to a more verbose one
The new site is easy to use at a glance but provides more advanced info if needed

Nexxy approved https://matrix.to/#/#offtopic:continuwuity.org/$rHSywj-s3v9onrROBcwDCHnnOpPVFbu0-Xgrh9A4btw
2026-02-12 20:13:47 +00:00
arxari 40bb5366bb Change the federation testing site to a more verbose one
The new site is easy to use at a glance but provides more advanced info if needed

Nexxy approved https://matrix.to/#/#offtopic:continuwuity.org/$rHSywj-s3v9onrROBcwDCHnnOpPVFbu0-Xgrh9A4btw
2026-02-12 20:11:20 +00:00
timedout f82bd77073 style: Fix clippy issues 2026-02-12 19:10:13 +00:00
timedout 7d84ba5ff2 fix: Don't include latest_events in output 2026-02-12 17:37:29 +00:00
timedout 69a8937584 fix: Complement runner 2026-02-12 17:23:39 +00:00
timedout b2ec13d342 fix: Redo the get_missing_events federation route 2026-02-12 16:48:12 +00:00
7 changed files with 691 additions and 130 deletions
+1 -1
View File
@@ -1,9 +1,9 @@
# Local build and dev artifacts
target/
!target/debug/conduwuit
# Docker files
Dockerfile*
docker/
# IDE files
.vscode
+3 -3
View File
@@ -2,9 +2,9 @@ FROM ubuntu:latest
EXPOSE 8008
EXPOSE 8448
RUN apt-get update && apt-get install -y ca-certificates liburing2 && rm -rf /var/lib/apt/lists/*
RUN mkdir -p /etc/continuwuity /var/lib/continuwuity
COPY docker/complement-entrypoint.sh /usr/local/bin/complement-entrypoint.sh
COPY docker/complement.config.toml /etc/continuwuity/config.toml
RUN mkdir -p /etc/continuwuity /var/lib/continuwuity /usr/local/bin/
COPY complement/complement-entrypoint.sh /usr/local/bin/complement-entrypoint.sh
COPY complement/complement.config.toml /etc/continuwuity/config.toml
COPY target/debug/conduwuit /usr/local/bin/conduwuit
RUN chmod +x /usr/local/bin/conduwuit /usr/local/bin/complement-entrypoint.sh
#HEALTHCHECK --interval=30s --timeout=5s CMD curl --fail http://localhost:8008/_continuwuity/server_version || exit 1
+1 -1
View File
@@ -269,7 +269,7 @@ curl https://your.server.name:8448/_matrix/federation/v1/version
```
- To check if your server can communicate with other homeservers, use the
[Matrix Federation Tester](https://federationtester.matrix.org/). If you can
[Matrix Federation Tester](https://federationtester.mtrnord.blog/). If you can
register but cannot join federated rooms, check your configuration and verify
that port 8448 is open and forwarded correctly.
-79
View File
@@ -1,79 +0,0 @@
# A Quick-Start Guide to Matrix
### What is Matrix?
[Matrix](https://matrix.org) is an open, federated, and extensible network for decentralized communication. Think of it like email, but for instant messaging:
- You create an account with a provider (called a **homeserver**)
- You can talk to anyone on Matrix, regardless of which homeserver they use
- You can use different apps (called **clients**) to access your account
- Your direct messages are end-to-end encrypted by default
### What's Continuwuity?
Continuwuity is a homeserver implementation. It's the software you use to set up your own provider, that you control! It's designed to run on few resources, and be easy to set up and maintain compared to similar software.
### Join a Continuwuity-Powered Homeserver
The easiest way to try Matrix is to create an account on an existing homeserver.
:::tip Continuwuity Partnered Homeservers
These homeservers are vetted by the Continuwuity team and follow our [partnered server guidelines](./community/ops-guidelines.mdx).
:::
- **[continuwuity.rocks](https://continuwuity.rocks)** - A public demo server operated by the Continuwuity Team
- **[federated.nexus](https://federated.nexus)** - A community resource hosting multiple FOSS services, including Matrix
### Join a Friend
If you have a friend who runs their own Matrix homeserver, ask them if you can create an account there! If you don't know anybody, consider partnering up to run a server for your community.
### Join Another Public Homeserver
There are many public Matrix homeservers you can join, which of then run other software. Here are some resources:
- [Join Matrix Homeserver List](https://joinmatrix.org/servers/) - A curated list of public homeservers
- [matrix.org](https://matrix.org) - The flagship homeserver (note: very large and sometimes slower due to high usage)
:::info About choosing a homeserver
Your choice of homeserver is important but not permanent. While your Matrix ID (like `@username:homeserver.org`) will include your homeserver's name, you can communicate with anyone on Matrix regardless of their homeserver. Think of it like choosing an email provider - you can still email anyone, no matter which provider you use!
:::
## Registering an Account
To interact with a Matrix server, you use a client. There are many matrix clients to choose from - [here's a list of some of them](https://matrix.org/ecosystem/clients/) - but to keep things simple we'll use [Element](https://app.element.io/#/register) or [Cinny](https://app.cinny.in/register/continuwuity.rocks) for this guide - pick what you prefer. Cinny looks closer to Discord, while Element looks like Teams or Slack.
Once you've opened the client, click on Register / Create an Account, and edit the Homeserver to match the one you decided on. On Element, you might have to click the Edit button to do that.
Fill out the username and password that you'd like to set. If your server is invite-only you might get asked to enter an invite code - this should have been given to you by the person that invited you.
After registration, you'll have a Matrix ID that looks like `@username:homeserver.org`. This is your unique identifier across the entire Matrix network.
:::warning Important: Save your Security Key!
Matrix uses end-to-end encryption to keep your messages private. During setup, you'll be asked to save a **Security Key** or **Security Phrase**. Store this somewhere safe (like a password manager) - you'll need it to access your encrypted messages on new devices!
:::
## What's Next?
Now that you have a Matrix account, you can:
- **Join public rooms** - Explore communities and conversations. Try [#continuwuity:continuwuity.org](https://matrix.to/#/#continuwuity:continuwuity.org) to chat with us!
- **Start private chats** - Message friends directly or create group chats
- **Explore Spaces** - Spaces are collections of rooms, similar to Discord servers or Slack workspaces
- **Try different clients** - Check out [this list](https://matrix.org/ecosystem/clients/), or ask a friend what they prefer!
### Other guides
- [Matrix vs Discord Guide](https://joinmatrix.org/guide/matrix-vs-discord/) - Coming from Discord?
- [Matrix Chat Basics](https://matrix.org/docs/chat_basics/) - Official Matrix.org documentation
- [Join Matrix Guide](https://joinmatrix.org/guide/)
- [Matrix Features Guide](https://joinmatrix.org/guide/features/) - Deep dive into Matrix features
---
## Ready to Run Your Own Homeserver?
If you want to run your own Matrix homeserver and have some technical knowledge, Continuwuity is an excellent choice. [Check out our guide for getting started with Continuwuity.](./introduction.mdx)
+622 -2
View File
@@ -1,12 +1,29 @@
use std::collections::BTreeMap;
use api::client::leave_room;
use clap::Subcommand;
use conduwuit::{
Err, Result, debug, info,
Err, Result, RoomVersion, debug, info,
utils::{IterStream, ReadyExt},
warn,
};
use futures::{FutureExt, StreamExt};
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId};
use ruma::{
Int, OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId,
events::{
StateEventType,
room::{
create::RoomCreateEventContent,
history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
join_rules::{JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
power_levels::RoomPowerLevelsEventContent,
tombstone::RoomTombstoneEventContent,
},
},
exports::serde::Deserialize,
};
use serde_json::json;
use crate::{admin_command, admin_command_dispatch, get_room_info};
@@ -43,6 +60,59 @@ pub enum RoomModerationCommand {
/// information
no_details: bool,
},
/// - Take over a room by puppeting a local user into giving you a higher
/// power level
Takeover {
/// Whether to force joining the room if no local users are in the room
#[arg(long)]
force: bool,
/// The room in the format of `!roomid:example.com` or a room alias in
/// the format of `#roomalias:example.com`
room: OwnedRoomOrAliasId,
},
/// - Shut down a room, as much is possible. **This is immediate and
/// irreversible**.
///
/// This command requires that you have a local user in the room with at
/// least a moderator power level. It will first attempt to raise power
/// levels so that nobody can use the room further, then remove the
/// canonical alias event, sets the history visibility to `joined`,
/// sets the join rules to `org.continuwuity.shutdown` (preventing anyone
/// from joining even with an invite), and then bans or kicks all users,
/// setting the MSC4293 "redact events" flag on those users if possible.
/// Finally, it will send a room tombstone event, which will effectively
/// make the room unusable on most clients even if the room state resets.
///
/// This effectively will make the room unusable, unjoinable, and removes
/// everyone from it. This is as close to a "shutdown" as you can get with
/// federation.
ShutdownRoom {
/// If no local users with a power level are joined to the room, setting
/// this flag will attempt one, and will join the user with the
/// highest power level to the room to perform the shutdown.
///
/// If this flag is not set, and no local users can perform the
/// shutdown, no further attempt will be made.
#[arg(long)]
force: bool,
/// Whether to use MSC4293 fields to indicate that all messages in the
/// room should be redacted. This will make it more difficult for
/// clients that implement MSC4293 (like Element) to render the room
/// in the event users manage to rejoin.
#[arg(long)]
redact: bool,
///
#[arg(long)]
yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process:
bool,
/// The room in the format of `!roomid:example.com` or a room alias in
/// the format of `#roomalias:example.com`
room: OwnedRoomOrAliasId,
},
}
#[admin_command]
@@ -468,3 +538,553 @@ async fn list_banned_rooms(&self, no_details: bool) -> Result {
self.write_str(&format!("Rooms Banned ({num}):\n```\n{body}\n```",))
.await
}
#[admin_command]
async fn takeover(&self, force: bool, room: OwnedRoomOrAliasId) -> Result {
let room_id = if room.is_room_id() {
let room_id = match RoomId::parse(&room) {
| Ok(room_id) => room_id,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
room_id.to_owned()
} else if room.is_room_alias_id() {
let room_alias = match RoomAliasId::parse(&room) {
| Ok(room_alias) => room_alias,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
| Ok((room_id, servers)) => {
debug!(
?room_id,
?servers,
"Got federation response fetching room ID for room {room}"
);
room_id
},
| Err(e) => {
return Err!("Failed to resolve room alias {room} to a room ID: {e}");
},
}
} else {
return Err!(
"Room specified is not a room ID or room alias. Please note that this requires a \
full room ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`)",
);
};
let room_version =
RoomVersion::new(&self.services.rooms.state.get_room_version(&room_id).await?)?;
let Ok(create_content) = self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomCreateEventContent>(
&room_id,
&StateEventType::RoomCreate,
"",
)
.await
else {
return Err!("Failed to get room create event");
};
let mut power_levels = match self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomPowerLevelsEventContent>(
&room_id,
&StateEventType::RoomPowerLevels,
"",
)
.await
{
| Ok(content) => content,
| Err(e) => {
return Err!("Failed to get power levels for room {room_id}: {e}");
},
};
let local_creators = if room_version.explicitly_privilege_room_creators
&& create_content.additional_creators.is_some()
{
create_content
.additional_creators
.clone()
.unwrap()
.into_iter()
.filter(|user_id| self.services.globals.user_is_local(user_id))
.collect::<Vec<_>>()
} else {
vec![]
};
let local_users = power_levels
.users
.iter()
.filter(|(user_id, _)| self.services.globals.user_is_local(user_id))
.map(|(user_id, level)| (user_id.clone(), *level))
.collect::<BTreeMap<_, _>>();
let min_pl = power_levels
.events
.get(&StateEventType::RoomPowerLevels.into())
.copied()
.unwrap_or(power_levels.state_default);
let mut ordered_users = local_users
.iter()
.chain(local_creators.iter().map(|user_id| (user_id, &Int::MAX)))
.map(|(user_id, level)| {
if local_creators.contains(user_id) {
(user_id, Int::MAX)
} else {
(user_id, *level)
}
})
.filter(|(user_id, level)| *level >= min_pl || local_creators.contains(*user_id))
.collect::<Vec<_>>();
ordered_users.sort_by_key(|(_, level)| level.saturating_mul(Int::from(-1)));
for (user_id, powerlevel) in ordered_users {
if !self
.services
.rooms
.state_cache
.is_joined(user_id.as_ref(), &room_id)
.await
{
if !force {
continue;
}
info!("Joining {user_id} to room {room_id} to perform takeover");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(user_id.as_str()),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to join {user_id} to room {room_id} to perform takeover: {e}");
drop(lock);
continue;
}
drop(lock);
}
info!("Promoting you to power level {powerlevel} in room {room_id} via {user_id}");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
power_levels
.users
.insert(self.sender.expect("you should exist").to_owned(), powerlevel);
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(String::new(), &power_levels),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to promote you to power level {powerlevel} in room {room_id} via \
{user_id}: {e}"
);
drop(lock);
continue;
}
return self
.write_str(&format!(
"Successfully promoted you to power level {powerlevel} in room {room_id} via \
{user_id}"
))
.await;
}
self.write_str("Failed to promote you, no local users with sufficient power level found.")
.await
}
#[admin_command]
async fn shutdown_room(
&self,
force: bool,
redact: bool,
yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process: bool,
room: OwnedRoomOrAliasId,
) -> Result {
let room_id = if room.is_room_id() {
let room_id = match RoomId::parse(&room) {
| Ok(room_id) => room_id,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
room_id.to_owned()
} else if room.is_room_alias_id() {
let room_alias = match RoomAliasId::parse(&room) {
| Ok(room_alias) => room_alias,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
| Ok((room_id, servers)) => {
debug!(
?room_id,
?servers,
"Got federation response fetching room ID for room {room}"
);
room_id
},
| Err(e) => {
return Err!("Failed to resolve room alias {room} to a room ID: {e}");
},
}
} else {
return Err!(
"Room specified is not a room ID or room alias. Please note that this requires a \
full room ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`)",
);
};
if !yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process {
return Err!(
"This command is irreversible and will immediately shutdown the room, making it \
completely unusable if successful. If you are sure you want to do this, add the \
flag --yes-i-am-sure-i-want-to-irreversibly-shutdown-this-room-destroying-it-in-the-process \
to your command."
);
}
let mut power_levels: RoomPowerLevelsEventContent = match self
.services
.rooms
.state_accessor
.room_state_get_content(&room_id, &StateEventType::RoomPowerLevels, "")
.await
.map_err(|e| Err!("Failed to get power levels for room {room_id}: {e}"))
{
| Ok(content) => content,
| Err(e) => {
return e;
},
};
let mut joined_users = self
.services
.rooms
.state_cache
.room_members(&room_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await;
let room_version =
RoomVersion::new(&self.services.rooms.state.get_room_version(&room_id).await?)?;
let Ok(create_content) = self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomCreateEventContent>(
&room_id,
&StateEventType::RoomCreate,
"",
)
.await
else {
return Err!("Failed to get room create event");
};
let local_creators = if room_version.explicitly_privilege_room_creators
&& create_content.additional_creators.is_some()
{
create_content
.additional_creators
.unwrap()
.into_iter()
.filter(|user_id| self.services.globals.user_is_local(user_id))
.collect::<Vec<_>>()
} else {
vec![]
};
let local_users = power_levels
.users
.iter()
.filter(|(user_id, _)| self.services.globals.user_is_local(user_id))
.map(|(user_id, level)| (user_id.clone(), *level))
.collect::<BTreeMap<_, _>>();
let mut ordered_users = local_users
.iter()
.chain(local_creators.iter().map(|user_id| (user_id, &Int::MAX)))
.map(|(user_id, level)| {
if local_creators.contains(user_id) {
(user_id, Int::MAX)
} else {
(user_id, *level)
}
})
.collect::<Vec<_>>();
ordered_users.sort_by_key(|(_, level)| level.saturating_mul(Int::from(-1)));
let mut changed_join_rules = false;
let mut changed_history_visibility = false;
let mut changed_power_levels = false;
let mut sent_tombstone = false;
let mut removed_ok: u32 = 0;
for (user_id, powerlevel) in ordered_users {
if !self
.services
.rooms
.state_cache
.is_joined(user_id.as_ref(), &room_id)
.await
{
if !force {
continue;
}
info!("Joining {user_id} to room {room_id} to perform shutdown");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(user_id.as_str()),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to join {user_id} to room {room_id} to perform shutdown: {e}");
drop(lock);
continue;
}
drop(lock);
}
if !changed_power_levels {
info!("Raising minimum power levels to {powerlevel} via {user_id}");
power_levels.events_default = power_levels.events_default.max(powerlevel);
power_levels.state_default = power_levels.state_default.max(powerlevel);
if power_levels.users_default < powerlevel {
power_levels.users_default = Int::MIN;
}
power_levels.kick = power_levels.kick.max(powerlevel);
power_levels.ban = power_levels.ban.max(powerlevel);
for (event_type, event_pl) in power_levels.events.clone() {
power_levels
.events
.insert(event_type, event_pl.max(powerlevel));
}
for (user, user_pl) in power_levels.users.clone() {
if user_pl < powerlevel {
power_levels.users.remove(&user);
}
}
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(String::new(), &power_levels.clone()),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to raise power levels to {powerlevel} in room {room_id} via \
{user_id}: {e}"
);
} else {
changed_power_levels = true;
}
drop(lock);
}
if !changed_join_rules {
info!("Setting room to private via {user_id}");
// NOTE: Setting the room to `private` soft-bricks it, as new joins with this
// join rule can actually be authorised.
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomJoinRulesEventContent::new(
JoinRule::deserialize(json!("\"org.continuwuity.shutdown\""))
.expect("valid fixed json"),
),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to set room to private in room {room_id} via {user_id}: {e}");
} else {
changed_join_rules = true;
}
drop(lock);
}
if !changed_history_visibility {
info!("Setting history visibility to joined via {user_id}");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomHistoryVisibilityEventContent::new(HistoryVisibility::Joined),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to set history visibility to joined in room {room_id} via \
{user_id}: {e}"
);
} else {
changed_history_visibility = true;
}
drop(lock);
}
info!("Removing {} users in {room_id} via {user_id}", joined_users.len());
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
for remove_user in &joined_users.clone() {
if remove_user == user_id || self.services.admin.user_is_admin(user_id).await {
continue;
}
let user_pl = power_levels
.users
.get(remove_user)
.copied()
.unwrap_or(power_levels.users_default);
let new_membership = if power_levels.ban <= powerlevel && user_pl < powerlevel {
MembershipState::Ban
} else {
MembershipState::Leave
};
debug!("Removing {remove_user} via {user_id}");
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(remove_user.as_str()),
&RoomMemberEventContent {
membership: new_membership.clone(),
redact_events: if redact { Some(true) } else { None },
..RoomMemberEventContent::new(new_membership.clone())
},
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to remove {remove_user} via {user_id}: {e}");
continue;
}
removed_ok = removed_ok.saturating_add(1);
if self.services.globals.user_is_local(remove_user) {
self.services
.rooms
.state_cache
.forget(&room_id, remove_user);
}
joined_users.retain(|u| u != remove_user);
}
if !sent_tombstone {
info!("Sending tombstone event for {room_id} via {user_id}");
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomTombstoneEventContent::new(
format!("Room {room_id} has been shut down"),
room_id.clone(),
),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to send tombstone event for {room_id} via {user_id}: {e}");
} else {
sent_tombstone = true;
}
}
}
self.write_str(&format!(
"Room shutdown complete, removed {removed_ok} users, changed join rules: \
{changed_join_rules}.\nConsider banning the room with `ban-room`.",
))
.await
}
+63 -43
View File
@@ -1,6 +1,9 @@
use std::collections::{HashSet, VecDeque};
use axum::extract::State;
use conduwuit::{Err, Result, debug, debug_error, info, utils::to_canonical_object};
use ruma::api::federation::event::get_missing_events;
use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn};
use ruma::{OwnedEventId, api::federation::event::get_missing_events};
use serde_json::{json, value::RawValue};
use super::AccessCheck;
use crate::Ruma;
@@ -45,59 +48,76 @@ pub(crate) async fn get_missing_events_route(
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
let mut queued_events = body.latest_events.clone();
// the vec will never have more entries the limit
let mut events = Vec::with_capacity(limit);
let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
let mut i: usize = 0;
while i < queued_events.len() && events.len() < limit {
let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else {
debug!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Event {} does not exist locally, skipping", &queued_events[i]
);
i = i.saturating_add(1);
let mut queue: VecDeque<OwnedEventId> = VecDeque::from(body.latest_events.clone());
let mut results: Vec<Box<RawValue>> = Vec::with_capacity(limit);
let mut seen: HashSet<OwnedEventId> = HashSet::from_iter(body.earliest_events.clone());
while let Some(next_event_id) = queue.pop_front() {
if seen.contains(&next_event_id) {
trace!(%next_event_id, "already seen event, skipping");
continue;
}
if results.len() >= limit {
debug!(%next_event_id, "reached limit of events to return, breaking");
break;
}
let mut pdu = match services.rooms.timeline.get_pdu(&next_event_id).await {
| Ok(pdu) => pdu,
| Err(e) => {
warn!("could not find event {next_event_id} while walking missing events: {e}");
continue;
},
};
if body.earliest_events.contains(&queued_events[i]) {
i = i.saturating_add(1);
continue;
if pdu.room_id_or_hash() != body.room_id {
return Err!(Request(Unknown(
"Event {next_event_id} is not in room {}",
body.room_id
)));
}
if !services
.rooms
.state_accessor
.server_can_see_event(body.origin(), &body.room_id, &queued_events[i])
.server_can_see_event(body.origin(), &body.room_id, pdu.event_id())
.await
{
debug!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id
);
i = i.saturating_add(1);
continue;
debug!(%next_event_id, origin = %body.origin(), "redacting event origin cannot see");
pdu.redact(&room_version, json!({}))?;
}
i = i.saturating_add(1);
let Ok(event) = to_canonical_object(&pdu) else {
debug_error!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Failed to convert PDU in database to canonical JSON: {pdu:?}"
);
continue;
};
let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned);
let event = services
.sending
.convert_to_outgoing_federation_event(event)
.await;
queued_events.extend(prev_events);
events.push(event);
trace!(
%next_event_id,
prev_events = ?pdu.prev_events().collect::<Vec<_>>(),
"adding event to results and queueing prev events"
);
queue.extend(pdu.prev_events.clone());
seen.insert(next_event_id.clone());
if body.latest_events.contains(&next_event_id) {
continue; // Don't include latest_events in results,
// but do include their prev_events in the queue
}
results.push(
services
.sending
.convert_to_outgoing_federation_event(to_canonical_object(pdu)?)
.await,
);
trace!(
%next_event_id,
queue_len = queue.len(),
seen_len = seen.len(),
results_len = results.len(),
"event added to results"
);
}
Ok(get_missing_events::v1::Response { events })
if !queue.is_empty() {
debug!("limit reached before queue was empty");
}
results.reverse(); // return oldest first
Ok(get_missing_events::v1::Response { events: results })
}
+1 -1
View File
@@ -9,7 +9,7 @@
<li>Read the <a href="https://continuwuity.org/introduction">documentation</a></li>
<li>Join the <a href="https://matrix.to/#/#continuwuity:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">Continuwuity Matrix room</a> or <a href="https://matrix.to/#/#space:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">space</a></li>
<li>Log in with a <a href="https://matrix.org/ecosystem/clients/">client</a></li>
<li>Ensure <a href="https://federationtester.matrix.org/#{{ server_name }}">federation</a> works</li>
<li>Ensure <a href="https://federationtester.mtrnord.blog/?serverName={{ server_name }}">federation</a> works</li>
</ul>
</div>