Compare commits

...

10 Commits

Author SHA1 Message Date
timedout 93b9007e1d fix: Unreliability in kicks & bans 2026-02-14 15:23:22 +00:00
timedout 2919c8e636 chore: Add double-lock flag 2026-02-14 14:24:13 +00:00
timedout 5ad653ab86 chore: Revert leaked changes to leave.rs 2026-02-14 14:17:23 +00:00
nexy7574 c9d9ed0a90 feat: Port room takeover and shutdown commands from continuwuity.rocks 2026-02-14 14:12:52 +00:00
arxari d9520f9382 Change the federation testing site in the docs to a more verbose one
The new site is easy to use at a glance but provides more advanced info if needed

Nexxy approved https://matrix.to/#/#offtopic:continuwuity.org/$rHSywj-s3v9onrROBcwDCHnnOpPVFbu0-Xgrh9A4btw
2026-02-12 20:13:47 +00:00
arxari 40bb5366bb Change the federation testing site to a more verbose one
The new site is easy to use at a glance but provides more advanced info if needed

Nexxy approved https://matrix.to/#/#offtopic:continuwuity.org/$rHSywj-s3v9onrROBcwDCHnnOpPVFbu0-Xgrh9A4btw
2026-02-12 20:11:20 +00:00
timedout f82bd77073 style: Fix clippy issues 2026-02-12 19:10:13 +00:00
timedout 7d84ba5ff2 fix: Don't include latest_events in output 2026-02-12 17:37:29 +00:00
timedout 69a8937584 fix: Complement runner 2026-02-12 17:23:39 +00:00
timedout b2ec13d342 fix: Redo the get_missing_events federation route 2026-02-12 16:48:12 +00:00
6 changed files with 691 additions and 51 deletions
+1 -1
View File
@@ -1,9 +1,9 @@
# Local build and dev artifacts # Local build and dev artifacts
target/ target/
!target/debug/conduwuit
# Docker files # Docker files
Dockerfile* Dockerfile*
docker/
# IDE files # IDE files
.vscode .vscode
+3 -3
View File
@@ -2,9 +2,9 @@ FROM ubuntu:latest
EXPOSE 8008 EXPOSE 8008
EXPOSE 8448 EXPOSE 8448
RUN apt-get update && apt-get install -y ca-certificates liburing2 && rm -rf /var/lib/apt/lists/* RUN apt-get update && apt-get install -y ca-certificates liburing2 && rm -rf /var/lib/apt/lists/*
RUN mkdir -p /etc/continuwuity /var/lib/continuwuity RUN mkdir -p /etc/continuwuity /var/lib/continuwuity /usr/local/bin/
COPY docker/complement-entrypoint.sh /usr/local/bin/complement-entrypoint.sh COPY complement/complement-entrypoint.sh /usr/local/bin/complement-entrypoint.sh
COPY docker/complement.config.toml /etc/continuwuity/config.toml COPY complement/complement.config.toml /etc/continuwuity/config.toml
COPY target/debug/conduwuit /usr/local/bin/conduwuit COPY target/debug/conduwuit /usr/local/bin/conduwuit
RUN chmod +x /usr/local/bin/conduwuit /usr/local/bin/complement-entrypoint.sh RUN chmod +x /usr/local/bin/conduwuit /usr/local/bin/complement-entrypoint.sh
#HEALTHCHECK --interval=30s --timeout=5s CMD curl --fail http://localhost:8008/_continuwuity/server_version || exit 1 #HEALTHCHECK --interval=30s --timeout=5s CMD curl --fail http://localhost:8008/_continuwuity/server_version || exit 1
+1 -1
View File
@@ -269,7 +269,7 @@ curl https://your.server.name:8448/_matrix/federation/v1/version
``` ```
- To check if your server can communicate with other homeservers, use the - To check if your server can communicate with other homeservers, use the
[Matrix Federation Tester](https://federationtester.matrix.org/). If you can [Matrix Federation Tester](https://federationtester.mtrnord.blog/). If you can
register but cannot join federated rooms, check your configuration and verify register but cannot join federated rooms, check your configuration and verify
that port 8448 is open and forwarded correctly. that port 8448 is open and forwarded correctly.
+622 -2
View File
@@ -1,12 +1,29 @@
use std::collections::BTreeMap;
use api::client::leave_room; use api::client::leave_room;
use clap::Subcommand; use clap::Subcommand;
use conduwuit::{ use conduwuit::{
Err, Result, debug, info, Err, Result, RoomVersion, debug, info,
utils::{IterStream, ReadyExt}, utils::{IterStream, ReadyExt},
warn, warn,
}; };
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId}; use ruma::{
Int, OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId,
events::{
StateEventType,
room::{
create::RoomCreateEventContent,
history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
join_rules::{JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
power_levels::RoomPowerLevelsEventContent,
tombstone::RoomTombstoneEventContent,
},
},
exports::serde::Deserialize,
};
use serde_json::json;
use crate::{admin_command, admin_command_dispatch, get_room_info}; use crate::{admin_command, admin_command_dispatch, get_room_info};
@@ -43,6 +60,59 @@ pub enum RoomModerationCommand {
/// information /// information
no_details: bool, no_details: bool,
}, },
/// - Take over a room by puppeting a local user into giving you a higher
/// power level
Takeover {
/// Whether to force joining the room if no local users are in the room
#[arg(long)]
force: bool,
/// The room in the format of `!roomid:example.com` or a room alias in
/// the format of `#roomalias:example.com`
room: OwnedRoomOrAliasId,
},
/// - Shut down a room, as much is possible. **This is immediate and
/// irreversible**.
///
/// This command requires that you have a local user in the room with at
/// least a moderator power level. It will first attempt to raise power
/// levels so that nobody can use the room further, then remove the
/// canonical alias event, sets the history visibility to `joined`,
/// sets the join rules to `org.continuwuity.shutdown` (preventing anyone
/// from joining even with an invite), and then bans or kicks all users,
/// setting the MSC4293 "redact events" flag on those users if possible.
/// Finally, it will send a room tombstone event, which will effectively
/// make the room unusable on most clients even if the room state resets.
///
/// This effectively will make the room unusable, unjoinable, and removes
/// everyone from it. This is as close to a "shutdown" as you can get with
/// federation.
ShutdownRoom {
/// If no local users with a power level are joined to the room, setting
/// this flag will attempt one, and will join the user with the
/// highest power level to the room to perform the shutdown.
///
/// If this flag is not set, and no local users can perform the
/// shutdown, no further attempt will be made.
#[arg(long)]
force: bool,
/// Whether to use MSC4293 fields to indicate that all messages in the
/// room should be redacted. This will make it more difficult for
/// clients that implement MSC4293 (like Element) to render the room
/// in the event users manage to rejoin.
#[arg(long)]
redact: bool,
///
#[arg(long)]
yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process:
bool,
/// The room in the format of `!roomid:example.com` or a room alias in
/// the format of `#roomalias:example.com`
room: OwnedRoomOrAliasId,
},
} }
#[admin_command] #[admin_command]
@@ -468,3 +538,553 @@ async fn list_banned_rooms(&self, no_details: bool) -> Result {
self.write_str(&format!("Rooms Banned ({num}):\n```\n{body}\n```",)) self.write_str(&format!("Rooms Banned ({num}):\n```\n{body}\n```",))
.await .await
} }
#[admin_command]
async fn takeover(&self, force: bool, room: OwnedRoomOrAliasId) -> Result {
let room_id = if room.is_room_id() {
let room_id = match RoomId::parse(&room) {
| Ok(room_id) => room_id,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
room_id.to_owned()
} else if room.is_room_alias_id() {
let room_alias = match RoomAliasId::parse(&room) {
| Ok(room_alias) => room_alias,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
| Ok((room_id, servers)) => {
debug!(
?room_id,
?servers,
"Got federation response fetching room ID for room {room}"
);
room_id
},
| Err(e) => {
return Err!("Failed to resolve room alias {room} to a room ID: {e}");
},
}
} else {
return Err!(
"Room specified is not a room ID or room alias. Please note that this requires a \
full room ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`)",
);
};
let room_version =
RoomVersion::new(&self.services.rooms.state.get_room_version(&room_id).await?)?;
let Ok(create_content) = self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomCreateEventContent>(
&room_id,
&StateEventType::RoomCreate,
"",
)
.await
else {
return Err!("Failed to get room create event");
};
let mut power_levels = match self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomPowerLevelsEventContent>(
&room_id,
&StateEventType::RoomPowerLevels,
"",
)
.await
{
| Ok(content) => content,
| Err(e) => {
return Err!("Failed to get power levels for room {room_id}: {e}");
},
};
let local_creators = if room_version.explicitly_privilege_room_creators
&& create_content.additional_creators.is_some()
{
create_content
.additional_creators
.clone()
.unwrap()
.into_iter()
.filter(|user_id| self.services.globals.user_is_local(user_id))
.collect::<Vec<_>>()
} else {
vec![]
};
let local_users = power_levels
.users
.iter()
.filter(|(user_id, _)| self.services.globals.user_is_local(user_id))
.map(|(user_id, level)| (user_id.clone(), *level))
.collect::<BTreeMap<_, _>>();
let min_pl = power_levels
.events
.get(&StateEventType::RoomPowerLevels.into())
.copied()
.unwrap_or(power_levels.state_default);
let mut ordered_users = local_users
.iter()
.chain(local_creators.iter().map(|user_id| (user_id, &Int::MAX)))
.map(|(user_id, level)| {
if local_creators.contains(user_id) {
(user_id, Int::MAX)
} else {
(user_id, *level)
}
})
.filter(|(user_id, level)| *level >= min_pl || local_creators.contains(*user_id))
.collect::<Vec<_>>();
ordered_users.sort_by_key(|(_, level)| level.saturating_mul(Int::from(-1)));
for (user_id, powerlevel) in ordered_users {
if !self
.services
.rooms
.state_cache
.is_joined(user_id.as_ref(), &room_id)
.await
{
if !force {
continue;
}
info!("Joining {user_id} to room {room_id} to perform takeover");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(user_id.as_str()),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to join {user_id} to room {room_id} to perform takeover: {e}");
drop(lock);
continue;
}
drop(lock);
}
info!("Promoting you to power level {powerlevel} in room {room_id} via {user_id}");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
power_levels
.users
.insert(self.sender.expect("you should exist").to_owned(), powerlevel);
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(String::new(), &power_levels),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to promote you to power level {powerlevel} in room {room_id} via \
{user_id}: {e}"
);
drop(lock);
continue;
}
return self
.write_str(&format!(
"Successfully promoted you to power level {powerlevel} in room {room_id} via \
{user_id}"
))
.await;
}
self.write_str("Failed to promote you, no local users with sufficient power level found.")
.await
}
#[admin_command]
async fn shutdown_room(
&self,
force: bool,
redact: bool,
yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process: bool,
room: OwnedRoomOrAliasId,
) -> Result {
let room_id = if room.is_room_id() {
let room_id = match RoomId::parse(&room) {
| Ok(room_id) => room_id,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
room_id.to_owned()
} else if room.is_room_alias_id() {
let room_alias = match RoomAliasId::parse(&room) {
| Ok(room_alias) => room_alias,
| Err(e) => {
return Err!(
"Failed to parse room ID {room}. Please note that this requires a full room \
ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`): {e}"
);
},
};
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
| Ok((room_id, servers)) => {
debug!(
?room_id,
?servers,
"Got federation response fetching room ID for room {room}"
);
room_id
},
| Err(e) => {
return Err!("Failed to resolve room alias {room} to a room ID: {e}");
},
}
} else {
return Err!(
"Room specified is not a room ID or room alias. Please note that this requires a \
full room ID (`!awIh6gGInaS5wLQJwa:example.com`) or a room alias \
(`#roomalias:example.com`)",
);
};
if !yes_i_am_sure_i_want_to_irreversibly_shutdown_this_room_destroying_it_in_the_process {
return Err!(
"This command is irreversible and will immediately shutdown the room, making it \
completely unusable if successful. If you are sure you want to do this, add the \
flag --yes-i-am-sure-i-want-to-irreversibly-shutdown-this-room-destroying-it-in-the-process \
to your command."
);
}
let mut power_levels: RoomPowerLevelsEventContent = match self
.services
.rooms
.state_accessor
.room_state_get_content(&room_id, &StateEventType::RoomPowerLevels, "")
.await
.map_err(|e| Err!("Failed to get power levels for room {room_id}: {e}"))
{
| Ok(content) => content,
| Err(e) => {
return e;
},
};
let mut joined_users = self
.services
.rooms
.state_cache
.room_members(&room_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await;
let room_version =
RoomVersion::new(&self.services.rooms.state.get_room_version(&room_id).await?)?;
let Ok(create_content) = self
.services
.rooms
.state_accessor
.room_state_get_content::<RoomCreateEventContent>(
&room_id,
&StateEventType::RoomCreate,
"",
)
.await
else {
return Err!("Failed to get room create event");
};
let local_creators = if room_version.explicitly_privilege_room_creators
&& create_content.additional_creators.is_some()
{
create_content
.additional_creators
.unwrap()
.into_iter()
.filter(|user_id| self.services.globals.user_is_local(user_id))
.collect::<Vec<_>>()
} else {
vec![]
};
let local_users = power_levels
.users
.iter()
.filter(|(user_id, _)| self.services.globals.user_is_local(user_id))
.map(|(user_id, level)| (user_id.clone(), *level))
.collect::<BTreeMap<_, _>>();
let mut ordered_users = local_users
.iter()
.chain(local_creators.iter().map(|user_id| (user_id, &Int::MAX)))
.map(|(user_id, level)| {
if local_creators.contains(user_id) {
(user_id, Int::MAX)
} else {
(user_id, *level)
}
})
.collect::<Vec<_>>();
ordered_users.sort_by_key(|(_, level)| level.saturating_mul(Int::from(-1)));
let mut changed_join_rules = false;
let mut changed_history_visibility = false;
let mut changed_power_levels = false;
let mut sent_tombstone = false;
let mut removed_ok: u32 = 0;
for (user_id, powerlevel) in ordered_users {
if !self
.services
.rooms
.state_cache
.is_joined(user_id.as_ref(), &room_id)
.await
{
if !force {
continue;
}
info!("Joining {user_id} to room {room_id} to perform shutdown");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(user_id.as_str()),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to join {user_id} to room {room_id} to perform shutdown: {e}");
drop(lock);
continue;
}
drop(lock);
}
if !changed_power_levels {
info!("Raising minimum power levels to {powerlevel} via {user_id}");
power_levels.events_default = power_levels.events_default.max(powerlevel);
power_levels.state_default = power_levels.state_default.max(powerlevel);
if power_levels.users_default < powerlevel {
power_levels.users_default = Int::MIN;
}
power_levels.kick = power_levels.kick.max(powerlevel);
power_levels.ban = power_levels.ban.max(powerlevel);
for (event_type, event_pl) in power_levels.events.clone() {
power_levels
.events
.insert(event_type, event_pl.max(powerlevel));
}
for (user, user_pl) in power_levels.users.clone() {
if user_pl < powerlevel {
power_levels.users.remove(&user);
}
}
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(String::new(), &power_levels.clone()),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to raise power levels to {powerlevel} in room {room_id} via \
{user_id}: {e}"
);
} else {
changed_power_levels = true;
}
drop(lock);
}
if !changed_join_rules {
info!("Setting room to private via {user_id}");
// NOTE: Setting the room to `private` soft-bricks it, as new joins with this
// join rule can actually be authorised.
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomJoinRulesEventContent::new(
JoinRule::deserialize(json!("\"org.continuwuity.shutdown\""))
.expect("valid fixed json"),
),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to set room to private in room {room_id} via {user_id}: {e}");
} else {
changed_join_rules = true;
}
drop(lock);
}
if !changed_history_visibility {
info!("Setting history visibility to joined via {user_id}");
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomHistoryVisibilityEventContent::new(HistoryVisibility::Joined),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!(
"Failed to set history visibility to joined in room {room_id} via \
{user_id}: {e}"
);
} else {
changed_history_visibility = true;
}
drop(lock);
}
info!("Removing {} users in {room_id} via {user_id}", joined_users.len());
let lock = self.services.rooms.state.mutex.lock(&room_id).await;
for remove_user in &joined_users.clone() {
if remove_user == user_id || self.services.admin.user_is_admin(user_id).await {
continue;
}
let user_pl = power_levels
.users
.get(remove_user)
.copied()
.unwrap_or(power_levels.users_default);
let new_membership = if power_levels.ban <= powerlevel && user_pl < powerlevel {
MembershipState::Ban
} else {
MembershipState::Leave
};
debug!("Removing {remove_user} via {user_id}");
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::from(remove_user.as_str()),
&RoomMemberEventContent {
membership: new_membership.clone(),
redact_events: if redact { Some(true) } else { None },
..RoomMemberEventContent::new(new_membership.clone())
},
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to remove {remove_user} via {user_id}: {e}");
continue;
}
removed_ok = removed_ok.saturating_add(1);
if self.services.globals.user_is_local(remove_user) {
self.services
.rooms
.state_cache
.forget(&room_id, remove_user);
}
joined_users.retain(|u| u != remove_user);
}
if !sent_tombstone {
info!("Sending tombstone event for {room_id} via {user_id}");
if let Err(e) = self
.services
.rooms
.timeline
.build_and_append_pdu(
conduwuit::pdu::Builder::state(
String::new(),
&RoomTombstoneEventContent::new(
format!("Room {room_id} has been shut down"),
room_id.clone(),
),
),
user_id,
Some(&room_id),
&lock,
)
.await
{
warn!("Failed to send tombstone event for {room_id} via {user_id}: {e}");
} else {
sent_tombstone = true;
}
}
}
self.write_str(&format!(
"Room shutdown complete, removed {removed_ok} users, changed join rules: \
{changed_join_rules}.\nConsider banning the room with `ban-room`.",
))
.await
}
+63 -43
View File
@@ -1,6 +1,9 @@
use std::collections::{HashSet, VecDeque};
use axum::extract::State; use axum::extract::State;
use conduwuit::{Err, Result, debug, debug_error, info, utils::to_canonical_object}; use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn};
use ruma::api::federation::event::get_missing_events; use ruma::{OwnedEventId, api::federation::event::get_missing_events};
use serde_json::{json, value::RawValue};
use super::AccessCheck; use super::AccessCheck;
use crate::Ruma; use crate::Ruma;
@@ -45,59 +48,76 @@ pub(crate) async fn get_missing_events_route(
.unwrap_or(LIMIT_DEFAULT) .unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX); .min(LIMIT_MAX);
let mut queued_events = body.latest_events.clone(); let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
// the vec will never have more entries the limit
let mut events = Vec::with_capacity(limit);
let mut i: usize = 0; let mut queue: VecDeque<OwnedEventId> = VecDeque::from(body.latest_events.clone());
while i < queued_events.len() && events.len() < limit { let mut results: Vec<Box<RawValue>> = Vec::with_capacity(limit);
let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else { let mut seen: HashSet<OwnedEventId> = HashSet::from_iter(body.earliest_events.clone());
debug!(
body.origin = body.origin.as_ref().map(tracing::field::display), while let Some(next_event_id) = queue.pop_front() {
"Event {} does not exist locally, skipping", &queued_events[i] if seen.contains(&next_event_id) {
); trace!(%next_event_id, "already seen event, skipping");
i = i.saturating_add(1);
continue; continue;
}
if results.len() >= limit {
debug!(%next_event_id, "reached limit of events to return, breaking");
break;
}
let mut pdu = match services.rooms.timeline.get_pdu(&next_event_id).await {
| Ok(pdu) => pdu,
| Err(e) => {
warn!("could not find event {next_event_id} while walking missing events: {e}");
continue;
},
}; };
if pdu.room_id_or_hash() != body.room_id {
if body.earliest_events.contains(&queued_events[i]) { return Err!(Request(Unknown(
i = i.saturating_add(1); "Event {next_event_id} is not in room {}",
continue; body.room_id
)));
} }
if !services if !services
.rooms .rooms
.state_accessor .state_accessor
.server_can_see_event(body.origin(), &body.room_id, &queued_events[i]) .server_can_see_event(body.origin(), &body.room_id, pdu.event_id())
.await .await
{ {
debug!( debug!(%next_event_id, origin = %body.origin(), "redacting event origin cannot see");
body.origin = body.origin.as_ref().map(tracing::field::display), pdu.redact(&room_version, json!({}))?;
"Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id
);
i = i.saturating_add(1);
continue;
} }
i = i.saturating_add(1); trace!(
let Ok(event) = to_canonical_object(&pdu) else { %next_event_id,
debug_error!( prev_events = ?pdu.prev_events().collect::<Vec<_>>(),
body.origin = body.origin.as_ref().map(tracing::field::display), "adding event to results and queueing prev events"
"Failed to convert PDU in database to canonical JSON: {pdu:?}" );
); queue.extend(pdu.prev_events.clone());
continue; seen.insert(next_event_id.clone());
}; if body.latest_events.contains(&next_event_id) {
continue; // Don't include latest_events in results,
let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned); // but do include their prev_events in the queue
}
let event = services results.push(
.sending services
.convert_to_outgoing_federation_event(event) .sending
.await; .convert_to_outgoing_federation_event(to_canonical_object(pdu)?)
.await,
queued_events.extend(prev_events); );
events.push(event); trace!(
%next_event_id,
queue_len = queue.len(),
seen_len = seen.len(),
results_len = results.len(),
"event added to results"
);
} }
Ok(get_missing_events::v1::Response { events }) if !queue.is_empty() {
debug!("limit reached before queue was empty");
}
results.reverse(); // return oldest first
Ok(get_missing_events::v1::Response { events: results })
} }
+1 -1
View File
@@ -9,7 +9,7 @@
<li>Read the <a href="https://continuwuity.org/introduction">documentation</a></li> <li>Read the <a href="https://continuwuity.org/introduction">documentation</a></li>
<li>Join the <a href="https://matrix.to/#/#continuwuity:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">Continuwuity Matrix room</a> or <a href="https://matrix.to/#/#space:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">space</a></li> <li>Join the <a href="https://matrix.to/#/#continuwuity:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">Continuwuity Matrix room</a> or <a href="https://matrix.to/#/#space:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org">space</a></li>
<li>Log in with a <a href="https://matrix.org/ecosystem/clients/">client</a></li> <li>Log in with a <a href="https://matrix.org/ecosystem/clients/">client</a></li>
<li>Ensure <a href="https://federationtester.matrix.org/#{{ server_name }}">federation</a> works</li> <li>Ensure <a href="https://federationtester.mtrnord.blog/?serverName={{ server_name }}">federation</a> works</li>
</ul> </ul>
</div> </div>