2026-02-20 21:28:23 +00:00
|
|
|
use std::{collections::HashMap, sync::Arc};
|
2024-05-09 15:59:08 -07:00
|
|
|
|
2026-02-21 00:35:48 +00:00
|
|
|
use async_trait::async_trait;
|
|
|
|
|
use conduwuit::{Error, Result, SyncRwLock};
|
2024-08-08 17:18:30 +00:00
|
|
|
use database::{Handle, Map};
|
2026-02-20 21:28:23 +00:00
|
|
|
use ruma::{
|
|
|
|
|
DeviceId, OwnedServerName, OwnedTransactionId, TransactionId, UserId,
|
2026-02-21 00:35:48 +00:00
|
|
|
api::{
|
|
|
|
|
client::error::ErrorKind::LimitExceeded,
|
|
|
|
|
federation::transactions::send_transaction_message,
|
|
|
|
|
},
|
2026-02-20 21:28:23 +00:00
|
|
|
};
|
|
|
|
|
use tokio::sync::watch::{Receiver, Sender};
|
|
|
|
|
|
|
|
|
|
pub type TxnKey = (OwnedServerName, OwnedTransactionId);
|
2026-02-21 00:35:48 +00:00
|
|
|
pub type WrappedTransactionResponse = Option<send_transaction_message::v1::Response>;
|
|
|
|
|
pub type ActiveTransactionsMap = HashMap<TxnKey, Receiver<WrappedTransactionResponse>>;
|
2021-06-08 18:10:00 +02:00
|
|
|
|
2024-05-09 15:59:08 -07:00
|
|
|
pub struct Service {
|
2024-08-08 17:18:30 +00:00
|
|
|
db: Data,
|
2026-02-21 00:35:48 +00:00
|
|
|
servername_txnid_response_cache:
|
2026-02-20 21:28:23 +00:00
|
|
|
Arc<SyncRwLock<HashMap<TxnKey, send_transaction_message::v1::Response>>>,
|
2026-02-21 00:35:48 +00:00
|
|
|
servername_txnid_active: Arc<SyncRwLock<ActiveTransactionsMap>>,
|
|
|
|
|
max_active_txns: usize,
|
2024-08-08 17:18:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct Data {
|
|
|
|
|
userdevicetxnid_response: Arc<Map>,
|
2020-08-25 13:24:38 +02:00
|
|
|
}
|
|
|
|
|
|
2026-02-21 00:35:48 +00:00
|
|
|
#[async_trait]
|
2024-07-04 03:26:19 +00:00
|
|
|
impl crate::Service for Service {
|
|
|
|
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
|
|
|
|
Ok(Arc::new(Self {
|
2024-08-08 17:18:30 +00:00
|
|
|
db: Data {
|
|
|
|
|
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
|
|
|
|
|
},
|
2026-02-20 21:28:23 +00:00
|
|
|
servername_txnid_response_cache: Arc::new(SyncRwLock::new(HashMap::new())),
|
|
|
|
|
servername_txnid_active: Arc::new(SyncRwLock::new(HashMap::new())),
|
2026-02-21 15:20:40 +00:00
|
|
|
max_active_txns: args
|
|
|
|
|
.depend::<crate::config::Service>("config")
|
|
|
|
|
.max_concurrent_inbound_transactions,
|
2024-07-04 03:26:19 +00:00
|
|
|
}))
|
2024-05-27 03:17:20 +00:00
|
|
|
}
|
|
|
|
|
|
2026-02-21 00:35:48 +00:00
|
|
|
async fn clear_cache(&self) {
|
|
|
|
|
let mut state = self.servername_txnid_response_cache.write();
|
|
|
|
|
state.clear();
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-04 03:26:19 +00:00
|
|
|
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-20 21:28:23 +00:00
|
|
|
impl Service {
|
2026-02-20 21:31:45 +00:00
|
|
|
pub fn add_client_txnid(
|
2026-02-20 21:28:23 +00:00
|
|
|
&self,
|
|
|
|
|
user_id: &UserId,
|
|
|
|
|
device_id: Option<&DeviceId>,
|
|
|
|
|
txn_id: &TransactionId,
|
|
|
|
|
data: &[u8],
|
|
|
|
|
) {
|
|
|
|
|
let mut key = user_id.as_bytes().to_vec();
|
|
|
|
|
key.push(0xFF);
|
|
|
|
|
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
|
|
|
|
|
key.push(0xFF);
|
|
|
|
|
key.extend_from_slice(txn_id.as_bytes());
|
2024-08-08 17:18:30 +00:00
|
|
|
|
2026-02-20 21:28:23 +00:00
|
|
|
self.db.userdevicetxnid_response.insert(&key, data);
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-20 21:31:45 +00:00
|
|
|
pub async fn get_client_txn(
|
2026-02-20 21:28:23 +00:00
|
|
|
&self,
|
|
|
|
|
user_id: &UserId,
|
|
|
|
|
device_id: Option<&DeviceId>,
|
|
|
|
|
txn_id: &TransactionId,
|
|
|
|
|
) -> Result<Handle<'_>> {
|
|
|
|
|
let key = (user_id, device_id, txn_id);
|
|
|
|
|
self.db.userdevicetxnid_response.qry(&key).await
|
|
|
|
|
}
|
2026-02-21 00:35:48 +00:00
|
|
|
|
|
|
|
|
/// Fetches a receiver channel for the given transaction, if any exists.
|
|
|
|
|
/// If the given txn is not active, None is returned.
|
|
|
|
|
#[must_use]
|
|
|
|
|
pub fn get_active_federation_txn(
|
|
|
|
|
&self,
|
|
|
|
|
key: &TxnKey,
|
|
|
|
|
) -> Option<Receiver<WrappedTransactionResponse>> {
|
|
|
|
|
let state = self.servername_txnid_active.read();
|
|
|
|
|
state.get(key).cloned()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Starts a new inbound transaction handler, returning the appropriate
|
|
|
|
|
/// sender to broadcast the response via.
|
|
|
|
|
///
|
|
|
|
|
/// If the given key is already active, a rate-limited response is returned.
|
|
|
|
|
pub fn start_federation_txn(
|
|
|
|
|
&self,
|
|
|
|
|
key: TxnKey,
|
|
|
|
|
) -> Result<Sender<WrappedTransactionResponse>> {
|
|
|
|
|
let mut state = self.servername_txnid_active.write();
|
|
|
|
|
if state.get(&key).is_some() {
|
|
|
|
|
Err(Error::BadRequest(
|
|
|
|
|
LimitExceeded { retry_after: None },
|
|
|
|
|
"Transaction is already being handled",
|
|
|
|
|
))
|
|
|
|
|
} else if state.keys().any(|k| k.0 == key.0) {
|
|
|
|
|
Err(Error::BadRequest(
|
|
|
|
|
LimitExceeded { retry_after: None },
|
|
|
|
|
"Still processing another transaction from this origin",
|
|
|
|
|
))
|
|
|
|
|
} else if state.len() >= self.max_active_txns {
|
|
|
|
|
Err(Error::BadRequest(
|
|
|
|
|
LimitExceeded { retry_after: None },
|
|
|
|
|
"Server is overloaded, try again later",
|
|
|
|
|
))
|
|
|
|
|
} else {
|
|
|
|
|
let (tx, rx) = tokio::sync::watch::channel(None);
|
|
|
|
|
state.insert(key, rx);
|
|
|
|
|
Ok(tx)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Finishes a transaction, removing it from the active txns registry.
|
|
|
|
|
pub fn finish_federation_txn(&self, key: &TxnKey) {
|
|
|
|
|
let mut state = self.servername_txnid_active.write();
|
|
|
|
|
state.remove(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Gets a cached transaction response, if the given key has a value.
|
|
|
|
|
#[must_use]
|
|
|
|
|
pub fn get_cached_txn(&self, key: &TxnKey) -> Option<send_transaction_message::v1::Response> {
|
|
|
|
|
let state = self.servername_txnid_response_cache.read();
|
|
|
|
|
state.get(key).cloned()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Sets a cached transaction response. The existing key will be overwritten
|
|
|
|
|
/// if it exists.
|
|
|
|
|
pub fn set_cached_txn(&self, key: TxnKey, response: send_transaction_message::v1::Response) {
|
|
|
|
|
let mut state = self.servername_txnid_response_cache.write();
|
|
|
|
|
// TODO: time-to-live?
|
|
|
|
|
state.insert(key, response);
|
|
|
|
|
}
|
2020-08-25 13:24:38 +02:00
|
|
|
}
|