2022-10-08 13:02:52 +02:00
mod data ;
2020-12-19 16:00:11 +01:00
use std ::{
2024-03-05 19:48:54 -05:00
collections ::{ BTreeMap , HashMap , HashSet },
fmt ::Debug ,
sync ::Arc ,
time ::{ Duration , Instant },
2020-12-19 16:00:11 +01:00
};
2020-09-15 16:13:54 +02:00
2024-03-05 19:48:54 -05:00
use base64 ::{ engine ::general_purpose , Engine as _ };
pub use data ::Data ;
2020-09-23 15:23:29 +02:00
use federation ::transactions ::send_transaction_message ;
2022-01-20 11:51:31 +01:00
use futures_util ::{ stream ::FuturesUnordered , StreamExt };
2024-03-05 19:48:54 -05:00
use ipaddress ::IPAddress ;
2020-12-08 10:33:44 +01:00
use ruma ::{
2024-03-05 19:48:54 -05:00
api ::{
appservice ::{ self , Registration },
federation ::{
self ,
transactions ::edu ::{
DeviceListUpdateContent , Edu , PresenceContent , PresenceUpdate , ReceiptContent , ReceiptData , ReceiptMap ,
},
},
OutgoingRequest ,
},
device_id ,
events ::{ push_rules ::PushRulesEvent , receipt ::ReceiptType , AnySyncEphemeralRoomEvent , GlobalAccountDataEventType },
push , uint , MilliSecondsSinceUnixEpoch , OwnedServerName , OwnedUserId , ServerName , UInt , UserId ,
2020-12-08 10:33:44 +01:00
};
2021-07-14 07:07:08 +00:00
use tokio ::{
2024-03-05 19:48:54 -05:00
select ,
sync ::{ mpsc , Mutex , Semaphore },
2021-07-14 07:07:08 +00:00
};
2024-01-21 22:59:06 -05:00
use tracing ::{ debug , error , info , warn };
2021-01-26 21:54:35 -05:00
2024-03-05 19:48:54 -05:00
use crate ::{
api ::{ appservice_server , server_server },
services ,
utils ::calculate_hash ,
Config , Error , PduEvent , Result ,
};
2021-01-26 21:54:35 -05:00
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind {
2024-03-05 19:48:54 -05:00
Appservice ( String ),
Push ( OwnedUserId , String ), // user and pushkey
Normal ( OwnedServerName ),
2021-01-26 21:54:35 -05:00
}
2021-05-12 20:04:28 +02:00
impl OutgoingKind {
2024-03-05 19:48:54 -05:00
#[tracing::instrument(skip(self))]
pub fn get_prefix ( & self ) -> Vec < u8 > {
let mut prefix = match self {
OutgoingKind ::Appservice ( server ) => {
let mut p = b "+" . to_vec ();
p . extend_from_slice ( server . as_bytes ());
p
},
OutgoingKind ::Push ( user , pushkey ) => {
let mut p = b "$" . to_vec ();
p . extend_from_slice ( user . as_bytes ());
p . push ( 0xFF );
p . extend_from_slice ( pushkey . as_bytes ());
p
},
OutgoingKind ::Normal ( server ) => {
let mut p = Vec ::new ();
p . extend_from_slice ( server . as_bytes ());
p
},
};
prefix . push ( 0xFF );
prefix
}
2021-05-12 20:04:28 +02:00
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
2024-03-02 20:55:02 -05:00
#[allow(clippy::module_name_repetitions)]
2021-05-12 20:04:28 +02:00
pub enum SendingEventType {
2024-03-05 19:48:54 -05:00
Pdu ( Vec < u8 > ), // pduid
Edu ( Vec < u8 > ), // pdu json
2021-05-12 20:04:28 +02:00
}
2022-09-07 13:25:51 +02:00
pub struct Service {
2024-03-05 19:48:54 -05:00
db : & 'static dyn Data ,
2022-10-08 13:02:52 +02:00
2024-03-05 19:48:54 -05:00
/// The state for a given state hash.
pub ( super ) maximum_requests : Arc < Semaphore > ,
pub sender : mpsc ::UnboundedSender < ( OutgoingKind , SendingEventType , Vec < u8 > ) > ,
receiver : Mutex < mpsc ::UnboundedReceiver < ( OutgoingKind , SendingEventType , Vec < u8 > ) >> ,
2020-09-15 16:13:54 +02:00
}
2021-04-24 18:01:05 +02:00
enum TransactionStatus {
2024-03-05 19:48:54 -05:00
Running ,
Failed ( u32 , Instant ), // number of times failed, time of last failure
Retrying ( u32 ), // number of times failed
2021-04-24 18:01:05 +02:00
}
2022-09-07 13:25:51 +02:00
impl Service {
2024-03-05 19:48:54 -05:00
pub fn build ( db : & 'static dyn Data , config : & Config ) -> Arc < Self > {
let ( sender , receiver ) = mpsc ::unbounded_channel ();
Arc ::new ( Self {
db ,
sender ,
receiver : Mutex ::new ( receiver ),
maximum_requests : Arc ::new ( Semaphore ::new ( config . max_concurrent_requests as usize )),
})
}
pub fn start_handler ( self : & Arc < Self > ) {
let self2 = Arc ::clone ( self );
tokio ::spawn ( async move {
self2 . handler (). await . unwrap ();
});
}
async fn handler ( & self ) -> Result < () > {
let mut receiver = self . receiver . lock (). await ;
let mut futures = FuturesUnordered ::new ();
let mut current_transaction_status = HashMap ::< OutgoingKind , TransactionStatus > ::new ();
// Retry requests we could not finish yet
let mut initial_transactions = HashMap ::< OutgoingKind , Vec < SendingEventType >> ::new ();
for ( key , outgoing_kind , event ) in self . db . active_requests (). filter_map ( std ::result ::Result ::ok ) {
let entry = initial_transactions . entry ( outgoing_kind . clone ()). or_default ();
if entry . len () > 30 {
warn! ( "Dropping some current events: {:?} {:?} {:?}" , key , outgoing_kind , event );
self . db . delete_active_request ( key ) ? ;
continue ;
}
entry . push ( event );
}
for ( outgoing_kind , events ) in initial_transactions {
current_transaction_status . insert ( outgoing_kind . clone (), TransactionStatus ::Running );
futures . push ( Self ::handle_events ( outgoing_kind . clone (), events ));
}
loop {
select! {
Some ( response ) = futures . next () => {
match response {
Ok ( outgoing_kind ) => {
self . db . delete_all_active_requests_for ( & outgoing_kind ) ? ;
// Find events that have been added since starting the last request
let new_events = self . db . queued_requests ( & outgoing_kind ). filter_map ( std ::result ::Result ::ok ). take ( 30 ). collect ::< Vec < _ >> ();
if ! new_events . is_empty () {
// Insert pdus we found
self . db . mark_as_active ( & new_events ) ? ;
futures . push (
Self ::handle_events (
outgoing_kind . clone (),
new_events . into_iter (). map ( | ( event , _ ) | event ). collect (),
)
);
} else {
current_transaction_status . remove ( & outgoing_kind );
}
}
Err (( outgoing_kind , _ )) => {
current_transaction_status . entry ( outgoing_kind ). and_modify ( | e | * e = match e {
TransactionStatus ::Running => TransactionStatus ::Failed ( 1 , Instant ::now ()),
TransactionStatus ::Retrying ( n ) => TransactionStatus ::Failed ( * n + 1 , Instant ::now ()),
TransactionStatus ::Failed ( _ , _ ) => {
error! ( "Request that was not even running failed?!" );
return
},
});
}
};
},
Some (( outgoing_kind , event , key )) = receiver . recv () => {
if let Ok ( Some ( events )) = self . select_events (
& outgoing_kind ,
vec! [( event , key )],
& mut current_transaction_status ,
) {
futures . push ( Self ::handle_events ( outgoing_kind , events ));
}
}
}
}
}
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
fn select_events (
& self ,
outgoing_kind : & OutgoingKind ,
new_events : Vec < ( SendingEventType , Vec < u8 > ) > , // Events we want to send: event and full key
current_transaction_status : & mut HashMap < OutgoingKind , TransactionStatus > ,
) -> Result < Option < Vec < SendingEventType >>> {
let mut retry = false ;
let mut allow = true ;
let entry = current_transaction_status . entry ( outgoing_kind . clone ());
entry
. and_modify ( | e | match e {
TransactionStatus ::Running | TransactionStatus ::Retrying ( _ ) => {
allow = false ; // already running
},
TransactionStatus ::Failed ( tries , time ) => {
// Fail if a request has failed recently (exponential backoff)
let mut min_elapsed_duration = Duration ::from_secs ( 5 * 60 ) * ( * tries ) * ( * tries );
if min_elapsed_duration > Duration ::from_secs ( 60 * 60 * 24 ) {
min_elapsed_duration = Duration ::from_secs ( 60 * 60 * 24 );
}
if time . elapsed () < min_elapsed_duration {
allow = false ;
} else {
retry = true ;
* e = TransactionStatus ::Retrying ( * tries );
}
},
})
. or_insert ( TransactionStatus ::Running );
if ! allow {
return Ok ( None );
}
let mut events = Vec ::new ();
if retry {
// We retry the previous transaction
for ( _ , e ) in self . db . active_requests_for ( outgoing_kind ). filter_map ( std ::result ::Result ::ok ) {
events . push ( e );
}
} else {
self . db . mark_as_active ( & new_events ) ? ;
for ( e , _ ) in new_events {
events . push ( e );
}
if let OutgoingKind ::Normal ( server_name ) = outgoing_kind {
if let Ok (( select_edus , last_count )) = self . select_edus ( server_name ) {
events . extend ( select_edus . into_iter (). map ( SendingEventType ::Edu ));
self . db . set_latest_educount ( server_name , last_count ) ? ;
}
}
}
Ok ( Some ( events ))
}
#[tracing::instrument(skip(self, server_name))]
pub fn select_edus ( & self , server_name : & ServerName ) -> Result < ( Vec < Vec < u8 >> , u64 ) > {
// u64: count of last edu
let since = self . db . get_latest_educount ( server_name ) ? ;
let mut events = Vec ::new ();
let mut max_edu_count = since ;
let mut device_list_changes = HashSet ::new ();
'outer : for room_id in services (). rooms . state_cache . server_rooms ( server_name ) {
let room_id = room_id ? ;
// Look for device list updates in this room
device_list_changes . extend (
services ()
. users
. keys_changed ( room_id . as_ref (), since , None )
. filter_map ( std ::result ::Result ::ok )
. filter ( | user_id | user_id . server_name () == services (). globals . server_name ()),
);
if services (). globals . allow_outgoing_presence () {
// Look for presence updates in this room
let mut presence_updates = Vec ::new ();
for ( user_id , count , presence_event ) in services (). rooms . edus . presence . presence_since ( & room_id , since ) {
if count > max_edu_count {
max_edu_count = count ;
}
if user_id . server_name () != services (). globals . server_name () {
continue ;
}
presence_updates . push ( PresenceUpdate {
user_id ,
presence : presence_event . content . presence ,
currently_active : presence_event . content . currently_active . unwrap_or ( false ),
last_active_ago : presence_event . content . last_active_ago . unwrap_or_else ( || uint! ( 0 )),
status_msg : presence_event . content . status_msg ,
});
}
let presence_content = Edu ::Presence ( PresenceContent ::new ( presence_updates ));
events . push ( serde_json ::to_vec ( & presence_content ). expect ( "PresenceEvent can be serialized" ));
}
// Look for read receipts in this room
for r in services (). rooms . edus . read_receipt . readreceipts_since ( & room_id , since ) {
let ( user_id , count , read_receipt ) = r ? ;
if count > max_edu_count {
max_edu_count = count ;
}
if user_id . server_name () != services (). globals . server_name () {
continue ;
}
let event : AnySyncEphemeralRoomEvent = serde_json ::from_str ( read_receipt . json (). get ())
. map_err ( | _ | Error ::bad_database ( "Invalid edu event in read_receipts." )) ? ;
let federation_event = match event {
AnySyncEphemeralRoomEvent ::Receipt ( r ) => {
let mut read = BTreeMap ::new ();
let ( event_id , mut receipt ) =
r . content . 0. into_iter (). next (). expect ( "we only use one event per read receipt" );
let receipt = receipt
. remove ( & ReceiptType ::Read )
. expect ( "our read receipts always set this" )
. remove ( & user_id )
. expect ( "our read receipts always have the user here" );
read . insert (
user_id ,
ReceiptData {
data : receipt . clone (),
event_ids : vec ! [ event_id . clone ()],
},
);
let receipt_map = ReceiptMap {
read ,
};
let mut receipts = BTreeMap ::new ();
receipts . insert ( room_id . clone (), receipt_map );
Edu ::Receipt ( ReceiptContent {
receipts ,
})
},
_ => {
Error ::bad_database ( "Invalid event type in read_receipts" );
continue ;
},
};
events . push ( serde_json ::to_vec ( & federation_event ). expect ( "json can be serialized" ));
if events . len () >= 20 {
break 'outer ;
}
}
}
for user_id in device_list_changes {
// Empty prev id forces synapse to resync: https://github.com/matrix-org/synapse/blob/98aec1cc9da2bd6b8e34ffb282c85abf9b8b42ca/synapse/handlers/device.py#L767
// Because synapse resyncs, we can just insert dummy data
let edu = Edu ::DeviceListUpdate ( DeviceListUpdateContent {
user_id ,
device_id : device_id ! ( "dummy" ). to_owned (),
device_display_name : Some ( "Dummy" . to_owned ()),
stream_id : uint ! ( 1 ),
prev_id : Vec ::new (),
deleted : None ,
keys : None ,
});
events . push ( serde_json ::to_vec ( & edu ). expect ( "json can be serialized" ));
}
Ok (( events , max_edu_count ))
}
#[tracing::instrument(skip(self, pdu_id, user, pushkey))]
pub fn send_push_pdu ( & self , pdu_id : & [ u8 ], user : & UserId , pushkey : String ) -> Result < () > {
let outgoing_kind = OutgoingKind ::Push ( user . to_owned (), pushkey );
let event = SendingEventType ::Pdu ( pdu_id . to_owned ());
let keys = self . db . queue_requests ( & [( & outgoing_kind , event . clone ())]) ? ;
self . sender . send (( outgoing_kind , event , keys . into_iter (). next (). unwrap ())). unwrap ();
Ok (())
}
#[tracing::instrument(skip(self, servers, pdu_id))]
pub fn send_pdu < I : Iterator < Item = OwnedServerName >> ( & self , servers : I , pdu_id : & [ u8 ]) -> Result < () > {
let requests = servers
. into_iter ()
. map ( | server | ( OutgoingKind ::Normal ( server ), SendingEventType ::Pdu ( pdu_id . to_owned ())))
. collect ::< Vec < _ >> ();
let keys = self . db . queue_requests ( & requests . iter (). map ( | ( o , e ) | ( o , e . clone ())). collect ::< Vec < _ >> ()) ? ;
for (( outgoing_kind , event ), key ) in requests . into_iter (). zip ( keys ) {
self . sender . send (( outgoing_kind . clone (), event , key )). unwrap ();
}
Ok (())
}
#[tracing::instrument(skip(self, server, serialized))]
pub fn send_reliable_edu ( & self , server : & ServerName , serialized : Vec < u8 > , id : u64 ) -> Result < () > {
let outgoing_kind = OutgoingKind ::Normal ( server . to_owned ());
let event = SendingEventType ::Edu ( serialized );
let keys = self . db . queue_requests ( & [( & outgoing_kind , event . clone ())]) ? ;
self . sender . send (( outgoing_kind , event , keys . into_iter (). next (). unwrap ())). unwrap ();
Ok (())
}
#[tracing::instrument(skip(self))]
pub fn send_pdu_appservice ( & self , appservice_id : String , pdu_id : Vec < u8 > ) -> Result < () > {
let outgoing_kind = OutgoingKind ::Appservice ( appservice_id );
let event = SendingEventType ::Pdu ( pdu_id );
let keys = self . db . queue_requests ( & [( & outgoing_kind , event . clone ())]) ? ;
self . sender . send (( outgoing_kind , event , keys . into_iter (). next (). unwrap ())). unwrap ();
Ok (())
}
/// Cleanup event data
/// Used for instance after we remove an appservice registration
#[tracing::instrument(skip(self))]
pub fn cleanup_events ( & self , appservice_id : String ) -> Result < () > {
self . db . delete_all_requests_for ( & OutgoingKind ::Appservice ( appservice_id )) ? ;
Ok (())
}
#[tracing::instrument(skip(events, kind))]
async fn handle_events (
kind : OutgoingKind , events : Vec < SendingEventType > ,
) -> Result < OutgoingKind , ( OutgoingKind , Error ) > {
match & kind {
OutgoingKind ::Appservice ( id ) => {
let mut pdu_jsons = Vec ::new ();
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) => {
pdu_jsons . push (
services ()
. rooms
. timeline
. get_pdu_from_id ( pdu_id )
. map_err ( | e | ( kind . clone (), e )) ?
. ok_or_else ( || {
(
kind . clone (),
Error ::bad_database (
"[Appservice] Event in servernameevent_data not found in db." ,
),
)
}) ?
. to_room_event (),
);
},
SendingEventType ::Edu ( _ ) => {
// Appservices don't need EDUs (?)
},
}
}
let permit = services (). sending . maximum_requests . acquire (). await ;
let response = match appservice_server ::send_request (
services (). appservice . get_registration ( id ). map_err ( | e | ( kind . clone (), e )) ? . ok_or_else ( || {
(
kind . clone (),
Error ::bad_database ( "[Appservice] Could not load registration from db." ),
)
}) ? ,
appservice ::event ::push_events ::v1 ::Request {
events : pdu_jsons ,
txn_id : ( &* general_purpose ::URL_SAFE_NO_PAD . encode ( calculate_hash (
& events
. iter ()
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) => &** b ,
})
. collect ::< Vec < _ >> (),
)))
. into (),
},
)
. await
{
None => Ok ( kind . clone ()),
Some ( op_resp ) => op_resp . map ( | _response | kind . clone ()). map_err ( | e | ( kind . clone (), e )),
};
drop ( permit );
response
},
OutgoingKind ::Push ( userid , pushkey ) => {
let mut pdus = Vec ::new ();
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) => {
pdus . push (
services ()
. rooms
. timeline
. get_pdu_from_id ( pdu_id )
. map_err ( | e | ( kind . clone (), e )) ?
. ok_or_else ( || {
(
kind . clone (),
Error ::bad_database (
"[Push] Event in servernamevent_datas not found in db." ,
),
)
}) ? ,
);
},
SendingEventType ::Edu ( _ ) => {
// Push gateways don't need EDUs (?)
},
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if let Some ( unsigned ) = & pdu . unsigned {
if let Ok ( unsigned ) = serde_json ::from_str ::< serde_json ::Value > ( unsigned . get ()) {
if unsigned . get ( "redacted_because" ). is_some () {
continue ;
}
}
}
let Some ( pusher ) = services ()
. pusher
. get_pusher ( userid , pushkey )
. map_err ( | e | ( OutgoingKind ::Push ( userid . clone (), pushkey . clone ()), e )) ?
else {
continue ;
};
let rules_for_user = services ()
. account_data
. get ( None , userid , GlobalAccountDataEventType ::PushRules . to_string (). into ())
. unwrap_or_default ()
. and_then ( | event | serde_json ::from_str ::< PushRulesEvent > ( event . get ()). ok ())
. map_or_else ( || push ::Ruleset ::server_default ( userid ), | ev : PushRulesEvent | ev . content . global );
let unread : UInt = services ()
. rooms
. user
. notification_count ( userid , & pdu . room_id )
. map_err ( | e | ( kind . clone (), e )) ?
. try_into ()
. expect ( "notification count can't go that high" );
let permit = services (). sending . maximum_requests . acquire (). await ;
let _response = services ()
. pusher
. send_push_notice ( userid , unread , & pusher , rules_for_user , & pdu )
. await
. map ( | _response | kind . clone ())
. map_err ( | e | ( kind . clone (), e ));
drop ( permit );
}
Ok ( OutgoingKind ::Push ( userid . clone (), pushkey . clone ()))
},
OutgoingKind ::Normal ( server ) => {
let mut edu_jsons = Vec ::new ();
let mut pdu_jsons = Vec ::new ();
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) => {
// TODO: check room version and remove event_id if needed
let raw = PduEvent ::convert_to_outgoing_federation_event (
services ()
. rooms
. timeline
. get_pdu_json_from_id ( pdu_id )
. map_err ( | e | ( OutgoingKind ::Normal ( server . clone ()), e )) ?
. ok_or_else ( || {
error! ( "event not found: {server} {pdu_id:?}" );
(
OutgoingKind ::Normal ( server . clone ()),
Error ::bad_database (
"[Normal] Event in servernamevent_datas not found in db." ,
),
)
}) ? ,
);
pdu_jsons . push ( raw );
},
SendingEventType ::Edu ( edu ) => {
if let Ok ( raw ) = serde_json ::from_slice ( edu ) {
edu_jsons . push ( raw );
}
},
}
}
let permit = services (). sending . maximum_requests . acquire (). await ;
let response = server_server ::send_request (
server ,
send_transaction_message ::v1 ::Request {
origin : services (). globals . server_name (). to_owned (),
pdus : pdu_jsons ,
edus : edu_jsons ,
origin_server_ts : MilliSecondsSinceUnixEpoch ::now (),
transaction_id : ( &* general_purpose ::URL_SAFE_NO_PAD . encode ( calculate_hash (
& events
. iter ()
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) => &** b ,
})
. collect ::< Vec < _ >> (),
)))
. into (),
},
)
. await
. map ( | response | {
for pdu in response . pdus {
if pdu . 1. is_err () {
warn! ( "Failed to send to {}: {:?}" , server , pdu );
}
}
kind . clone ()
})
. map_err ( | e | ( kind , e ));
drop ( permit );
response
},
}
}
#[tracing::instrument(skip(self, destination, request))]
pub async fn send_federation_request < T > ( & self , destination : & ServerName , request : T ) -> Result < T ::IncomingResponse >
where
T : OutgoingRequest + Debug ,
{
2024-03-16 16:07:42 -04:00
if ! services (). globals . allow_federation () {
return Err ( Error ::bad_config ( "Federation is disabled." ));
}
2024-03-05 19:48:54 -05:00
if destination . is_ip_literal () || IPAddress ::is_valid ( destination . host ()) {
info! (
"Destination {} is an IP literal, checking against IP range denylist." ,
destination
);
let ip = IPAddress ::parse ( destination . host ()). map_err ( | e | {
warn! ( "Failed to parse IP literal from string: {}" , e );
Error ::BadServerResponse ( "Invalid IP address" )
}) ? ;
let cidr_ranges_s = services (). globals . ip_range_denylist (). to_vec ();
let mut cidr_ranges : Vec < IPAddress > = Vec ::new ();
for cidr in cidr_ranges_s {
cidr_ranges . push ( IPAddress ::parse ( cidr ). expect ( "we checked this at startup" ));
}
debug! ( "List of pushed CIDR ranges: {:?}" , cidr_ranges );
for cidr in cidr_ranges {
if cidr . includes ( & ip ) {
return Err ( Error ::BadServerResponse ( "Not allowed to send requests to this IP" ));
}
}
info! ( "IP literal {} is allowed." , destination );
}
debug! ( "Waiting for permit" );
let permit = self . maximum_requests . acquire (). await ;
debug! ( "Got permit" );
let response =
tokio ::time ::timeout ( Duration ::from_secs ( 5 * 60 ), server_server ::send_request ( destination , request ))
. await
. map_err ( | _ | {
warn! ( "Timeout after 300 seconds waiting for server response of {destination}" );
Error ::BadServerResponse ( "Timeout after 300 seconds waiting for server response" )
}) ? ;
drop ( permit );
response
}
/// Sends a request to an appservice
///
/// Only returns None if there is no url specified in the appservice
/// registration file
pub async fn send_appservice_request < T > (
& self , registration : Registration , request : T ,
) -> Option < Result < T ::IncomingResponse >>
where
T : OutgoingRequest + Debug ,
{
let permit = self . maximum_requests . acquire (). await ;
let response = appservice_server ::send_request ( registration , request ). await ;
drop ( permit );
response
}
2020-09-15 16:13:54 +02:00
}