Files
continuwuity/src/api/server/backfill.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

103 lines
2.3 KiB
Rust
Raw Normal View History

2024-08-08 17:18:30 +00:00
use std::cmp;
2024-07-16 08:05:25 +00:00
use axum::extract::State;
2024-12-14 21:58:01 -05:00
use conduwuit::{
2025-09-17 20:46:03 +00:00
Event, PduCount, Result,
result::LogErr,
utils::{IterStream, ReadyExt, stream::TryTools},
2024-06-05 04:32:58 +00:00
};
use futures::{FutureExt, StreamExt, TryStreamExt};
use ruma::{MilliSecondsSinceUnixEpoch, api::federation::backfill::get_backfill};
2024-06-05 04:32:58 +00:00
use super::AccessCheck;
2024-07-03 21:05:24 +00:00
use crate::Ruma;
2024-06-05 04:32:58 +00:00
/// arbitrary number but synapse's is 100 and we can handle lots of these
/// anyways
const LIMIT_MAX: usize = 150;
/// no spec defined number but we can handle a lot of these
const LIMIT_DEFAULT: usize = 50;
2024-06-05 04:32:58 +00:00
/// # `GET /_matrix/federation/v1/backfill/<room_id>`
///
/// Retrieves events from before the sender joined the room, if the room's
/// history visibility allows.
2024-07-16 08:05:25 +00:00
pub(crate) async fn get_backfill_route(
State(services): State<crate::State>,
ref body: Ruma<get_backfill::v1::Request>,
2024-07-16 08:05:25 +00:00
) -> Result<get_backfill::v1::Response> {
AccessCheck {
services: &services,
origin: body.origin(),
room_id: &body.room_id,
event_id: None,
2024-06-05 04:32:58 +00:00
}
.check()
.await?;
2024-06-05 04:32:58 +00:00
let limit = body
.limit
.try_into()
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
2024-06-05 04:32:58 +00:00
2024-11-08 05:49:28 +00:00
let from = body
.v
.iter()
.stream()
.filter_map(|event_id| {
2024-08-08 17:18:30 +00:00
services
.rooms
.timeline
2024-11-08 05:49:28 +00:00
.get_pdu_count(event_id)
.map(Result::ok)
2024-06-05 04:32:58 +00:00
})
2024-11-08 05:49:28 +00:00
.ready_fold(PduCount::min(), cmp::max)
2024-08-08 17:18:30 +00:00
.await;
2024-06-05 04:32:58 +00:00
Ok(get_backfill::v1::Response {
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
2024-11-08 05:49:28 +00:00
origin: services.globals.server_name().to_owned(),
pdus: services
.rooms
.timeline
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
.try_take(limit)
.try_filter_map(|(_, pdu)| async move {
Ok(services
2024-11-08 05:49:28 +00:00
.rooms
.state_accessor
2025-09-17 20:46:03 +00:00
.server_can_see_event(body.origin(), &pdu.room_id_or_hash(), &pdu.event_id)
2024-11-08 05:49:28 +00:00
.await
.then_some(pdu))
2024-11-08 05:49:28 +00:00
})
.and_then(async |mut pdu| {
// Strip the transaction ID, as that is private
pdu.remove_transaction_id().log_err().ok();
// Add age, as this is specified
pdu.add_age().log_err().ok();
// It's not clear if we should strip or add any more data, leave as is.
// In particular: Redaction?
Ok(pdu)
})
.try_filter_map(|pdu| async move {
Ok(services
2024-11-08 05:49:28 +00:00
.rooms
.timeline
.get_pdu_json(&pdu.event_id)
.await
.ok())
})
.and_then(|pdu| {
services
.sending
.convert_to_outgoing_federation_event(pdu)
.map(Ok)
2024-11-08 05:49:28 +00:00
})
.try_collect()
.await?,
2024-06-05 04:32:58 +00:00
})
}