mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2026-05-26 20:49:55 +00:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f2e588b294 | |||
| 7ec81c25f1 | |||
| 61f61826bb | |||
| 27d6604d14 | |||
| 1c7bd2f6fa | |||
| 56d7099011 | |||
| bc426e1bfc | |||
| 6c61b3ec5b | |||
| 9d9d1170b6 | |||
| 7be20abcad | |||
| 078275964c | |||
| bf200ad12d | |||
| 41e628892d | |||
| 44851ee6a2 | |||
| a7e6e6e83f |
@@ -23,7 +23,7 @@ repos:
|
|||||||
- id: check-added-large-files
|
- id: check-added-large-files
|
||||||
|
|
||||||
- repo: https://github.com/crate-ci/typos
|
- repo: https://github.com/crate-ci/typos
|
||||||
rev: v1.40.0
|
rev: v1.41.0
|
||||||
hooks:
|
hooks:
|
||||||
- id: typos
|
- id: typos
|
||||||
- id: typos
|
- id: typos
|
||||||
@@ -31,7 +31,7 @@ repos:
|
|||||||
stages: [commit-msg]
|
stages: [commit-msg]
|
||||||
|
|
||||||
- repo: https://github.com/crate-ci/committed
|
- repo: https://github.com/crate-ci/committed
|
||||||
rev: v1.1.8
|
rev: v1.1.9
|
||||||
hooks:
|
hooks:
|
||||||
- id: committed
|
- id: committed
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
Fixed unreliable room summary fetching and improved error messages. Contributed by @nex.
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
Client requested timeout parameter is now applied to e2ee key lookups and claims. Related federation requests are now
|
||||||
|
also concurrent. Contributed by @nex.
|
||||||
+14
-1
@@ -389,7 +389,15 @@
|
|||||||
#
|
#
|
||||||
#appservice_idle_timeout = 300
|
#appservice_idle_timeout = 300
|
||||||
|
|
||||||
# Notification gateway pusher idle connection pool timeout.
|
# Notification gateway pusher request connection timeout (seconds).
|
||||||
|
#
|
||||||
|
#pusher_conn_timeout = 15
|
||||||
|
|
||||||
|
# Notification gateway pusher total request timeout (seconds).
|
||||||
|
#
|
||||||
|
#pusher_timeout = 60
|
||||||
|
|
||||||
|
# Notification gateway pusher idle connection pool timeout (seconds).
|
||||||
#
|
#
|
||||||
#pusher_idle_timeout = 15
|
#pusher_idle_timeout = 15
|
||||||
|
|
||||||
@@ -1455,6 +1463,11 @@
|
|||||||
#
|
#
|
||||||
#url_preview_max_spider_size = 256000
|
#url_preview_max_spider_size = 256000
|
||||||
|
|
||||||
|
# Total request timeout for URL previews (seconds). This includes
|
||||||
|
# connection, request, and response body reading time.
|
||||||
|
#
|
||||||
|
#url_preview_timeout = 120
|
||||||
|
|
||||||
# Option to decide whether you would like to run the domain allowlist
|
# Option to decide whether you would like to run the domain allowlist
|
||||||
# checks (contains and explicit) on the root domain or not. Does not apply
|
# checks (contains and explicit) on the root domain or not. Does not apply
|
||||||
# to URL contains allowlist. Defaults to false.
|
# to URL contains allowlist. Defaults to false.
|
||||||
|
|||||||
@@ -465,7 +465,7 @@ pub(super) async fn force_join_list_of_local_users(
|
|||||||
|
|
||||||
if server_admins.is_empty() {
|
if server_admins.is_empty() {
|
||||||
return Err!("There are no admins set for this server.");
|
return Err!("There are no admins set for this server.");
|
||||||
};
|
}
|
||||||
|
|
||||||
let (room_id, servers) = self
|
let (room_id, servers) = self
|
||||||
.services
|
.services
|
||||||
@@ -580,7 +580,7 @@ pub(super) async fn force_join_all_local_users(
|
|||||||
|
|
||||||
if server_admins.is_empty() {
|
if server_admins.is_empty() {
|
||||||
return Err!("There are no admins set for this server.");
|
return Err!("There are no admins set for this server.");
|
||||||
};
|
}
|
||||||
|
|
||||||
let (room_id, servers) = self
|
let (room_id, servers) = self
|
||||||
.services
|
.services
|
||||||
|
|||||||
+57
-28
@@ -1,7 +1,15 @@
|
|||||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
use std::{
|
||||||
|
collections::{BTreeMap, HashMap, HashSet},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{Err, Error, Result, debug, debug_warn, err, result::NotFound, utils};
|
use conduwuit::{
|
||||||
|
Err, Error, Result, debug, debug_warn, err,
|
||||||
|
result::NotFound,
|
||||||
|
utils,
|
||||||
|
utils::{IterStream, stream::WidebandExt},
|
||||||
|
};
|
||||||
use conduwuit_service::{Services, users::parse_master_key};
|
use conduwuit_service::{Services, users::parse_master_key};
|
||||||
use futures::{StreamExt, stream::FuturesUnordered};
|
use futures::{StreamExt, stream::FuturesUnordered};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
@@ -134,6 +142,7 @@ pub(crate) async fn get_keys_route(
|
|||||||
&body.device_keys,
|
&body.device_keys,
|
||||||
|u| u == sender_user,
|
|u| u == sender_user,
|
||||||
true, // Always allow local users to see device names of other local users
|
true, // Always allow local users to see device names of other local users
|
||||||
|
body.timeout.unwrap_or(Duration::from_secs(10)),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -145,7 +154,12 @@ pub(crate) async fn claim_keys_route(
|
|||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
body: Ruma<claim_keys::v3::Request>,
|
body: Ruma<claim_keys::v3::Request>,
|
||||||
) -> Result<claim_keys::v3::Response> {
|
) -> Result<claim_keys::v3::Response> {
|
||||||
claim_keys_helper(&services, &body.one_time_keys).await
|
claim_keys_helper(
|
||||||
|
&services,
|
||||||
|
&body.one_time_keys,
|
||||||
|
body.timeout.unwrap_or(Duration::from_secs(10)),
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # `POST /_matrix/client/r0/keys/device_signing/upload`
|
/// # `POST /_matrix/client/r0/keys/device_signing/upload`
|
||||||
@@ -421,6 +435,7 @@ pub(crate) async fn get_keys_helper<F>(
|
|||||||
device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
|
device_keys_input: &BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
|
||||||
allowed_signatures: F,
|
allowed_signatures: F,
|
||||||
include_display_names: bool,
|
include_display_names: bool,
|
||||||
|
timeout: Duration,
|
||||||
) -> Result<get_keys::v3::Response>
|
) -> Result<get_keys::v3::Response>
|
||||||
where
|
where
|
||||||
F: Fn(&UserId) -> bool + Send + Sync,
|
F: Fn(&UserId) -> bool + Send + Sync,
|
||||||
@@ -512,9 +527,10 @@ where
|
|||||||
|
|
||||||
let mut failures = BTreeMap::new();
|
let mut failures = BTreeMap::new();
|
||||||
|
|
||||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
let futures = get_over_federation
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(server, vec)| async move {
|
.stream()
|
||||||
|
.wide_filter_map(|(server, vec)| async move {
|
||||||
let mut device_keys_input_fed = BTreeMap::new();
|
let mut device_keys_input_fed = BTreeMap::new();
|
||||||
for (user_id, keys) in vec {
|
for (user_id, keys) in vec {
|
||||||
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
|
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
|
||||||
@@ -522,17 +538,22 @@ where
|
|||||||
|
|
||||||
let request =
|
let request =
|
||||||
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
|
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
|
||||||
|
let response = tokio::time::timeout(
|
||||||
|
timeout,
|
||||||
|
services.sending.send_federation_request(server, request),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
// Need to flatten the Result<Result<V, E>, E> into Result<V, E>
|
||||||
|
.map_err(|_| err!(Request(Unknown("Timeout when getting keys over federation."))))
|
||||||
|
.and_then(|res| res);
|
||||||
|
|
||||||
let response = services
|
Some((server, response))
|
||||||
.sending
|
|
||||||
.send_federation_request(server, request)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
(server, response)
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect::<FuturesUnordered<_>>()
|
||||||
|
.await
|
||||||
|
.into_iter();
|
||||||
|
|
||||||
while let Some((server, response)) = futures.next().await {
|
for (server, response) in futures {
|
||||||
match response {
|
match response {
|
||||||
| Ok(response) => {
|
| Ok(response) => {
|
||||||
for (user, master_key) in response.master_keys {
|
for (user, master_key) in response.master_keys {
|
||||||
@@ -564,8 +585,8 @@ where
|
|||||||
self_signing_keys.extend(response.self_signing_keys);
|
self_signing_keys.extend(response.self_signing_keys);
|
||||||
device_keys.extend(response.device_keys);
|
device_keys.extend(response.device_keys);
|
||||||
},
|
},
|
||||||
| _ => {
|
| Err(e) => {
|
||||||
failures.insert(server.to_string(), json!({}));
|
failures.insert(server.to_string(), json!({ "error": e.to_string() }));
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -608,6 +629,7 @@ fn add_unsigned_device_display_name(
|
|||||||
pub(crate) async fn claim_keys_helper(
|
pub(crate) async fn claim_keys_helper(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>,
|
one_time_keys_input: &BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, OneTimeKeyAlgorithm>>,
|
||||||
|
timeout: Duration,
|
||||||
) -> Result<claim_keys::v3::Response> {
|
) -> Result<claim_keys::v3::Response> {
|
||||||
let mut one_time_keys = BTreeMap::new();
|
let mut one_time_keys = BTreeMap::new();
|
||||||
|
|
||||||
@@ -638,32 +660,39 @@ pub(crate) async fn claim_keys_helper(
|
|||||||
|
|
||||||
let mut failures = BTreeMap::new();
|
let mut failures = BTreeMap::new();
|
||||||
|
|
||||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
let futures = get_over_federation
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(server, vec)| async move {
|
.stream()
|
||||||
|
.wide_filter_map(|(server, vec)| async move {
|
||||||
let mut one_time_keys_input_fed = BTreeMap::new();
|
let mut one_time_keys_input_fed = BTreeMap::new();
|
||||||
for (user_id, keys) in vec {
|
for (user_id, keys) in vec {
|
||||||
one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
|
one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
|
||||||
}
|
}
|
||||||
(
|
let response = tokio::time::timeout(
|
||||||
server,
|
timeout,
|
||||||
services
|
services.sending.send_federation_request(
|
||||||
.sending
|
server,
|
||||||
.send_federation_request(server, federation::keys::claim_keys::v1::Request {
|
federation::keys::claim_keys::v1::Request {
|
||||||
one_time_keys: one_time_keys_input_fed,
|
one_time_keys: one_time_keys_input_fed,
|
||||||
})
|
},
|
||||||
.await,
|
),
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| err!(Request(Unknown("Timeout when claiming keys over federation."))))
|
||||||
|
.and_then(|res| res);
|
||||||
|
Some((server, response))
|
||||||
})
|
})
|
||||||
.collect();
|
.collect::<FuturesUnordered<_>>()
|
||||||
|
.await
|
||||||
|
.into_iter();
|
||||||
|
|
||||||
while let Some((server, response)) = futures.next().await {
|
for (server, response) in futures {
|
||||||
match response {
|
match response {
|
||||||
| Ok(keys) => {
|
| Ok(keys) => {
|
||||||
one_time_keys.extend(keys.one_time_keys);
|
one_time_keys.extend(keys.one_time_keys);
|
||||||
},
|
},
|
||||||
| Err(_e) => {
|
| Err(e) => {
|
||||||
failures.insert(server.to_string(), json!({}));
|
failures.insert(server.to_string(), json!({"error": e.to_string()}));
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum_client_ip::InsecureClientIp;
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, debug_warn, trace,
|
Err, Result, debug, debug_warn, info, trace,
|
||||||
utils::{IterStream, future::TryExtExt},
|
utils::{IterStream, future::TryExtExt},
|
||||||
};
|
};
|
||||||
use futures::{
|
use futures::{
|
||||||
FutureExt, StreamExt,
|
FutureExt, StreamExt, TryFutureExt,
|
||||||
future::{OptionFuture, join3},
|
future::{OptionFuture, join3},
|
||||||
stream::FuturesUnordered,
|
stream::FuturesUnordered,
|
||||||
};
|
};
|
||||||
@@ -79,9 +79,15 @@ async fn room_summary_response(
|
|||||||
.server_in_room(services.globals.server_name(), room_id)
|
.server_in_room(services.globals.server_name(), room_id)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
return local_room_summary_response(services, room_id, sender_user)
|
match local_room_summary_response(services, room_id, sender_user)
|
||||||
.boxed()
|
.boxed()
|
||||||
.await;
|
.await
|
||||||
|
{
|
||||||
|
| Ok(response) => return Ok(response),
|
||||||
|
| Err(e) => {
|
||||||
|
debug_warn!("Failed to get local room summary: {e:?}, falling back to remote");
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let room =
|
let room =
|
||||||
@@ -111,26 +117,27 @@ async fn local_room_summary_response(
|
|||||||
sender_user: Option<&UserId>,
|
sender_user: Option<&UserId>,
|
||||||
) -> Result<get_summary::msc3266::Response> {
|
) -> Result<get_summary::msc3266::Response> {
|
||||||
trace!(?sender_user, "Sending local room summary response for {room_id:?}");
|
trace!(?sender_user, "Sending local room summary response for {room_id:?}");
|
||||||
let join_rule = services.rooms.state_accessor.get_join_rules(room_id);
|
let (join_rule, world_readable, guest_can_join) = join3(
|
||||||
|
services.rooms.state_accessor.get_join_rules(room_id),
|
||||||
let world_readable = services.rooms.state_accessor.is_world_readable(room_id);
|
services.rooms.state_accessor.is_world_readable(room_id),
|
||||||
|
services.rooms.state_accessor.guest_can_join(room_id),
|
||||||
let guest_can_join = services.rooms.state_accessor.guest_can_join(room_id);
|
|
||||||
|
|
||||||
let (join_rule, world_readable, guest_can_join) =
|
|
||||||
join3(join_rule, world_readable, guest_can_join).await;
|
|
||||||
|
|
||||||
trace!("{join_rule:?}, {world_readable:?}, {guest_can_join:?}");
|
|
||||||
user_can_see_summary(
|
|
||||||
services,
|
|
||||||
room_id,
|
|
||||||
&join_rule.clone().into(),
|
|
||||||
guest_can_join,
|
|
||||||
world_readable,
|
|
||||||
join_rule.allowed_rooms(),
|
|
||||||
sender_user,
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await;
|
||||||
|
|
||||||
|
// Synapse allows server admins to bypass visibility checks.
|
||||||
|
// That seems neat so we'll copy that behaviour.
|
||||||
|
if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await {
|
||||||
|
user_can_see_summary(
|
||||||
|
services,
|
||||||
|
room_id,
|
||||||
|
&join_rule.clone().into(),
|
||||||
|
guest_can_join,
|
||||||
|
world_readable,
|
||||||
|
join_rule.allowed_rooms(),
|
||||||
|
sender_user,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
let canonical_alias = services
|
let canonical_alias = services
|
||||||
.rooms
|
.rooms
|
||||||
@@ -231,15 +238,27 @@ async fn remote_room_summary_hierarchy_response(
|
|||||||
"Federaton of room {room_id} is currently disabled on this server."
|
"Federaton of room {room_id} is currently disabled on this server."
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
if servers.is_empty() {
|
||||||
|
return Err!(Request(MissingParam(
|
||||||
|
"No servers were provided to fetch the room over federation"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
let request = get_hierarchy::v1::Request::new(room_id.to_owned());
|
let request = get_hierarchy::v1::Request::new(room_id.to_owned());
|
||||||
|
|
||||||
let mut requests: FuturesUnordered<_> = servers
|
let mut requests: FuturesUnordered<_> = servers
|
||||||
.iter()
|
.iter()
|
||||||
.map(|server| {
|
.map(|server| {
|
||||||
|
info!("Fetching room summary for {room_id} from server {server}");
|
||||||
services
|
services
|
||||||
.sending
|
.sending
|
||||||
.send_federation_request(server, request.clone())
|
.send_federation_request(server, request.clone())
|
||||||
|
.inspect_ok(move |v| {
|
||||||
|
debug!("Fetched room summary for {room_id} from server {server}: {v:?}");
|
||||||
|
})
|
||||||
|
.inspect_err(move |e| {
|
||||||
|
info!("Failed to fetch room summary for {room_id} from server {server}: {e}");
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@@ -255,23 +274,23 @@ async fn remote_room_summary_hierarchy_response(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
return user_can_see_summary(
|
if sender_user.is_none() || !services.users.is_admin(sender_user.unwrap()).await {
|
||||||
services,
|
return user_can_see_summary(
|
||||||
room_id,
|
services,
|
||||||
&room.join_rule,
|
room_id,
|
||||||
room.guest_can_join,
|
&room.join_rule,
|
||||||
room.world_readable,
|
room.guest_can_join,
|
||||||
room.allowed_room_ids.iter().map(AsRef::as_ref),
|
room.world_readable,
|
||||||
sender_user,
|
room.allowed_room_ids.iter().map(AsRef::as_ref),
|
||||||
)
|
sender_user,
|
||||||
.await
|
)
|
||||||
.map(|()| room);
|
.await
|
||||||
|
.map(|()| room);
|
||||||
|
}
|
||||||
|
return Ok(room);
|
||||||
}
|
}
|
||||||
|
|
||||||
Err!(Request(NotFound(
|
Err!(Request(NotFound("Room not found or is not accessible")))
|
||||||
"Room is unknown to this server and was unable to fetch over federation with the \
|
|
||||||
provided servers available"
|
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn user_can_see_summary<'a, I>(
|
async fn user_can_see_summary<'a, I>(
|
||||||
@@ -311,21 +330,14 @@ where
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
Err!(Request(Forbidden(
|
Err!(Request(Forbidden("Room is not accessible")))
|
||||||
"Room is not world readable, not publicly accessible/joinable, restricted room \
|
|
||||||
conditions not met, and guest access is forbidden. Not allowed to see details \
|
|
||||||
of this room."
|
|
||||||
)))
|
|
||||||
},
|
},
|
||||||
| None => {
|
| None => {
|
||||||
if is_public_room || world_readable {
|
if is_public_room || world_readable {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
Err!(Request(Forbidden(
|
Err!(Request(Forbidden("Room is not accessible")))
|
||||||
"Room is not world readable or publicly accessible/joinable, authentication is \
|
|
||||||
required"
|
|
||||||
)))
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ pub(crate) async fn well_known_support(
|
|||||||
if contacts.is_empty() {
|
if contacts.is_empty() {
|
||||||
let admin_users = services.admin.get_admins().await;
|
let admin_users = services.admin.get_admins().await;
|
||||||
|
|
||||||
for user_id in admin_users.iter() {
|
for user_id in &admin_users {
|
||||||
if *user_id == services.globals.server_user {
|
if *user_id == services.globals.server_user {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{Error, Result};
|
use conduwuit::{Error, Result};
|
||||||
use futures::{FutureExt, StreamExt, TryFutureExt};
|
use futures::{FutureExt, StreamExt, TryFutureExt};
|
||||||
@@ -96,6 +98,7 @@ pub(crate) async fn get_keys_route(
|
|||||||
&body.device_keys,
|
&body.device_keys,
|
||||||
|u| Some(u.server_name()) == body.origin.as_deref(),
|
|u| Some(u.server_name()) == body.origin.as_deref(),
|
||||||
services.globals.allow_device_name_federation(),
|
services.globals.allow_device_name_federation(),
|
||||||
|
Duration::from_secs(0),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -124,7 +127,8 @@ pub(crate) async fn claim_keys_route(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = claim_keys_helper(&services, &body.one_time_keys).await?;
|
let result =
|
||||||
|
claim_keys_helper(&services, &body.one_time_keys, Duration::from_secs(0)).await?;
|
||||||
|
|
||||||
Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys })
|
Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys })
|
||||||
}
|
}
|
||||||
|
|||||||
+26
-1
@@ -500,7 +500,19 @@ pub struct Config {
|
|||||||
#[serde(default = "default_appservice_idle_timeout")]
|
#[serde(default = "default_appservice_idle_timeout")]
|
||||||
pub appservice_idle_timeout: u64,
|
pub appservice_idle_timeout: u64,
|
||||||
|
|
||||||
/// Notification gateway pusher idle connection pool timeout.
|
/// Notification gateway pusher request connection timeout (seconds).
|
||||||
|
///
|
||||||
|
/// default: 15
|
||||||
|
#[serde(default = "default_pusher_conn_timeout")]
|
||||||
|
pub pusher_conn_timeout: u64,
|
||||||
|
|
||||||
|
/// Notification gateway pusher total request timeout (seconds).
|
||||||
|
///
|
||||||
|
/// default: 60
|
||||||
|
#[serde(default = "default_pusher_timeout")]
|
||||||
|
pub pusher_timeout: u64,
|
||||||
|
|
||||||
|
/// Notification gateway pusher idle connection pool timeout (seconds).
|
||||||
///
|
///
|
||||||
/// default: 15
|
/// default: 15
|
||||||
#[serde(default = "default_pusher_idle_timeout")]
|
#[serde(default = "default_pusher_idle_timeout")]
|
||||||
@@ -1670,6 +1682,13 @@ pub struct Config {
|
|||||||
#[serde(default = "default_url_preview_max_spider_size")]
|
#[serde(default = "default_url_preview_max_spider_size")]
|
||||||
pub url_preview_max_spider_size: usize,
|
pub url_preview_max_spider_size: usize,
|
||||||
|
|
||||||
|
/// Total request timeout for URL previews (seconds). This includes
|
||||||
|
/// connection, request, and response body reading time.
|
||||||
|
///
|
||||||
|
/// default: 120
|
||||||
|
#[serde(default = "default_url_preview_timeout")]
|
||||||
|
pub url_preview_timeout: u64,
|
||||||
|
|
||||||
/// Option to decide whether you would like to run the domain allowlist
|
/// Option to decide whether you would like to run the domain allowlist
|
||||||
/// checks (contains and explicit) on the root domain or not. Does not apply
|
/// checks (contains and explicit) on the root domain or not. Does not apply
|
||||||
/// to URL contains allowlist. Defaults to false.
|
/// to URL contains allowlist. Defaults to false.
|
||||||
@@ -2424,6 +2443,10 @@ fn default_appservice_timeout() -> u64 { 35 }
|
|||||||
|
|
||||||
fn default_appservice_idle_timeout() -> u64 { 300 }
|
fn default_appservice_idle_timeout() -> u64 { 300 }
|
||||||
|
|
||||||
|
fn default_pusher_conn_timeout() -> u64 { 15 }
|
||||||
|
|
||||||
|
fn default_pusher_timeout() -> u64 { 60 }
|
||||||
|
|
||||||
fn default_pusher_idle_timeout() -> u64 { 15 }
|
fn default_pusher_idle_timeout() -> u64 { 15 }
|
||||||
|
|
||||||
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
||||||
@@ -2551,6 +2574,8 @@ fn default_url_preview_max_spider_size() -> usize {
|
|||||||
256_000 // 256KB
|
256_000 // 256KB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_url_preview_timeout() -> u64 { 120 }
|
||||||
|
|
||||||
fn default_new_user_displayname_suffix() -> String { "🏳️⚧️".to_owned() }
|
fn default_new_user_displayname_suffix() -> String { "🏳️⚧️".to_owned() }
|
||||||
|
|
||||||
fn default_sentry_endpoint() -> Option<Url> { None }
|
fn default_sentry_endpoint() -> Option<Url> { None }
|
||||||
|
|||||||
@@ -188,7 +188,7 @@ pub async fn revoke_admin(&self, user_id: &UserId) -> Result {
|
|||||||
warn!(
|
warn!(
|
||||||
"Revoking the admin status of {user_id} will not work correctly as they are within \
|
"Revoking the admin status of {user_id} will not work correctly as they are within \
|
||||||
the admins_list config."
|
the admins_list config."
|
||||||
)
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let Ok(room_id) = self.get_admin_room().await else {
|
let Ok(room_id) = self.get_admin_room().await else {
|
||||||
|
|||||||
@@ -406,16 +406,23 @@ impl Service {
|
|||||||
|
|
||||||
/// Checks whether a given user is an admin of this server
|
/// Checks whether a given user is an admin of this server
|
||||||
pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
|
pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
|
||||||
if self.services.server.config.admins_list.contains(user_id) {
|
if self
|
||||||
|
.services
|
||||||
|
.server
|
||||||
|
.config
|
||||||
|
.admins_list
|
||||||
|
.contains(&user_id.to_owned())
|
||||||
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.services.server.config.admins_from_room {
|
if self.services.server.config.admins_from_room {
|
||||||
if let Ok(admin_room) = self.get_admin_room().await {
|
if let Ok(admin_room) = self.get_admin_room().await {
|
||||||
self.services
|
return self
|
||||||
|
.services
|
||||||
.state_cache
|
.state_cache
|
||||||
.is_joined(user_id, &admin_room)
|
.is_joined(user_id, &admin_room)
|
||||||
.await
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ impl crate::Service for Service {
|
|||||||
})?
|
})?
|
||||||
.local_address(url_preview_bind_addr)
|
.local_address(url_preview_bind_addr)
|
||||||
.dns_resolver(resolver.resolver.clone())
|
.dns_resolver(resolver.resolver.clone())
|
||||||
|
.timeout(Duration::from_secs(config.url_preview_timeout))
|
||||||
.redirect(redirect::Policy::limited(3))
|
.redirect(redirect::Policy::limited(3))
|
||||||
.build()?,
|
.build()?,
|
||||||
|
|
||||||
@@ -68,6 +69,11 @@ impl crate::Service for Service {
|
|||||||
.dns_resolver(resolver.resolver.hooked.clone())
|
.dns_resolver(resolver.resolver.hooked.clone())
|
||||||
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
||||||
.read_timeout(Duration::from_secs(config.federation_timeout))
|
.read_timeout(Duration::from_secs(config.federation_timeout))
|
||||||
|
.timeout(Duration::from_secs(
|
||||||
|
config
|
||||||
|
.federation_timeout
|
||||||
|
.saturating_add(config.federation_conn_timeout),
|
||||||
|
))
|
||||||
.pool_max_idle_per_host(config.federation_idle_per_host.into())
|
.pool_max_idle_per_host(config.federation_idle_per_host.into())
|
||||||
.pool_idle_timeout(Duration::from_secs(config.federation_idle_timeout))
|
.pool_idle_timeout(Duration::from_secs(config.federation_idle_timeout))
|
||||||
.redirect(redirect::Policy::limited(3))
|
.redirect(redirect::Policy::limited(3))
|
||||||
@@ -77,6 +83,7 @@ impl crate::Service for Service {
|
|||||||
.dns_resolver(resolver.resolver.hooked.clone())
|
.dns_resolver(resolver.resolver.hooked.clone())
|
||||||
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
.connect_timeout(Duration::from_secs(config.federation_conn_timeout))
|
||||||
.read_timeout(Duration::from_secs(305))
|
.read_timeout(Duration::from_secs(305))
|
||||||
|
.timeout(Duration::from_secs(120))
|
||||||
.pool_max_idle_per_host(0)
|
.pool_max_idle_per_host(0)
|
||||||
.redirect(redirect::Policy::limited(3))
|
.redirect(redirect::Policy::limited(3))
|
||||||
.build()?,
|
.build()?,
|
||||||
@@ -103,6 +110,8 @@ impl crate::Service for Service {
|
|||||||
|
|
||||||
pusher: base(config)?
|
pusher: base(config)?
|
||||||
.dns_resolver(resolver.resolver.clone())
|
.dns_resolver(resolver.resolver.clone())
|
||||||
|
.connect_timeout(Duration::from_secs(config.pusher_conn_timeout))
|
||||||
|
.timeout(Duration::from_secs(config.pusher_timeout))
|
||||||
.pool_max_idle_per_host(1)
|
.pool_max_idle_per_host(1)
|
||||||
.pool_idle_timeout(Duration::from_secs(config.pusher_idle_timeout))
|
.pool_idle_timeout(Duration::from_secs(config.pusher_idle_timeout))
|
||||||
.redirect(redirect::Policy::limited(2))
|
.redirect(redirect::Policy::limited(2))
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use std::{fmt::Debug, mem};
|
|||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
|
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
|
||||||
error::inspect_debug_log, implement, trace, utils::string::EMPTY,
|
error::inspect_debug_log, implement, trace,
|
||||||
};
|
};
|
||||||
use http::{HeaderValue, header::AUTHORIZATION};
|
use http::{HeaderValue, header::AUTHORIZATION};
|
||||||
use ipaddress::IPAddress;
|
use ipaddress::IPAddress;
|
||||||
@@ -90,7 +90,9 @@ where
|
|||||||
|
|
||||||
debug!(?method, ?url, "Sending request");
|
debug!(?method, ?url, "Sending request");
|
||||||
match client.execute(request).await {
|
match client.execute(request).await {
|
||||||
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
|
| Ok(response) =>
|
||||||
|
self.handle_response::<T>(dest, actual, &method, &url, response)
|
||||||
|
.await,
|
||||||
| Err(error) =>
|
| Err(error) =>
|
||||||
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
|
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
|
||||||
}
|
}
|
||||||
@@ -119,7 +121,9 @@ fn validate_url(&self, url: &Url) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[implement(super::Service)]
|
||||||
async fn handle_response<T>(
|
async fn handle_response<T>(
|
||||||
|
&self,
|
||||||
dest: &ServerName,
|
dest: &ServerName,
|
||||||
actual: &ActualDest,
|
actual: &ActualDest,
|
||||||
method: &Method,
|
method: &Method,
|
||||||
@@ -162,7 +166,6 @@ async fn into_http_response(
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
// TODO: handle timeout
|
|
||||||
trace!("Waiting for response body...");
|
trace!("Waiting for response body...");
|
||||||
let body = response
|
let body = response
|
||||||
.bytes()
|
.bytes()
|
||||||
@@ -286,10 +289,13 @@ where
|
|||||||
T: OutgoingRequest + Send,
|
T: OutgoingRequest + Send,
|
||||||
{
|
{
|
||||||
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
|
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
|
||||||
const SATIR: SendAccessToken<'_> = SendAccessToken::IfRequired(EMPTY);
|
|
||||||
|
|
||||||
let http_request = request
|
let http_request = request
|
||||||
.try_into_http_request::<Vec<u8>>(actual.string().as_str(), SATIR, &VERSIONS)
|
.try_into_http_request::<Vec<u8>>(
|
||||||
|
actual.string().as_str(),
|
||||||
|
SendAccessToken::None,
|
||||||
|
&VERSIONS,
|
||||||
|
)
|
||||||
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
|
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
|
||||||
|
|
||||||
Ok(http_request)
|
Ok(http_request)
|
||||||
|
|||||||
+18
-18
@@ -198,7 +198,7 @@ impl Service {
|
|||||||
trace!("Push gateway destination: {dest}");
|
trace!("Push gateway destination: {dest}");
|
||||||
|
|
||||||
let http_request = request
|
let http_request = request
|
||||||
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::IfRequired(""), &VERSIONS)
|
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::None, &VERSIONS)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
err!(BadServerResponse(warn!(
|
err!(BadServerResponse(warn!(
|
||||||
"Failed to find destination {dest} for push gateway: {e}"
|
"Failed to find destination {dest} for push gateway: {e}"
|
||||||
@@ -245,7 +245,7 @@ impl Service {
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let body = response.bytes().await?; // TODO: handle timeout
|
let body = response.bytes().await?;
|
||||||
|
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
|
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
|
||||||
@@ -288,7 +288,7 @@ impl Service {
|
|||||||
let mut notify = None;
|
let mut notify = None;
|
||||||
let mut tweaks = Vec::new();
|
let mut tweaks = Vec::new();
|
||||||
if event.room_id().is_none() {
|
if event.room_id().is_none() {
|
||||||
// TODO(hydra): does this matter?
|
// This only affects v12+ create events
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -427,20 +427,20 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let d = vec![device];
|
let d = vec![device];
|
||||||
let mut notifi = Notification::new(d);
|
let mut notify = Notification::new(d);
|
||||||
|
|
||||||
notifi.event_id = Some(event.event_id().to_owned());
|
notify.event_id = Some(event.event_id().to_owned());
|
||||||
notifi.room_id = Some(event.room_id().unwrap().to_owned());
|
notify.room_id = Some(event.room_id().unwrap().to_owned());
|
||||||
if http
|
if http
|
||||||
.data
|
.data
|
||||||
.get("org.matrix.msc4076.disable_badge_count")
|
.get("org.matrix.msc4076.disable_badge_count")
|
||||||
.is_none() && http.data.get("disable_badge_count").is_none()
|
.is_none() && http.data.get("disable_badge_count").is_none()
|
||||||
{
|
{
|
||||||
notifi.counts = NotificationCounts::new(unread, uint!(0));
|
notify.counts = NotificationCounts::new(unread, uint!(0));
|
||||||
} else {
|
} else {
|
||||||
// counts will not be serialised if it's the default (0, 0)
|
// counts will not be serialised if it's the default (0, 0)
|
||||||
// skip_serializing_if = "NotificationCounts::is_default"
|
// skip_serializing_if = "NotificationCounts::is_default"
|
||||||
notifi.counts = NotificationCounts::default();
|
notify.counts = NotificationCounts::default();
|
||||||
}
|
}
|
||||||
|
|
||||||
if !event_id_only {
|
if !event_id_only {
|
||||||
@@ -449,30 +449,30 @@ impl Service {
|
|||||||
.iter()
|
.iter()
|
||||||
.any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
|
.any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
|
||||||
{
|
{
|
||||||
notifi.prio = NotificationPriority::High;
|
notify.prio = NotificationPriority::High;
|
||||||
} else {
|
} else {
|
||||||
notifi.prio = NotificationPriority::Low;
|
notify.prio = NotificationPriority::Low;
|
||||||
}
|
}
|
||||||
notifi.sender = Some(event.sender().to_owned());
|
notify.sender = Some(event.sender().to_owned());
|
||||||
notifi.event_type = Some(event.kind().to_owned());
|
notify.event_type = Some(event.kind().to_owned());
|
||||||
notifi.content = serde_json::value::to_raw_value(event.content()).ok();
|
notify.content = serde_json::value::to_raw_value(event.content()).ok();
|
||||||
|
|
||||||
if *event.kind() == TimelineEventType::RoomMember {
|
if *event.kind() == TimelineEventType::RoomMember {
|
||||||
notifi.user_is_target =
|
notify.user_is_target =
|
||||||
event.state_key() == Some(event.sender().as_str());
|
event.state_key() == Some(event.sender().as_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
notifi.sender_display_name =
|
notify.sender_display_name =
|
||||||
self.services.users.displayname(event.sender()).await.ok();
|
self.services.users.displayname(event.sender()).await.ok();
|
||||||
|
|
||||||
notifi.room_name = self
|
notify.room_name = self
|
||||||
.services
|
.services
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.get_name(event.room_id().unwrap())
|
.get_name(event.room_id().unwrap())
|
||||||
.await
|
.await
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
notifi.room_alias = self
|
notify.room_alias = self
|
||||||
.services
|
.services
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.get_canonical_alias(event.room_id().unwrap())
|
.get_canonical_alias(event.room_id().unwrap())
|
||||||
@@ -480,7 +480,7 @@ impl Service {
|
|||||||
.ok();
|
.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
self.send_request(&http.url, send_event_notification::v1::Request::new(notifi))
|
self.send_request(&http.url, send_event_notification::v1::Request::new(notify))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -1,8 +1,7 @@
|
|||||||
use std::{fmt::Debug, mem};
|
use std::{fmt::Debug, mem};
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use conduwuit::{Err, Result, debug_error, err, trace, utils, warn};
|
use conduwuit::{Err, Result, debug_error, err, implement, trace, utils, warn};
|
||||||
use reqwest::Client;
|
|
||||||
use ruma::api::{
|
use ruma::api::{
|
||||||
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
|
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
|
||||||
};
|
};
|
||||||
@@ -11,8 +10,9 @@ use ruma::api::{
|
|||||||
///
|
///
|
||||||
/// Only returns Ok(None) if there is no url specified in the appservice
|
/// Only returns Ok(None) if there is no url specified in the appservice
|
||||||
/// registration file
|
/// registration file
|
||||||
pub(crate) async fn send_request<T>(
|
#[implement(super::Service)]
|
||||||
client: &Client,
|
pub async fn send_appservice_request<T>(
|
||||||
|
&self,
|
||||||
registration: Registration,
|
registration: Registration,
|
||||||
request: T,
|
request: T,
|
||||||
) -> Result<Option<T::IncomingResponse>>
|
) -> Result<Option<T::IncomingResponse>>
|
||||||
@@ -32,10 +32,10 @@ where
|
|||||||
trace!("Appservice URL \"{dest}\", Appservice ID: {}", registration.id);
|
trace!("Appservice URL \"{dest}\", Appservice ID: {}", registration.id);
|
||||||
|
|
||||||
let hs_token = registration.hs_token.as_str();
|
let hs_token = registration.hs_token.as_str();
|
||||||
let mut http_request = request
|
let http_request = request
|
||||||
.try_into_http_request::<BytesMut>(
|
.try_into_http_request::<BytesMut>(
|
||||||
&dest,
|
&dest,
|
||||||
SendAccessToken::IfRequired(hs_token),
|
SendAccessToken::Appservice(hs_token),
|
||||||
&VERSIONS,
|
&VERSIONS,
|
||||||
)
|
)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
@@ -45,19 +45,10 @@ where
|
|||||||
})?
|
})?
|
||||||
.map(BytesMut::freeze);
|
.map(BytesMut::freeze);
|
||||||
|
|
||||||
let mut parts = http_request.uri().clone().into_parts();
|
|
||||||
let old_path_and_query = parts.path_and_query.unwrap().as_str().to_owned();
|
|
||||||
let symbol = if old_path_and_query.contains('?') { "&" } else { "?" };
|
|
||||||
|
|
||||||
parts.path_and_query = Some(
|
|
||||||
(old_path_and_query + symbol + "access_token=" + hs_token)
|
|
||||||
.parse()
|
|
||||||
.unwrap(),
|
|
||||||
);
|
|
||||||
*http_request.uri_mut() = parts.try_into().expect("our manipulation is always valid");
|
|
||||||
|
|
||||||
let reqwest_request = reqwest::Request::try_from(http_request)?;
|
let reqwest_request = reqwest::Request::try_from(http_request)?;
|
||||||
|
|
||||||
|
let client = &self.services.client.appservice;
|
||||||
|
|
||||||
let mut response = client.execute(reqwest_request).await.map_err(|e| {
|
let mut response = client.execute(reqwest_request).await.map_err(|e| {
|
||||||
warn!("Could not send request to appservice \"{}\" at {dest}: {e:?}", registration.id);
|
warn!("Could not send request to appservice \"{}\" at {dest}: {e:?}", registration.id);
|
||||||
e
|
e
|
||||||
@@ -75,7 +66,7 @@ where
|
|||||||
.expect("http::response::Builder is usable"),
|
.expect("http::response::Builder is usable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let body = response.bytes().await?; // TODO: handle timeout
|
let body = response.bytes().await?;
|
||||||
|
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
|
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
|
||||||
|
|||||||
@@ -18,10 +18,7 @@ use conduwuit::{
|
|||||||
warn,
|
warn,
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, Stream, StreamExt};
|
use futures::{FutureExt, Stream, StreamExt};
|
||||||
use ruma::{
|
use ruma::{RoomId, ServerName, UserId, api::OutgoingRequest};
|
||||||
RoomId, ServerName, UserId,
|
|
||||||
api::{OutgoingRequest, appservice::Registration},
|
|
||||||
};
|
|
||||||
use tokio::{task, task::JoinSet};
|
use tokio::{task, task::JoinSet};
|
||||||
|
|
||||||
use self::data::Data;
|
use self::data::Data;
|
||||||
@@ -318,22 +315,6 @@ impl Service {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a request to an appservice
|
|
||||||
///
|
|
||||||
/// Only returns None if there is no url specified in the appservice
|
|
||||||
/// registration file
|
|
||||||
pub async fn send_appservice_request<T>(
|
|
||||||
&self,
|
|
||||||
registration: Registration,
|
|
||||||
request: T,
|
|
||||||
) -> Result<Option<T::IncomingResponse>>
|
|
||||||
where
|
|
||||||
T: OutgoingRequest + Debug + Send,
|
|
||||||
{
|
|
||||||
let client = &self.services.client.appservice;
|
|
||||||
appservice::send_request(client, registration, request).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Clean up queued sending event data
|
/// Clean up queued sending event data
|
||||||
///
|
///
|
||||||
/// Used after we remove an appservice registration or a user deletes a push
|
/// Used after we remove an appservice registration or a user deletes a push
|
||||||
|
|||||||
@@ -50,9 +50,7 @@ use ruma::{
|
|||||||
};
|
};
|
||||||
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
||||||
|
|
||||||
use super::{
|
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
|
||||||
Destination, EduBuf, EduVec, Msg, SendingEvent, Service, appservice, data::QueueItem,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum TransactionStatus {
|
enum TransactionStatus {
|
||||||
@@ -720,18 +718,18 @@ impl Service {
|
|||||||
|
|
||||||
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
||||||
// transaction");
|
// transaction");
|
||||||
let client = &self.services.client.appservice;
|
|
||||||
match appservice::send_request(
|
match self
|
||||||
client,
|
.send_appservice_request(
|
||||||
appservice,
|
appservice,
|
||||||
ruma::api::appservice::event::push_events::v1::Request {
|
ruma::api::appservice::event::push_events::v1::Request {
|
||||||
events: pdu_jsons,
|
events: pdu_jsons,
|
||||||
txn_id: txn_id.into(),
|
txn_id: txn_id.into(),
|
||||||
ephemeral: edu_jsons,
|
ephemeral: edu_jsons,
|
||||||
to_device: Vec::new(), // TODO
|
to_device: Vec::new(), // TODO
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
| Ok(_) => Ok(Destination::Appservice(id)),
|
| Ok(_) => Ok(Destination::Appservice(id)),
|
||||||
| Err(e) => Err((Destination::Appservice(id), e)),
|
| Err(e) => Err((Destination::Appservice(id), e)),
|
||||||
|
|||||||
Reference in New Issue
Block a user