fix: Apply timeouts in more places

This commit is contained in:
Jade Ellis
2026-01-05 18:09:18 +00:00
parent da956b1a2a
commit d3ee9c407a
8 changed files with 77 additions and 46 deletions
+7 -5
View File
@@ -1,8 +1,7 @@
use std::{fmt::Debug, mem};
use bytes::BytesMut;
use conduwuit::{Err, Result, debug_error, err, trace, utils, warn};
use reqwest::Client;
use conduwuit::{Err, Result, debug_error, err, implement, trace, utils, warn};
use ruma::api::{
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
};
@@ -11,8 +10,9 @@ use ruma::api::{
///
/// Only returns Ok(None) if there is no url specified in the appservice
/// registration file
pub(crate) async fn send_request<T>(
client: &Client,
#[implement(super::Service)]
pub async fn send_appservice_request<T>(
&self,
registration: Registration,
request: T,
) -> Result<Option<T::IncomingResponse>>
@@ -58,6 +58,8 @@ where
let reqwest_request = reqwest::Request::try_from(http_request)?;
let client = &self.services.client.appservice;
let mut response = client.execute(reqwest_request).await.map_err(|e| {
warn!("Could not send request to appservice \"{}\" at {dest}: {e:?}", registration.id);
e
@@ -75,7 +77,7 @@ where
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await?; // TODO: handle timeout
let body = response.bytes().await?;
if !status.is_success() {
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
+1 -20
View File
@@ -19,10 +19,7 @@ use conduwuit::{
warn,
};
use futures::{FutureExt, Stream, StreamExt};
use ruma::{
RoomId, ServerName, UserId,
api::{OutgoingRequest, appservice::Registration},
};
use ruma::{RoomId, ServerName, UserId, api::OutgoingRequest};
use tokio::{task, task::JoinSet};
use self::data::Data;
@@ -319,22 +316,6 @@ impl Service {
.await
}
/// Sends a request to an appservice
///
/// Only returns None if there is no url specified in the appservice
/// registration file
pub async fn send_appservice_request<T>(
&self,
registration: Registration,
request: T,
) -> Result<Option<T::IncomingResponse>>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.appservice;
appservice::send_request(client, registration, request).await
}
/// Clean up queued sending event data
///
/// Used after we remove an appservice registration or a user deletes a push
+13 -15
View File
@@ -50,9 +50,7 @@ use ruma::{
};
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use super::{
Destination, EduBuf, EduVec, Msg, SendingEvent, Service, appservice, data::QueueItem,
};
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
#[derive(Debug)]
enum TransactionStatus {
@@ -720,18 +718,18 @@ impl Service {
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
let client = &self.services.client.appservice;
match appservice::send_request(
client,
appservice,
ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: txn_id.into(),
ephemeral: edu_jsons,
to_device: Vec::new(), // TODO
},
)
.await
match self
.send_appservice_request(
appservice,
ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: txn_id.into(),
ephemeral: edu_jsons,
to_device: Vec::new(), // TODO
},
)
.await
{
| Ok(_) => Ok(Destination::Appservice(id)),
| Err(e) => Err((Destination::Appservice(id), e)),