refactor: Ruma upstreaming, half-baked edition

Co-authored-by: Jade Ellis <jade@ellis.link>
This commit is contained in:
Ginger
2026-03-29 12:25:42 -04:00
parent 1cc9dbf2a4
commit 204bc1367e
141 changed files with 2715 additions and 2279 deletions
+10 -4
View File
@@ -1,9 +1,11 @@
use std::{fmt::Debug, mem};
use std::{borrow::Cow, fmt::Debug, mem};
use bytes::BytesMut;
use conduwuit::{Err, Result, debug_error, err, utils, utils::response::LimitReadExt, warn};
use reqwest::Client;
use ruma::api::{IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken};
use ruma::api::{IncomingResponse, MatrixVersion, OutgoingRequest, auth_scheme::{AppserviceToken, SendAccessToken}, path_builder::VersionHistory};
use crate::SUPPORTED_VERSIONS;
/// Sends a request to an antispam service
pub(crate) async fn send_antispam_request<T>(
@@ -13,11 +15,15 @@ pub(crate) async fn send_antispam_request<T>(
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
T: OutgoingRequest::<Authentication = AppserviceToken, PathBuilder = VersionHistory> + Debug + Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_15];
let http_request = request
.try_into_http_request::<BytesMut>(base_url, SendAccessToken::Always(secret), &VERSIONS)?
.try_into_http_request::<BytesMut>(
base_url,
SendAccessToken::Always(secret),
Cow::Borrowed(&SUPPORTED_VERSIONS),
)?
.map(BytesMut::freeze);
let reqwest_request = reqwest::Request::try_from(http_request)?;
+3 -5
View File
@@ -5,7 +5,7 @@ use conduwuit::{
Err, Result, debug_error, err, implement, trace, utils, utils::response::LimitReadExt, warn,
};
use ruma::api::{
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
IncomingResponse, MatrixVersion, OutgoingRequest, appservice::Registration, auth_scheme::{AccessToken, SendAccessToken}, path_builder::SinglePath,
};
/// Sends a request to an appservice
@@ -19,10 +19,8 @@ pub async fn send_appservice_request<T>(
request: T,
) -> Result<Option<T::IncomingResponse>>
where
T: OutgoingRequest + Debug + Send,
T: OutgoingRequest<Authentication = AccessToken, PathBuilder = SinglePath> + Debug + Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_7];
let Some(dest) = registration.url else {
return Ok(None);
};
@@ -38,7 +36,7 @@ where
.try_into_http_request::<BytesMut>(
&dest,
SendAccessToken::Appservice(hs_token),
&VERSIONS,
(),
)
.map_err(|e| {
err!(BadServerResponse(
+1 -1
View File
@@ -245,7 +245,7 @@ fn parse_servercurrentevent(key: &[u8], value: &[u8]) -> Result<(Destination, Se
})?;
(
Destination::Federation(OwnedServerName::parse(&server).map_err(|_| {
Destination::Federation(ServerName::parse(&server).map_err(|_| {
Error::bad_database("Invalid server string in server_currenttransaction")
})?),
if value.is_empty() {
+30 -15
View File
@@ -19,7 +19,7 @@ use conduwuit::{
warn,
};
use futures::{FutureExt, Stream, StreamExt};
use ruma::{RoomId, ServerName, UserId, api::OutgoingRequest};
use ruma::{OwnedServerName, RoomId, ServerName, UserId, api::{OutgoingRequest, auth_scheme::NoAuthentication, federation::authentication::ServerSignatures, path_builder::PathBuilder}};
use tokio::{task, task::JoinSet};
use self::data::Data;
@@ -28,8 +28,8 @@ pub use self::{
sender::{EDU_LIMIT, PDU_LIMIT},
};
use crate::{
Dep, account_data, client, federation, globals, presence, pusher, rooms,
rooms::timeline::RawPduId, users,
Dep, account_data, client, federation::{self, FederationPathBuilderInput}, globals, presence, pusher, rooms::{self, timeline::RawPduId},
users,
};
pub struct Service {
@@ -187,9 +187,9 @@ impl Service {
}
#[tracing::instrument(skip(self, servers, pdu_id), level = "debug")]
pub async fn send_pdu_servers<'a, S>(&self, servers: S, pdu_id: &RawPduId) -> Result
pub async fn send_pdu_servers<S>(&self, servers: S, pdu_id: &RawPduId) -> Result
where
S: Stream<Item = &'a ServerName> + Send + 'a,
S: Stream<Item = OwnedServerName> + Send,
{
let requests = servers
.map(|server| {
@@ -233,14 +233,14 @@ impl Service {
}
#[tracing::instrument(skip(self, servers, serialized), level = "debug")]
pub async fn send_edu_servers<'a, S>(&self, servers: S, serialized: EduBuf) -> Result
pub async fn send_edu_servers<S>(&self, servers: S, serialized: EduBuf) -> Result
where
S: Stream<Item = &'a ServerName> + Send + 'a,
S: Stream<Item = OwnedServerName> + Send,
{
let requests = servers
.map(|server| {
(
Destination::Federation(server.to_owned()),
Destination::Federation(server),
SendingEvent::Edu(serialized.clone()),
)
})
@@ -269,12 +269,11 @@ impl Service {
}
#[tracing::instrument(skip(self, servers), level = "debug")]
pub async fn flush_servers<'a, S>(&self, servers: S) -> Result<()>
pub async fn flush_servers<S>(&self, servers: S) -> Result<()>
where
S: Stream<Item = &'a ServerName> + Send + 'a,
S: Stream<Item = OwnedServerName> + Send,
{
servers
.map(ToOwned::to_owned)
.map(Destination::Federation)
.map(Ok)
.ready_try_for_each(|dest| {
@@ -289,26 +288,26 @@ impl Service {
/// Sends a request to a federation server
#[inline]
pub async fn send_federation_request<T>(
pub async fn send_federation_request<'i, T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
T: OutgoingRequest::<Authentication = ServerSignatures, PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>> + Debug + Send,
{
self.services.federation.execute(dest, request).await
}
/// Like send_federation_request() but with a very large timeout
#[inline]
pub async fn send_synapse_request<T>(
pub async fn send_synapse_request<'i, T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
T: OutgoingRequest::<Authentication = ServerSignatures, PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>> + Debug + Send,
{
self.services
.federation
@@ -316,6 +315,22 @@ impl Service {
.await
}
/// Send an unauthenticated federation request with no X-Matrix header.
#[inline]
pub async fn send_unauthenticated_request<'i, T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest::<Authentication = NoAuthentication, PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>> + Debug + Send,
{
self.services
.federation
.execute_unauthenticated(dest, request)
.await
}
/// Clean up queued sending event data
///
/// Used after we remove an appservice registration or a user deletes a push
+22 -48
View File
@@ -422,7 +422,7 @@ impl Service {
let keys_changed = self
.services
.users
.room_keys_changed(room_id, Some(since.0), None)
.room_keys_changed(&room_id, Some(since.0), None)
.ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id));
pin_mut!(keys_changed);
@@ -432,21 +432,13 @@ impl Service {
}
max_edu_count.fetch_max(count, Ordering::Relaxed);
if !device_list_changes.insert(user_id.into()) {
if !device_list_changes.insert(user_id.clone()) {
continue;
}
// Empty prev id forces synapse to resync; because synapse resyncs,
// we can just insert placeholder data
let edu = Edu::DeviceListUpdate(DeviceListUpdateContent {
user_id: user_id.into(),
device_id: device_id!("placeholder").to_owned(),
device_display_name: Some("Placeholder".to_owned()),
stream_id: uint!(1),
prev_id: Vec::new(),
deleted: None,
keys: None,
});
let edu = Edu::DeviceListUpdate(DeviceListUpdateContent::new(user_id, device_id!("placeholder").to_owned(), uint!(1)));
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &edu)
@@ -479,7 +471,6 @@ impl Service {
.services
.state_cache
.server_rooms(server_name)
.map(ToOwned::to_owned)
.broad_filter_map(|room_id| async move {
let receipt_map = self
.select_edus_receipts_room(&room_id, since, max_edu_count, &mut num)
@@ -498,7 +489,7 @@ impl Service {
return None;
}
let receipt_content = Edu::Receipt(ReceiptContent { receipts });
let receipt_content = Edu::Receipt(ReceiptContent::new(receipts));
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &receipt_content)
@@ -556,10 +547,7 @@ impl Service {
.remove(&user_id)
.expect("our read receipts always have the user here");
let receipt_data = ReceiptData {
data: receipt,
event_ids: vec![event_id.clone()],
};
let receipt_data = ReceiptData::new(receipt, vec![event_id.clone()]);
if read.insert(user_id, receipt_data).is_none() {
*num = num.saturating_add(1);
@@ -569,7 +557,7 @@ impl Service {
}
}
ReceiptMap { read }
ReceiptMap::new(read)
}
/// Look for presence
@@ -617,16 +605,9 @@ impl Service {
continue;
};
let update = PresenceUpdate {
user_id: user_id.into(),
presence: presence_event.content.presence,
currently_active: presence_event.content.currently_active.unwrap_or(false),
status_msg: presence_event.content.status_msg,
last_active_ago: presence_event
.content
.last_active_ago
.unwrap_or_else(|| uint!(0)),
};
let mut update = PresenceUpdate::new(user_id.to_owned(), presence_event.content.presence, presence_event.content.last_active_ago.unwrap_or_else(|| uint!(0)));
update.currently_active = presence_event.content.currently_active.unwrap_or_default();
update.status_msg = presence_event.content.status_msg;
presence_updates.insert(user_id.into(), update);
if presence_updates.len() >= SELECT_PRESENCE_LIMIT {
@@ -638,9 +619,7 @@ impl Service {
return None;
}
let presence_content = Edu::Presence(PresenceContent {
push: presence_updates.into_values().collect(),
});
let presence_content = Edu::Presence(PresenceContent::new(presence_updates.into_values().collect()));
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &presence_content)
@@ -686,7 +665,7 @@ impl Service {
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
.count(),
);
let mut edu_jsons: Vec<EphemeralData> = Vec::with_capacity(
let mut edu_jsons: Vec<Raw<EphemeralData>> = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Edu(_)))
@@ -702,7 +681,7 @@ impl Service {
| SendingEvent::Edu(edu) =>
if appservice.receive_ephemeral {
if let Ok(edu) = serde_json::from_slice(edu) {
edu_jsons.push(edu);
edu_jsons.push(Raw::from_json(edu));
}
},
| SendingEvent::Flush => {}, // flush only; no new content
@@ -720,15 +699,14 @@ impl Service {
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
let mut request = ruma::api::appservice::event::push_events::v1::Request::new(txn_id.into(), pdu_jsons);
request.ephemeral = edu_jsons;
request.to_device = Vec::new(); // TODO
match self
.send_appservice_request(
appservice,
ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: txn_id.into(),
ephemeral: edu_jsons,
to_device: Vec::new(), // TODO
},
request,
)
.await
{
@@ -851,18 +829,14 @@ impl Service {
let txn_hash = calculate_hash(preimage);
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
let request = send_transaction_message::v1::Request {
transaction_id: txn_id.into(),
origin: self.server.name.clone(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdus,
edus,
};
let mut request = send_transaction_message::v1::Request::new(txn_id.into(), self.server.name.clone(), MilliSecondsSinceUnixEpoch::now());
request.pdus = pdus;
request.edus = edus;
let result = self
.services
.federation
.execute_on(&self.services.client.sender, &server, request)
.execute_signed(&self.services.client.sender, &server, request)
.await;
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
@@ -900,7 +874,7 @@ impl Service {
.get("room_id")
.and_then(|val| RoomId::parse(val.as_str()?).ok())
{
match self.services.state.get_room_version(room_id).await {
match self.services.state.get_room_version(&room_id).await {
| Ok(room_version_id) => match room_version_id {
| RoomVersionId::V1 | RoomVersionId::V2 => {},
| _ => _ = pdu_json.remove("event_id"),