Compare commits

...

21 Commits

Author SHA1 Message Date
timedout 93b9007e1d fix: Unreliability in kicks & bans 2026-02-14 15:23:22 +00:00
timedout 2919c8e636 chore: Add double-lock flag 2026-02-14 14:24:13 +00:00
timedout 5ad653ab86 chore: Revert leaked changes to leave.rs 2026-02-14 14:17:23 +00:00
nexy7574 c9d9ed0a90 feat: Port room takeover and shutdown commands from continuwuity.rocks 2026-02-14 14:12:52 +00:00
arxari d9520f9382 Change the federation testing site in the docs to a more verbose one
The new site is easy to use at a glance but provides more advanced info if needed

Nexxy approved https://matrix.to/#/#offtopic:continuwuity.org/$rHSywj-s3v9onrROBcwDCHnnOpPVFbu0-Xgrh9A4btw
2026-02-12 20:13:47 +00:00
arxari 40bb5366bb Change the federation testing site to a more verbose one
The new site is easy to use at a glance but provides more advanced info if needed

Nexxy approved https://matrix.to/#/#offtopic:continuwuity.org/$rHSywj-s3v9onrROBcwDCHnnOpPVFbu0-Xgrh9A4btw
2026-02-12 20:11:20 +00:00
timedout f82bd77073 style: Fix clippy issues 2026-02-12 19:10:13 +00:00
timedout 7d84ba5ff2 fix: Don't include latest_events in output 2026-02-12 17:37:29 +00:00
timedout 69a8937584 fix: Complement runner 2026-02-12 17:23:39 +00:00
timedout b2ec13d342 fix: Redo the get_missing_events federation route 2026-02-12 16:48:12 +00:00
Jade Ellis 4e55e1ea90 docs: Add note about checking the contents of configuration 2026-02-11 16:56:07 +00:00
ginger f5f3108d5f chore: Formatting 2026-02-10 22:56:11 +00:00
chri-k d1e1ee6156 fix: always treat server_user as an admin 2026-02-10 22:56:11 +00:00
Omar Pakker ae16a45515 chore: Add towncrier news fragment 2026-02-10 23:07:38 +01:00
Omar Pakker 077bda23a6 feat(admin): Add resolver cache flush command
This command allows an admin to flush a specific server
from the resolver caches or flush the whole cache.
2026-02-10 23:07:32 +01:00
Renovate Bot a2bf0c1223 chore(deps): update pre-commit hook crate-ci/typos to v1.43.4 2026-02-10 05:02:40 +00:00
Ginger b9b1ff87f2 chore: Formatting fixes 2026-02-10 02:29:11 +00:00
Ginger 3c0146d437 feat: Implement a migration to fix busted local invites 2026-02-10 02:29:11 +00:00
Ginger 7485d4aa91 fix: Properly set stripped state for local invites 2026-02-10 02:29:11 +00:00
Jade Ellis 39bdb4c5a2 chore: Announcement for v0.5.4 2026-02-09 20:48:47 +00:00
Renovate Bot 55fb3b8848 chore(deps): update pre-commit hook crate-ci/typos to v1.43.3 2026-02-09 15:26:52 +00:00
18 changed files with 811 additions and 66 deletions
+1 -1
View File
@@ -1,9 +1,9 @@
# Local build and dev artifacts
target/
!target/debug/conduwuit
# Docker files
Dockerfile*
docker/
# IDE files
.vscode
+1 -1
View File
@@ -23,7 +23,7 @@ repos:
- id: check-added-large-files
- repo: https://github.com/crate-ci/typos
rev: v1.43.2
rev: v1.43.4
hooks:
- id: typos
- id: typos
+1
View File
@@ -0,0 +1 @@
Fixed invites sent to other users in the same homeserver not being properly sent down sync. Users with missing or broken invites should clear their client caches after updating to make them appear.
+1
View File
@@ -0,0 +1 @@
Introduce a resolver command to allow flushing a server from the cache or to flush the complete cache. Contributed by @Omar007
+3 -3
View File
@@ -2,9 +2,9 @@ FROM ubuntu:latest
EXPOSE 8008
EXPOSE 8448
RUN apt-get update && apt-get install -y ca-certificates liburing2 && rm -rf /var/lib/apt/lists/*
RUN mkdir -p /etc/continuwuity /var/lib/continuwuity
COPY docker/complement-entrypoint.sh /usr/local/bin/complement-entrypoint.sh
COPY docker/complement.config.toml /etc/continuwuity/config.toml
RUN mkdir -p /etc/continuwuity /var/lib/continuwuity /usr/local/bin/
COPY complement/complement-entrypoint.sh /usr/local/bin/complement-entrypoint.sh
COPY complement/complement.config.toml /etc/continuwuity/config.toml
COPY target/debug/conduwuit /usr/local/bin/conduwuit
RUN chmod +x /usr/local/bin/conduwuit /usr/local/bin/complement-entrypoint.sh
#HEALTHCHECK --interval=30s --timeout=5s CMD curl --fail http://localhost:8008/_continuwuity/server_version || exit 1
+1 -1
View File
@@ -269,7 +269,7 @@ curl https://your.server.name:8448/_matrix/federation/v1/version
```
- To check if your server can communicate with other homeservers, use the
[Matrix Federation Tester](https://federationtester.matrix.org/). If you can
[Matrix Federation Tester](https://federationtester.mtrnord.blog/). If you can
register but cannot join federated rooms, check your configuration and verify
that port 8448 is open and forwarded correctly.
@@ -6,10 +6,10 @@
"message": "Welcome to Continuwuity! Important announcements about the project will appear here."
},
{
"id": 8,
"id": 9,
"mention_room": false,
"date": "2026-01-12",
"message": "Hey everyone!\n\nJust letting you know we've released [v0.5.3](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.3) - this one is a bit of a hotfix for an issue with inviting and allowing others to join rooms.\n\nIf you appreceate the round-the-clock work we've been doing to keep your servers secure over this holiday period, we'd really appreciate your support - you can sponsor individuals on our team using the 'sponsor' button at the top of [our GitHub repository](https://github.com/continuwuity/continuwuity). If you can't do that, even a star helps - spreading the word and advocating for our project helps keep it going.\n\nHave a lovely rest of your year \\\n[Jade \\(she/her\\)](https://matrix.to/#/%40jade%3Aellis.link) \n🩵"
"date": "2026-02-09",
"message": "Yesterday we released [v0.5.4](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.4). Bugfixes, performance improvements and more moderation features! There's also a security fix, so please update as soon as possible. Don't forget to join [our announcements channel](https://matrix.to/#/!jIdNjSM5X-V5JVx2h2kAhUZIIQ08GyzPL55NFZAH1vM/%2489TY9CqRg4-ff1MGo3Ulc5r5X4pakfdzT-99RD8Docc?via=ellis.link&via=explodie.org&via=matrix.org) to get important information sooner <3 "
}
]
}
+13
View File
@@ -112,6 +112,19 @@ Query the destinations cache
Query the overrides cache
### `!admin query resolver flush-cache`
Flush a given server from the resolver caches or flush them completely
* Examples:
* Flush a specific server:
`!admin query resolver flush-cache matrix.example.com`
* Flush all resolver caches completely:
`!admin query resolver flush-cache --all`
## `!admin query pusher`
pusher service
+10
View File
@@ -20,6 +20,16 @@ log into the server account (`@conduit`) from a web client
## General potential issues
### Configuration not working as expected
Sometimes you can make a mistake in your configuration that
means things don't get passed to Continuwuity correctly.
This is particularly easy to do with environment variables.
To check what configuration Continuwuity actually sees, you can
use the `!admin server show-config` command in your admin room.
Beware that this prints out any secrets in your configuration,
so you might want to delete the result afterwards!
### Potential DNS issues when using Docker
Docker's DNS setup for containers in a non-default network intercepts queries to
+25 -1
View File
@@ -1,5 +1,5 @@
use clap::Subcommand;
use conduwuit::{Result, utils::time};
use conduwuit::{Err, Result, utils::time};
use futures::StreamExt;
use ruma::OwnedServerName;
@@ -7,6 +7,7 @@ use crate::{admin_command, admin_command_dispatch};
#[admin_command_dispatch]
#[derive(Debug, Subcommand)]
#[allow(clippy::enum_variant_names)]
/// Resolver service and caches
pub enum ResolverCommand {
/// Query the destinations cache
@@ -18,6 +19,14 @@ pub enum ResolverCommand {
OverridesCache {
name: Option<String>,
},
/// Flush a specific server from the resolver caches or everything
FlushCache {
name: Option<OwnedServerName>,
#[arg(short, long)]
all: bool,
},
}
#[admin_command]
@@ -69,3 +78,18 @@ async fn overrides_cache(&self, server_name: Option<String>) -> Result {
Ok(())
}
#[admin_command]
async fn flush_cache(&self, name: Option<OwnedServerName>, all: bool) -> Result {
if all {
self.services.resolver.cache.clear().await;
writeln!(self, "Resolver caches cleared!").await
} else if let Some(name) = name {
self.services.resolver.cache.del_destination(&name);
self.services.resolver.cache.del_override(&name);
self.write_str(&format!("Cleared {name} from resolver caches!"))
.await
} else {
Err!("Missing name. Supply a name or use --all to flush the whole cache.")
}
}
+622 -2
View File
@@ -1,12 +1,29 @@
use std::collections::BTreeMap;
use api::client::leave_room;
use clap::Subcommand;
use conduwuit::{
Err, Result, debug, info,
Err, Result, RoomVersion, debug, info,
utils::{IterStream, ReadyExt},
warn,
};
use futures::{FutureExt, StreamExt};
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId};
use ruma::{
Int, OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId,
events::{
StateEventType,
room::{
create::RoomCreateEventContent,
history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
join_rules::{JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
power_levels::RoomPowerLevelsEventContent,
tombstone::RoomTombstoneEventContent,
},
},
exports::serde::Deserialize,
};
use serde_json::json;
use crate::{admin_command, admin_command_dispatch, get_room_info};
@@ -43,6 +60,59 @@ pub enum RoomModerationCommand {
/// information
no_details: bool,
},
/// - Take over a room by puppeting a local user into giving you a higher
/// power level
Takeover {
/// Whether to force joining the room if no local users are in the room
#[arg(long)]
force: bool,
/// The room in the format of `!roomid:example.com` or a room alias in
/// the format of `#roomalias:example.com`
room: OwnedRoomOrAliasId,
},
/// - Shut down a room, as much is possible. **This is immediate and
/// irreversible**.
///
/// This command requires that you have a local user in the room with at
/// least a moderator power level. It will first attempt to raise power
/// levels so that nobody can use the room further, then remove the
/// canonical alias event, sets the history visibility to `joined`,
/// sets the join rules to `org.continuwuity.shutdown` (preventing anyone
/// from joining even with an invite), and then bans or kicks all users,
/// setting the MSC4293 "redact events" flag on those users if possible.
/// Finally, it will send a room tombstone event, which will effectively
/// make the room unusable on most clients even if the room state resets.
///
/// This effectively will make the room unusable, unjoinable, and removes
/// everyone from it. This is as close to a "shutdown" as you can get with
/// federation.
ShutdownRoom {
/// If no local users with a power level are joined to the room, setting
/// this flag will attempt one, and will join the user with the
/// highest power level to the room to perform the shutdown.
///
/// If this flag is not set, and no local users can perform the
/// shutdown, no further attempt will be made.
#[arg(long)]
force: bool,
/// Whether to use MSC4293 fields to indicate that all messages in the
/// room should be redacted. This will make it more difficult for
/// clients that implement MSC4293 (like Element) to render the room
/// in the event users manage to rejoin.
#[arg(long)]
redact: bool,
///
#[arg(long)]
yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process:
bool,
/// The room in the format of `!roomid:example.com` or a room alias in
/// the format of `#roomalias:example.com`
room: OwnedRoomOrAliasId,
},
}
#[admin_command]
@@ -468,3 +538,553 @@ async fn list_banned_rooms(&self, no_details: bool) -> Result {
self.write_str(&format!("Rooms Banned ({num}):\n```\n{body}\n```",))
.await
}
#[admin_command]
async fn takeover(&self, force: bool, room: OwnedRoomOrAliasId) -> Result {
let room_id = if room.is_room_id() {
let room_id = match RoomId::parse(&room) {
| Ok(room_id) => room_id,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
room_id.to_owned()
} else if room.is_room_alias_id() {
let room_alias = match RoomAliasId::parse(&room) {
| Ok(room_alias) => room_alias,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
| Ok((room_id, servers)) => {
debug!(
?room_id,
?servers,
"Got federation response fetching room ID for room {room}"
);
room_id
},
| Err(e) => {
return Err!("Failed to resolve room alias {room} to a room ID: {e}");
},
}
} else {
return Err!(
"Room specified is not a room ID or room alias. Please note that this requires a \
full room ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`)",
);
};
let room_version =
RoomVersion::new(&self.services.rooms.state.get_room_version(&room_id).await?)?;
let Ok(create_content) = self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomCreateEventContent>(
&room_id,
&StateEventType::RoomCreate,
"",
)
.await
else {
return Err!("Failed to get room create event");
};
let mut power_levels = match self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomPowerLevelsEventContent>(
&room_id,
&StateEventType::RoomPowerLevels,
"",
)
.await
{
| Ok(content) => content,
| Err(e) => {
return Err!("Failed to get power levels for room {room_id}: {e}");
},
};
let local_creators = if room_version.explicitly_privilege_room_creators
&& create_content.additional_creators.is_some()
{
create_content
.additional_creators
.clone()
.unwrap()
.into_iter()
.filter(|user_id| self.services.globals.user_is_local(user_id))
.collect::<Vec<_>>()
} else {
vec![]
};
let local_users = power_levels
.users
.iter()
.filter(|(user_id, _)| self.services.globals.user_is_local(user_id))
.map(|(user_id, level)| (user_id.clone(), *level))
.collect::<BTreeMap<_, _>>();
let min_pl = power_levels
.events
.get(&StateEventType::RoomPowerLevels.into())
.copied()
.unwrap_or(power_levels.state_default);
let mut ordered_users = local_users
.iter()
.chain(local_creators.iter().map(|user_id| (user_id, &Int::MAX)))
.map(|(user_id, level)| {
if local_creators.contains(user_id) {
(user_id, Int::MAX)
} else {
(user_id, *level)
}
})
.filter(|(user_id, level)| *level >= min_pl || local_creators.contains(*user_id))
.collect::<Vec<_>>();
ordered_users.sort_by_key(|(_, level)| level.saturating_mul(Int::from(-1)));
for (user_id, powerlevel) in ordered_users {
if !self
.services
.rooms
.state_cache
.is_joined(user_id.as_ref(), &room_id)
.await
{
if !force {
continue;
}
info!("Joining {user_id} to room {room_id} to perform takeover");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(user_id.as_str()),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to join {user_id} to room {room_id} to perform takeover: {e}");
drop(lock);
continue;
}
drop(lock);
}
info!("Promoting you to power level {powerlevel} in room {room_id} via {user_id}");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
power_levels
.users
.insert(self.sender.expect("you should exist").to_owned(), powerlevel);
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(String::new(), &power_levels),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to promote you to power level {powerlevel} in room {room_id} via \
{user_id}: {e}"
);
drop(lock);
continue;
}
return self
.write_str(&format!(
"Successfully promoted you to power level {powerlevel} in room {room_id} via \
{user_id}"
))
.await;
}
self.write_str("Failed to promote you, no local users with sufficient power level found.")
.await
}
#[admin_command]
async fn shutdown_room(
&self,
force: bool,
redact: bool,
yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process: bool,
room: OwnedRoomOrAliasId,
) -> Result {
let room_id = if room.is_room_id() {
let room_id = match RoomId::parse(&room) {
| Ok(room_id) => room_id,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
room_id.to_owned()
} else if room.is_room_alias_id() {
let room_alias = match RoomAliasId::parse(&room) {
| Ok(room_alias) => room_alias,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
| Ok((room_id, servers)) => {
debug!(
?room_id,
?servers,
"Got federation response fetching room ID for room {room}"
);
room_id
},
| Err(e) => {
return Err!("Failed to resolve room alias {room} to a room ID: {e}");
},
}
} else {
return Err!(
"Room specified is not a room ID or room alias. Please note that this requires a \
full room ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`)",
);
};
if !yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process {
return Err!(
"This command is irreversible and will immediately shutdown the room, making it \
completely unusable if successful. If you are sure you want to do this, add the \
flag --yes-i-am-sure-i-want-to-irreversibly-shutdown-this-room-destroying-it-in-the-process \
to your command."
);
}
let mut power_levels: RoomPowerLevelsEventContent = match self
.services
.rooms
.state_accessor
.room_state_get_content(&room_id, &StateEventType::RoomPowerLevels, "")
.await
.map_err(|e| Err!("Failed to get power levels for room {room_id}: {e}"))
{
| Ok(content) => content,
| Err(e) => {
return e;
},
};
let mut joined_users = self
.services
.rooms
.state_cache
.room_members(&room_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await;
let room_version =
RoomVersion::new(&self.services.rooms.state.get_room_version(&room_id).await?)?;
let Ok(create_content) = self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomCreateEventContent>(
&room_id,
&StateEventType::RoomCreate,
"",
)
.await
else {
return Err!("Failed to get room create event");
};
let local_creators = if room_version.explicitly_privilege_room_creators
&& create_content.additional_creators.is_some()
{
create_content
.additional_creators
.unwrap()
.into_iter()
.filter(|user_id| self.services.globals.user_is_local(user_id))
.collect::<Vec<_>>()
} else {
vec![]
};
let local_users = power_levels
.users
.iter()
.filter(|(user_id, _)| self.services.globals.user_is_local(user_id))
.map(|(user_id, level)| (user_id.clone(), *level))
.collect::<BTreeMap<_, _>>();
let mut ordered_users = local_users
.iter()
.chain(local_creators.iter().map(|user_id| (user_id, &Int::MAX)))
.map(|(user_id, level)| {
if local_creators.contains(user_id) {
(user_id, Int::MAX)
} else {
(user_id, *level)
}
})
.collect::<Vec<_>>();
ordered_users.sort_by_key(|(_, level)| level.saturating_mul(Int::from(-1)));
let mut changed_join_rules = false;
let mut changed_history_visibility = false;
let mut changed_power_levels = false;
let mut sent_tombstone = false;
let mut removed_ok: u32 = 0;
for (user_id, powerlevel) in ordered_users {
if !self
.services
.rooms
.state_cache
.is_joined(user_id.as_ref(), &room_id)
.await
{
if !force {
continue;
}
info!("Joining {user_id} to room {room_id} to perform shutdown");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(user_id.as_str()),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to join {user_id} to room {room_id} to perform shutdown: {e}");
drop(lock);
continue;
}
drop(lock);
}
if !changed_power_levels {
info!("Raising minimum power levels to {powerlevel} via {user_id}");
power_levels.events_default = power_levels.events_default.max(powerlevel);
power_levels.state_default = power_levels.state_default.max(powerlevel);
if power_levels.users_default < powerlevel {
power_levels.users_default = Int::MIN;
}
power_levels.kick = power_levels.kick.max(powerlevel);
power_levels.ban = power_levels.ban.max(powerlevel);
for (event_type, event_pl) in power_levels.events.clone() {
power_levels
.events
.insert(event_type, event_pl.max(powerlevel));
}
for (user, user_pl) in power_levels.users.clone() {
if user_pl < powerlevel {
power_levels.users.remove(&user);
}
}
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(String::new(), &power_levels.clone()),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to raise power levels to {powerlevel} in room {room_id} via \
{user_id}: {e}"
);
} else {
changed_power_levels = true;
}
drop(lock);
}
if !changed_join_rules {
info!("Setting room to private via {user_id}");
// NOTE: Setting the room to `private` soft-bricks it, as new joins with this
// join rule can actually be authorised.
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomJoinRulesEventContent::new(
JoinRule::deserialize(json!("\"org.continuwuity.shutdown\""))
.expect("valid fixed json"),
),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to set room to private in room {room_id} via {user_id}: {e}");
} else {
changed_join_rules = true;
}
drop(lock);
}
if !changed_history_visibility {
info!("Setting history visibility to joined via {user_id}");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomHistoryVisibilityEventContent::new(HistoryVisibility::Joined),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to set history visibility to joined in room {room_id} via \
{user_id}: {e}"
);
} else {
changed_history_visibility = true;
}
drop(lock);
}
info!("Removing {} users in {room_id} via {user_id}", joined_users.len());
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
for remove_user in &joined_users.clone() {
if remove_user == user_id || self.services.admin.user_is_admin(user_id).await {
continue;
}
let user_pl = power_levels
.users
.get(remove_user)
.copied()
.unwrap_or(power_levels.users_default);
let new_membership = if power_levels.ban <= powerlevel && user_pl < powerlevel {
MembershipState::Ban
} else {
MembershipState::Leave
};
debug!("Removing {remove_user} via {user_id}");
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(remove_user.as_str()),
&RoomMemberEventContent {
membership: new_membership.clone(),
redact_events: if redact { Some(true) } else { None },
..RoomMemberEventContent::new(new_membership.clone())
},
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to remove {remove_user} via {user_id}: {e}");
continue;
}
removed_ok = removed_ok.saturating_add(1);
if self.services.globals.user_is_local(remove_user) {
self.services
.rooms
.state_cache
.forget(&room_id, remove_user);
}
joined_users.retain(|u| u != remove_user);
}
if !sent_tombstone {
info!("Sending tombstone event for {room_id} via {user_id}");
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomTombstoneEventContent::new(
format!("Room {room_id} has been shut down"),
room_id.clone(),
),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to send tombstone event for {room_id} via {user_id}: {e}");
} else {
sent_tombstone = true;
}
}
}
self.write_str(&format!(
"Room shutdown complete, removed {removed_ok} users, changed join rules: \
{changed_join_rules}.\nConsider banning the room with `ban-room`.",
))
.await
}
+63 -43
View File
@@ -1,6 +1,9 @@
use std::collections::{HashSet, VecDeque};
use axum::extract::State;
use conduwuit::{Err, Result, debug, debug_error, info, utils::to_canonical_object};
use ruma::api::federation::event::get_missing_events;
use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn};
use ruma::{OwnedEventId, api::federation::event::get_missing_events};
use serde_json::{json, value::RawValue};
use super::AccessCheck;
use crate::Ruma;
@@ -45,59 +48,76 @@ pub(crate) async fn get_missing_events_route(
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
let mut queued_events = body.latest_events.clone();
// the vec will never have more entries the limit
let mut events = Vec::with_capacity(limit);
let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
let mut i: usize = 0;
while i < queued_events.len() && events.len() < limit {
let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else {
debug!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Event {} does not exist locally, skipping", &queued_events[i]
);
i = i.saturating_add(1);
let mut queue: VecDeque<OwnedEventId> = VecDeque::from(body.latest_events.clone());
let mut results: Vec<Box<RawValue>> = Vec::with_capacity(limit);
let mut seen: HashSet<OwnedEventId> = HashSet::from_iter(body.earliest_events.clone());
while let Some(next_event_id) = queue.pop_front() {
if seen.contains(&next_event_id) {
trace!(%next_event_id, "already seen event, skipping");
continue;
}
if results.len() >= limit {
debug!(%next_event_id, "reached limit of events to return, breaking");
break;
}
let mut pdu = match services.rooms.timeline.get_pdu(&next_event_id).await {
| Ok(pdu) => pdu,
| Err(e) => {
warn!("could not find event {next_event_id} while walking missing events: {e}");
continue;
},
};
if body.earliest_events.contains(&queued_events[i]) {
i = i.saturating_add(1);
continue;
if pdu.room_id_or_hash() != body.room_id {
return Err!(Request(Unknown(
"Event {next_event_id} is not in room {}",
body.room_id
)));
}
if !services
.rooms
.state_accessor
.server_can_see_event(body.origin(), &body.room_id, &queued_events[i])
.server_can_see_event(body.origin(), &body.room_id, pdu.event_id())
.await
{
debug!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id
);
i = i.saturating_add(1);
continue;
debug!(%next_event_id, origin = %body.origin(), "redacting event origin cannot see");
pdu.redact(&room_version, json!({}))?;
}
i = i.saturating_add(1);
let Ok(event) = to_canonical_object(&pdu) else {
debug_error!(
body.origin = body.origin.as_ref().map(tracing::field::display),
"Failed to convert PDU in database to canonical JSON: {pdu:?}"
);
continue;
};
let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned);
let event = services
.sending
.convert_to_outgoing_federation_event(event)
.await;
queued_events.extend(prev_events);
events.push(event);
trace!(
%next_event_id,
prev_events = ?pdu.prev_events().collect::<Vec<_>>(),
"adding event to results and queueing prev events"
);
queue.extend(pdu.prev_events.clone());
seen.insert(next_event_id.clone());
if body.latest_events.contains(&next_event_id) {
continue; // Don't include latest_events in results,
// but do include their prev_events in the queue
}
results.push(
services
.sending
.convert_to_outgoing_federation_event(to_canonical_object(pdu)?)
.await,
);
trace!(
%next_event_id,
queue_len = queue.len(),
seen_len = seen.len(),
results_len = results.len(),
"event added to results"
);
}
Ok(get_missing_events::v1::Response { events })
if !queue.is_empty() {
debug!("limit reached before queue was empty");
}
results.reverse(); // return oldest first
Ok(get_missing_events::v1::Response { events: results })
}
+4
View File
@@ -406,6 +406,10 @@ impl Service {
/// Checks whether a given user is an admin of this server
pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
if self.services.globals.server_user == user_id {
return true;
}
if self
.services
.server
+56 -4
View File
@@ -1,7 +1,7 @@
use std::{cmp, collections::HashMap};
use std::{cmp, collections::HashMap, future::ready};
use conduwuit::{
Err, Pdu, Result, debug, debug_info, debug_warn, error, info,
Err, Event, Pdu, Result, debug, debug_info, debug_warn, error, info,
result::NotFound,
utils::{
IterStream, ReadyExt,
@@ -15,8 +15,9 @@ use itertools::Itertools;
use ruma::{
OwnedRoomId, OwnedUserId, RoomId, UserId,
events::{
GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent,
room::member::MembershipState,
AnyStrippedStateEvent, GlobalAccountDataEventType, StateEventType,
push_rules::PushRulesEvent,
room::member::{MembershipState, RoomMemberEventContent},
},
push::Ruleset,
serde::Raw,
@@ -162,6 +163,14 @@ async fn migrate(services: &Services) -> Result<()> {
populate_userroomid_leftstate_table(services).await?;
}
if db["global"]
.get(FIXED_LOCAL_INVITE_STATE_MARKER)
.await
.is_not_found()
{
fix_local_invite_state(services).await?;
}
assert_eq!(
services.globals.db.database_version().await,
DATABASE_VERSION,
@@ -721,3 +730,46 @@ async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
db.db.sort()?;
Ok(())
}
const FIXED_LOCAL_INVITE_STATE_MARKER: &str = "fix_local_invite_state";
async fn fix_local_invite_state(services: &Services) -> Result {
// Clean up the effects of !1249 by caching stripped state for invites
type KeyVal<'a> = (Key<'a>, Raw<Vec<AnyStrippedStateEvent>>);
type Key<'a> = (&'a UserId, &'a RoomId);
let db = &services.db;
let cork = db.cork_and_sync();
let userroomid_invitestate = services.db["userroomid_invitestate"].clone();
// for each user invited to a room
let fixed = userroomid_invitestate.stream()
// if they're a local user on this homeserver
.try_filter(|((user_id, _), _): &KeyVal<'_>| ready(services.globals.user_is_local(user_id)))
.and_then(async |((user_id, room_id), stripped_state): KeyVal<'_>| Ok::<_, conduwuit::Error>((user_id.to_owned(), room_id.to_owned(), stripped_state.deserialize()?)))
.try_fold(0_usize, async |mut fixed, (user_id, room_id, stripped_state)| {
// and their invite state is None
if stripped_state.is_empty()
// and they are actually invited to the room
&& let Ok(membership_event) = services.rooms.state_accessor.room_state_get(&room_id, &StateEventType::RoomMember, user_id.as_str()).await
&& membership_event.get_content::<RoomMemberEventContent>().is_ok_and(|content| content.membership == MembershipState::Invite)
// and the invite was sent by a local user
&& services.globals.user_is_local(&membership_event.sender) {
// build and save stripped state for their invite in the database
let stripped_state = services.rooms.state.summary_stripped(&membership_event, &room_id).await;
userroomid_invitestate.put((&user_id, &room_id), Json(stripped_state));
fixed = fixed.saturating_add(1);
}
Ok(fixed)
})
.await?;
drop(cork);
info!(?fixed, "Fixed local invite state cache entries.");
db["global"].insert(FIXED_LOCAL_INVITE_STATE_MARKER, []);
db.db.sort()?;
Ok(())
}
@@ -1,7 +1,7 @@
use std::borrow::Borrow;
use conduwuit::{
Result, err, implement,
Pdu, Result, err, implement,
matrix::{Event, StateKey},
};
use futures::{Stream, StreamExt, TryFutureExt};
@@ -84,7 +84,7 @@ pub async fn room_state_get(
room_id: &RoomId,
event_type: &StateEventType,
state_key: &str,
) -> Result<impl Event> {
) -> Result<Pdu> {
self.services
.state
.get_room_shortstatehash(room_id)
+2
View File
@@ -30,6 +30,7 @@ struct Services {
config: Dep<config::Service>,
globals: Dep<globals::Service>,
metadata: Dep<rooms::metadata::Service>,
state: Dep<rooms::state::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
users: Dep<users::Service>,
}
@@ -64,6 +65,7 @@ impl crate::Service for Service {
config: args.depend::<config::Service>("config"),
globals: args.depend::<globals::Service>("globals"),
metadata: args.depend::<rooms::metadata::Service>("rooms::metadata"),
state: args.depend::<rooms::state::Service>("rooms::state"),
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
users: args.depend::<users::Service>("users"),
+2 -4
View File
@@ -118,10 +118,8 @@ pub async fn update_membership(
self.mark_as_joined(user_id, room_id);
},
| MembershipState::Invite => {
// TODO: make sure that passing None for `last_state` is correct behavior.
// the call from `append_pdu` used to use `services.state.summary_stripped`
// to fill that parameter.
self.mark_as_invited(user_id, room_id, pdu.sender(), None, None)
let last_state = self.services.state.summary_stripped(pdu, room_id).await;
self.mark_as_invited(user_id, room_id, pdu.sender(), Some(last_state), None)
.await?;
},
| MembershipState::Leave | MembershipState::Ban => {
+1 -1
View File
@@ -9,7 +9,7 @@
<li>Read the <a href="https://continuwuity.org/introduction">documentation</a></li>
<li>Join the <a href="https://matrix.to/#/#continuwuity:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">Continuwuity Matrix room</a> or <a href="https://matrix.to/#/#space:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">space</a></li>
<li>Log in with a <a href="https://matrix.org/ecosystem/clients/">client</a></li>
<li>Ensure <a href="https://federationtester.matrix.org/#{{ server_name }}">federation</a> works</li>
<li>Ensure <a href="https://federationtester.mtrnord.blog/?serverName={{ server_name }}">federation</a> works</li>
</ul>
</div>