Files
continuwuity/src/router/run.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

139 lines
3.4 KiB
Rust
Raw Normal View History

2024-05-09 15:59:08 -07:00
use std::{sync::Arc, time::Duration};
use axum_server::Handle as ServerHandle;
use tokio::{
sync::broadcast::{self, Sender},
task::JoinHandle,
};
2024-05-09 15:59:08 -07:00
extern crate conduit_admin as admin;
extern crate conduit_core as conduit;
extern crate conduit_service as service;
use std::sync::atomic::Ordering;
use conduit::{debug, debug_info, error, info, trace, Error, Result, Server};
2024-05-09 15:59:08 -07:00
use crate::serve;
2024-05-09 15:59:08 -07:00
/// Main loop base
#[tracing::instrument(skip_all)]
pub(crate) async fn run(server: Arc<Server>) -> Result<()> {
debug!("Start");
2024-05-09 15:59:08 -07:00
// Install the admin room callback here for now
2024-07-05 01:44:43 +00:00
admin::init().await;
2024-05-09 15:59:08 -07:00
// Setup shutdown/signal handling
let handle = ServerHandle::new();
2024-05-29 16:59:20 +00:00
let (tx, _) = broadcast::channel::<()>(1);
let sigs = server
.runtime()
.spawn(signal(server.clone(), tx.clone(), handle.clone()));
2024-05-09 15:59:08 -07:00
let mut listener = server
.runtime()
.spawn(serve::serve(server.clone(), handle.clone(), tx.subscribe()));
// Focal point
debug!("Running");
let res = tokio::select! {
res = &mut listener => res.map_err(Error::from).unwrap_or_else(Err),
res = service::services().poll() => handle_services_poll(&server, res, listener).await,
};
2024-05-09 15:59:08 -07:00
// Join the signal handler before we leave.
sigs.abort();
_ = sigs.await;
// Remove the admin room callback
2024-07-05 01:44:43 +00:00
admin::fini().await;
2024-05-09 15:59:08 -07:00
debug_info!("Finish");
2024-05-29 16:59:20 +00:00
res
2024-05-09 15:59:08 -07:00
}
/// Async initializations
#[tracing::instrument(skip_all)]
pub(crate) async fn start(server: Arc<Server>) -> Result<()> {
2024-05-09 15:59:08 -07:00
debug!("Starting...");
2024-06-05 05:41:29 +00:00
service::start(&server).await?;
2024-05-09 15:59:08 -07:00
#[cfg(feature = "systemd")]
2024-05-23 01:27:04 -04:00
sd_notify::notify(true, &[sd_notify::NotifyState::Ready]).expect("failed to notify systemd of ready state");
2024-05-09 15:59:08 -07:00
debug!("Started");
Ok(())
}
/// Async destructions
#[tracing::instrument(skip_all)]
pub(crate) async fn stop(_server: Arc<Server>) -> Result<()> {
2024-05-09 15:59:08 -07:00
debug!("Shutting down...");
// Wait for all completions before dropping or we'll lose them to the module
// unload and explode.
service::stop().await;
2024-06-05 05:41:29 +00:00
2024-05-09 15:59:08 -07:00
debug!("Cleaning up...");
#[cfg(feature = "systemd")]
2024-05-23 01:27:04 -04:00
sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]).expect("failed to notify systemd of stopping state");
2024-05-09 15:59:08 -07:00
info!("Shutdown complete.");
Ok(())
}
#[tracing::instrument(skip_all)]
async fn signal(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
2024-06-10 06:02:17 +00:00
loop {
let sig: &'static str = server
.signal
.subscribe()
.recv()
.await
.expect("channel error");
if !server.running() {
handle_shutdown(&server, &tx, &handle, sig).await;
break;
}
2024-05-09 15:59:08 -07:00
}
2024-06-10 06:02:17 +00:00
}
2024-05-09 15:59:08 -07:00
2024-06-10 06:02:17 +00:00
async fn handle_shutdown(server: &Arc<Server>, tx: &Sender<()>, handle: &axum_server::Handle, sig: &str) {
debug!("Received signal {}", sig);
2024-06-01 08:03:20 +00:00
if let Err(e) = tx.send(()) {
error!("failed sending shutdown transaction to channel: {e}");
}
2024-05-09 15:59:08 -07:00
2024-06-25 05:05:02 +00:00
let pending = server.metrics.requests_spawn_active.load(Ordering::Relaxed);
if pending > 0 {
let timeout = Duration::from_secs(36);
trace!(pending, ?timeout, "Notifying for graceful shutdown");
handle.graceful_shutdown(Some(timeout));
} else {
debug!(pending, "Notifying for immediate shutdown");
handle.shutdown();
2024-05-09 15:59:08 -07:00
}
}
async fn handle_services_poll(
server: &Arc<Server>, result: Result<()>, listener: JoinHandle<Result<()>>,
) -> Result<()> {
debug!("Service manager finished: {result:?}");
if server.running() {
if let Err(e) = server.shutdown() {
error!("Failed to send shutdown signal: {e}");
}
}
if let Err(e) = listener.await {
error!("Client listener task finished with error: {e}");
}
result
}