2021-06-08 18:10:00 +02:00
pub mod abstraction ;
2022-09-06 23:15:09 +02:00
pub mod key_value ;
2021-06-08 18:10:00 +02:00
2023-02-20 22:59:45 +01:00
use crate ::{
service ::rooms ::timeline ::PduCount , services , utils , Config , Error , PduEvent , Result , Services ,
SERVICES ,
};
2022-10-10 14:09:11 +02:00
use abstraction ::{ KeyValueDatabaseEngine , KvTree };
2020-03-30 13:46:18 +02:00
use directories ::ProjectDirs ;
2021-06-30 09:52:01 +02:00
use lru_cache ::LruCache ;
2022-04-07 12:11:55 +00:00
use ruma ::{
events ::{
2022-10-21 12:27:11 +00:00
push_rules ::{ PushRulesEvent , PushRulesEventContent },
room ::message ::RoomMessageEventContent ,
2022-10-05 12:45:54 +02:00
GlobalAccountDataEvent , GlobalAccountDataEventType , StateEventType ,
2022-04-07 12:11:55 +00:00
},
push ::Ruleset ,
2022-10-10 14:09:11 +02:00
CanonicalJsonValue , EventId , OwnedDeviceId , OwnedEventId , OwnedRoomId , OwnedUserId , RoomId ,
UserId ,
2022-04-07 12:11:55 +00:00
};
2023-07-29 20:01:38 +02:00
use serde ::Deserialize ;
2021-06-08 20:54:36 +04:30
use std ::{
2021-08-01 15:14:54 +02:00
collections ::{ BTreeMap , HashMap , HashSet },
2021-06-08 20:54:36 +04:30
fs ::{ self , remove_dir_all },
io ::Write ,
2021-08-01 15:14:54 +02:00
mem ::size_of ,
2021-07-14 07:07:08 +00:00
path ::Path ,
2021-07-18 20:43:39 +02:00
sync ::{ Arc , Mutex , RwLock },
2023-07-29 20:01:38 +02:00
time ::Duration ,
2021-06-08 20:54:36 +04:30
};
2023-07-29 20:01:38 +02:00
use tokio ::time ::interval ;
2022-10-08 13:04:55 +02:00
2022-02-03 20:52:41 +02:00
use tracing ::{ debug , error , info , warn };
2022-06-25 16:12:23 +02:00
pub struct KeyValueDatabase {
_db : Arc < dyn KeyValueDatabaseEngine > ,
2022-09-06 23:15:09 +02:00
//pub globals: globals::Globals,
pub ( super ) global : Arc < dyn KvTree > ,
pub ( super ) server_signingkeys : Arc < dyn KvTree > ,
//pub users: users::Users,
pub ( super ) userid_password : Arc < dyn KvTree > ,
pub ( super ) userid_displayname : Arc < dyn KvTree > ,
pub ( super ) userid_avatarurl : Arc < dyn KvTree > ,
pub ( super ) userid_blurhash : Arc < dyn KvTree > ,
pub ( super ) userdeviceid_token : Arc < dyn KvTree > ,
pub ( super ) userdeviceid_metadata : Arc < dyn KvTree > , // This is also used to check if a device exists
pub ( super ) userid_devicelistversion : Arc < dyn KvTree > , // DevicelistVersion = u64
pub ( super ) token_userdeviceid : Arc < dyn KvTree > ,
pub ( super ) onetimekeyid_onetimekeys : Arc < dyn KvTree > , // OneTimeKeyId = UserId + DeviceKeyId
pub ( super ) userid_lastonetimekeyupdate : Arc < dyn KvTree > , // LastOneTimeKeyUpdate = Count
pub ( super ) keychangeid_userid : Arc < dyn KvTree > , // KeyChangeId = UserId/RoomId + Count
pub ( super ) keyid_key : Arc < dyn KvTree > , // KeyId = UserId + KeyId (depends on key type)
pub ( super ) userid_masterkeyid : Arc < dyn KvTree > ,
pub ( super ) userid_selfsigningkeyid : Arc < dyn KvTree > ,
pub ( super ) userid_usersigningkeyid : Arc < dyn KvTree > ,
pub ( super ) userfilterid_filter : Arc < dyn KvTree > , // UserFilterId = UserId + FilterId
pub ( super ) todeviceid_events : Arc < dyn KvTree > , // ToDeviceId = UserId + DeviceId + Count
//pub uiaa: uiaa::Uiaa,
pub ( super ) userdevicesessionid_uiaainfo : Arc < dyn KvTree > , // User-interactive authentication
pub ( super ) userdevicesessionid_uiaarequest :
2022-10-09 17:25:06 +02:00
RwLock < BTreeMap < ( OwnedUserId , OwnedDeviceId , String ), CanonicalJsonValue >> ,
2022-09-06 23:15:09 +02:00
//pub edus: RoomEdus,
pub ( super ) readreceiptid_readreceipt : Arc < dyn KvTree > , // ReadReceiptId = RoomId + Count + UserId
pub ( super ) roomuserid_privateread : Arc < dyn KvTree > , // RoomUserId = Room + User, PrivateRead = Count
pub ( super ) roomuserid_lastprivatereadupdate : Arc < dyn KvTree > , // LastPrivateReadUpdate = Count
2022-10-05 20:34:31 +02:00
pub ( super ) typingid_userid : Arc < dyn KvTree > , // TypingId = RoomId + TimeoutTime + Count
2022-09-06 23:15:09 +02:00
pub ( super ) roomid_lasttypingupdate : Arc < dyn KvTree > , // LastRoomTypingUpdate = Count
2022-10-05 20:34:31 +02:00
pub ( super ) presenceid_presence : Arc < dyn KvTree > , // PresenceId = RoomId + Count + UserId
2022-09-06 23:15:09 +02:00
pub ( super ) userid_lastpresenceupdate : Arc < dyn KvTree > , // LastPresenceUpdate = Count
//pub rooms: rooms::Rooms,
pub ( super ) pduid_pdu : Arc < dyn KvTree > , // PduId = ShortRoomId + Count
pub ( super ) eventid_pduid : Arc < dyn KvTree > ,
pub ( super ) roomid_pduleaves : Arc < dyn KvTree > ,
pub ( super ) alias_roomid : Arc < dyn KvTree > ,
pub ( super ) aliasid_alias : Arc < dyn KvTree > , // AliasId = RoomId + Count
pub ( super ) publicroomids : Arc < dyn KvTree > ,
2023-06-25 19:31:40 +02:00
pub ( super ) threadid_userids : Arc < dyn KvTree > , // ThreadId = RoomId + Count
2022-09-06 23:15:09 +02:00
pub ( super ) tokenids : Arc < dyn KvTree > , // TokenId = ShortRoomId + Token + PduIdCount
/// Participating servers in a room.
pub ( super ) roomserverids : Arc < dyn KvTree > , // RoomServerId = RoomId + ServerName
pub ( super ) serverroomids : Arc < dyn KvTree > , // ServerRoomId = ServerName + RoomId
pub ( super ) userroomid_joined : Arc < dyn KvTree > ,
pub ( super ) roomuserid_joined : Arc < dyn KvTree > ,
pub ( super ) roomid_joinedcount : Arc < dyn KvTree > ,
pub ( super ) roomid_invitedcount : Arc < dyn KvTree > ,
pub ( super ) roomuseroncejoinedids : Arc < dyn KvTree > ,
pub ( super ) userroomid_invitestate : Arc < dyn KvTree > , // InviteState = Vec<Raw<Pdu>>
pub ( super ) roomuserid_invitecount : Arc < dyn KvTree > , // InviteCount = Count
pub ( super ) userroomid_leftstate : Arc < dyn KvTree > ,
pub ( super ) roomuserid_leftcount : Arc < dyn KvTree > ,
pub ( super ) disabledroomids : Arc < dyn KvTree > , // Rooms where incoming federation handling is disabled
pub ( super ) lazyloadedids : Arc < dyn KvTree > , // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId
pub ( super ) userroomid_notificationcount : Arc < dyn KvTree > , // NotifyCount = u64
pub ( super ) userroomid_highlightcount : Arc < dyn KvTree > , // HightlightCount = u64
2022-10-30 20:36:14 +01:00
pub ( super ) roomuserid_lastnotificationread : Arc < dyn KvTree > , // LastNotificationRead = u64
2022-09-06 23:15:09 +02:00
/// Remember the current state hash of a room.
pub ( super ) roomid_shortstatehash : Arc < dyn KvTree > ,
pub ( super ) roomsynctoken_shortstatehash : Arc < dyn KvTree > ,
/// Remember the state hash at events in the past.
pub ( super ) shorteventid_shortstatehash : Arc < dyn KvTree > ,
/// StateKey = EventType + StateKey, ShortStateKey = Count
pub ( super ) statekey_shortstatekey : Arc < dyn KvTree > ,
pub ( super ) shortstatekey_statekey : Arc < dyn KvTree > ,
pub ( super ) roomid_shortroomid : Arc < dyn KvTree > ,
pub ( super ) shorteventid_eventid : Arc < dyn KvTree > ,
pub ( super ) eventid_shorteventid : Arc < dyn KvTree > ,
pub ( super ) statehash_shortstatehash : Arc < dyn KvTree > ,
pub ( super ) shortstatehash_statediff : Arc < dyn KvTree > , // StateDiff = parent (or 0) + (shortstatekey+shorteventid++) + 0_u64 + (shortstatekey+shorteventid--)
pub ( super ) shorteventid_authchain : Arc < dyn KvTree > ,
/// RoomId + EventId -> outlier PDU.
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
pub ( super ) eventid_outlierpdu : Arc < dyn KvTree > ,
pub ( super ) softfailedeventids : Arc < dyn KvTree > ,
2023-06-25 19:31:40 +02:00
/// ShortEventId + ShortEventId -> ().
2023-06-26 12:38:51 +02:00
pub ( super ) tofrom_relation : Arc < dyn KvTree > ,
2022-09-06 23:15:09 +02:00
/// RoomId + EventId -> Parent PDU EventId.
pub ( super ) referencedevents : Arc < dyn KvTree > ,
//pub account_data: account_data::AccountData,
pub ( super ) roomuserdataid_accountdata : Arc < dyn KvTree > , // RoomUserDataId = Room + User + Count + Type
pub ( super ) roomusertype_roomuserdataid : Arc < dyn KvTree > , // RoomUserType = Room + User + Type
//pub media: media::Media,
pub ( super ) mediaid_file : Arc < dyn KvTree > , // MediaId = MXC + WidthHeight + ContentDisposition + ContentType
//pub key_backups: key_backups::KeyBackups,
pub ( super ) backupid_algorithm : Arc < dyn KvTree > , // BackupId = UserId + Version(Count)
pub ( super ) backupid_etag : Arc < dyn KvTree > , // BackupId = UserId + Version(Count)
pub ( super ) backupkeyid_backup : Arc < dyn KvTree > , // BackupKeyId = UserId + Version + RoomId + SessionId
//pub transaction_ids: transaction_ids::TransactionIds,
pub ( super ) userdevicetxnid_response : Arc < dyn KvTree > , // Response can be empty (/sendToDevice) or the event id (/send)
//pub sending: sending::Sending,
pub ( super ) servername_educount : Arc < dyn KvTree > , // EduCount: Count of last EDU sync
pub ( super ) servernameevent_data : Arc < dyn KvTree > , // ServernameEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content
pub ( super ) servercurrentevent_data : Arc < dyn KvTree > , // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content
//pub appservice: appservice::Appservice,
pub ( super ) id_appserviceregistrations : Arc < dyn KvTree > ,
//pub pusher: pusher::PushData,
pub ( super ) senderkey_pusher : Arc < dyn KvTree > ,
2022-10-05 12:45:54 +02:00
pub ( super ) cached_registrations : Arc < RwLock < HashMap < String , serde_yaml ::Value >>> ,
2022-10-09 17:25:06 +02:00
pub ( super ) pdu_cache : Mutex < LruCache < OwnedEventId , Arc < PduEvent >>> ,
2022-10-05 12:45:54 +02:00
pub ( super ) shorteventid_cache : Mutex < LruCache < u64 , Arc < EventId >>> ,
pub ( super ) auth_chain_cache : Mutex < LruCache < Vec < u64 > , Arc < HashSet < u64 >>>> ,
2022-10-09 17:25:06 +02:00
pub ( super ) eventidshort_cache : Mutex < LruCache < OwnedEventId , u64 >> ,
2022-10-05 12:45:54 +02:00
pub ( super ) statekeyshort_cache : Mutex < LruCache < ( StateEventType , String ), u64 >> ,
pub ( super ) shortstatekey_cache : Mutex < LruCache < u64 , ( StateEventType , String ) >> ,
2022-10-09 17:25:06 +02:00
pub ( super ) our_real_users_cache : RwLock < HashMap < OwnedRoomId , Arc < HashSet < OwnedUserId >>>> ,
pub ( super ) appservice_in_room_cache : RwLock < HashMap < OwnedRoomId , HashMap < String , bool >>> ,
2023-02-20 22:59:45 +01:00
pub ( super ) lasttimelinecount_cache : Mutex < HashMap < OwnedRoomId , PduCount >> ,
2020-03-30 13:46:18 +02:00
}
2022-06-25 16:12:23 +02:00
impl KeyValueDatabase {
2020-04-10 13:36:57 +02:00
/// Tries to remove the old database but ignores all errors.
2020-06-09 15:13:17 +02:00
pub fn try_remove ( server_name : & str ) -> Result < () > {
2020-04-11 20:03:22 +02:00
let mut path = ProjectDirs ::from ( "xyz" , "koesters" , "conduit" )
2020-11-15 12:17:21 +01:00
. ok_or_else ( || Error ::bad_config ( "The OS didn't return a valid home directory path." )) ?
2020-04-10 13:36:57 +02:00
. data_dir ()
. to_path_buf ();
2020-05-06 15:36:44 +02:00
path . push ( server_name );
2020-04-10 13:36:57 +02:00
let _ = remove_dir_all ( path );
2020-06-09 15:13:17 +02:00
Ok (())
2020-04-10 13:36:57 +02:00
}
2022-01-09 16:44:44 +01:00
fn check_db_setup ( config : & Config ) -> Result < () > {
let path = Path ::new ( & config . database_path );
let sled_exists = path . join ( "db" ). exists ();
let sqlite_exists = path . join ( "conduit.db" ). exists ();
let rocksdb_exists = path . join ( "IDENTITY" ). exists ();
let mut count = 0 ;
if sled_exists {
count += 1 ;
}
if sqlite_exists {
count += 1 ;
}
if rocksdb_exists {
count += 1 ;
}
if count > 1 {
warn! ( "Multiple databases at database_path detected" );
return Ok (());
}
2022-01-20 00:10:39 +01:00
if sled_exists && config . database_backend != "sled" {
return Err ( Error ::bad_config (
"Found sled at database_path, but is not specified in config." ,
));
2022-01-09 16:44:44 +01:00
}
2022-01-20 00:10:39 +01:00
if sqlite_exists && config . database_backend != "sqlite" {
return Err ( Error ::bad_config (
"Found sqlite at database_path, but is not specified in config." ,
));
2022-01-09 16:44:44 +01:00
}
2022-01-20 00:10:39 +01:00
if rocksdb_exists && config . database_backend != "rocksdb" {
return Err ( Error ::bad_config (
"Found rocksdb at database_path, but is not specified in config." ,
));
2021-07-14 07:07:08 +00:00
}
Ok (())
}
2020-03-30 13:46:18 +02:00
/// Load an existing database or create a new one.
2022-10-05 18:36:12 +02:00
pub async fn load_or_create ( config : Config ) -> Result < () > {
Self ::check_db_setup ( & config ) ? ;
2021-07-14 07:07:08 +00:00
2021-09-07 19:41:14 +01:00
if ! Path ::new ( & config . database_path ). exists () {
std ::fs ::create_dir_all ( & config . database_path )
. map_err ( | _ | Error ::BadConfig ( "Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself." )) ? ;
}
2022-06-25 16:12:23 +02:00
let builder : Arc < dyn KeyValueDatabaseEngine > = match &* config . database_backend {
2022-01-09 16:44:44 +01:00
"sqlite" => {
#[cfg(not(feature = "sqlite" ))]
return Err ( Error ::BadConfig ( "Database backend not found." ));
#[cfg(feature = "sqlite" )]
2022-10-05 18:36:12 +02:00
Arc ::new ( Arc ::< abstraction ::sqlite ::Engine > ::open ( & config ) ? )
2022-01-09 16:44:44 +01:00
}
"rocksdb" => {
#[cfg(not(feature = "rocksdb" ))]
return Err ( Error ::BadConfig ( "Database backend not found." ));
#[cfg(feature = "rocksdb" )]
2022-10-05 18:36:12 +02:00
Arc ::new ( Arc ::< abstraction ::rocksdb ::Engine > ::open ( & config ) ? )
2022-01-09 16:44:44 +01:00
}
2021-06-18 00:38:32 +01:00
"persy" => {
#[cfg(not(feature = "persy" ))]
return Err ( Error ::BadConfig ( "Database backend not found." ));
#[cfg(feature = "persy" )]
2022-10-05 18:36:12 +02:00
Arc ::new ( Arc ::< abstraction ::persy ::Engine > ::open ( & config ) ? )
2021-06-18 00:38:32 +01:00
}
2022-01-09 16:44:44 +01:00
_ => {
return Err ( Error ::BadConfig ( "Database backend not found." ));
}
};
2020-10-21 21:43:59 +02:00
2023-08-09 18:27:30 +02:00
if config . registration_token == Some ( String ::new ()) {
return Err ( Error ::bad_config ( "Registration token is empty" ));
}
2021-05-22 10:34:19 +02:00
if config . max_request_size < 1024 {
2022-12-18 07:47:18 +01:00
error! ( ? config . max_request_size , "Max request size is less than 1KB. Please increase it." );
2021-05-22 10:34:19 +02:00
}
2020-03-30 13:46:18 +02:00
2022-10-08 13:02:52 +02:00
let db_raw = Box ::new ( Self {
2021-07-14 07:07:08 +00:00
_db : builder . clone (),
2022-10-05 20:34:31 +02:00
userid_password : builder . open_tree ( "userid_password" ) ? ,
userid_displayname : builder . open_tree ( "userid_displayname" ) ? ,
userid_avatarurl : builder . open_tree ( "userid_avatarurl" ) ? ,
userid_blurhash : builder . open_tree ( "userid_blurhash" ) ? ,
userdeviceid_token : builder . open_tree ( "userdeviceid_token" ) ? ,
userdeviceid_metadata : builder . open_tree ( "userdeviceid_metadata" ) ? ,
userid_devicelistversion : builder . open_tree ( "userid_devicelistversion" ) ? ,
token_userdeviceid : builder . open_tree ( "token_userdeviceid" ) ? ,
onetimekeyid_onetimekeys : builder . open_tree ( "onetimekeyid_onetimekeys" ) ? ,
userid_lastonetimekeyupdate : builder . open_tree ( "userid_lastonetimekeyupdate" ) ? ,
keychangeid_userid : builder . open_tree ( "keychangeid_userid" ) ? ,
keyid_key : builder . open_tree ( "keyid_key" ) ? ,
userid_masterkeyid : builder . open_tree ( "userid_masterkeyid" ) ? ,
userid_selfsigningkeyid : builder . open_tree ( "userid_selfsigningkeyid" ) ? ,
userid_usersigningkeyid : builder . open_tree ( "userid_usersigningkeyid" ) ? ,
userfilterid_filter : builder . open_tree ( "userfilterid_filter" ) ? ,
todeviceid_events : builder . open_tree ( "todeviceid_events" ) ? ,
userdevicesessionid_uiaainfo : builder . open_tree ( "userdevicesessionid_uiaainfo" ) ? ,
userdevicesessionid_uiaarequest : RwLock ::new ( BTreeMap ::new ()),
readreceiptid_readreceipt : builder . open_tree ( "readreceiptid_readreceipt" ) ? ,
roomuserid_privateread : builder . open_tree ( "roomuserid_privateread" ) ? , // "Private" read receipt
roomuserid_lastprivatereadupdate : builder
. open_tree ( "roomuserid_lastprivatereadupdate" ) ? ,
typingid_userid : builder . open_tree ( "typingid_userid" ) ? ,
roomid_lasttypingupdate : builder . open_tree ( "roomid_lasttypingupdate" ) ? ,
presenceid_presence : builder . open_tree ( "presenceid_presence" ) ? ,
userid_lastpresenceupdate : builder . open_tree ( "userid_lastpresenceupdate" ) ? ,
pduid_pdu : builder . open_tree ( "pduid_pdu" ) ? ,
eventid_pduid : builder . open_tree ( "eventid_pduid" ) ? ,
roomid_pduleaves : builder . open_tree ( "roomid_pduleaves" ) ? ,
alias_roomid : builder . open_tree ( "alias_roomid" ) ? ,
aliasid_alias : builder . open_tree ( "aliasid_alias" ) ? ,
publicroomids : builder . open_tree ( "publicroomids" ) ? ,
2023-06-25 19:31:40 +02:00
threadid_userids : builder . open_tree ( "threadid_userids" ) ? ,
2022-10-05 20:34:31 +02:00
tokenids : builder . open_tree ( "tokenids" ) ? ,
roomserverids : builder . open_tree ( "roomserverids" ) ? ,
serverroomids : builder . open_tree ( "serverroomids" ) ? ,
userroomid_joined : builder . open_tree ( "userroomid_joined" ) ? ,
roomuserid_joined : builder . open_tree ( "roomuserid_joined" ) ? ,
roomid_joinedcount : builder . open_tree ( "roomid_joinedcount" ) ? ,
roomid_invitedcount : builder . open_tree ( "roomid_invitedcount" ) ? ,
roomuseroncejoinedids : builder . open_tree ( "roomuseroncejoinedids" ) ? ,
userroomid_invitestate : builder . open_tree ( "userroomid_invitestate" ) ? ,
roomuserid_invitecount : builder . open_tree ( "roomuserid_invitecount" ) ? ,
userroomid_leftstate : builder . open_tree ( "userroomid_leftstate" ) ? ,
roomuserid_leftcount : builder . open_tree ( "roomuserid_leftcount" ) ? ,
disabledroomids : builder . open_tree ( "disabledroomids" ) ? ,
lazyloadedids : builder . open_tree ( "lazyloadedids" ) ? ,
userroomid_notificationcount : builder . open_tree ( "userroomid_notificationcount" ) ? ,
userroomid_highlightcount : builder . open_tree ( "userroomid_highlightcount" ) ? ,
2022-10-30 20:36:14 +01:00
roomuserid_lastnotificationread : builder . open_tree ( "userroomid_highlightcount" ) ? ,
2022-10-05 20:34:31 +02:00
statekey_shortstatekey : builder . open_tree ( "statekey_shortstatekey" ) ? ,
shortstatekey_statekey : builder . open_tree ( "shortstatekey_statekey" ) ? ,
shorteventid_authchain : builder . open_tree ( "shorteventid_authchain" ) ? ,
roomid_shortroomid : builder . open_tree ( "roomid_shortroomid" ) ? ,
shortstatehash_statediff : builder . open_tree ( "shortstatehash_statediff" ) ? ,
eventid_shorteventid : builder . open_tree ( "eventid_shorteventid" ) ? ,
shorteventid_eventid : builder . open_tree ( "shorteventid_eventid" ) ? ,
shorteventid_shortstatehash : builder . open_tree ( "shorteventid_shortstatehash" ) ? ,
roomid_shortstatehash : builder . open_tree ( "roomid_shortstatehash" ) ? ,
roomsynctoken_shortstatehash : builder . open_tree ( "roomsynctoken_shortstatehash" ) ? ,
statehash_shortstatehash : builder . open_tree ( "statehash_shortstatehash" ) ? ,
eventid_outlierpdu : builder . open_tree ( "eventid_outlierpdu" ) ? ,
softfailedeventids : builder . open_tree ( "softfailedeventids" ) ? ,
2023-06-26 12:38:51 +02:00
tofrom_relation : builder . open_tree ( "tofrom_relation" ) ? ,
2022-10-05 20:34:31 +02:00
referencedevents : builder . open_tree ( "referencedevents" ) ? ,
roomuserdataid_accountdata : builder . open_tree ( "roomuserdataid_accountdata" ) ? ,
roomusertype_roomuserdataid : builder . open_tree ( "roomusertype_roomuserdataid" ) ? ,
mediaid_file : builder . open_tree ( "mediaid_file" ) ? ,
backupid_algorithm : builder . open_tree ( "backupid_algorithm" ) ? ,
backupid_etag : builder . open_tree ( "backupid_etag" ) ? ,
backupkeyid_backup : builder . open_tree ( "backupkeyid_backup" ) ? ,
userdevicetxnid_response : builder . open_tree ( "userdevicetxnid_response" ) ? ,
servername_educount : builder . open_tree ( "servername_educount" ) ? ,
servernameevent_data : builder . open_tree ( "servernameevent_data" ) ? ,
servercurrentevent_data : builder . open_tree ( "servercurrentevent_data" ) ? ,
id_appserviceregistrations : builder . open_tree ( "id_appserviceregistrations" ) ? ,
senderkey_pusher : builder . open_tree ( "senderkey_pusher" ) ? ,
global : builder . open_tree ( "global" ) ? ,
server_signingkeys : builder . open_tree ( "server_signingkeys" ) ? ,
cached_registrations : Arc ::new ( RwLock ::new ( HashMap ::new ())),
pdu_cache : Mutex ::new ( LruCache ::new (
config
. pdu_cache_capacity
. try_into ()
. expect ( "pdu cache capacity fits into usize" ),
)),
auth_chain_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
)),
shorteventid_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
)),
eventidshort_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
)),
shortstatekey_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
)),
statekeyshort_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
)),
our_real_users_cache : RwLock ::new ( HashMap ::new ()),
appservice_in_room_cache : RwLock ::new ( HashMap ::new ()),
lasttimelinecount_cache : Mutex ::new ( HashMap ::new ()),
2022-10-05 12:45:54 +02:00
});
2022-10-08 13:02:52 +02:00
let db = Box ::leak ( db_raw );
let services_raw = Box ::new ( Services ::build ( db , config ) ? );
2022-10-05 12:45:54 +02:00
// This is the first and only time we initialize the SERVICE static
2022-10-05 15:33:57 +02:00
* SERVICES . write (). unwrap () = Some ( Box ::leak ( services_raw ));
2022-10-05 12:45:54 +02:00
2022-02-03 20:52:41 +02:00
// Matrix resource ownership is based on the server name; changing it
// requires recreating the database from scratch.
2022-10-05 12:45:54 +02:00
if services (). users . count () ? > 0 {
2022-02-03 20:52:41 +02:00
let conduit_user =
2022-10-05 12:45:54 +02:00
UserId ::parse_with_server_name ( "conduit" , services (). globals . server_name ())
2022-02-03 20:52:41 +02:00
. expect ( "@conduit:server_name is valid" );
2022-10-05 12:45:54 +02:00
if ! services (). users . exists ( & conduit_user ) ? {
2022-02-03 20:52:41 +02:00
error! (
"The {} server user does not exist, and the database is not new." ,
conduit_user
);
return Err ( Error ::bad_database (
"Cannot reuse an existing database after changing the server name, please delete the old one first."
));
}
}
// If the database has any data, perform data migrations before starting
2023-02-26 17:57:44 +01:00
let latest_database_version = 13 ;
2022-02-03 20:52:41 +02:00
2022-10-05 12:45:54 +02:00
if services (). users . count () ? > 0 {
2021-07-14 07:07:08 +00:00
// MIGRATIONS
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 1 {
for ( roomserverid , _ ) in db . roomserverids . iter () {
2021-07-14 07:07:08 +00:00
let mut parts = roomserverid . split ( |& b | b == 0xff );
let room_id = parts . next (). expect ( "split always returns one element" );
let servername = match parts . next () {
Some ( s ) => s ,
None => {
error! ( "Migration: Invalid roomserverid in db." );
continue ;
}
};
let mut serverroomid = servername . to_vec ();
serverroomid . push ( 0xff );
serverroomid . extend_from_slice ( room_id );
2020-11-09 12:21:04 +01:00
2022-10-05 12:45:54 +02:00
db . serverroomids . insert ( & serverroomid , & []) ? ;
2021-07-14 07:07:08 +00:00
}
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 1 ) ? ;
2021-05-17 10:25:27 +02:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 0 -> 1 finished" );
2021-05-17 10:25:27 +02:00
}
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 2 {
2021-07-14 07:07:08 +00:00
// We accidentally inserted hashed versions of "" into the db instead of just ""
2022-10-05 12:45:54 +02:00
for ( userid , password ) in db . userid_password . iter () {
2021-07-14 07:07:08 +00:00
let password = utils ::string_from_bytes ( & password );
2021-05-17 10:25:27 +02:00
2021-07-14 07:07:08 +00:00
let empty_hashed_password = password . map_or ( false , | password | {
argon2 ::verify_encoded ( & password , b "" ). unwrap_or ( false )
});
2021-05-17 10:25:27 +02:00
2021-07-14 07:07:08 +00:00
if empty_hashed_password {
2022-10-05 12:45:54 +02:00
db . userid_password . insert ( & userid , b "" ) ? ;
2021-07-14 07:07:08 +00:00
}
}
2021-05-30 21:55:43 +02:00
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 2 ) ? ;
2021-06-08 20:53:24 +04:30
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 1 -> 2 finished" );
2021-05-30 21:55:43 +02:00
}
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 3 {
2021-07-14 07:07:08 +00:00
// Move media to filesystem
2022-10-05 12:45:54 +02:00
for ( key , content ) in db . mediaid_file . iter () {
2021-07-14 12:31:38 +02:00
if content . is_empty () {
2021-07-14 07:07:08 +00:00
continue ;
}
2021-05-30 21:55:43 +02:00
2022-10-05 12:45:54 +02:00
let path = services (). globals . get_media_file ( & key );
2021-07-14 07:07:08 +00:00
let mut file = fs ::File ::create ( path ) ? ;
file . write_all ( & content ) ? ;
2022-10-05 12:45:54 +02:00
db . mediaid_file . insert ( & key , & []) ? ;
2021-06-08 20:53:24 +04:30
}
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 3 ) ? ;
2021-06-08 20:53:24 +04:30
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 2 -> 3 finished" );
2021-07-14 07:07:08 +00:00
}
2021-06-12 18:40:33 +02:00
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 4 {
// Add federated users to services() as deactivated
for our_user in services (). users . iter () {
2021-07-14 07:07:08 +00:00
let our_user = our_user ? ;
2022-10-05 12:45:54 +02:00
if services (). users . is_deactivated ( & our_user ) ? {
2021-07-14 07:07:08 +00:00
continue ;
}
2022-10-05 12:45:54 +02:00
for room in services (). rooms . state_cache . rooms_joined ( & our_user ) {
for user in services (). rooms . state_cache . room_members ( & room ? ) {
2021-07-14 07:07:08 +00:00
let user = user ? ;
2022-10-05 12:45:54 +02:00
if user . server_name () != services (). globals . server_name () {
2022-12-18 07:47:18 +01:00
info! ( ? user , "Migration: creating user" );
2022-10-05 12:45:54 +02:00
services (). users . create ( & user , None ) ? ;
2021-07-14 07:07:08 +00:00
}
2021-06-12 18:40:33 +02:00
}
}
}
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 4 ) ? ;
2021-06-12 18:40:33 +02:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 3 -> 4 finished" );
2021-07-14 07:07:08 +00:00
}
2021-07-30 12:11:06 +02:00
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 5 {
2021-07-30 12:11:06 +02:00
// Upgrade user data store
2022-10-05 12:45:54 +02:00
for ( roomuserdataid , _ ) in db . roomuserdataid_accountdata . iter () {
2021-07-30 12:11:06 +02:00
let mut parts = roomuserdataid . split ( |& b | b == 0xff );
let room_id = parts . next (). unwrap ();
2021-07-30 18:05:26 +02:00
let user_id = parts . next (). unwrap ();
2021-07-30 12:11:06 +02:00
let event_type = roomuserdataid . rsplit ( |& b | b == 0xff ). next (). unwrap ();
let mut key = room_id . to_vec ();
key . push ( 0xff );
key . extend_from_slice ( user_id );
key . push ( 0xff );
key . extend_from_slice ( event_type );
2022-10-05 12:45:54 +02:00
db . roomusertype_roomuserdataid
2021-07-30 12:11:06 +02:00
. insert ( & key , & roomuserdataid ) ? ;
}
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 5 ) ? ;
2021-07-30 12:11:06 +02:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 4 -> 5 finished" );
2021-07-30 12:11:06 +02:00
}
2021-08-04 21:15:01 +02:00
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 6 {
2021-08-04 21:15:01 +02:00
// Set room member count
2022-10-05 12:45:54 +02:00
for ( roomid , _ ) in db . roomid_shortstatehash . iter () {
2021-11-27 00:30:28 +01:00
let string = utils ::string_from_bytes ( & roomid ). unwrap ();
let room_id = <& RoomId > ::try_from ( string . as_str ()). unwrap ();
2022-10-05 12:45:54 +02:00
services (). rooms . state_cache . update_joined_count ( room_id ) ? ;
2021-08-04 21:15:01 +02:00
}
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 6 ) ? ;
2021-08-04 21:15:01 +02:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 5 -> 6 finished" );
2021-08-04 21:15:01 +02:00
}
2021-08-01 15:14:54 +02:00
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 7 {
2021-08-01 15:14:54 +02:00
// Upgrade state store
2022-10-09 17:25:06 +02:00
let mut last_roomstates : HashMap < OwnedRoomId , u64 > = HashMap ::new ();
2021-08-12 23:04:00 +02:00
let mut current_sstatehash : Option < u64 > = None ;
2021-08-01 15:14:54 +02:00
let mut current_room = None ;
let mut current_state = HashSet ::new ();
let mut counter = 0 ;
2021-08-12 23:04:00 +02:00
let mut handle_state =
| current_sstatehash : u64 ,
current_room : & RoomId ,
current_state : HashSet < _ > ,
last_roomstates : & mut HashMap < _ , _ >| {
counter += 1 ;
let last_roomsstatehash = last_roomstates . get ( current_room );
let states_parents = last_roomsstatehash . map_or_else (
|| Ok ( Vec ::new ()),
|& last_roomsstatehash | {
2022-10-05 20:34:31 +02:00
services ()
. rooms
. state_compressor
2022-10-09 21:56:56 +02:00
. load_shortstatehash_info ( last_roomsstatehash )
2021-08-12 23:04:00 +02:00
},
) ? ;
let ( statediffnew , statediffremoved ) =
if let Some ( parent_stateinfo ) = states_parents . last () {
let statediffnew = current_state
. difference ( & parent_stateinfo . 1 )
2021-10-13 10:24:39 +02:00
. copied ()
2021-08-12 23:04:00 +02:00
. collect ::< HashSet < _ >> ();
let statediffremoved = parent_stateinfo
. 1
. difference ( & current_state )
2021-10-13 10:24:39 +02:00
. copied ()
2021-08-12 23:04:00 +02:00
. collect ::< HashSet < _ >> ();
( statediffnew , statediffremoved )
} else {
( current_state , HashSet ::new ())
};
2022-10-05 12:45:54 +02:00
services (). rooms . state_compressor . save_state_from_diff (
2022-10-09 21:56:56 +02:00
current_sstatehash ,
2023-06-27 13:06:55 +02:00
Arc ::new ( statediffnew ),
Arc ::new ( statediffremoved ),
2021-08-12 23:04:00 +02:00
2 , // every state change is 2 event changes on average
states_parents ,
) ? ;
/*
2022-10-05 12:45:54 +02:00
let mut tmp = services().rooms.load_shortstatehash_info(¤t_sstatehash)?;
2021-08-12 23:04:00 +02:00
let state = tmp.pop().unwrap();
println!(
"{}\t{}{:?}: {:?} + {:?} - {:?}",
current_room,
" ".repeat(tmp.len()),
utils::u64_from_bytes(¤t_sstatehash).unwrap(),
tmp.last().map(|b| utils::u64_from_bytes(&b.0).unwrap()),
state
.2
.iter()
.map(|b| utils::u64_from_bytes(&b[size_of::<u64>()..]).unwrap())
.collect::<Vec<_>>(),
state
.3
.iter()
.map(|b| utils::u64_from_bytes(&b[size_of::<u64>()..]).unwrap())
.collect::<Vec<_>>()
);
*/
Ok ::< _ , Error > (())
};
2021-08-01 15:14:54 +02:00
for ( k , seventid ) in db . _db . open_tree ( "stateid_shorteventid" ) ? . iter () {
2021-08-12 23:04:00 +02:00
let sstatehash = utils ::u64_from_bytes ( & k [ 0 .. size_of ::< u64 > ()])
. expect ( "number of bytes is correct" );
2021-08-01 15:14:54 +02:00
let sstatekey = k [ size_of ::< u64 > () .. ]. to_vec ();
2021-08-12 23:04:00 +02:00
if Some ( sstatehash ) != current_sstatehash {
if let Some ( current_sstatehash ) = current_sstatehash {
handle_state (
current_sstatehash ,
2021-11-26 20:36:40 +01:00
current_room . as_deref (). unwrap (),
2021-08-12 23:04:00 +02:00
current_state ,
& mut last_roomstates ,
2021-08-01 15:14:54 +02:00
) ? ;
2021-08-12 23:04:00 +02:00
last_roomstates
. insert ( current_room . clone (). unwrap (), current_sstatehash );
2021-08-01 15:14:54 +02:00
}
current_state = HashSet ::new ();
2021-08-12 23:04:00 +02:00
current_sstatehash = Some ( sstatehash );
2021-08-01 15:14:54 +02:00
2022-10-05 20:34:31 +02:00
let event_id = db . shorteventid_eventid . get ( & seventid ). unwrap (). unwrap ();
2021-11-27 00:30:28 +01:00
let string = utils ::string_from_bytes ( & event_id ). unwrap ();
let event_id = <& EventId > ::try_from ( string . as_str ()). unwrap ();
2022-10-05 20:34:31 +02:00
let pdu = services ()
. rooms
. timeline
. get_pdu ( event_id )
. unwrap ()
. unwrap ();
2021-08-01 15:14:54 +02:00
if Some ( & pdu . room_id ) != current_room . as_ref () {
current_room = Some ( pdu . room_id . clone ());
}
}
let mut val = sstatekey ;
val . extend_from_slice ( & seventid );
2021-08-12 23:04:00 +02:00
current_state . insert ( val . try_into (). expect ( "size is correct" ));
}
if let Some ( current_sstatehash ) = current_sstatehash {
handle_state (
current_sstatehash ,
2021-11-26 20:36:40 +01:00
current_room . as_deref (). unwrap (),
2021-08-12 23:04:00 +02:00
current_state ,
& mut last_roomstates ,
) ? ;
2021-08-01 15:14:54 +02:00
}
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 7 ) ? ;
2021-08-01 15:14:54 +02:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 6 -> 7 finished" );
2021-08-01 15:14:54 +02:00
}
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 8 {
2021-08-01 15:14:54 +02:00
// Generate short room ids for all rooms
2022-10-05 12:45:54 +02:00
for ( room_id , _ ) in db . roomid_shortstatehash . iter () {
let shortroomid = services (). globals . next_count () ? . to_be_bytes ();
db . roomid_shortroomid . insert ( & room_id , & shortroomid ) ? ;
2022-02-03 20:52:41 +02:00
info! ( "Migration: 8" );
2021-08-01 15:14:54 +02:00
}
// Update pduids db layout
2022-10-05 12:45:54 +02:00
let mut batch = db . pduid_pdu . iter (). filter_map ( | ( key , v ) | {
2021-08-02 22:32:28 +02:00
if ! key . starts_with ( b "!" ) {
return None ;
}
2021-08-01 15:14:54 +02:00
let mut parts = key . splitn ( 2 , |& b | b == 0xff );
let room_id = parts . next (). unwrap ();
let count = parts . next (). unwrap ();
2021-08-02 22:32:28 +02:00
let short_room_id = db
. roomid_shortroomid
2021-09-13 19:45:56 +02:00
. get ( room_id )
2021-08-02 22:32:28 +02:00
. unwrap ()
. expect ( "shortroomid should exist" );
2021-08-01 15:14:54 +02:00
let mut new_key = short_room_id ;
new_key . extend_from_slice ( count );
2021-08-02 22:32:28 +02:00
Some (( new_key , v ))
});
2022-10-05 12:45:54 +02:00
db . pduid_pdu . insert_batch ( & mut batch ) ? ;
2021-08-02 22:32:28 +02:00
2022-10-05 12:45:54 +02:00
let mut batch2 = db . eventid_pduid . iter (). filter_map ( | ( k , value ) | {
2021-08-12 23:04:00 +02:00
if ! value . starts_with ( b "!" ) {
return None ;
2021-08-02 22:32:28 +02:00
}
2021-08-12 23:04:00 +02:00
let mut parts = value . splitn ( 2 , |& b | b == 0xff );
let room_id = parts . next (). unwrap ();
let count = parts . next (). unwrap ();
let short_room_id = db
. roomid_shortroomid
2021-09-13 19:45:56 +02:00
. get ( room_id )
2021-08-12 23:04:00 +02:00
. unwrap ()
. expect ( "shortroomid should exist" );
let mut new_value = short_room_id ;
new_value . extend_from_slice ( count );
Some (( k , new_value ))
});
2022-10-05 12:45:54 +02:00
db . eventid_pduid . insert_batch ( & mut batch2 ) ? ;
2021-08-01 15:14:54 +02:00
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 8 ) ? ;
2021-08-02 22:32:28 +02:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 7 -> 8 finished" );
2021-08-02 22:32:28 +02:00
}
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 9 {
2021-08-01 15:14:54 +02:00
// Update tokenids db layout
2021-08-31 21:20:03 +02:00
let mut iter = db
2021-08-21 14:24:10 +02:00
. tokenids
. iter ()
. filter_map ( | ( key , _ ) | {
if ! key . starts_with ( b "!" ) {
return None ;
}
let mut parts = key . splitn ( 4 , |& b | b == 0xff );
let room_id = parts . next (). unwrap ();
let word = parts . next (). unwrap ();
let _pdu_id_room = parts . next (). unwrap ();
let pdu_id_count = parts . next (). unwrap ();
2021-08-01 15:14:54 +02:00
2021-08-21 14:24:10 +02:00
let short_room_id = db
. roomid_shortroomid
2021-09-13 19:45:56 +02:00
. get ( room_id )
2021-08-21 14:24:10 +02:00
. unwrap ()
. expect ( "shortroomid should exist" );
let mut new_key = short_room_id ;
new_key . extend_from_slice ( word );
new_key . push ( 0xff );
new_key . extend_from_slice ( pdu_id_count );
Some (( new_key , Vec ::new ()))
})
2021-08-31 21:20:03 +02:00
. peekable ();
2021-08-02 22:32:28 +02:00
2021-08-21 14:22:21 +02:00
while iter . peek (). is_some () {
2022-10-05 20:34:31 +02:00
db . tokenids . insert_batch ( & mut iter . by_ref (). take ( 1000 )) ? ;
2022-12-18 07:47:18 +01:00
debug! ( "Inserted smaller batch" );
2021-08-21 14:22:21 +02:00
}
2021-08-02 22:32:28 +02:00
2022-02-03 20:52:41 +02:00
info! ( "Deleting starts" );
2021-08-21 14:22:21 +02:00
2021-10-13 11:51:30 +02:00
let batch2 : Vec < _ > = db
2021-08-21 14:24:10 +02:00
. tokenids
. iter ()
. filter_map ( | ( key , _ ) | {
if key . starts_with ( b "!" ) {
Some ( key )
} else {
None
}
})
2021-10-13 11:51:30 +02:00
. collect ();
2021-08-21 14:22:21 +02:00
for key in batch2 {
2022-10-05 12:45:54 +02:00
db . tokenids . remove ( & key ) ? ;
2021-08-01 15:14:54 +02:00
}
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 9 ) ? ;
2021-08-01 15:14:54 +02:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 8 -> 9 finished" );
2021-08-01 15:14:54 +02:00
}
2021-08-24 19:10:31 +02:00
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 10 {
2021-08-24 19:10:31 +02:00
// Add other direction for shortstatekeys
2022-10-05 12:45:54 +02:00
for ( statekey , shortstatekey ) in db . statekey_shortstatekey . iter () {
db . shortstatekey_statekey
2021-08-24 19:10:31 +02:00
. insert ( & shortstatekey , & statekey ) ? ;
}
2021-08-25 17:40:10 +02:00
// Force E2EE device list updates so we can send them over federation
2022-10-05 12:45:54 +02:00
for user_id in services (). users . iter (). filter_map ( | r | r . ok ()) {
2022-10-05 20:34:31 +02:00
services (). users . mark_device_key_update ( & user_id ) ? ;
2021-08-25 17:40:10 +02:00
}
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 10 ) ? ;
2021-08-24 19:10:31 +02:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 9 -> 10 finished" );
2021-08-24 19:10:31 +02:00
}
2021-12-14 17:55:28 +01:00
2022-10-05 12:45:54 +02:00
if services (). globals . database_version () ? < 11 {
2021-12-14 17:55:28 +01:00
db . _db
. open_tree ( "userdevicesessionid_uiaarequest" ) ?
. clear () ? ;
2022-10-05 12:45:54 +02:00
services (). globals . bump_database_version ( 11 ) ? ;
2021-12-14 17:55:28 +01:00
2022-02-03 20:52:41 +02:00
warn! ( "Migration: 10 -> 11 finished" );
2021-12-14 17:55:28 +01:00
}
2021-06-12 18:40:33 +02:00
2022-10-21 12:27:11 +00:00
if services (). globals . database_version () ? < 12 {
2023-02-07 15:26:34 +01:00
for username in services (). users . list_local_users () ? {
let user = match UserId ::parse_with_server_name (
username . clone (),
services (). globals . server_name (),
) {
Ok ( u ) => u ,
Err ( e ) => {
warn! ( "Invalid username {username}: {e}" );
continue ;
}
};
2022-10-21 12:27:11 +00:00
let raw_rules_list = services ()
. account_data
. get (
None ,
& user ,
GlobalAccountDataEventType ::PushRules . to_string (). into (),
)
. unwrap ()
. expect ( "Username is invalid" );
let mut account_data =
serde_json ::from_str ::< PushRulesEvent > ( raw_rules_list . get ()). unwrap ();
let rules_list = & mut account_data . content . global ;
//content rule
{
let content_rule_transformation =
[ ".m.rules.contains_user_name" , ".m.rule.contains_user_name" ];
let rule = rules_list . content . get ( content_rule_transformation [ 0 ]);
if rule . is_some () {
let mut rule = rule . unwrap (). clone ();
2022-12-21 10:42:12 +01:00
rule . rule_id = content_rule_transformation [ 1 ]. to_owned ();
2022-10-21 12:27:11 +00:00
rules_list . content . remove ( content_rule_transformation [ 0 ]);
rules_list . content . insert ( rule );
}
}
//underride rules
{
let underride_rule_transformation = [
[ ".m.rules.call" , ".m.rule.call" ],
[ ".m.rules.room_one_to_one" , ".m.rule.room_one_to_one" ],
[
".m.rules.encrypted_room_one_to_one" ,
".m.rule.encrypted_room_one_to_one" ,
],
[ ".m.rules.message" , ".m.rule.message" ],
[ ".m.rules.encrypted" , ".m.rule.encrypted" ],
];
for transformation in underride_rule_transformation {
let rule = rules_list . underride . get ( transformation [ 0 ]);
2022-11-03 13:12:53 +00:00
if let Some ( rule ) = rule {
let mut rule = rule . clone ();
2022-12-21 10:42:12 +01:00
rule . rule_id = transformation [ 1 ]. to_owned ();
2022-10-21 12:27:11 +00:00
rules_list . underride . remove ( transformation [ 0 ]);
rules_list . underride . insert ( rule );
}
}
}
services (). account_data . update (
None ,
& user ,
GlobalAccountDataEventType ::PushRules . to_string (). into (),
& serde_json ::to_value ( account_data ). expect ( "to json value always works" ),
) ? ;
}
services (). globals . bump_database_version ( 12 ) ? ;
warn! ( "Migration: 11 -> 12 finished" );
}
2023-02-26 17:57:44 +01:00
// This migration can be reused as-is anytime the server-default rules are updated.
if services (). globals . database_version () ? < 13 {
for username in services (). users . list_local_users () ? {
let user = match UserId ::parse_with_server_name (
username . clone (),
services (). globals . server_name (),
) {
Ok ( u ) => u ,
Err ( e ) => {
warn! ( "Invalid username {username}: {e}" );
continue ;
}
};
let raw_rules_list = services ()
. account_data
. get (
None ,
& user ,
GlobalAccountDataEventType ::PushRules . to_string (). into (),
)
. unwrap ()
. expect ( "Username is invalid" );
let mut account_data =
serde_json ::from_str ::< PushRulesEvent > ( raw_rules_list . get ()). unwrap ();
let user_default_rules = ruma ::push ::Ruleset ::server_default ( & user );
account_data
. content
. global
. update_with_server_default ( user_default_rules );
services (). account_data . update (
None ,
& user ,
GlobalAccountDataEventType ::PushRules . to_string (). into (),
& serde_json ::to_value ( account_data ). expect ( "to json value always works" ),
) ? ;
}
services (). globals . bump_database_version ( 13 ) ? ;
warn! ( "Migration: 12 -> 13 finished" );
}
2022-10-21 12:27:11 +00:00
assert_eq! (
services (). globals . database_version (). unwrap (),
latest_database_version
);
2022-02-03 20:52:41 +02:00
info! (
"Loaded {} database with version {}" ,
2022-10-05 20:34:31 +02:00
services (). globals . config . database_backend ,
latest_database_version
2022-02-03 20:52:41 +02:00
);
} else {
2022-10-05 12:45:54 +02:00
services ()
2022-02-03 20:52:41 +02:00
. globals
. bump_database_version ( latest_database_version ) ? ;
// Create the admin room and server user on first run
2022-10-05 12:45:54 +02:00
services (). admin . create_admin_room (). await ? ;
2022-02-03 20:52:41 +02:00
warn! (
"Created new {} database with version {}" ,
2022-10-05 20:34:31 +02:00
services (). globals . config . database_backend ,
latest_database_version
2022-02-03 20:52:41 +02:00
);
}
2021-07-14 07:07:08 +00:00
2021-05-12 20:04:28 +02:00
// This data is probably outdated
2022-10-05 12:45:54 +02:00
db . presenceid_presence . clear () ? ;
2021-07-14 07:07:08 +00:00
2022-10-08 13:57:01 +02:00
services (). admin . start_handler ();
2022-04-07 12:11:55 +00:00
// Set emergency access for the conduit user
2022-10-05 12:45:54 +02:00
match set_emergency_access () {
2022-04-07 12:11:55 +00:00
Ok ( pwd_set ) => {
if pwd_set {
warn! ( "The Conduit account emergency password is set! Please unset it as soon as you finish admin account recovery!" );
2022-10-05 12:45:54 +02:00
services (). admin . send_message ( RoomMessageEventContent ::text_plain ( "The Conduit account emergency password is set! Please unset it as soon as you finish admin account recovery!" ));
2022-04-07 12:11:55 +00:00
}
}
Err ( e ) => {
error! (
"Could not set the configured emergency password for the conduit user: {}" ,
e
)
}
};
2022-10-08 13:57:01 +02:00
services (). sending . start_handler ();
2022-10-05 18:36:12 +02:00
Self ::start_cleanup_task (). await ;
2023-07-29 20:01:38 +02:00
if services (). globals . allow_check_for_updates () {
Self ::start_check_for_updates_task ();
}
2021-07-14 07:07:08 +00:00
2022-10-05 12:45:54 +02:00
Ok (())
2020-03-30 13:46:18 +02:00
}
2020-07-27 17:36:54 +02:00
2021-07-29 08:36:01 +02:00
#[tracing::instrument(skip(self))]
2021-08-02 10:13:34 +02:00
pub fn flush ( & self ) -> Result < () > {
2021-07-14 07:07:08 +00:00
let start = std ::time ::Instant ::now ();
let res = self . _db . flush ();
2021-07-29 08:36:01 +02:00
debug! ( "flush: took {:?}" , start . elapsed ());
2021-07-14 07:07:08 +00:00
res
}
2022-10-05 18:36:12 +02:00
#[tracing::instrument]
2023-07-29 20:01:38 +02:00
pub fn start_check_for_updates_task () {
tokio ::spawn ( async move {
let timer_interval = Duration ::from_secs ( 60 * 60 );
let mut i = interval ( timer_interval );
loop {
i . tick (). await ;
let _ = Self ::try_handle_updates (). await ;
}
});
}
async fn try_handle_updates () -> Result < () > {
let response = services ()
. globals
. default_client ()
. get ( "https://conduit.rs/check-for-updates/stable" )
. send ()
. await ? ;
#[derive(Deserialize)]
struct CheckForUpdatesResponseEntry {
id : u64 ,
date : String ,
message : String ,
}
#[derive(Deserialize)]
struct CheckForUpdatesResponse {
updates : Vec < CheckForUpdatesResponseEntry > ,
}
2021-07-15 18:09:10 +02:00
2023-07-29 20:01:38 +02:00
let response = serde_json ::from_str ::< CheckForUpdatesResponse > ( & response . text (). await ? )
. map_err ( | _ | Error ::BadServerResponse ( "Bad version check response" )) ? ;
let mut last_update_id = services (). globals . last_check_for_updates_id () ? ;
for update in response . updates {
last_update_id = last_update_id . max ( update . id );
if update . id > services (). globals . last_check_for_updates_id () ? {
println! ( " {} " , update . message );
services ()
. admin
. send_message ( RoomMessageEventContent ::text_plain ( format! (
"@room: The following is a message from the Conduit developers. It was sent on ' {} ': \n\n {} " ,
update . date , update . message
)))
}
}
services ()
. globals
. update_check_for_updates_id ( last_update_id ) ? ;
Ok (())
}
#[tracing::instrument]
pub async fn start_cleanup_task () {
2021-07-15 18:09:10 +02:00
#[cfg(unix)]
use tokio ::signal ::unix ::{ signal , SignalKind };
2021-07-14 07:07:08 +00:00
2021-08-01 16:59:52 +02:00
use std ::time ::{ Duration , Instant };
2021-07-14 07:07:08 +00:00
2022-10-05 20:34:31 +02:00
let timer_interval =
Duration ::from_secs ( services (). globals . config . cleanup_second_interval as u64 );
2021-07-14 07:07:08 +00:00
tokio ::spawn ( async move {
let mut i = interval ( timer_interval );
2021-07-15 18:09:10 +02:00
#[cfg(unix)]
2021-07-14 07:07:08 +00:00
let mut s = signal ( SignalKind ::hangup ()). unwrap ();
loop {
2021-07-15 18:09:10 +02:00
#[cfg(unix)]
tokio ::select! {
2021-08-01 16:59:52 +02:00
_ = i . tick () => {
2022-12-18 07:47:18 +01:00
debug! ( "cleanup: Timer ticked" );
2021-07-14 07:07:08 +00:00
}
_ = s . recv () => {
2022-12-18 07:47:18 +01:00
debug! ( "cleanup: Received SIGHUP" );
2021-07-14 07:07:08 +00:00
}
};
2021-07-15 18:09:10 +02:00
#[cfg(not(unix))]
2021-08-01 16:59:52 +02:00
{
2021-07-15 18:09:10 +02:00
i . tick (). await ;
2022-12-18 07:47:18 +01:00
debug! ( "cleanup: Timer ticked" )
2021-07-15 18:09:10 +02:00
}
2021-08-01 16:59:52 +02:00
let start = Instant ::now ();
2022-10-05 15:33:57 +02:00
if let Err ( e ) = services (). globals . cleanup () {
2022-01-09 16:44:44 +01:00
error! ( "cleanup: Errored: {}" , e );
2021-07-14 07:07:08 +00:00
} else {
2022-12-18 07:47:18 +01:00
debug! ( "cleanup: Finished in {:?}" , start . elapsed ());
2021-07-14 07:07:08 +00:00
}
}
});
}
}
2022-04-07 12:11:55 +00:00
/// Sets the emergency password and push rules for the @conduit account in case emergency password is set
2022-10-05 12:45:54 +02:00
fn set_emergency_access () -> Result < bool > {
let conduit_user = UserId ::parse_with_server_name ( "conduit" , services (). globals . server_name ())
2022-04-07 12:11:55 +00:00
. expect ( "@conduit:server_name is a valid UserId" );
2022-10-05 20:34:31 +02:00
services (). users . set_password (
& conduit_user ,
services (). globals . emergency_password (). as_deref (),
) ? ;
2022-04-07 12:11:55 +00:00
2022-10-05 12:45:54 +02:00
let ( ruleset , res ) = match services (). globals . emergency_password () {
2022-04-07 12:11:55 +00:00
Some ( _ ) => ( Ruleset ::server_default ( & conduit_user ), Ok ( true )),
None => ( Ruleset ::new (), Ok ( false )),
};
2022-10-05 12:45:54 +02:00
services (). account_data . update (
2022-04-07 12:11:55 +00:00
None ,
& conduit_user ,
2022-01-18 16:53:25 +01:00
GlobalAccountDataEventType ::PushRules . to_string (). into (),
2022-10-05 15:33:57 +02:00
& serde_json ::to_value ( & GlobalAccountDataEvent {
2022-04-07 12:11:55 +00:00
content : PushRulesEventContent { global : ruleset },
2022-10-05 20:34:31 +02:00
})
. expect ( "to json value always works" ),
2022-04-07 12:11:55 +00:00
) ? ;
res
}