Files
continuwuity/src/service/presence/mod.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

236 lines
6.6 KiB
Rust
Raw Normal View History

mod data;
mod presence;
2024-04-01 20:48:40 -07:00
use std::{sync::Arc, time::Duration};
2022-09-06 23:15:09 +02:00
2024-07-04 03:26:19 +00:00
use async_trait::async_trait;
2024-12-14 21:58:01 -05:00
use conduwuit::{checked, debug, error, result::LogErr, Error, Result, Server};
2024-08-08 17:18:30 +00:00
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use loole::{Receiver, Sender};
use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId};
use tokio::time::sleep;
use self::{data::Data, presence::Presence};
2024-07-22 07:43:51 +00:00
use crate::{globals, users, Dep};
2024-05-09 15:59:08 -07:00
pub struct Service {
timer_channel: (Sender<TimerType>, Receiver<TimerType>),
2024-04-01 20:48:40 -07:00
timeout_remote_users: bool,
2024-07-07 04:46:16 +00:00
idle_timeout: u64,
offline_timeout: u64,
pub db: Data,
services: Services,
}
2024-07-18 06:37:47 +00:00
struct Services {
server: Arc<Server>,
2024-07-22 07:43:51 +00:00
globals: Dep<globals::Service>,
2024-07-18 06:37:47 +00:00
users: Dep<users::Service>,
}
type TimerType = (OwnedUserId, Duration);
2024-07-04 03:26:19 +00:00
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
2024-07-07 04:46:16 +00:00
let idle_timeout_s = config.presence_idle_timeout_s;
let offline_timeout_s = config.presence_offline_timeout_s;
2024-05-27 03:17:20 +00:00
Ok(Arc::new(Self {
timer_channel: loole::unbounded(),
2024-04-01 20:48:40 -07:00
timeout_remote_users: config.presence_timeout_remote_users,
2024-07-07 04:46:16 +00:00
idle_timeout: checked!(idle_timeout_s * 1_000)?,
offline_timeout: checked!(offline_timeout_s * 1_000)?,
db: Data::new(&args),
services: Services {
server: args.server.clone(),
globals: args.depend::<globals::Service>("globals"),
users: args.depend::<users::Service>("users"),
},
2024-05-27 03:17:20 +00:00
}))
2024-04-01 20:48:40 -07:00
}
async fn worker(self: Arc<Self>) -> Result<()> {
let receiver = self.timer_channel.1.clone();
let mut presence_timers = FuturesUnordered::new();
while !receiver.is_closed() {
tokio::select! {
2024-08-08 17:18:30 +00:00
Some(user_id) = presence_timers.next() => {
self.process_presence_timer(&user_id).await.log_err().ok();
},
event = receiver.recv_async() => match event {
Err(_) => break,
Ok((user_id, timeout)) => {
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
presence_timers.push(presence_timer(user_id, timeout));
},
},
2024-05-09 15:59:08 -07:00
}
}
Ok(())
2024-05-09 15:59:08 -07:00
}
2024-07-04 03:26:19 +00:00
fn interrupt(&self) {
let (timer_sender, _) = &self.timer_channel;
if !timer_sender.is_closed() {
timer_sender.close();
2024-05-09 15:59:08 -07:00
}
2024-04-01 20:48:40 -07:00
}
2024-07-04 03:26:19 +00:00
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
2024-04-01 20:48:40 -07:00
/// Returns the latest presence event for the given user.
#[inline]
2024-08-08 17:18:30 +00:00
pub async fn get_presence(&self, user_id: &UserId) -> Result<PresenceEvent> {
self.db
.get_presence(user_id)
.map_ok(|(_, presence)| presence)
.await
2020-07-28 15:58:50 +02:00
}
2024-03-05 19:48:54 -05:00
/// Pings the presence of the given user in the given room, setting the
/// specified state.
2024-08-08 17:18:30 +00:00
pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
const REFRESH_TIMEOUT: u64 = 60 * 1000;
2024-08-08 17:18:30 +00:00
let last_presence = self.db.get_presence(user_id).await;
2024-04-01 20:48:40 -07:00
let state_changed = match last_presence {
| Err(_) => true,
| Ok((_, ref presence)) => presence.content.presence != *new_state,
2024-04-01 20:48:40 -07:00
};
let last_last_active_ago = match last_presence {
| Err(_) => 0_u64,
| Ok((_, ref presence)) =>
presence.content.last_active_ago.unwrap_or_default().into(),
2024-04-01 20:48:40 -07:00
};
if !state_changed && last_last_active_ago < REFRESH_TIMEOUT {
return Ok(());
}
let status_msg = match last_presence {
| Ok((_, ref presence)) => presence.content.status_msg.clone(),
| Err(_) => Some(String::new()),
2024-04-01 20:48:40 -07:00
};
let last_active_ago = UInt::new(0);
let currently_active = *new_state == PresenceState::Online;
self.set_presence(user_id, new_state, Some(currently_active), last_active_ago, status_msg)
2024-08-08 17:18:30 +00:00
.await
2020-07-28 15:58:50 +02:00
}
2024-03-05 19:48:54 -05:00
/// Adds a presence event which will be saved until a new event replaces it.
2024-08-08 17:18:30 +00:00
pub async fn set_presence(
&self,
user_id: &UserId,
state: &PresenceState,
currently_active: Option<bool>,
last_active_ago: Option<UInt>,
2024-04-14 03:44:04 -07:00
status_msg: Option<String>,
) -> Result<()> {
2024-04-14 03:44:04 -07:00
let presence_state = match state.as_str() {
| "" => &PresenceState::Offline, // default an empty string to 'offline'
| &_ => state,
2024-04-14 03:44:04 -07:00
};
2024-03-25 17:05:11 -04:00
self.db
2024-08-08 17:18:30 +00:00
.set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg)
.await?;
2024-04-01 20:48:40 -07:00
if (self.timeout_remote_users || self.services.globals.user_is_local(user_id))
&& user_id != self.services.globals.server_user
{
2024-04-01 20:48:40 -07:00
let timeout = match presence_state {
| PresenceState::Online => self.services.server.config.presence_idle_timeout_s,
| _ => self.services.server.config.presence_offline_timeout_s,
2024-04-01 20:48:40 -07:00
};
self.timer_channel
.0
2024-04-01 20:48:40 -07:00
.send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|e| {
error!("Failed to add presence timer: {}", e);
Error::bad_database("Failed to add presence timer")
})?;
}
Ok(())
}
2024-03-05 19:48:54 -05:00
/// Removes the presence record for the given user from the database.
2024-04-22 23:48:57 -04:00
///
/// TODO: Why is this not used?
#[allow(dead_code)]
pub async fn remove_presence(&self, user_id: &UserId) {
self.db.remove_presence(user_id).await;
}
2024-03-05 19:48:54 -05:00
/// Returns the most recent presence updates that happened after the event
/// with id `since`.
pub fn presence_since(
&self,
since: u64,
) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
2024-04-01 20:48:40 -07:00
self.db.presence_since(since)
}
2024-03-05 19:48:54 -05:00
2024-08-08 17:18:30 +00:00
#[inline]
pub async fn from_json_bytes_to_event(
&self,
bytes: &[u8],
user_id: &UserId,
) -> Result<PresenceEvent> {
2024-07-18 06:37:47 +00:00
let presence = Presence::from_json_bytes(bytes)?;
2024-08-08 17:18:30 +00:00
let event = presence
.to_presence_event(user_id, &self.services.users)
.await;
Ok(event)
2024-07-18 06:37:47 +00:00
}
2024-08-08 17:18:30 +00:00
async fn process_presence_timer(&self, user_id: &OwnedUserId) -> Result<()> {
2024-07-07 04:46:16 +00:00
let mut presence_state = PresenceState::Offline;
let mut last_active_ago = None;
let mut status_msg = None;
2020-07-28 15:58:50 +02:00
2024-08-08 17:18:30 +00:00
let presence_event = self.get_presence(user_id).await;
2024-08-08 17:18:30 +00:00
if let Ok(presence_event) = presence_event {
2024-07-07 04:46:16 +00:00
presence_state = presence_event.content.presence;
last_active_ago = presence_event.content.last_active_ago;
status_msg = presence_event.content.status_msg;
}
2024-03-05 19:48:54 -05:00
2024-07-07 04:46:16 +00:00
let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
| (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout =>
Some(PresenceState::Unavailable),
| (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout =>
Some(PresenceState::Offline),
| _ => None,
2024-07-07 04:46:16 +00:00
};
2024-03-05 19:48:54 -05:00
2024-07-07 04:46:16 +00:00
debug!(
"Processed presence timer for user '{user_id}': Old state = {presence_state}, New \
state = {new_state:?}"
2024-07-07 04:46:16 +00:00
);
2024-03-05 19:48:54 -05:00
2024-07-07 04:46:16 +00:00
if let Some(new_state) = new_state {
2024-08-08 17:18:30 +00:00
self.set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg)
.await?;
2024-07-07 04:46:16 +00:00
}
2024-03-05 19:48:54 -05:00
2024-07-07 04:46:16 +00:00
Ok(())
}
2024-07-07 04:46:16 +00:00
}
2024-03-05 19:48:54 -05:00
2024-07-07 04:46:16 +00:00
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
sleep(timeout).await;
user_id
}