refactor: Ruma upstraming, bake a little more

This commit is contained in:
Jade Ellis
2026-04-07 14:40:10 +01:00
committed by Ginger
parent 204bc1367e
commit a4e64383b7
115 changed files with 1907 additions and 1504 deletions
+8 -2
View File
@@ -3,7 +3,11 @@ use std::{borrow::Cow, fmt::Debug, mem};
use bytes::BytesMut;
use conduwuit::{Err, Result, debug_error, err, utils, utils::response::LimitReadExt, warn};
use reqwest::Client;
use ruma::api::{IncomingResponse, MatrixVersion, OutgoingRequest, auth_scheme::{AppserviceToken, SendAccessToken}, path_builder::VersionHistory};
use ruma::api::{
IncomingResponse, MatrixVersion, OutgoingRequest,
auth_scheme::{AppserviceToken, SendAccessToken},
path_builder::VersionHistory,
};
use crate::SUPPORTED_VERSIONS;
@@ -15,7 +19,9 @@ pub(crate) async fn send_antispam_request<T>(
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest::<Authentication = AppserviceToken, PathBuilder = VersionHistory> + Debug + Send,
T: OutgoingRequest<Authentication = AppserviceToken, PathBuilder = VersionHistory>
+ Debug
+ Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_15];
let http_request = request
+5 -6
View File
@@ -5,7 +5,10 @@ use conduwuit::{
Err, Result, debug_error, err, implement, trace, utils, utils::response::LimitReadExt, warn,
};
use ruma::api::{
IncomingResponse, MatrixVersion, OutgoingRequest, appservice::Registration, auth_scheme::{AccessToken, SendAccessToken}, path_builder::SinglePath,
IncomingResponse, MatrixVersion, OutgoingRequest,
appservice::Registration,
auth_scheme::{AccessToken, SendAccessToken},
path_builder::SinglePath,
};
/// Sends a request to an appservice
@@ -33,11 +36,7 @@ where
let hs_token = registration.hs_token.as_str();
let mut http_request = request
.try_into_http_request::<BytesMut>(
&dest,
SendAccessToken::Appservice(hs_token),
(),
)
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::Appservice(hs_token), ())
.map_err(|e| {
err!(BadServerResponse(
warn!(appservice = %registration.id, "Failed to find destination {dest}: {e:?}")
+47 -9
View File
@@ -19,7 +19,13 @@ use conduwuit::{
warn,
};
use futures::{FutureExt, Stream, StreamExt};
use ruma::{OwnedServerName, RoomId, ServerName, UserId, api::{OutgoingRequest, auth_scheme::NoAuthentication, federation::authentication::ServerSignatures, path_builder::PathBuilder}};
use ruma::{
OwnedServerName, RoomId, ServerName, UserId,
api::{
OutgoingRequest, auth_scheme::{NoAuthentication, NoAccessToken, SendAccessToken},
federation::authentication::ServerSignatures, path_builder::PathBuilder,
},
};
use tokio::{task, task::JoinSet};
use self::data::Data;
@@ -28,7 +34,10 @@ pub use self::{
sender::{EDU_LIMIT, PDU_LIMIT},
};
use crate::{
Dep, account_data, client, federation::{self, FederationPathBuilderInput}, globals, presence, pusher, rooms::{self, timeline::RawPduId},
Dep, account_data, client,
federation::{self, FederationPathBuilderInput},
globals, presence, pusher,
rooms::{self, timeline::RawPduId},
users,
};
@@ -239,10 +248,7 @@ impl Service {
{
let requests = servers
.map(|server| {
(
Destination::Federation(server),
SendingEvent::Edu(serialized.clone()),
)
(Destination::Federation(server), SendingEvent::Edu(serialized.clone()))
})
.collect::<Vec<_>>()
.await;
@@ -294,7 +300,11 @@ impl Service {
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest::<Authentication = ServerSignatures, PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>> + Debug + Send,
T: OutgoingRequest<
Authentication = ServerSignatures,
PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>,
> + Debug
+ Send,
{
self.services.federation.execute(dest, request).await
}
@@ -307,7 +317,11 @@ impl Service {
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest::<Authentication = ServerSignatures, PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>> + Debug + Send,
T: OutgoingRequest<
Authentication = ServerSignatures,
PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>,
> + Debug
+ Send,
{
self.services
.federation
@@ -323,13 +337,37 @@ impl Service {
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest::<Authentication = NoAuthentication, PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>> + Debug + Send,
T: OutgoingRequest<
Authentication = NoAuthentication,
PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>,
> + Debug
+ Send,
{
self.services
.federation
.execute_unauthenticated(dest, request)
.await
}
/// Send an unauthenticated federation request with no X-Matrix header.
#[inline]
pub async fn send_legacy_media_request<'i, T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest<
Authentication = NoAccessToken,
PathBuilder: PathBuilder<Input<'i>: FederationPathBuilderInput>,
> + Debug
+ Send,
{
self.services
.federation
.execute_on(&self.services.client.federation, dest, request, SendAccessToken::None)
.await
}
/// Clean up queued sending event data
///
+23 -12
View File
@@ -438,7 +438,11 @@ impl Service {
// Empty prev id forces synapse to resync; because synapse resyncs,
// we can just insert placeholder data
let edu = Edu::DeviceListUpdate(DeviceListUpdateContent::new(user_id, device_id!("placeholder").to_owned(), uint!(1)));
let edu = Edu::DeviceListUpdate(DeviceListUpdateContent::new(
user_id,
device_id!("placeholder").to_owned(),
uint!(1),
));
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &edu)
@@ -605,7 +609,14 @@ impl Service {
continue;
};
let mut update = PresenceUpdate::new(user_id.to_owned(), presence_event.content.presence, presence_event.content.last_active_ago.unwrap_or_else(|| uint!(0)));
let mut update = PresenceUpdate::new(
user_id.to_owned(),
presence_event.content.presence,
presence_event
.content
.last_active_ago
.unwrap_or_else(|| uint!(0)),
);
update.currently_active = presence_event.content.currently_active.unwrap_or_default();
update.status_msg = presence_event.content.status_msg;
@@ -619,7 +630,8 @@ impl Service {
return None;
}
let presence_content = Edu::Presence(PresenceContent::new(presence_updates.into_values().collect()));
let presence_content =
Edu::Presence(PresenceContent::new(presence_updates.into_values().collect()));
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &presence_content)
@@ -699,17 +711,12 @@ impl Service {
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
let mut request = ruma::api::appservice::event::push_events::v1::Request::new(txn_id.into(), pdu_jsons);
let mut request =
ruma::api::appservice::event::push_events::v1::Request::new(txn_id.into(), pdu_jsons);
request.ephemeral = edu_jsons;
request.to_device = Vec::new(); // TODO
match self
.send_appservice_request(
appservice,
request,
)
.await
{
match self.send_appservice_request(appservice, request).await {
| Ok(_) => Ok(Destination::Appservice(id)),
| Err(e) => Err((Destination::Appservice(id), e)),
}
@@ -829,7 +836,11 @@ impl Service {
let txn_hash = calculate_hash(preimage);
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
let mut request = send_transaction_message::v1::Request::new(txn_id.into(), self.server.name.clone(), MilliSecondsSinceUnixEpoch::now());
let mut request = send_transaction_message::v1::Request::new(
txn_id.into(),
self.server.name.clone(),
MilliSecondsSinceUnixEpoch::now(),
);
request.pdus = pdus;
request.edus = edus;