2020-12-19 16:00:11 +01:00
use std ::{
2021-08-24 19:10:31 +02:00
collections ::{ BTreeMap , HashMap , HashSet },
2021-04-12 12:40:16 +02:00
convert ::{ TryFrom , TryInto },
2021-02-11 13:16:14 +01:00
fmt ::Debug ,
2020-12-19 16:00:11 +01:00
sync ::Arc ,
2021-05-20 23:46:52 +02:00
time ::{ Duration , Instant },
2020-12-19 16:00:11 +01:00
};
2020-09-15 16:13:54 +02:00
2021-03-15 09:48:19 +01:00
use crate ::{
appservice_server , database ::pusher , server_server , utils , Database , Error , PduEvent , Result ,
};
2020-09-23 15:23:29 +02:00
use federation ::transactions ::send_transaction_message ;
2021-03-02 14:36:48 +01:00
use ring ::digest ;
2021-06-08 18:10:00 +02:00
use rocket ::futures ::{
channel ::mpsc ,
stream ::{ FuturesUnordered , StreamExt },
};
2020-12-08 10:33:44 +01:00
use ruma ::{
2021-05-17 10:25:27 +02:00
api ::{
appservice ,
federation ::{
self ,
2021-08-24 19:10:31 +02:00
transactions ::edu ::{
DeviceListUpdateContent , Edu , ReceiptContent , ReceiptData , ReceiptMap ,
},
2021-05-17 10:25:27 +02:00
},
OutgoingRequest ,
},
2021-08-24 19:10:31 +02:00
device_id ,
2021-05-17 10:25:27 +02:00
events ::{ push_rules , AnySyncEphemeralRoomEvent , EventType },
2021-05-20 23:46:52 +02:00
push ,
receipt ::ReceiptType ,
2021-08-24 19:10:31 +02:00
uint , MilliSecondsSinceUnixEpoch , ServerName , UInt , UserId ,
2020-12-08 10:33:44 +01:00
};
2021-07-14 07:07:08 +00:00
use tokio ::{
select ,
sync ::{ RwLock , Semaphore },
};
2021-07-29 08:36:01 +02:00
use tracing ::{ error , warn };
2021-01-26 21:54:35 -05:00
2021-06-08 18:10:00 +02:00
use super ::abstraction ::Tree ;
2021-01-26 21:54:35 -05:00
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind {
Appservice ( Box < ServerName > ),
2021-03-22 14:04:11 +01:00
Push ( Vec < u8 > , Vec < u8 > ), // user and pushkey
2021-01-26 21:54:35 -05:00
Normal ( Box < ServerName > ),
}
2021-05-12 20:04:28 +02:00
impl OutgoingKind {
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(self))]
2021-05-12 20:04:28 +02:00
pub fn get_prefix ( & self ) -> Vec < u8 > {
let mut prefix = match self {
OutgoingKind ::Appservice ( server ) => {
let mut p = b "+" . to_vec ();
p . extend_from_slice ( server . as_bytes ());
p
}
OutgoingKind ::Push ( user , pushkey ) => {
let mut p = b "$" . to_vec ();
p . extend_from_slice ( & user );
p . push ( 0xff );
p . extend_from_slice ( & pushkey );
p
}
OutgoingKind ::Normal ( server ) => {
let mut p = Vec ::new ();
p . extend_from_slice ( server . as_bytes ());
p
}
};
prefix . push ( 0xff );
prefix
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum SendingEventType {
Pdu ( Vec < u8 > ),
Edu ( Vec < u8 > ),
}
2020-09-15 16:13:54 +02:00
pub struct Sending {
/// The state for a given state hash.
2021-06-08 18:10:00 +02:00
pub ( super ) servername_educount : Arc < dyn Tree > , // EduCount: Count of last EDU sync
2021-07-29 20:17:47 +02:00
pub ( super ) servernameevent_data : Arc < dyn Tree > , // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / * (for edus), Data = EDU content
pub ( super ) servercurrentevent_data : Arc < dyn Tree > , // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / * (for edus), Data = EDU content
2020-12-19 16:00:11 +01:00
pub ( super ) maximum_requests : Arc < Semaphore > ,
2021-07-29 20:17:47 +02:00
pub sender : mpsc ::UnboundedSender < ( Vec < u8 > , Vec < u8 > ) > ,
2020-09-15 16:13:54 +02:00
}
2021-04-24 18:01:05 +02:00
enum TransactionStatus {
Running ,
Failed ( u32 , Instant ), // number of times failed, time of last failure
Retrying ( u32 ), // number of times failed
}
2020-09-15 16:13:54 +02:00
impl Sending {
2021-07-14 07:07:08 +00:00
pub fn start_handler (
& self ,
db : Arc < RwLock < Database >> ,
2021-07-29 20:17:47 +02:00
mut receiver : mpsc ::UnboundedReceiver < ( Vec < u8 > , Vec < u8 > ) > ,
2021-07-14 07:07:08 +00:00
) {
2020-09-15 16:13:54 +02:00
tokio ::spawn ( async move {
let mut futures = FuturesUnordered ::new ();
2020-09-23 15:23:29 +02:00
2021-04-24 18:01:05 +02:00
let mut current_transaction_status = HashMap ::< Vec < u8 > , TransactionStatus > ::new ();
2020-10-21 16:08:54 +02:00
2021-06-08 18:10:00 +02:00
// Retry requests we could not finish yet
2021-05-12 20:04:28 +02:00
let mut initial_transactions = HashMap ::< OutgoingKind , Vec < SendingEventType >> ::new ();
2021-07-14 07:07:08 +00:00
let guard = db . read (). await ;
2021-07-29 20:17:47 +02:00
for ( key , outgoing_kind , event ) in guard
. sending
. servercurrentevent_data
. iter ()
. filter_map ( | ( key , v ) | {
Self ::parse_servercurrentevent ( & key , v )
. ok ()
. map ( | ( k , e ) | ( key , k , e ))
})
2020-10-21 16:08:54 +02:00
{
2021-04-24 18:01:05 +02:00
let entry = initial_transactions
. entry ( outgoing_kind . clone ())
2021-02-26 13:24:07 +01:00
. or_insert_with ( Vec ::new );
if entry . len () > 30 {
2021-04-24 18:01:05 +02:00
warn! (
2021-05-12 20:04:28 +02:00
"Dropping some current events: {:?} {:?} {:?}" ,
key , outgoing_kind , event
2021-04-24 18:01:05 +02:00
);
2021-07-29 20:17:47 +02:00
guard . sending . servercurrentevent_data . remove ( & key ). unwrap ();
2021-02-26 13:24:07 +01:00
continue ;
}
2021-05-12 20:04:28 +02:00
entry . push ( event );
2020-10-21 16:08:54 +02:00
}
2021-07-14 07:07:08 +00:00
drop ( guard );
2021-05-12 20:04:28 +02:00
for ( outgoing_kind , events ) in initial_transactions {
current_transaction_status
. insert ( outgoing_kind . get_prefix (), TransactionStatus ::Running );
2021-07-14 07:07:08 +00:00
futures . push ( Self ::handle_events (
outgoing_kind . clone (),
events ,
Arc ::clone ( & db ),
));
2020-10-21 16:08:54 +02:00
}
2020-09-15 16:13:54 +02:00
loop {
select! {
2020-12-08 10:33:44 +01:00
Some ( response ) = futures . next () => {
match response {
2021-01-26 21:54:35 -05:00
Ok ( outgoing_kind ) => {
2021-07-14 07:07:08 +00:00
let guard = db . read (). await ;
2021-05-12 20:04:28 +02:00
let prefix = outgoing_kind . get_prefix ();
2021-07-29 20:17:47 +02:00
for ( key , _ ) in guard . sending . servercurrentevent_data
2021-06-08 18:10:00 +02:00
. scan_prefix ( prefix . clone ())
2020-10-21 16:08:54 +02:00
{
2021-07-29 20:17:47 +02:00
guard . sending . servercurrentevent_data . remove ( & key ). unwrap ();
2020-10-21 16:08:54 +02:00
}
// Find events that have been added since starting the last request
2021-07-29 20:17:47 +02:00
let new_events = guard . sending . servernameevent_data
2021-06-08 18:10:00 +02:00
. scan_prefix ( prefix . clone ())
2021-07-29 20:17:47 +02:00
. filter_map ( | ( k , v ) | {
Self ::parse_servercurrentevent ( & k , v ). ok (). map ( | ev | ( ev , k ))
2020-11-03 21:20:35 +01:00
})
2021-02-26 13:24:07 +01:00
. take ( 30 )
2020-11-03 21:20:35 +01:00
. collect ::< Vec < _ >> ();
2020-10-21 16:08:54 +02:00
2021-05-12 20:04:28 +02:00
// TODO: find edus
if ! new_events . is_empty () {
// Insert pdus we found
2021-07-29 20:17:47 +02:00
for ( e , key ) in & new_events {
let value = if let SendingEventType ::Edu ( value ) = & e . 1 { &** value } else { & [] };
guard . sending . servercurrentevent_data . insert ( & key , value ). unwrap ();
guard . sending . servernameevent_data . remove ( & key ). unwrap ();
2020-10-21 16:08:54 +02:00
}
2021-07-14 07:07:08 +00:00
drop ( guard );
2021-01-26 21:54:35 -05:00
futures . push (
2021-05-12 20:04:28 +02:00
Self ::handle_events (
2021-01-26 21:54:35 -05:00
outgoing_kind . clone (),
2021-07-20 23:36:03 +02:00
new_events . into_iter (). map ( | ( event , _ ) | event . 1 ). collect (),
2021-07-14 07:07:08 +00:00
Arc ::clone ( & db ),
2021-01-26 21:54:35 -05:00
)
);
2020-10-21 16:08:54 +02:00
} else {
2021-04-24 18:01:05 +02:00
current_transaction_status . remove ( & prefix );
2020-10-21 16:08:54 +02:00
}
2020-09-23 15:23:29 +02:00
}
2021-03-25 23:55:40 +01:00
Err (( outgoing_kind , _ )) => {
2021-05-12 20:04:28 +02:00
current_transaction_status . entry ( outgoing_kind . get_prefix ()). and_modify ( | e | * e = match e {
2021-04-24 18:01:05 +02:00
TransactionStatus ::Running => TransactionStatus ::Failed ( 1 , Instant ::now ()),
TransactionStatus ::Retrying ( n ) => TransactionStatus ::Failed ( * n + 1 , Instant ::now ()),
TransactionStatus ::Failed ( _ , _ ) => {
error! ( "Request that was not even running failed?!" );
return
2020-12-19 16:00:11 +01:00
},
});
2020-09-23 15:23:29 +02:00
}
};
},
2021-07-29 20:17:47 +02:00
Some (( key , value )) = receiver . next () => {
if let Ok (( outgoing_kind , event )) = Self ::parse_servercurrentevent ( & key , value ) {
2021-07-14 07:07:08 +00:00
let guard = db . read (). await ;
2021-06-08 18:10:00 +02:00
if let Ok ( Some ( events )) = Self ::select_events (
& outgoing_kind ,
vec! [( event , key )],
& mut current_transaction_status ,
2021-07-14 07:07:08 +00:00
& guard
2021-06-08 18:10:00 +02:00
) {
2021-07-14 07:07:08 +00:00
futures . push ( Self ::handle_events ( outgoing_kind , events , Arc ::clone ( & db )));
2020-09-15 16:13:54 +02:00
}
2020-09-23 15:23:29 +02:00
}
}
2020-09-15 16:13:54 +02:00
}
}
});
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(outgoing_kind, new_events, current_transaction_status, db))]
2021-05-12 20:04:28 +02:00
fn select_events (
outgoing_kind : & OutgoingKind ,
2021-06-08 18:10:00 +02:00
new_events : Vec < ( SendingEventType , Vec < u8 > ) > , // Events we want to send: event and full key
2021-05-12 20:04:28 +02:00
current_transaction_status : & mut HashMap < Vec < u8 > , TransactionStatus > ,
2021-05-17 10:25:27 +02:00
db : & Database ,
2021-06-08 18:10:00 +02:00
) -> Result < Option < Vec < SendingEventType >>> {
2021-05-12 20:04:28 +02:00
let mut retry = false ;
let mut allow = true ;
let prefix = outgoing_kind . get_prefix ();
let entry = current_transaction_status . entry ( prefix . clone ());
entry
. and_modify ( | e | match e {
TransactionStatus ::Running | TransactionStatus ::Retrying ( _ ) => {
allow = false ; // already running
}
TransactionStatus ::Failed ( tries , time ) => {
// Fail if a request has failed recently (exponential backoff)
let mut min_elapsed_duration = Duration ::from_secs ( 30 ) * ( * tries ) * ( * tries );
if min_elapsed_duration > Duration ::from_secs ( 60 * 60 * 24 ) {
min_elapsed_duration = Duration ::from_secs ( 60 * 60 * 24 );
}
if time . elapsed () < min_elapsed_duration {
allow = false ;
} else {
retry = true ;
* e = TransactionStatus ::Retrying ( * tries );
}
}
})
. or_insert ( TransactionStatus ::Running );
if ! allow {
2021-06-08 18:10:00 +02:00
return Ok ( None );
2021-05-12 20:04:28 +02:00
}
let mut events = Vec ::new ();
if retry {
// We retry the previous transaction
2021-07-29 20:17:47 +02:00
for ( key , value ) in db . sending . servercurrentevent_data . scan_prefix ( prefix ) {
if let Ok (( _ , e )) = Self ::parse_servercurrentevent ( & key , value ) {
2021-05-12 20:04:28 +02:00
events . push ( e );
}
}
} else {
for ( e , full_key ) in new_events {
2021-07-29 20:17:47 +02:00
let value = if let SendingEventType ::Edu ( value ) = & e {
&** value
} else {
& [][ .. ]
};
db . sending
. servercurrentevent_data
. insert ( & full_key , value ) ? ;
2021-05-12 20:04:28 +02:00
// If it was a PDU we have to unqueue it
// TODO: don't try to unqueue EDUs
2021-07-29 20:17:47 +02:00
db . sending . servernameevent_data . remove ( & full_key ) ? ;
2021-05-12 20:04:28 +02:00
events . push ( e );
}
2021-05-17 10:25:27 +02:00
2021-05-20 23:46:52 +02:00
if let OutgoingKind ::Normal ( server_name ) = outgoing_kind {
if let Ok (( select_edus , last_count )) = Self ::select_edus ( db , server_name ) {
2021-07-20 21:17:15 +02:00
events . extend ( select_edus . into_iter (). map ( SendingEventType ::Edu ));
2021-05-20 23:46:52 +02:00
db . sending
. servername_educount
2021-06-08 18:10:00 +02:00
. insert ( server_name . as_bytes (), & last_count . to_be_bytes ()) ? ;
2021-05-17 10:25:27 +02:00
}
}
2021-05-12 20:04:28 +02:00
}
2021-06-08 18:10:00 +02:00
Ok ( Some ( events ))
2021-05-12 20:04:28 +02:00
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(db, server))]
2021-07-20 21:17:15 +02:00
pub fn select_edus ( db : & Database , server : & ServerName ) -> Result < ( Vec < Vec < u8 >> , u64 ) > {
2021-05-17 10:25:27 +02:00
// u64: count of last edu
let since = db
. sending
. servername_educount
. get ( server . as_bytes ()) ?
. map_or ( Ok ( 0 ), | bytes | {
utils ::u64_from_bytes ( & bytes )
. map_err ( | _ | Error ::bad_database ( "Invalid u64 in servername_educount." ))
}) ? ;
let mut events = Vec ::new ();
let mut max_edu_count = since ;
2021-08-24 19:10:31 +02:00
let mut device_list_changes = HashSet ::new ();
2021-05-17 10:25:27 +02:00
'outer : for room_id in db . rooms . server_rooms ( server ) {
let room_id = room_id ? ;
2021-08-24 19:10:31 +02:00
// Look for device list updates in this room
device_list_changes . extend (
db . users
. keys_changed ( & room_id . to_string (), since , None )
. filter_map ( | r | r . ok ())
. filter ( | user_id | user_id . server_name () == db . globals . server_name ()),
);
// Look for read receipts in this room
2021-06-08 18:10:00 +02:00
for r in db . rooms . edus . readreceipts_since ( & room_id , since ) {
2021-05-17 10:25:27 +02:00
let ( user_id , count , read_receipt ) = r ? ;
if count > max_edu_count {
max_edu_count = count ;
}
if user_id . server_name () != db . globals . server_name () {
continue ;
}
let event =
serde_json ::from_str ::< AnySyncEphemeralRoomEvent > ( & read_receipt . json (). get ())
. map_err ( | _ | Error ::bad_database ( "Invalid edu event in read_receipts." )) ? ;
let federation_event = match event {
AnySyncEphemeralRoomEvent ::Receipt ( r ) => {
let mut read = BTreeMap ::new ();
2021-05-20 23:46:52 +02:00
let ( event_id , mut receipt ) = r
2021-05-17 10:25:27 +02:00
. content
. 0
. into_iter ()
. next ()
. expect ( "we only use one event per read receipt" );
let receipt = receipt
2021-05-20 23:46:52 +02:00
. remove ( & ReceiptType ::Read )
2021-05-17 10:25:27 +02:00
. expect ( "our read receipts always set this" )
. remove ( & user_id )
. expect ( "our read receipts always have the user here" );
read . insert (
user_id ,
ReceiptData {
data : receipt . clone (),
event_ids : vec ! [ event_id . clone ()],
},
);
let receipt_map = ReceiptMap { read };
let mut receipts = BTreeMap ::new ();
receipts . insert ( room_id . clone (), receipt_map );
Edu ::Receipt ( ReceiptContent { receipts })
}
_ => {
Error ::bad_database ( "Invalid event type in read_receipts" );
continue ;
}
};
2021-07-20 21:17:15 +02:00
events . push ( serde_json ::to_vec ( & federation_event ). expect ( "json can be serialized" ));
2021-05-17 10:25:27 +02:00
if events . len () >= 20 {
break 'outer ;
}
}
}
2021-08-24 19:10:31 +02:00
for user_id in device_list_changes {
// Empty prev id forces synapse to resync: https://github.com/matrix-org/synapse/blob/98aec1cc9da2bd6b8e34ffb282c85abf9b8b42ca/synapse/handlers/device.py#L767
// Because synapse resyncs, we can just insert dummy data
let edu = Edu ::DeviceListUpdate ( DeviceListUpdateContent {
user_id ,
device_id : device_id ! ( "dummy" ),
device_display_name : "Dummy" . to_owned (),
stream_id : uint ! ( 1 ),
prev_id : Vec ::new (),
deleted : None ,
keys : None ,
});
events . push ( serde_json ::to_vec ( & edu ). expect ( "json can be serialized" ));
}
2021-05-17 10:25:27 +02:00
Ok (( events , max_edu_count ))
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(self, pdu_id, senderkey))]
2021-07-14 07:07:08 +00:00
pub fn send_push_pdu ( & self , pdu_id : & [ u8 ], senderkey : Vec < u8 > ) -> Result < () > {
2021-01-26 21:54:35 -05:00
let mut key = b "$" . to_vec ();
2021-03-22 14:04:11 +01:00
key . extend_from_slice ( & senderkey );
2021-01-26 21:54:35 -05:00
key . push ( 0xff );
key . extend_from_slice ( pdu_id );
2021-07-29 20:17:47 +02:00
self . servernameevent_data . insert ( & key , & []) ? ;
self . sender . unbounded_send (( key , vec! [])). unwrap ();
2021-01-26 21:54:35 -05:00
Ok (())
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(self, server, pdu_id))]
2020-11-08 14:45:52 -05:00
pub fn send_pdu ( & self , server : & ServerName , pdu_id : & [ u8 ]) -> Result < () > {
2020-09-15 16:13:54 +02:00
let mut key = server . as_bytes (). to_vec ();
key . push ( 0xff );
key . extend_from_slice ( pdu_id );
2021-07-29 20:17:47 +02:00
self . servernameevent_data . insert ( & key , & []) ? ;
self . sender . unbounded_send (( key , vec! [])). unwrap ();
2021-07-20 21:17:15 +02:00
Ok (())
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(self, server, serialized))]
2021-07-29 20:17:47 +02:00
pub fn send_reliable_edu ( & self , server : & ServerName , serialized : Vec < u8 > ) -> Result < () > {
2021-07-20 21:17:15 +02:00
let mut key = server . as_bytes (). to_vec ();
key . push ( 0xff );
key . push ( b '*' );
2021-07-29 20:17:47 +02:00
self . servernameevent_data . insert ( & key , & serialized ) ? ;
self . sender . unbounded_send (( key , serialized )). unwrap ();
2020-09-15 16:13:54 +02:00
Ok (())
}
2020-09-23 15:23:29 +02:00
2021-02-28 12:41:03 +01:00
#[tracing::instrument(skip(self))]
2020-12-08 10:33:44 +01:00
pub fn send_pdu_appservice ( & self , appservice_id : & str , pdu_id : & [ u8 ]) -> Result < () > {
2021-01-05 09:21:41 -05:00
let mut key = b "+" . to_vec ();
2020-12-08 10:33:44 +01:00
key . extend_from_slice ( appservice_id . as_bytes ());
key . push ( 0xff );
key . extend_from_slice ( pdu_id );
2021-07-29 20:17:47 +02:00
self . servernameevent_data . insert ( & key , & []) ? ;
self . sender . unbounded_send (( key , vec! [])). unwrap ();
2020-12-08 10:33:44 +01:00
Ok (())
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(keys))]
2021-05-12 20:04:28 +02:00
fn calculate_hash ( keys : & [ & [ u8 ]]) -> Vec < u8 > {
2021-03-02 14:36:48 +01:00
// We only hash the pdu's event ids, not the whole pdu
let bytes = keys . join ( & 0xff );
let hash = digest ::digest ( & digest ::SHA256 , & bytes );
hash . as_ref (). to_owned ()
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(db, events, kind))]
2021-05-12 20:04:28 +02:00
async fn handle_events (
2021-01-26 21:54:35 -05:00
kind : OutgoingKind ,
2021-05-12 20:04:28 +02:00
events : Vec < SendingEventType > ,
2021-07-14 07:07:08 +00:00
db : Arc < RwLock < Database >> ,
2021-01-26 21:54:35 -05:00
) -> std ::result ::Result < OutgoingKind , ( OutgoingKind , Error ) > {
2021-07-14 07:07:08 +00:00
let db = db . read (). await ;
2021-03-16 18:00:26 +01:00
match & kind {
2021-01-26 21:54:35 -05:00
OutgoingKind ::Appservice ( server ) => {
2021-05-12 20:04:28 +02:00
let mut pdu_jsons = Vec ::new ();
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) => {
pdu_jsons . push ( db . rooms
. get_pdu_from_id ( & pdu_id )
. map_err ( | e | ( kind . clone (), e )) ?
2021-01-26 21:54:35 -05:00
. ok_or_else ( || {
(
2021-05-12 20:04:28 +02:00
kind . clone (),
2021-01-26 21:54:35 -05:00
Error ::bad_database (
2021-07-29 20:17:47 +02:00
"[Appservice] Event in servernameevent_data not found in db." ,
2021-01-26 21:54:35 -05:00
),
)
}) ?
2021-05-20 23:46:52 +02:00
. to_room_event ())
2021-05-12 20:04:28 +02:00
}
SendingEventType ::Edu ( _ ) => {
// Appservices don't need EDUs (?)
}
}
}
2021-03-15 09:48:19 +01:00
let permit = db . sending . maximum_requests . acquire (). await ;
2021-03-22 14:04:11 +01:00
2021-03-15 09:48:19 +01:00
let response = appservice_server ::send_request (
2021-01-29 10:14:09 -05:00
& db . globals ,
db . appservice
2021-01-26 21:54:35 -05:00
. get_registration ( server . as_str ())
. unwrap ()
. unwrap (), // TODO: handle error
appservice ::event ::push_events ::v1 ::Request {
events : & pdu_jsons ,
2021-03-15 09:48:19 +01:00
txn_id : & base64 ::encode_config (
2021-05-12 20:04:28 +02:00
Self ::calculate_hash (
& events
. iter ()
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) => &** b ,
})
. collect ::< Vec < _ >> (),
),
2021-03-15 09:48:19 +01:00
base64 ::URL_SAFE_NO_PAD ,
),
2021-01-26 21:54:35 -05:00
},
)
. await
2021-03-15 09:48:19 +01:00
. map ( | _response | kind . clone ())
. map_err ( | e | ( kind , e ));
2021-02-26 13:24:07 +01:00
2021-03-15 09:48:19 +01:00
drop ( permit );
2021-02-26 13:24:07 +01:00
2021-03-15 09:48:19 +01:00
response
2021-01-26 21:54:35 -05:00
}
2021-03-22 14:04:11 +01:00
OutgoingKind ::Push ( user , pushkey ) => {
2021-05-12 20:04:28 +02:00
let mut pdus = Vec ::new ();
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) => {
pdus . push (
db . rooms
. get_pdu_from_id ( & pdu_id )
. map_err ( | e | ( kind . clone (), e )) ?
. ok_or_else ( || {
(
kind . clone (),
Error ::bad_database (
2021-07-29 20:17:47 +02:00
"[Push] Event in servernamevent_datas not found in db." ,
2021-05-12 20:04:28 +02:00
),
)
}) ? ,
);
}
SendingEventType ::Edu ( _ ) => {
// Push gateways don't need EDUs (?)
}
}
}
2021-01-29 10:14:09 -05:00
2021-03-16 18:00:26 +01:00
for pdu in pdus {
2021-01-29 10:14:09 -05:00
// Redacted events are not notification targets (we don't send push for them)
if pdu . unsigned . get ( "redacted_because" ). is_some () {
continue ;
}
2021-03-24 11:52:10 +01:00
let userid =
UserId ::try_from ( utils ::string_from_bytes ( user ). map_err ( | _ | {
(
2021-05-12 20:04:28 +02:00
kind . clone (),
2021-03-24 11:52:10 +01:00
Error ::bad_database ( "Invalid push user string in db." ),
)
}) ? )
. map_err ( | _ | {
(
2021-05-12 20:04:28 +02:00
kind . clone (),
2021-03-24 11:52:10 +01:00
Error ::bad_database ( "Invalid push user id in db." ),
)
}) ? ;
2021-03-22 14:04:11 +01:00
let mut senderkey = user . clone ();
senderkey . push ( 0xff );
senderkey . extend_from_slice ( pushkey );
let pusher = match db
. pusher
. get_pusher ( & senderkey )
. map_err ( | e | ( OutgoingKind ::Push ( user . clone (), pushkey . clone ()), e )) ?
{
Some ( pusher ) => pusher ,
None => continue ,
};
let rules_for_user = db
. account_data
. get ::< push_rules ::PushRulesEvent > ( None , & userid , EventType ::PushRules )
2021-04-12 12:40:16 +02:00
. unwrap_or_default ()
2021-03-22 14:04:11 +01:00
. map ( | ev | ev . content . global )
2021-04-05 21:25:10 +02:00
. unwrap_or_else ( || push ::Ruleset ::server_default ( & userid ));
2021-03-22 14:04:11 +01:00
2021-04-12 12:40:16 +02:00
let unread : UInt = db
2021-03-22 14:04:11 +01:00
. rooms
2021-04-12 12:40:16 +02:00
. notification_count ( & userid , & pdu . room_id )
. map_err ( | e | ( kind . clone (), e )) ?
. try_into ()
. expect ( "notifiation count can't go that high" );
2021-03-22 14:04:11 +01:00
let permit = db . sending . maximum_requests . acquire (). await ;
let _response = pusher ::send_push_notice (
& userid ,
unread ,
& pusher ,
rules_for_user ,
& pdu ,
2021-07-14 07:07:08 +00:00
& db ,
2021-03-22 14:04:11 +01:00
)
. await
. map ( | _response | kind . clone ())
. map_err ( | e | ( kind . clone (), e ));
drop ( permit );
2021-01-26 21:54:35 -05:00
}
2021-03-22 14:04:11 +01:00
Ok ( OutgoingKind ::Push ( user . clone (), pushkey . clone ()))
2021-01-26 21:54:35 -05:00
}
OutgoingKind ::Normal ( server ) => {
2021-05-12 20:04:28 +02:00
let mut edu_jsons = Vec ::new ();
let mut pdu_jsons = Vec ::new ();
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) => {
2021-01-26 21:54:35 -05:00
// TODO: check room version and remove event_id if needed
2021-07-30 18:05:26 +02:00
let raw = PduEvent ::convert_to_outgoing_federation_event (
db . rooms
. get_pdu_json_from_id ( & pdu_id )
. map_err ( | e | ( OutgoingKind ::Normal ( server . clone ()), e )) ?
. ok_or_else ( || {
(
OutgoingKind ::Normal ( server . clone ()),
Error ::bad_database (
"[Normal] Event in servernamevent_datas not found in db." ,
),
)
}) ? ,
);
pdu_jsons . push ( raw );
2021-05-12 20:04:28 +02:00
}
SendingEventType ::Edu ( edu ) => {
2021-07-30 18:05:26 +02:00
if let Ok ( raw ) = serde_json ::from_slice ( edu ) {
edu_jsons . push ( raw );
}
2021-05-12 20:04:28 +02:00
}
}
}
2021-01-26 21:54:35 -05:00
2021-03-15 09:48:19 +01:00
let permit = db . sending . maximum_requests . acquire (). await ;
let response = server_server ::send_request (
2021-01-29 10:14:09 -05:00
& db . globals ,
2021-01-26 21:54:35 -05:00
&* server ,
send_transaction_message ::v1 ::Request {
2021-01-29 10:14:09 -05:00
origin : db . globals . server_name (),
2021-01-26 21:54:35 -05:00
pdus : & pdu_jsons ,
2021-05-12 20:04:28 +02:00
edus : & edu_jsons ,
2021-05-20 23:46:52 +02:00
origin_server_ts : MilliSecondsSinceUnixEpoch ::now (),
2021-03-15 09:48:19 +01:00
transaction_id : & base64 ::encode_config (
2021-05-12 20:04:28 +02:00
Self ::calculate_hash (
& events
. iter ()
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) => &** b ,
})
. collect ::< Vec < _ >> (),
),
2021-03-15 09:48:19 +01:00
base64 ::URL_SAFE_NO_PAD ,
),
2021-01-26 21:54:35 -05:00
},
)
. await
2021-03-15 09:48:19 +01:00
. map ( | response | {
2021-03-26 11:10:45 +01:00
for pdu in response . pdus {
if pdu . 1. is_err () {
warn! ( "Failed to send to {}: {:?}" , server , pdu );
}
}
2021-03-15 09:48:19 +01:00
kind . clone ()
2020-12-08 10:33:44 +01:00
})
2021-03-15 09:48:19 +01:00
. map_err ( | e | ( kind , e ));
2021-02-26 13:24:07 +01:00
2021-03-15 09:48:19 +01:00
drop ( permit );
2021-02-26 13:24:07 +01:00
2021-03-15 09:48:19 +01:00
response
2021-01-26 21:54:35 -05:00
}
2020-12-08 10:33:44 +01:00
}
2020-09-23 15:23:29 +02:00
}
2020-12-19 16:00:11 +01:00
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(key))]
2021-07-29 20:17:47 +02:00
fn parse_servercurrentevent (
key : & [ u8 ],
value : Vec < u8 > ,
) -> Result < ( OutgoingKind , SendingEventType ) > {
2021-03-22 14:04:11 +01:00
// Appservices start with a plus
Ok ::< _ , Error > ( if key . starts_with ( b "+" ) {
let mut parts = key [ 1 .. ]. splitn ( 2 , |& b | b == 0xff );
2020-12-19 16:00:11 +01:00
2021-03-22 14:04:11 +01:00
let server = parts . next (). expect ( "splitn always returns one element" );
2021-05-12 20:04:28 +02:00
let event = parts
2021-03-22 14:04:11 +01:00
. next ()
. ok_or_else ( || Error ::bad_database ( "Invalid bytes in servercurrentpdus." )) ? ;
let server = utils ::string_from_bytes ( & server ). map_err ( | _ | {
Error ::bad_database ( "Invalid server bytes in server_currenttransaction" )
}) ? ;
2020-12-19 16:00:11 +01:00
2021-01-26 21:54:35 -05:00
(
OutgoingKind ::Appservice ( Box ::< ServerName > ::try_from ( server ). map_err ( | _ | {
Error ::bad_database ( "Invalid server string in server_currenttransaction" )
}) ? ),
2021-05-12 20:04:28 +02:00
if event . starts_with ( b "*" ) {
2021-07-30 18:05:26 +02:00
SendingEventType ::Edu ( value )
2021-05-12 20:04:28 +02:00
} else {
SendingEventType ::Pdu ( event . to_vec ())
},
2021-01-26 21:54:35 -05:00
)
2021-03-22 14:04:11 +01:00
} else if key . starts_with ( b "$" ) {
let mut parts = key [ 1 .. ]. splitn ( 3 , |& b | b == 0xff );
let user = parts . next (). expect ( "splitn always returns one element" );
let pushkey = parts
. next ()
. ok_or_else ( || Error ::bad_database ( "Invalid bytes in servercurrentpdus." )) ? ;
2021-05-12 20:04:28 +02:00
let event = parts
2021-03-22 14:04:11 +01:00
. next ()
. ok_or_else ( || Error ::bad_database ( "Invalid bytes in servercurrentpdus." )) ? ;
2021-01-26 21:54:35 -05:00
(
2021-03-22 14:04:11 +01:00
OutgoingKind ::Push ( user . to_vec (), pushkey . to_vec ()),
2021-05-12 20:04:28 +02:00
if event . starts_with ( b "*" ) {
2021-07-30 18:05:26 +02:00
SendingEventType ::Edu ( value )
2021-05-12 20:04:28 +02:00
} else {
SendingEventType ::Pdu ( event . to_vec ())
},
2021-01-26 21:54:35 -05:00
)
2020-12-19 16:00:11 +01:00
} else {
2021-03-22 14:04:11 +01:00
let mut parts = key . splitn ( 2 , |& b | b == 0xff );
let server = parts . next (). expect ( "splitn always returns one element" );
2021-05-12 20:04:28 +02:00
let event = parts
2021-03-22 14:04:11 +01:00
. next ()
. ok_or_else ( || Error ::bad_database ( "Invalid bytes in servercurrentpdus." )) ? ;
let server = utils ::string_from_bytes ( & server ). map_err ( | _ | {
Error ::bad_database ( "Invalid server bytes in server_currenttransaction" )
}) ? ;
2021-01-26 21:54:35 -05:00
(
OutgoingKind ::Normal ( Box ::< ServerName > ::try_from ( server ). map_err ( | _ | {
Error ::bad_database ( "Invalid server string in server_currenttransaction" )
}) ? ),
2021-05-12 20:04:28 +02:00
if event . starts_with ( b "*" ) {
SendingEventType ::Edu ( event [ 1 .. ]. to_vec ())
} else {
SendingEventType ::Pdu ( event . to_vec ())
},
2021-01-26 21:54:35 -05:00
)
})
2020-12-19 16:00:11 +01:00
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(self, globals, destination, request))]
2020-12-19 16:00:11 +01:00
pub async fn send_federation_request < T : OutgoingRequest > (
& self ,
globals : & crate ::database ::globals ::Globals ,
2021-01-14 14:39:56 -05:00
destination : & ServerName ,
2020-12-19 16:00:11 +01:00
request : T ,
) -> Result < T ::IncomingResponse >
where
T : Debug ,
{
let permit = self . maximum_requests . acquire (). await ;
let response = server_server ::send_request ( globals , destination , request ). await ;
drop ( permit );
response
}
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(self, globals, registration, request))]
2020-12-19 16:00:11 +01:00
pub async fn send_appservice_request < T : OutgoingRequest > (
& self ,
globals : & crate ::database ::globals ::Globals ,
registration : serde_yaml ::Value ,
request : T ,
) -> Result < T ::IncomingResponse >
where
T : Debug ,
{
let permit = self . maximum_requests . acquire (). await ;
let response = appservice_server ::send_request ( globals , registration , request ). await ;
drop ( permit );
response
}
2020-09-15 16:13:54 +02:00
}