Files
continuwuity/src/api/client/search.rs
T
2026-04-28 09:16:57 -04:00

225 lines
5.0 KiB
Rust

use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{
Err, Result, at, debug_warn, is_true,
matrix::Event,
result::FlatOk,
utils::{IterStream, stream::ReadyExt},
};
use conduwuit_service::{Services, rooms::search::RoomQuery};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use ruma::{
OwnedRoomId, RoomId, UInt, UserId,
api::client::search::search_events::{
self,
v3::{
Criteria, EventContextResult, ResultCategories, ResultGroupMapsByGroupingKey,
ResultRoomEvents, SearchResult,
},
},
assign,
events::AnyStateEvent,
serde::Raw,
};
use search_events::v3::{Request, Response};
use crate::Ruma;
type RoomStates = BTreeMap<OwnedRoomId, RoomState>;
type RoomState = Vec<Raw<AnyStateEvent>>;
const LIMIT_DEFAULT: usize = 10;
const LIMIT_MAX: usize = 100;
const BATCH_MAX: usize = 20;
/// # `POST /_matrix/client/r0/search`
///
/// Searches rooms for messages.
///
/// - Only works if the user is currently joined to the room (TODO: Respect
/// history visibility)
pub(crate) async fn search_events_route(
State(services): State<crate::State>,
body: Ruma<Request>,
) -> Result<Response> {
let sender_user = body.sender_user();
let next_batch = body.next_batch.as_deref();
let mut result_categories = ResultCategories::new();
if let Some(criteria) = &body.search_categories.room_events {
result_categories.room_events =
category_room_events(&services, sender_user, next_batch, criteria).await?;
}
Ok(Response::new(result_categories))
}
#[allow(clippy::map_unwrap_or)]
async fn category_room_events(
services: &Services,
sender_user: &UserId,
next_batch: Option<&str>,
criteria: &Criteria,
) -> Result<ResultRoomEvents> {
let filter = &criteria.filter;
let limit: usize = filter
.limit
.map(TryInto::try_into)
.flat_ok()
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
let next_batch: usize = next_batch
.map(str::parse)
.transpose()?
.unwrap_or(0)
.min(limit.saturating_mul(BATCH_MAX));
let rooms = filter
.rooms
.clone()
.map(IntoIterator::into_iter)
.map(IterStream::stream)
.map(StreamExt::boxed)
.unwrap_or_else(|| services.rooms.state_cache.rooms_joined(sender_user).boxed());
let results: Vec<_> = rooms
.filter_map(|room_id| async move {
check_room_visible(services, sender_user, &room_id, criteria)
.await
.is_ok()
.then_some(room_id)
})
.filter_map(|room_id| async move {
let query = RoomQuery {
room_id: &room_id,
user_id: Some(sender_user),
criteria,
skip: next_batch,
limit,
};
let (count, results) = services
.rooms
.search
.search_pdus(&query, sender_user)
.await
.ok()?;
results
.collect::<Vec<_>>()
.map(|results| (room_id.clone(), count, results))
.map(Some)
.await
})
.collect()
.await;
let total: UInt = results
.iter()
.fold(0, |a: usize, (_, count, _)| a.saturating_add(*count))
.try_into()
.expect("total results should fit into a UInt");
let state: RoomStates = results
.iter()
.stream()
.ready_filter(|_| criteria.include_state.is_some_and(is_true!()))
.filter_map(|(room_id, ..)| async move {
procure_room_state(services, room_id)
.map_ok(|state| (room_id.clone(), state))
.await
.ok()
})
.collect()
.await;
let results: Vec<SearchResult> = results
.into_iter()
.map(at!(2))
.flatten()
.stream()
.then(|mut pdu| async {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to search result: {e}");
}
pdu
})
.map(Event::into_format)
.map(|result| {
assign!(SearchResult::new(), {
rank: None,
result: Some(result),
context: EventContextResult::default() // TODO
})
})
.collect()
.await;
let highlights = criteria
.search_term
.split_terminator(|c: char| !c.is_alphanumeric())
.map(str::to_lowercase)
.collect();
let next_batch = (results.len() >= limit)
.then_some(next_batch.saturating_add(results.len()))
.as_ref()
.map(ToString::to_string);
Ok(assign!(ResultRoomEvents::new(), {
count: Some(total),
next_batch,
results,
state,
highlights,
groups: ResultGroupMapsByGroupingKey::default(), // TODO
}))
}
async fn procure_room_state(services: &Services, room_id: &RoomId) -> Result<RoomState> {
let state = services
.rooms
.state_accessor
.room_state_full_pdus(room_id)
.map_ok(Event::into_format)
.try_collect()
.await?;
Ok(state)
}
async fn check_room_visible(
services: &Services,
user_id: &UserId,
room_id: &RoomId,
search: &Criteria,
) -> Result {
let check_visible = search.filter.rooms.is_some();
let check_state = check_visible && search.include_state.is_some_and(is_true!());
let is_joined =
!check_visible || services.rooms.state_cache.is_joined(user_id, room_id).await;
let state_visible = !check_state
|| services
.rooms
.state_accessor
.user_can_see_state_events(user_id, room_id)
.await;
if !is_joined || !state_visible {
return Err!(Request(Forbidden("You don't have permission to view {room_id:?}")));
}
Ok(())
}