Files
continuwuity/src/router/run.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

163 lines
4.4 KiB
Rust
Raw Normal View History

2024-12-14 21:58:01 -05:00
extern crate conduwuit_admin as admin;
extern crate conduwuit_core as conduwuit;
extern crate conduwuit_service as service;
2024-07-27 07:17:07 +00:00
use std::{
sync::{Arc, Weak, atomic::Ordering},
2024-07-27 07:17:07 +00:00
time::Duration,
};
2024-05-09 15:59:08 -07:00
use axum_server::Handle as ServerHandle;
use conduwuit::{Error, Result, Server, debug, debug_error, debug_info, error, info};
2025-01-28 20:02:29 +00:00
use futures::FutureExt;
2024-07-27 07:17:07 +00:00
use service::Services;
use tokio::{
sync::broadcast::{self, Sender},
task::JoinHandle,
};
2024-05-09 15:59:08 -07:00
use crate::serve;
2024-05-09 15:59:08 -07:00
/// Main loop base
2026-01-04 03:04:37 +00:00
#[tracing::instrument(skip_all, level = "info")]
2024-07-27 07:17:07 +00:00
pub(crate) async fn run(services: Arc<Services>) -> Result<()> {
let server = &services.server;
debug!("Start");
2024-05-09 15:59:08 -07:00
// Install the admin room callback here for now
2024-07-27 07:17:07 +00:00
admin::init(&services.admin).await;
2024-05-09 15:59:08 -07:00
// Setup shutdown/signal handling
let handle = ServerHandle::new();
2024-05-29 16:59:20 +00:00
let (tx, _) = broadcast::channel::<()>(1);
let sigs = server
.runtime()
.spawn(signal(server.clone(), tx.clone(), handle.clone()));
2024-05-09 15:59:08 -07:00
let mut listener =
server
.runtime()
.spawn(serve::serve(services.clone(), handle.clone(), tx.subscribe()));
// Run startup admin commands.
// This has to be done after the admin service is initialized otherwise it
// panics.
services.admin.startup_execute().await?;
// Print first-run banner if necessary. This needs to be done after the startup
// admin commands are run in case one of them created the first user.
services.firstrun.print_first_run_banner();
debug!("Running");
let res = tokio::select! {
res = &mut listener => res.map_err(Error::from).unwrap_or_else(Err),
2024-07-27 07:17:07 +00:00
res = services.poll() => handle_services_poll(server, res, listener).await,
};
2024-05-09 15:59:08 -07:00
// Join the signal handler before we leave.
sigs.abort();
_ = sigs.await;
// Remove the admin room callback
2024-07-27 07:17:07 +00:00
admin::fini(&services.admin).await;
2024-05-09 15:59:08 -07:00
debug_info!("Finish");
2024-05-29 16:59:20 +00:00
res
2024-05-09 15:59:08 -07:00
}
/// Async initializations
2026-01-04 03:04:37 +00:00
#[tracing::instrument(skip_all, level = "info")]
2024-07-27 07:17:07 +00:00
pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
2024-05-09 15:59:08 -07:00
debug!("Starting...");
2024-06-05 05:41:29 +00:00
2024-07-27 07:17:07 +00:00
let services = Services::build(server).await?.start().await?;
2024-05-09 15:59:08 -07:00
2025-01-10 10:25:07 -05:00
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])
.expect("failed to notify systemd of ready state");
2024-05-09 15:59:08 -07:00
debug!("Started");
2024-07-27 07:17:07 +00:00
Ok(services)
2024-05-09 15:59:08 -07:00
}
/// Async destructions
2026-01-04 03:04:37 +00:00
#[tracing::instrument(skip_all, level = "info")]
2024-07-27 07:17:07 +00:00
pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
2024-05-09 15:59:08 -07:00
debug!("Shutting down...");
2025-04-02 04:12:24 +00:00
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(false, &[sd_notify::NotifyState::Stopping])
2025-04-02 04:12:24 +00:00
.expect("failed to notify systemd of stopping state");
2024-05-09 15:59:08 -07:00
// Wait for all completions before dropping or we'll lose them to the module
// unload and explode.
2024-07-27 07:17:07 +00:00
services.stop().await;
// Check that Services and Database will drop as expected, The complex of Arc's
// used for various components can easily lead to references being held
// somewhere improperly; this can hang shutdowns.
debug!("Cleaning up...");
let db = Arc::downgrade(&services.db);
2024-07-27 07:17:07 +00:00
if let Err(services) = Arc::try_unwrap(services) {
debug_error!(
"{} dangling references to Services after shutdown",
Arc::strong_count(&services)
);
}
2024-06-05 05:41:29 +00:00
if Weak::strong_count(&db) > 0 {
debug_error!(
"{} dangling references to Database after shutdown",
Weak::strong_count(&db)
);
}
2024-05-09 15:59:08 -07:00
info!("Shutdown complete.");
Ok(())
}
2026-01-04 03:04:37 +00:00
#[tracing::instrument(skip_all, level = "info")]
async fn signal(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
2025-01-28 20:02:29 +00:00
server
.clone()
.until_shutdown()
.then(move |()| handle_shutdown(server, tx, handle))
.await;
2024-06-10 06:02:17 +00:00
}
2024-05-09 15:59:08 -07:00
2025-01-28 20:02:29 +00:00
async fn handle_shutdown(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
2024-06-01 08:03:20 +00:00
if let Err(e) = tx.send(()) {
error!("failed sending shutdown transaction to channel: {e}");
}
2024-05-09 15:59:08 -07:00
2025-02-02 10:43:02 +00:00
let timeout = server.config.client_shutdown_timeout;
let timeout = Duration::from_secs(timeout);
debug!(
?timeout,
2026-01-04 03:04:37 +00:00
handle_active = %server.metrics.requests_handle_active.load(Ordering::Relaxed),
"Notifying for graceful shutdown"
);
handle.graceful_shutdown(Some(timeout));
2024-05-09 15:59:08 -07:00
}
async fn handle_services_poll(
server: &Arc<Server>,
result: Result<()>,
listener: JoinHandle<Result<()>>,
) -> Result<()> {
debug!("Service manager finished: {result:?}");
if server.running() {
if let Err(e) = server.shutdown() {
error!("Failed to send shutdown signal: {e}");
}
}
if let Err(e) = listener.await {
error!("Client listener task finished with error: {e}");
}
result
}