Files
continuwuity/src/service/services.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

243 lines
6.8 KiB
Rust
Raw Normal View History

2024-07-18 06:37:47 +00:00
use std::{
any::Any,
collections::BTreeMap,
sync::{Arc, RwLock},
};
2024-05-09 15:59:08 -07:00
use conduwuit::{Result, Server, debug, debug_info, info, trace, utils::stream::IterStream};
2024-06-28 22:51:39 +00:00
use database::Database;
use futures::{Stream, StreamExt, TryStreamExt};
2024-07-13 07:01:45 +00:00
use tokio::sync::Mutex;
2024-05-09 15:59:08 -07:00
use crate::{
account_data, admin, announcements, appservice, client, config, emergency, federation,
globals, key_backups,
2024-07-13 07:01:45 +00:00
manager::Manager,
media, moderation, presence, pusher, resolver, rooms, sending, server_keys, service,
2024-07-04 03:26:19 +00:00
service::{Args, Map, Service},
sync, transaction_ids, uiaa, users,
2024-05-09 15:59:08 -07:00
};
pub struct Services {
2024-07-04 03:26:19 +00:00
pub account_data: Arc<account_data::Service>,
2024-05-09 15:59:08 -07:00
pub admin: Arc<admin::Service>,
2024-07-18 06:37:47 +00:00
pub appservice: Arc<appservice::Service>,
2025-01-28 20:55:28 +00:00
pub config: Arc<config::Service>,
2024-07-18 06:37:47 +00:00
pub client: Arc<client::Service>,
pub emergency: Arc<emergency::Service>,
pub globals: Arc<globals::Service>,
2024-07-04 03:26:19 +00:00
pub key_backups: Arc<key_backups::Service>,
pub media: Arc<media::Service>,
2024-07-18 06:37:47 +00:00
pub presence: Arc<presence::Service>,
pub pusher: Arc<pusher::Service>,
pub resolver: Arc<resolver::Service>,
pub rooms: rooms::Service,
pub federation: Arc<federation::Service>,
2024-05-09 15:59:08 -07:00
pub sending: Arc<sending::Service>,
pub server_keys: Arc<server_keys::Service>,
2024-08-08 17:18:30 +00:00
pub sync: Arc<sync::Service>,
2024-07-18 06:37:47 +00:00
pub transaction_ids: Arc<transaction_ids::Service>,
pub uiaa: Arc<uiaa::Service>,
pub users: Arc<users::Service>,
pub moderation: Arc<moderation::Service>,
pub announcements: Arc<announcements::Service>,
2024-07-04 03:26:19 +00:00
2024-07-13 07:01:45 +00:00
manager: Mutex<Option<Arc<Manager>>>,
pub(crate) service: Arc<Map>,
2024-05-09 15:59:08 -07:00
pub server: Arc<Server>,
2024-06-28 22:51:39 +00:00
pub db: Arc<Database>,
2024-05-09 15:59:08 -07:00
}
impl Services {
#[allow(clippy::cognitive_complexity)]
2024-07-27 07:17:07 +00:00
pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
let db = Database::open(&server).await?;
2024-07-18 06:37:47 +00:00
let service: Arc<Map> = Arc::new(RwLock::new(BTreeMap::new()));
2024-07-04 03:26:19 +00:00
macro_rules! build {
2024-07-18 06:37:47 +00:00
($tyname:ty) => {{
let built = <$tyname>::build(Args {
db: &db,
server: &server,
service: &service,
})?;
add_service(&service, built.clone(), built.clone());
built
}};
2024-07-04 03:26:19 +00:00
}
2024-07-27 07:17:07 +00:00
Ok(Arc::new(Self {
2024-07-18 06:37:47 +00:00
account_data: build!(account_data::Service),
admin: build!(admin::Service),
appservice: build!(appservice::Service),
2024-07-16 22:00:54 +00:00
resolver: build!(resolver::Service),
2024-07-16 22:29:42 +00:00
client: build!(client::Service),
2025-01-28 20:55:28 +00:00
config: build!(config::Service),
2024-07-18 06:37:47 +00:00
emergency: build!(emergency::Service),
globals: build!(globals::Service),
key_backups: build!(key_backups::Service),
media: build!(media::Service),
presence: build!(presence::Service),
pusher: build!(pusher::Service),
2024-05-09 15:59:08 -07:00
rooms: rooms::Service {
2024-07-04 03:26:19 +00:00
alias: build!(rooms::alias::Service),
auth_chain: build!(rooms::auth_chain::Service),
directory: build!(rooms::directory::Service),
event_handler: build!(rooms::event_handler::Service),
lazy_loading: build!(rooms::lazy_loading::Service),
metadata: build!(rooms::metadata::Service),
outlier: build!(rooms::outlier::Service),
pdu_metadata: build!(rooms::pdu_metadata::Service),
read_receipt: build!(rooms::read_receipt::Service),
search: build!(rooms::search::Service),
short: build!(rooms::short::Service),
2024-07-18 06:37:47 +00:00
spaces: build!(rooms::spaces::Service),
2024-07-04 03:26:19 +00:00
state: build!(rooms::state::Service),
state_accessor: build!(rooms::state_accessor::Service),
state_cache: build!(rooms::state_cache::Service),
state_compressor: build!(rooms::state_compressor::Service),
threads: build!(rooms::threads::Service),
2024-07-18 06:37:47 +00:00
timeline: build!(rooms::timeline::Service),
2024-07-04 03:26:19 +00:00
typing: build!(rooms::typing::Service),
user: build!(rooms::user::Service),
2024-05-09 15:59:08 -07:00
},
federation: build!(federation::Service),
2024-07-18 06:37:47 +00:00
sending: build!(sending::Service),
server_keys: build!(server_keys::Service),
2024-08-08 17:18:30 +00:00
sync: build!(sync::Service),
2024-07-04 03:26:19 +00:00
transaction_ids: build!(transaction_ids::Service),
uiaa: build!(uiaa::Service),
2024-07-18 06:37:47 +00:00
users: build!(users::Service),
moderation: build!(moderation::Service),
announcements: build!(announcements::Service),
2024-07-18 06:37:47 +00:00
manager: Mutex::new(None),
2024-07-04 03:26:19 +00:00
service,
2024-05-09 15:59:08 -07:00
server,
db,
2024-07-27 07:17:07 +00:00
}))
2024-05-09 15:59:08 -07:00
}
2024-07-27 07:17:07 +00:00
pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
debug_info!("Starting services...");
2024-10-27 00:30:30 +00:00
self.admin.set_services(Some(Arc::clone(self)).as_ref());
2024-11-01 22:16:14 +00:00
super::migrations::migrations(self).await?;
2024-07-13 07:01:45 +00:00
self.manager
.lock()
.await
.insert(Manager::new(self))
.clone()
.start()
.await?;
2024-05-09 15:59:08 -07:00
2025-01-10 23:51:08 -05:00
// reset dormant online/away statuses to offline, and set the server user as
// online
if self.server.config.allow_local_presence && !self.db.is_read_only() {
2025-01-10 23:51:08 -05:00
self.presence.unset_all_presence().await;
_ = self
.presence
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Online)
.await;
}
debug_info!("Services startup complete.");
2024-07-27 07:17:07 +00:00
Ok(Arc::clone(self))
}
2024-07-27 07:17:07 +00:00
pub async fn stop(&self) {
info!("Shutting down services...");
// set the server user as offline
if self.server.config.allow_local_presence && !self.db.is_read_only() {
_ = self
.presence
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline)
.await;
}
2024-07-13 08:07:49 +00:00
self.interrupt();
2024-07-13 07:01:45 +00:00
if let Some(manager) = self.manager.lock().await.as_ref() {
manager.stop().await;
}
2024-10-27 00:30:30 +00:00
self.admin.set_services(None);
2024-07-27 07:17:07 +00:00
debug_info!("Services shutdown complete.");
2024-05-09 15:59:08 -07:00
}
pub async fn poll(&self) -> Result<()> {
2024-07-13 07:01:45 +00:00
if let Some(manager) = self.manager.lock().await.as_ref() {
return manager.poll().await;
}
Ok(())
}
2024-07-04 03:26:19 +00:00
pub async fn clear_cache(&self) {
self.services()
.for_each(|service| async move {
service.clear_cache().await;
})
.await;
2024-05-09 15:59:08 -07:00
}
pub async fn memory_usage(&self) -> Result<String> {
self.services()
.map(Ok)
.try_fold(String::new(), |mut out, service| async move {
service.memory_usage(&mut out).await?;
Ok(out)
})
.await
}
fn interrupt(&self) {
debug!("Interrupting services...");
2024-07-18 06:37:47 +00:00
for (name, (service, ..)) in self.service.read().expect("locked for reading").iter() {
2024-07-28 09:03:17 +00:00
if let Some(service) = service.upgrade() {
trace!("Interrupting {name}");
service.interrupt();
}
2024-05-09 15:59:08 -07:00
}
}
2024-07-16 21:33:28 +00:00
/// Iterate from snapshot of the services map
fn services(&self) -> impl Stream<Item = Arc<dyn Service>> + Send {
self.service
.read()
.expect("locked for reading")
.values()
.filter_map(|val| val.0.upgrade())
.collect::<Vec<_>>()
.into_iter()
.stream()
}
#[inline]
2024-08-30 00:26:23 +00:00
pub fn try_get<T>(&self, name: &str) -> Result<Arc<T>>
2024-08-04 06:32:19 +00:00
where
2024-08-30 00:26:23 +00:00
T: Any + Send + Sync + Sized,
2024-08-04 06:32:19 +00:00
{
service::try_get::<T>(&self.service, name)
}
#[inline]
2024-08-30 00:26:23 +00:00
pub fn get<T>(&self, name: &str) -> Option<Arc<T>>
2024-08-04 06:32:19 +00:00
where
2024-08-30 00:26:23 +00:00
T: Any + Send + Sync + Sized,
2024-08-04 06:32:19 +00:00
{
2024-07-28 09:03:17 +00:00
service::get::<T>(&self.service, name)
}
2024-07-18 06:37:47 +00:00
}
2024-07-28 09:03:17 +00:00
#[allow(clippy::needless_pass_by_value)]
2024-07-18 06:37:47 +00:00
fn add_service(map: &Arc<Map>, s: Arc<dyn Service>, a: Arc<dyn Any + Send + Sync>) {
let name = s.name();
let len = map.read().expect("locked for reading").len();
trace!("built service #{len}: {name:?}");
map.write()
.expect("locked for writing")
2024-07-28 09:03:17 +00:00
.insert(name.to_owned(), (Arc::downgrade(&s), Arc::downgrade(&a)));
}