mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
fix: This is some bullshit I tell you
This commit is contained in:
@@ -22,7 +22,7 @@ use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
use lettre::message::Mailbox;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
|
||||
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, UInt,
|
||||
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, ServerName, UInt,
|
||||
api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw,
|
||||
};
|
||||
use service::rooms::{
|
||||
@@ -1138,3 +1138,32 @@ pub(super) async fn send_test_email(&self) -> Result {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn walk_missing_events(
|
||||
&self,
|
||||
latest: OwnedEventId,
|
||||
earliest: OwnedEventId,
|
||||
via: String,
|
||||
) -> Result {
|
||||
let earliest_pdu = self.services.rooms.timeline.get_pdu(&earliest).await?;
|
||||
let events = self
|
||||
.services
|
||||
.rooms
|
||||
.event_handler
|
||||
.backfill_missing_events(
|
||||
earliest_pdu.room_id_or_hash(),
|
||||
HashSet::from_iter(vec![latest]),
|
||||
vec![earliest],
|
||||
ServerName::parse(via)?,
|
||||
)
|
||||
.await?;
|
||||
self.write_str(&format!("Found {} events:\n\n```\n", events.len()))
|
||||
.await?;
|
||||
for (event_id, event) in events {
|
||||
self.write_str(&format!("{event_id}: {:?}\n\n", serde_json::to_string(&event)))
|
||||
.await?;
|
||||
}
|
||||
self.write_str("\n```").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -32,6 +32,12 @@ pub enum DebugCommand {
|
||||
event_id: OwnedEventId,
|
||||
},
|
||||
|
||||
WalkMissingEvents {
|
||||
latest: OwnedEventId,
|
||||
earliest: OwnedEventId,
|
||||
via: String,
|
||||
},
|
||||
|
||||
/// Parse and print a PDU from a JSON
|
||||
///
|
||||
/// The PDU event is only checked for validity and is not added to the
|
||||
|
||||
@@ -47,7 +47,6 @@ use service::transactions::{
|
||||
FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse,
|
||||
};
|
||||
use tokio::sync::watch::{Receiver, Sender};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::{
|
||||
cmp::min,
|
||||
collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
|
||||
time::Instant,
|
||||
};
|
||||
@@ -9,7 +8,6 @@ use conduwuit::{
|
||||
matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort,
|
||||
trace, utils::continue_exponential_backoff_secs, warn,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||
OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt,
|
||||
@@ -98,37 +96,33 @@ impl super::Service {
|
||||
///
|
||||
/// This function does not persist the events. The caller is responsible for
|
||||
/// passing them through handle_incoming_pdu.
|
||||
pub(super) async fn backfill_missing_events(
|
||||
pub async fn backfill_missing_events(
|
||||
&self,
|
||||
room_id: OwnedRoomId,
|
||||
head: Vec<OwnedEventId>,
|
||||
head: HashSet<OwnedEventId>,
|
||||
tail: Vec<OwnedEventId>,
|
||||
via: OwnedServerName,
|
||||
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
|
||||
if head.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
let tail = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(&room_id)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
// TODO: min_depth is probably necessary to avoid fetching the entire room
|
||||
// history if there are very long gaps
|
||||
let mut latest_events: HashSet<OwnedEventId> = HashSet::from_iter(head.clone());
|
||||
let mut latest_events = head.clone();
|
||||
let mut loop_count: u64 = 3;
|
||||
// Start with a base number of 3 so that we fetch 10, 16, 25, 36, etc
|
||||
// instead of 1, 2, 4, 9, so on.
|
||||
// Start with 3 so that we fetch 9, 16, 25, 36, 49, 64, 81, 100 events.
|
||||
// This gives steady growth to the server's typical limit of 100. It's unlikely
|
||||
// we'll end up close to that.
|
||||
let mut backfilled_events = HashMap::with_capacity(10);
|
||||
|
||||
while !latest_events.is_empty() {
|
||||
// TODO: holy clone()
|
||||
let frontier_before = latest_events.clone();
|
||||
let todo: Vec<OwnedEventId> = latest_events.clone().into_iter().collect();
|
||||
let mut request =
|
||||
get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone());
|
||||
let limit = min(loop_count.saturating_pow(2), 100);
|
||||
request.limit = limit
|
||||
.try_into()
|
||||
.expect("limit cannot be greater than 100, which fits into UInt");
|
||||
let limit = loop_count.saturating_pow(2).min(100);
|
||||
request.limit = limit.try_into().expect("limit must fit into UInt");
|
||||
|
||||
debug_info!(
|
||||
backfilled=%backfilled_events.len(),
|
||||
@@ -148,6 +142,7 @@ impl super::Service {
|
||||
.send_federation_request(&via, request)
|
||||
.await?;
|
||||
loop_count = loop_count.saturating_add(1);
|
||||
trace!(?response, "get_missing_events response");
|
||||
|
||||
// Some buggy servers (including old continuwuity) may return the same events
|
||||
// multiple times, which can cause this to be an infinite loop.
|
||||
@@ -160,6 +155,7 @@ impl super::Service {
|
||||
debug_info!("No more missing events found");
|
||||
break;
|
||||
}
|
||||
|
||||
for event in response.events {
|
||||
trace!("Parsing incoming event from backfill");
|
||||
let (incoming_room_id, event_id, pdu_json) =
|
||||
@@ -178,52 +174,41 @@ impl super::Service {
|
||||
debug!("Skipping known event {event_id}");
|
||||
continue;
|
||||
}
|
||||
if backfilled_events.contains_key(&event_id) {
|
||||
let retransmitted = backfilled_events.contains_key(&event_id);
|
||||
if retransmitted {
|
||||
debug_warn!(%via, %event_id, "Remote retransmitted event");
|
||||
continue;
|
||||
}
|
||||
// TODO: Should this be scoped to the GME session? We might end up incorrectly
|
||||
// assuming we've caught up if we do this
|
||||
if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await {
|
||||
debug!(%via, %event_id, "Already seen event in database");
|
||||
backfilled_events.insert(event_id.clone(), pdu);
|
||||
} else {
|
||||
unseen = unseen.saturating_add(1);
|
||||
if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await {
|
||||
debug!(%via, %event_id, "Already seen event in database");
|
||||
backfilled_events.insert(event_id.clone(), pdu);
|
||||
} else {
|
||||
unseen = unseen.saturating_add(1);
|
||||
}
|
||||
}
|
||||
let parsed = PduEvent::from_id_val(&event_id, pdu_json)
|
||||
.map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?;
|
||||
for prev_event_id in parsed.prev_events() {
|
||||
// Verify that we have all of this event's prev_events. If we don't, add it to
|
||||
// the work queue
|
||||
if !(backfilled_events.contains_key(prev_event_id)
|
||||
|| self.services.timeline.pdu_exists(prev_event_id).await)
|
||||
{
|
||||
latest_events.insert(prev_event_id.to_owned());
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
backfilled_events.insert(event_id.clone(), parsed);
|
||||
}
|
||||
for event_id in todo {
|
||||
latest_events.remove(&event_id);
|
||||
if let hash_map::Entry::Vacant(e) = backfilled_events.entry(event_id.clone()) {
|
||||
let evt = self.services.timeline.get_pdu(&event_id).await?;
|
||||
e.insert(evt);
|
||||
if !retransmitted {
|
||||
backfilled_events.insert(event_id.clone(), parsed);
|
||||
}
|
||||
}
|
||||
latest_events.retain(|event_id| !backfilled_events.contains_key(event_id));
|
||||
debug!(
|
||||
count=%chunk_len,
|
||||
new=%unseen,
|
||||
remaining=%latest_events.len(),
|
||||
"Got missing events"
|
||||
);
|
||||
if unseen == 0 {
|
||||
debug_warn!("Didn't see any new events, breaking cycle");
|
||||
break;
|
||||
} else if chunk_len < usize::try_from(limit)? {
|
||||
debug!(
|
||||
"Got less than the limit number of events, assuming there's no more to fetch"
|
||||
let frontier_changed = latest_events != frontier_before;
|
||||
if unseen == 0 && !frontier_changed {
|
||||
debug_warn!(
|
||||
"Didn't see any new events and the frontier did not change, breaking cycle"
|
||||
);
|
||||
break;
|
||||
}
|
||||
@@ -243,6 +228,7 @@ impl super::Service {
|
||||
/// b. Look at outlier pdu tree
|
||||
/// c. Ask origin server over federation
|
||||
/// d. TODO: Ask other servers over federation?
|
||||
#[deprecated]
|
||||
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use conduwuit::{Event, PduEvent, debug, debug_info, error, trace};
|
||||
use ruma::{OwnedEventId, RoomId, ServerName};
|
||||
use conduwuit::{
|
||||
Event, PduEvent, debug, debug_info,
|
||||
result::DebugInspect,
|
||||
utils::{BoolExt, IterStream, stream::BroadbandExt},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use ruma::{RoomId, ServerName};
|
||||
|
||||
use crate::rooms::event_handler::build_local_dag;
|
||||
|
||||
impl super::Service {
|
||||
/// Fetches any missing prev_events for this event and persists them before
|
||||
/// returning.
|
||||
pub(super) async fn fetch_prevs(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
@@ -13,106 +20,72 @@ impl super::Service {
|
||||
incoming_pdu: &PduEvent,
|
||||
origin: &ServerName,
|
||||
) -> conduwuit::Result<()> {
|
||||
let mut queue: VecDeque<OwnedEventId> = VecDeque::new();
|
||||
queue.push_back(incoming_pdu.event_id().to_owned());
|
||||
let missing = incoming_pdu
|
||||
.prev_events()
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
self.services
|
||||
.timeline
|
||||
.get_non_outlier_pdu_json(event_id)
|
||||
.await
|
||||
.inspect(|_| debug!("Found prev_event {event_id} locally."))
|
||||
.inspect_err(|e| debug!(%e, "Could not find prev_event {event_id} locally."))
|
||||
.is_ok()
|
||||
.or(|| event_id.to_owned())
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
if missing.is_empty() {
|
||||
debug!(event_id=%incoming_pdu.event_id(), "No missing prev events.");
|
||||
return Ok(());
|
||||
}
|
||||
debug!(%room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events");
|
||||
let tail = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(room_id)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
while let Some(event_id) = queue.pop_front() {
|
||||
debug!(event_id=%incoming_pdu.event_id, "Fetching any missing prev_events");
|
||||
let mut missing_prev_events: HashSet<OwnedEventId> =
|
||||
incoming_pdu.prev_events().map(ToOwned::to_owned).collect();
|
||||
for pid in missing_prev_events.clone() {
|
||||
if self.services.timeline.pdu_exists(&pid).await {
|
||||
trace!("Found prev event {pid} for outlier event {event_id} locally");
|
||||
missing_prev_events.remove(&pid);
|
||||
} else {
|
||||
debug_info!(
|
||||
"Could not find prev event {pid} for outlier event {event_id} locally, \
|
||||
will fetch over federation"
|
||||
);
|
||||
}
|
||||
}
|
||||
if !missing_prev_events.is_empty() {
|
||||
debug_info!(
|
||||
"Fetching {} missing prev events for outlier event {event_id}",
|
||||
missing_prev_events.len()
|
||||
);
|
||||
let backfilled = self
|
||||
.backfill_missing_events(
|
||||
room_id.to_owned(),
|
||||
vec![event_id.clone()],
|
||||
origin.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
debug_info!("Fetched {} missing events for {event_id}", backfilled.len());
|
||||
let mapped = backfilled
|
||||
.iter()
|
||||
.map(|(eid, evt)| {
|
||||
let mut obj = evt.to_canonical_object();
|
||||
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||
(eid.clone(), obj)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
let local_dag = if mapped.len() == 1 {
|
||||
mapped.keys().map(ToOwned::to_owned).collect()
|
||||
} else {
|
||||
build_local_dag(&mapped).await?
|
||||
};
|
||||
debug_info!("Preparing to handle {} missing events", backfilled.len());
|
||||
for prev_event_id in local_dag {
|
||||
let obj = mapped
|
||||
.get(&prev_event_id)
|
||||
.expect("We should have this event in memory");
|
||||
debug_info!("Handling prev event {prev_event_id}");
|
||||
match self
|
||||
.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&prev_event_id,
|
||||
room_id,
|
||||
obj.clone(),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(_) => {
|
||||
debug!("Successfully handled {prev_event_id} as an outlier");
|
||||
missing_prev_events.remove(&prev_event_id);
|
||||
},
|
||||
| Err(e) =>
|
||||
error!(error=?e, %prev_event_id, %event_id, "Failed to handle prev event"),
|
||||
}
|
||||
debug_info!("Finished handling prev");
|
||||
}
|
||||
}
|
||||
let outlier = self.services.timeline.get_pdu(&event_id).await;
|
||||
if missing_prev_events.is_empty()
|
||||
&& let Ok(pdu) = outlier
|
||||
{
|
||||
// promote any prevs first
|
||||
for prev_event_id in pdu.prev_events() {
|
||||
debug_info!("Promoting prev event {prev_event_id} to timeline");
|
||||
let prev_pdu = self.services.timeline.get_pdu(&event_id).await?;
|
||||
let val = prev_pdu.to_canonical_object();
|
||||
self.upgrade_outlier_to_timeline_pdu(
|
||||
prev_pdu,
|
||||
val,
|
||||
create_event,
|
||||
origin,
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
debug_info!("Finished prev promoting {prev_event_id} to timeline");
|
||||
}
|
||||
debug_info!("Promoting event {event_id} to timeline");
|
||||
let val = pdu.to_canonical_object();
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
|
||||
.await?;
|
||||
debug_info!("Finished promoting {event_id} to timeline");
|
||||
} else {
|
||||
debug!(?missing_prev_events, ok=%outlier.is_ok(), "Not promoting {event_id}");
|
||||
}
|
||||
let backfilled = self
|
||||
.backfill_missing_events(
|
||||
room_id.to_owned(),
|
||||
HashSet::from_iter(vec![incoming_pdu.event_id().to_owned()]),
|
||||
tail,
|
||||
origin.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
debug_info!("Fetched {} missing events", backfilled.len());
|
||||
|
||||
// Persist all fetched events
|
||||
let mapped = backfilled
|
||||
.iter()
|
||||
.map(|(eid, evt)| {
|
||||
let mut obj = evt.to_canonical_object();
|
||||
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||
(eid.clone(), obj)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let to_persist = if mapped.len() <= 1 {
|
||||
mapped.keys().map(ToOwned::to_owned).collect()
|
||||
} else {
|
||||
build_local_dag(&mapped).await?
|
||||
};
|
||||
|
||||
for event_id in to_persist {
|
||||
debug_info!("Persisting fetched prev event {event_id}");
|
||||
let pdu_event = backfilled.get(&event_id).cloned().unwrap_or_else(|| {
|
||||
panic!("Event {event_id} was in backfill response but not in map")
|
||||
});
|
||||
let obj = pdu_event.to_canonical_object();
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu_event, obj, create_event, origin, room_id)
|
||||
.await
|
||||
.debug_inspect(|_| debug_info!("Persisted fetched prev event {event_id}"))?;
|
||||
}
|
||||
|
||||
// NOTE because i keep forgetting: the caller persists incoming_pdu.
|
||||
// we only care about its prev events
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
|
||||
trace, warn,
|
||||
};
|
||||
use futures::future::ready;
|
||||
use futures::{StreamExt, future::ready};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||
events::StateEventType,
|
||||
@@ -118,10 +118,17 @@ where
|
||||
"Fetching {} missing auth events for outlier event {event_id}",
|
||||
missing_auth_events.len()
|
||||
);
|
||||
let tail = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(room_id)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
let backfilled = self
|
||||
.backfill_missing_events(
|
||||
room_id.to_owned(),
|
||||
vec![event_id.to_owned()],
|
||||
HashSet::from_iter(vec![event_id.to_owned()]),
|
||||
tail,
|
||||
origin.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::{
|
||||
};
|
||||
|
||||
use conduwuit::{
|
||||
Result, debug, err, error, implement,
|
||||
Result, debug, debug_error, err, error, implement,
|
||||
matrix::{Event, StateMap},
|
||||
trace,
|
||||
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
|
||||
@@ -37,6 +37,7 @@ where
|
||||
.pdu_shortstatehash(prev_event)
|
||||
.await
|
||||
else {
|
||||
trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state.");
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -99,6 +100,7 @@ where
|
||||
.map_ok(move |sstatehash| (sstatehash, prev_event))
|
||||
})
|
||||
.try_collect::<HashMap<_, _>>()
|
||||
.inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}"))
|
||||
.await
|
||||
else {
|
||||
return Ok(None);
|
||||
|
||||
@@ -173,6 +173,10 @@ impl Service {
|
||||
self.db.get_non_outlier_pdu_json(event_id).await
|
||||
}
|
||||
|
||||
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool {
|
||||
self.db.non_outlier_pdu_exists(event_id).await.is_ok()
|
||||
}
|
||||
|
||||
/// Returns the pdu's id.
|
||||
#[inline]
|
||||
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
|
||||
|
||||
Reference in New Issue
Block a user