Files
continuwuity/src/service/transaction_ids/mod.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

164 lines
4.8 KiB
Rust
Raw Normal View History

use std::{collections::HashMap, sync::Arc};
2024-05-09 15:59:08 -07:00
use async_trait::async_trait;
2026-02-21 16:58:45 +00:00
use conduwuit::{Error, Result, SyncRwLock, debug, debug_warn, warn};
2024-08-08 17:18:30 +00:00
use database::{Handle, Map};
use ruma::{
DeviceId, OwnedServerName, OwnedTransactionId, TransactionId, UserId,
api::{
client::error::ErrorKind::LimitExceeded,
federation::transactions::send_transaction_message,
},
};
use tokio::sync::watch::{Receiver, Sender};
pub type TxnKey = (OwnedServerName, OwnedTransactionId);
pub type WrappedTransactionResponse = Option<send_transaction_message::v1::Response>;
pub type ActiveTransactionsMap = HashMap<TxnKey, Receiver<WrappedTransactionResponse>>;
2021-06-08 18:10:00 +02:00
2024-05-09 15:59:08 -07:00
pub struct Service {
2024-08-08 17:18:30 +00:00
db: Data,
servername_txnid_response_cache:
Arc<SyncRwLock<HashMap<TxnKey, send_transaction_message::v1::Response>>>,
servername_txnid_active: Arc<SyncRwLock<ActiveTransactionsMap>>,
max_active_txns: usize,
2024-08-08 17:18:30 +00:00
}
struct Data {
userdevicetxnid_response: Arc<Map>,
2020-08-25 13:24:38 +02:00
}
#[async_trait]
2024-07-04 03:26:19 +00:00
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
2024-08-08 17:18:30 +00:00
db: Data {
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
},
servername_txnid_response_cache: Arc::new(SyncRwLock::new(HashMap::new())),
servername_txnid_active: Arc::new(SyncRwLock::new(HashMap::new())),
max_active_txns: args
.depend::<crate::config::Service>("config")
.max_concurrent_inbound_transactions,
2024-07-04 03:26:19 +00:00
}))
2024-05-27 03:17:20 +00:00
}
async fn clear_cache(&self) {
let mut state = self.servername_txnid_response_cache.write();
state.clear();
}
2024-07-04 03:26:19 +00:00
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[must_use]
pub fn txn_active_handle_count(&self) -> usize {
let state = self.servername_txnid_active.read();
state.len()
}
pub fn add_client_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
data: &[u8],
) {
let mut key = user_id.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
key.push(0xFF);
key.extend_from_slice(txn_id.as_bytes());
2024-08-08 17:18:30 +00:00
self.db.userdevicetxnid_response.insert(&key, data);
}
pub async fn get_client_txn(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
) -> Result<Handle<'_>> {
let key = (user_id, device_id, txn_id);
self.db.userdevicetxnid_response.qry(&key).await
}
/// Fetches a receiver channel for the given transaction, if any exists.
/// If the given txn is not active, None is returned.
#[must_use]
pub fn get_active_federation_txn(
&self,
key: &TxnKey,
) -> Option<Receiver<WrappedTransactionResponse>> {
let state = self.servername_txnid_active.read();
state.get(key).cloned()
}
/// Starts a new inbound transaction handler, returning the appropriate
/// sender to broadcast the response via.
///
/// If the given key is already active, a rate-limited response is returned.
pub fn start_federation_txn(
&self,
key: TxnKey,
) -> Result<Sender<WrappedTransactionResponse>> {
let mut state = self.servername_txnid_active.write();
if state.get(&key).is_some() {
2026-02-21 16:58:45 +00:00
debug!(
origin = ?key.0,
id = ?key.1,
"Origin re-sent already running transaction"
);
Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Transaction is already being handled",
))
} else if state.keys().any(|k| k.0 == key.0) {
2026-02-21 16:58:45 +00:00
debug_warn!(
origin = ?key.0,
"Got concurrent transaction request from an origin with an active transaction"
);
Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Still processing another transaction from this origin",
))
} else if state.len() >= self.max_active_txns {
2026-02-21 16:58:45 +00:00
warn!(
active = state.len(),
max = self.max_active_txns,
"Server is overloaded, dropping incoming transaction"
);
Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Server is overloaded, try again later",
))
} else {
let (tx, rx) = tokio::sync::watch::channel(None);
state.insert(key, rx);
Ok(tx)
}
}
/// Finishes a transaction, removing it from the active txns registry.
pub fn finish_federation_txn(&self, key: &TxnKey) {
let mut state = self.servername_txnid_active.write();
state.remove(key);
}
/// Gets a cached transaction response, if the given key has a value.
#[must_use]
pub fn get_cached_txn(&self, key: &TxnKey) -> Option<send_transaction_message::v1::Response> {
let state = self.servername_txnid_response_cache.read();
state.get(key).cloned()
}
/// Sets a cached transaction response. The existing key will be overwritten
/// if it exists.
pub fn set_cached_txn(&self, key: TxnKey, response: send_transaction_message::v1::Response) {
let mut state = self.servername_txnid_response_cache.write();
// TODO: time-to-live?
state.insert(key, response);
}
2020-08-25 13:24:38 +02:00
}