mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bc79c84f65 | |||
| 60dd6baffd | |||
| 99a10998b4 | |||
| 05c6b5df75 | |||
| 74db426c6b | |||
| 344d68dabc | |||
| d3ee9c407a |
+17
-2
@@ -340,7 +340,9 @@
|
|||||||
# this to be high to account for extremely large room joins, slow
|
# this to be high to account for extremely large room joins, slow
|
||||||
# homeservers, your own resources etc.
|
# homeservers, your own resources etc.
|
||||||
#
|
#
|
||||||
#federation_timeout = 300
|
# Joins have 6x the timeout.
|
||||||
|
#
|
||||||
|
#federation_timeout = 60
|
||||||
|
|
||||||
# MSC4284 Policy server request timeout (seconds). Generally policy
|
# MSC4284 Policy server request timeout (seconds). Generally policy
|
||||||
# servers should respond near instantly, however may slow down under
|
# servers should respond near instantly, however may slow down under
|
||||||
@@ -389,7 +391,15 @@
|
|||||||
#
|
#
|
||||||
#appservice_idle_timeout = 300
|
#appservice_idle_timeout = 300
|
||||||
|
|
||||||
# Notification gateway pusher idle connection pool timeout.
|
# Notification gateway pusher request connection timeout (seconds).
|
||||||
|
#
|
||||||
|
#pusher_conn_timeout = 15
|
||||||
|
|
||||||
|
# Notification gateway pusher total request timeout (seconds).
|
||||||
|
#
|
||||||
|
#pusher_timeout = 60
|
||||||
|
|
||||||
|
# Notification gateway pusher idle connection pool timeout (seconds).
|
||||||
#
|
#
|
||||||
#pusher_idle_timeout = 15
|
#pusher_idle_timeout = 15
|
||||||
|
|
||||||
@@ -1446,6 +1456,11 @@
|
|||||||
#
|
#
|
||||||
#url_preview_max_spider_size = 256000
|
#url_preview_max_spider_size = 256000
|
||||||
|
|
||||||
|
# Total request timeout for URL previews (seconds). This includes
|
||||||
|
# connection, request, and response body reading time.
|
||||||
|
#
|
||||||
|
#url_preview_timeout = 120
|
||||||
|
|
||||||
# Option to decide whether you would like to run the domain allowlist
|
# Option to decide whether you would like to run the domain allowlist
|
||||||
# checks (contains and explicit) on the root domain or not. Does not apply
|
# checks (contains and explicit) on the root domain or not. Does not apply
|
||||||
# to URL contains allowlist. Defaults to false.
|
# to URL contains allowlist. Defaults to false.
|
||||||
|
|||||||
@@ -13,8 +13,8 @@ OCI images for Continuwuity are available in the registries listed below.
|
|||||||
| --------------- | --------------------------------------------------------------- | -----------------------|
|
| --------------- | --------------------------------------------------------------- | -----------------------|
|
||||||
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:latest](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest) | Latest tagged image. |
|
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:latest](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest) | Latest tagged image. |
|
||||||
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:main](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main) | Main branch image. |
|
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:main](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main) | Main branch image. |
|
||||||
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:latest-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest-maxperf) | Performance optimised version. |
|
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:latest-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
|
||||||
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:main-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main-maxperf) | Performance optimised version. |
|
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:main-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
|
||||||
|
|
||||||
Use
|
Use
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use conduwuit::{
|
|||||||
};
|
};
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
OwnedServerName, RoomId, UserId,
|
RoomId, UserId,
|
||||||
api::{client::membership::invite_user, federation::membership::create_invite},
|
api::{client::membership::invite_user, federation::membership::create_invite},
|
||||||
events::{
|
events::{
|
||||||
invite_permission_config::FilterLevel,
|
invite_permission_config::FilterLevel,
|
||||||
@@ -203,19 +203,10 @@ pub(crate) async fn invite_helper(
|
|||||||
))));
|
))));
|
||||||
}
|
}
|
||||||
|
|
||||||
let origin: OwnedServerName = serde_json::from_value(serde_json::to_value(
|
|
||||||
value
|
|
||||||
.get("origin")
|
|
||||||
.ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?,
|
|
||||||
)?)
|
|
||||||
.map_err(|e| {
|
|
||||||
err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}"))))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let pdu_id = services
|
let pdu_id = services
|
||||||
.rooms
|
.rooms
|
||||||
.event_handler
|
.event_handler
|
||||||
.handle_incoming_pdu(&origin, room_id, &event_id, value, true)
|
.handle_incoming_pdu(recipient_user.server_name(), room_id, &event_id, value, true)
|
||||||
.boxed()
|
.boxed()
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
|
|||||||
+30
-3
@@ -435,7 +435,9 @@ pub struct Config {
|
|||||||
/// this to be high to account for extremely large room joins, slow
|
/// this to be high to account for extremely large room joins, slow
|
||||||
/// homeservers, your own resources etc.
|
/// homeservers, your own resources etc.
|
||||||
///
|
///
|
||||||
/// default: 300
|
/// Joins have 6x the timeout.
|
||||||
|
///
|
||||||
|
/// default: 60
|
||||||
#[serde(default = "default_federation_timeout")]
|
#[serde(default = "default_federation_timeout")]
|
||||||
pub federation_timeout: u64,
|
pub federation_timeout: u64,
|
||||||
|
|
||||||
@@ -501,7 +503,19 @@ pub struct Config {
|
|||||||
#[serde(default = "default_appservice_idle_timeout")]
|
#[serde(default = "default_appservice_idle_timeout")]
|
||||||
pub appservice_idle_timeout: u64,
|
pub appservice_idle_timeout: u64,
|
||||||
|
|
||||||
/// Notification gateway pusher idle connection pool timeout.
|
/// Notification gateway pusher request connection timeout (seconds).
|
||||||
|
///
|
||||||
|
/// default: 15
|
||||||
|
#[serde(default = "default_pusher_conn_timeout")]
|
||||||
|
pub pusher_conn_timeout: u64,
|
||||||
|
|
||||||
|
/// Notification gateway pusher total request timeout (seconds).
|
||||||
|
///
|
||||||
|
/// default: 60
|
||||||
|
#[serde(default = "default_pusher_timeout")]
|
||||||
|
pub pusher_timeout: u64,
|
||||||
|
|
||||||
|
/// Notification gateway pusher idle connection pool timeout (seconds).
|
||||||
///
|
///
|
||||||
/// default: 15
|
/// default: 15
|
||||||
#[serde(default = "default_pusher_idle_timeout")]
|
#[serde(default = "default_pusher_idle_timeout")]
|
||||||
@@ -1663,6 +1677,13 @@ pub struct Config {
|
|||||||
#[serde(default = "default_url_preview_max_spider_size")]
|
#[serde(default = "default_url_preview_max_spider_size")]
|
||||||
pub url_preview_max_spider_size: usize,
|
pub url_preview_max_spider_size: usize,
|
||||||
|
|
||||||
|
/// Total request timeout for URL previews (seconds). This includes
|
||||||
|
/// connection, request, and response body reading time.
|
||||||
|
///
|
||||||
|
/// default: 120
|
||||||
|
#[serde(default = "default_url_preview_timeout")]
|
||||||
|
pub url_preview_timeout: u64,
|
||||||
|
|
||||||
/// Option to decide whether you would like to run the domain allowlist
|
/// Option to decide whether you would like to run the domain allowlist
|
||||||
/// checks (contains and explicit) on the root domain or not. Does not apply
|
/// checks (contains and explicit) on the root domain or not. Does not apply
|
||||||
/// to URL contains allowlist. Defaults to false.
|
/// to URL contains allowlist. Defaults to false.
|
||||||
@@ -2455,7 +2476,7 @@ fn default_well_known_timeout() -> u64 { 10 }
|
|||||||
|
|
||||||
fn default_federation_conn_timeout() -> u64 { 10 }
|
fn default_federation_conn_timeout() -> u64 { 10 }
|
||||||
|
|
||||||
fn default_federation_timeout() -> u64 { 25 }
|
fn default_federation_timeout() -> u64 { 60 }
|
||||||
|
|
||||||
fn default_policy_server_request_timeout() -> u64 { 10 }
|
fn default_policy_server_request_timeout() -> u64 { 10 }
|
||||||
|
|
||||||
@@ -2473,6 +2494,10 @@ fn default_appservice_timeout() -> u64 { 35 }
|
|||||||
|
|
||||||
fn default_appservice_idle_timeout() -> u64 { 300 }
|
fn default_appservice_idle_timeout() -> u64 { 300 }
|
||||||
|
|
||||||
|
fn default_pusher_conn_timeout() -> u64 { 15 }
|
||||||
|
|
||||||
|
fn default_pusher_timeout() -> u64 { 60 }
|
||||||
|
|
||||||
fn default_pusher_idle_timeout() -> u64 { 15 }
|
fn default_pusher_idle_timeout() -> u64 { 15 }
|
||||||
|
|
||||||
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
||||||
@@ -2600,6 +2625,8 @@ fn default_url_preview_max_spider_size() -> usize {
|
|||||||
256_000 // 256KB
|
256_000 // 256KB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_url_preview_timeout() -> u64 { 120 }
|
||||||
|
|
||||||
fn default_new_user_displayname_suffix() -> String { "🏳️⚧️".to_owned() }
|
fn default_new_user_displayname_suffix() -> String { "🏳️⚧️".to_owned() }
|
||||||
|
|
||||||
fn default_sentry_endpoint() -> Option<Url> { None }
|
fn default_sentry_endpoint() -> Option<Url> { None }
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ pub use self::{
|
|||||||
room_version::RoomVersion,
|
room_version::RoomVersion,
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
debug, debug_error, err,
|
debug, err, error as log_error,
|
||||||
matrix::{Event, StateKey},
|
matrix::{Event, StateKey},
|
||||||
state_res::room_version::StateResolutionVersion,
|
state_res::room_version::StateResolutionVersion,
|
||||||
trace,
|
trace,
|
||||||
@@ -173,7 +173,8 @@ where
|
|||||||
initial_state,
|
initial_state,
|
||||||
&event_fetch,
|
&event_fetch,
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.inspect_err(|_| log_error!("failed to calculate control events"))?;
|
||||||
|
|
||||||
debug!(count = resolved_control.len(), "resolved power events");
|
debug!(count = resolved_control.len(), "resolved power events");
|
||||||
trace!(map = ?resolved_control, "resolved power events");
|
trace!(map = ?resolved_control, "resolved power events");
|
||||||
@@ -213,7 +214,8 @@ where
|
|||||||
resolved_control, // The control events are added to the final resolved state
|
resolved_control, // The control events are added to the final resolved state
|
||||||
&event_fetch,
|
&event_fetch,
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.inspect_err(|_| log_error!("failed to resolve left over state"))?;
|
||||||
|
|
||||||
// Ensure unconflicting state is in the final state
|
// Ensure unconflicting state is in the final state
|
||||||
resolved_state.extend(unconflicted);
|
resolved_state.extend(unconflicted);
|
||||||
@@ -400,13 +402,17 @@ where
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let fetcher = async |event_id: OwnedEventId| {
|
let fetcher = async |event_id: OwnedEventId| {
|
||||||
let pl = *event_to_pl
|
let pl = *event_to_pl.get(&event_id).ok_or_else(|| {
|
||||||
.get(&event_id)
|
Error::NotFound(format!(
|
||||||
.ok_or_else(|| Error::NotFound(String::new()))?;
|
"event unexpectedly missing from power level map: {event_id}",
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
let ev = fetch_event(event_id)
|
let ev = fetch_event(event_id.clone()).await.ok_or_else(|| {
|
||||||
.await
|
Error::NotFound(format!(
|
||||||
.ok_or_else(|| Error::NotFound(String::new()))?;
|
"event found in power level map but not in room graph: {event_id}",
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
Ok((pl, ev.origin_server_ts()))
|
Ok((pl, ev.origin_server_ts()))
|
||||||
};
|
};
|
||||||
@@ -758,7 +764,10 @@ where
|
|||||||
warn!("event {} failed the authentication check", event.event_id());
|
warn!("event {} failed the authentication check", event.event_id());
|
||||||
},
|
},
|
||||||
| Err(e) => {
|
| Err(e) => {
|
||||||
debug_error!("event {} failed the authentication check: {e}", event.event_id());
|
log_error!(
|
||||||
|
"failed to run authentication check on event {}: {e}",
|
||||||
|
event.event_id()
|
||||||
|
);
|
||||||
return Err(e);
|
return Err(e);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ impl crate::Service for Service {
|
|||||||
})?
|
})?
|
||||||
.local_address(url_preview_bind_addr)
|
.local_address(url_preview_bind_addr)
|
||||||
.dns_resolver(resolver.resolver.clone())
|
.dns_resolver(resolver.resolver.clone())
|
||||||
|
.timeout(Duration::from_secs(config.url_preview_timeout))
|
||||||
.redirect(redirect::Policy::limited(3))
|
.redirect(redirect::Policy::limited(3))
|
||||||
.build()?,
|
.build()?,
|
||||||
|
|
||||||
@@ -68,6 +69,11 @@ impl crate::Service for Service {
|
|||||||
.dns_resolver(resolver.resolver.hooked.clone())
|
.dns_resolver(resolver.resolver.hooked.clone())
|
||||||
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
||||||
.read_timeout(Duration::from_secs(config.federation_timeout))
|
.read_timeout(Duration::from_secs(config.federation_timeout))
|
||||||
|
.timeout(Duration::from_secs(
|
||||||
|
config
|
||||||
|
.federation_timeout
|
||||||
|
.saturating_add(config.federation_conn_timeout),
|
||||||
|
))
|
||||||
.pool_max_idle_per_host(config.federation_idle_per_host.into())
|
.pool_max_idle_per_host(config.federation_idle_per_host.into())
|
||||||
.pool_idle_timeout(Duration::from_secs(config.federation_idle_timeout))
|
.pool_idle_timeout(Duration::from_secs(config.federation_idle_timeout))
|
||||||
.redirect(redirect::Policy::limited(3))
|
.redirect(redirect::Policy::limited(3))
|
||||||
@@ -76,7 +82,13 @@ impl crate::Service for Service {
|
|||||||
synapse: base(config)?
|
synapse: base(config)?
|
||||||
.dns_resolver(resolver.resolver.hooked.clone())
|
.dns_resolver(resolver.resolver.hooked.clone())
|
||||||
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
||||||
.read_timeout(Duration::from_secs(305))
|
.read_timeout(Duration::from_secs(config.federation_timeout.saturating_mul(6)))
|
||||||
|
.timeout(Duration::from_secs(
|
||||||
|
config
|
||||||
|
.federation_timeout
|
||||||
|
.saturating_mul(6)
|
||||||
|
.saturating_add(config.federation_conn_timeout),
|
||||||
|
))
|
||||||
.pool_max_idle_per_host(0)
|
.pool_max_idle_per_host(0)
|
||||||
.redirect(redirect::Policy::limited(3))
|
.redirect(redirect::Policy::limited(3))
|
||||||
.build()?,
|
.build()?,
|
||||||
@@ -103,6 +115,8 @@ impl crate::Service for Service {
|
|||||||
|
|
||||||
pusher: base(config)?
|
pusher: base(config)?
|
||||||
.dns_resolver(resolver.resolver.clone())
|
.dns_resolver(resolver.resolver.clone())
|
||||||
|
.connect_timeout(Duration::from_secs(config.pusher_conn_timeout))
|
||||||
|
.timeout(Duration::from_secs(config.pusher_timeout))
|
||||||
.pool_max_idle_per_host(1)
|
.pool_max_idle_per_host(1)
|
||||||
.pool_idle_timeout(Duration::from_secs(config.pusher_idle_timeout))
|
.pool_idle_timeout(Duration::from_secs(config.pusher_idle_timeout))
|
||||||
.redirect(redirect::Policy::limited(2))
|
.redirect(redirect::Policy::limited(2))
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use std::{fmt::Debug, mem};
|
|||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
|
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
|
||||||
error::inspect_debug_log, implement, trace, utils::string::EMPTY,
|
error::inspect_debug_log, implement, trace,
|
||||||
};
|
};
|
||||||
use http::{HeaderValue, header::AUTHORIZATION};
|
use http::{HeaderValue, header::AUTHORIZATION};
|
||||||
use ipaddress::IPAddress;
|
use ipaddress::IPAddress;
|
||||||
@@ -90,7 +90,9 @@ where
|
|||||||
|
|
||||||
debug!(%method, %url, "Sending request");
|
debug!(%method, %url, "Sending request");
|
||||||
match client.execute(request).await {
|
match client.execute(request).await {
|
||||||
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
|
| Ok(response) =>
|
||||||
|
self.handle_response::<T>(dest, actual, &method, &url, response)
|
||||||
|
.await,
|
||||||
| Err(error) =>
|
| Err(error) =>
|
||||||
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
|
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
|
||||||
}
|
}
|
||||||
@@ -119,7 +121,9 @@ fn validate_url(&self, url: &Url) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[implement(super::Service)]
|
||||||
async fn handle_response<T>(
|
async fn handle_response<T>(
|
||||||
|
&self,
|
||||||
dest: &ServerName,
|
dest: &ServerName,
|
||||||
actual: &ActualDest,
|
actual: &ActualDest,
|
||||||
method: &Method,
|
method: &Method,
|
||||||
@@ -162,7 +166,6 @@ async fn into_http_response(
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
// TODO: handle timeout
|
|
||||||
trace!("Waiting for response body...");
|
trace!("Waiting for response body...");
|
||||||
let body = response
|
let body = response
|
||||||
.bytes()
|
.bytes()
|
||||||
@@ -286,10 +289,13 @@ where
|
|||||||
T: OutgoingRequest + Send,
|
T: OutgoingRequest + Send,
|
||||||
{
|
{
|
||||||
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
|
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
|
||||||
const SATIR: SendAccessToken<'_> = SendAccessToken::IfRequired(EMPTY);
|
|
||||||
|
|
||||||
let http_request = request
|
let http_request = request
|
||||||
.try_into_http_request::<Vec<u8>>(actual.string().as_str(), SATIR, &VERSIONS)
|
.try_into_http_request::<Vec<u8>>(
|
||||||
|
actual.string().as_str(),
|
||||||
|
SendAccessToken::None,
|
||||||
|
&VERSIONS,
|
||||||
|
)
|
||||||
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
|
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
|
||||||
|
|
||||||
Ok(http_request)
|
Ok(http_request)
|
||||||
|
|||||||
@@ -198,7 +198,7 @@ impl Service {
|
|||||||
trace!("Push gateway destination: {dest}");
|
trace!("Push gateway destination: {dest}");
|
||||||
|
|
||||||
let http_request = request
|
let http_request = request
|
||||||
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::IfRequired(""), &VERSIONS)
|
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::None, &VERSIONS)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
err!(BadServerResponse(warn!(
|
err!(BadServerResponse(warn!(
|
||||||
"Failed to find destination {dest} for push gateway: {e}"
|
"Failed to find destination {dest} for push gateway: {e}"
|
||||||
@@ -245,7 +245,7 @@ impl Service {
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let body = response.bytes().await?; // TODO: handle timeout
|
let body = response.bytes().await?;
|
||||||
|
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
|
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
|
||||||
@@ -288,7 +288,7 @@ impl Service {
|
|||||||
let mut notify = None;
|
let mut notify = None;
|
||||||
let mut tweaks = Vec::new();
|
let mut tweaks = Vec::new();
|
||||||
if event.room_id().is_none() {
|
if event.room_id().is_none() {
|
||||||
// TODO(hydra): does this matter?
|
// This only affects v12+ create events
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,7 @@
|
|||||||
use std::{fmt::Debug, mem};
|
use std::{fmt::Debug, mem};
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use conduwuit::{Err, Result, debug_error, err, trace, utils, warn};
|
use conduwuit::{Err, Result, debug_error, err, implement, trace, utils, warn};
|
||||||
use reqwest::Client;
|
|
||||||
use ruma::api::{
|
use ruma::api::{
|
||||||
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
|
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
|
||||||
};
|
};
|
||||||
@@ -11,8 +10,9 @@ use ruma::api::{
|
|||||||
///
|
///
|
||||||
/// Only returns Ok(None) if there is no url specified in the appservice
|
/// Only returns Ok(None) if there is no url specified in the appservice
|
||||||
/// registration file
|
/// registration file
|
||||||
pub(crate) async fn send_request<T>(
|
#[implement(super::Service)]
|
||||||
client: &Client,
|
pub async fn send_appservice_request<T>(
|
||||||
|
&self,
|
||||||
registration: Registration,
|
registration: Registration,
|
||||||
request: T,
|
request: T,
|
||||||
) -> Result<Option<T::IncomingResponse>>
|
) -> Result<Option<T::IncomingResponse>>
|
||||||
@@ -35,7 +35,7 @@ where
|
|||||||
let mut http_request = request
|
let mut http_request = request
|
||||||
.try_into_http_request::<BytesMut>(
|
.try_into_http_request::<BytesMut>(
|
||||||
&dest,
|
&dest,
|
||||||
SendAccessToken::IfRequired(hs_token),
|
SendAccessToken::Appservice(hs_token),
|
||||||
&VERSIONS,
|
&VERSIONS,
|
||||||
)
|
)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
@@ -58,6 +58,8 @@ where
|
|||||||
|
|
||||||
let reqwest_request = reqwest::Request::try_from(http_request)?;
|
let reqwest_request = reqwest::Request::try_from(http_request)?;
|
||||||
|
|
||||||
|
let client = &self.services.client.appservice;
|
||||||
|
|
||||||
let mut response = client.execute(reqwest_request).await.map_err(|e| {
|
let mut response = client.execute(reqwest_request).await.map_err(|e| {
|
||||||
warn!("Could not send request to appservice \"{}\" at {dest}: {e:?}", registration.id);
|
warn!("Could not send request to appservice \"{}\" at {dest}: {e:?}", registration.id);
|
||||||
e
|
e
|
||||||
@@ -75,7 +77,7 @@ where
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let body = response.bytes().await?; // TODO: handle timeout
|
let body = response.bytes().await?;
|
||||||
|
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
|
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
|
||||||
|
|||||||
@@ -19,10 +19,7 @@ use conduwuit::{
|
|||||||
warn,
|
warn,
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, Stream, StreamExt};
|
use futures::{FutureExt, Stream, StreamExt};
|
||||||
use ruma::{
|
use ruma::{RoomId, ServerName, UserId, api::OutgoingRequest};
|
||||||
RoomId, ServerName, UserId,
|
|
||||||
api::{OutgoingRequest, appservice::Registration},
|
|
||||||
};
|
|
||||||
use tokio::{task, task::JoinSet};
|
use tokio::{task, task::JoinSet};
|
||||||
|
|
||||||
use self::data::Data;
|
use self::data::Data;
|
||||||
@@ -319,22 +316,6 @@ impl Service {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a request to an appservice
|
|
||||||
///
|
|
||||||
/// Only returns None if there is no url specified in the appservice
|
|
||||||
/// registration file
|
|
||||||
pub async fn send_appservice_request<T>(
|
|
||||||
&self,
|
|
||||||
registration: Registration,
|
|
||||||
request: T,
|
|
||||||
) -> Result<Option<T::IncomingResponse>>
|
|
||||||
where
|
|
||||||
T: OutgoingRequest + Debug + Send,
|
|
||||||
{
|
|
||||||
let client = &self.services.client.appservice;
|
|
||||||
appservice::send_request(client, registration, request).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Clean up queued sending event data
|
/// Clean up queued sending event data
|
||||||
///
|
///
|
||||||
/// Used after we remove an appservice registration or a user deletes a push
|
/// Used after we remove an appservice registration or a user deletes a push
|
||||||
|
|||||||
@@ -50,9 +50,7 @@ use ruma::{
|
|||||||
};
|
};
|
||||||
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
||||||
|
|
||||||
use super::{
|
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
|
||||||
Destination, EduBuf, EduVec, Msg, SendingEvent, Service, appservice, data::QueueItem,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum TransactionStatus {
|
enum TransactionStatus {
|
||||||
@@ -720,18 +718,18 @@ impl Service {
|
|||||||
|
|
||||||
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
||||||
// transaction");
|
// transaction");
|
||||||
let client = &self.services.client.appservice;
|
|
||||||
match appservice::send_request(
|
match self
|
||||||
client,
|
.send_appservice_request(
|
||||||
appservice,
|
appservice,
|
||||||
ruma::api::appservice::event::push_events::v1::Request {
|
ruma::api::appservice::event::push_events::v1::Request {
|
||||||
events: pdu_jsons,
|
events: pdu_jsons,
|
||||||
txn_id: txn_id.into(),
|
txn_id: txn_id.into(),
|
||||||
ephemeral: edu_jsons,
|
ephemeral: edu_jsons,
|
||||||
to_device: Vec::new(), // TODO
|
to_device: Vec::new(), // TODO
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
| Ok(_) => Ok(Destination::Appservice(id)),
|
| Ok(_) => Ok(Destination::Appservice(id)),
|
||||||
| Err(e) => Err((Destination::Appservice(id), e)),
|
| Err(e) => Err((Destination::Appservice(id), e)),
|
||||||
|
|||||||
Reference in New Issue
Block a user