Files
continuwuity/src/database/rocksdb/mod.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

276 lines
7.8 KiB
Rust
Raw Normal View History

2024-05-09 15:59:08 -07:00
// no_link to prevent double-inclusion of librocksdb.a here and with
// libconduit_core.so
#[no_link]
extern crate rust_rocksdb;
use std::{
collections::HashMap,
2024-05-23 01:27:04 -04:00
fmt::Write,
sync::{atomic::AtomicU32, Arc},
};
2021-10-16 15:19:25 +02:00
2024-03-20 00:03:07 -04:00
use chrono::{DateTime, Utc};
use rust_rocksdb::{
2024-03-19 04:37:35 -07:00
backup::{BackupEngine, BackupEngineOptions},
2024-05-09 15:59:08 -07:00
perf::get_memory_usage_stats,
Cache, ColumnFamilyDescriptor, DBCommon, DBWithThreadMode as Db, Env, MultiThreaded, Options,
};
use tracing::{debug, error, info, warn};
2024-05-09 15:59:08 -07:00
use crate::{watchers::Watchers, Config, KeyValueDatabaseEngine, KvTree, Result};
2024-04-06 08:48:41 -07:00
pub(crate) mod kvtree;
pub(crate) mod opts;
use kvtree::RocksDbEngineTree;
use opts::{cf_options, db_options};
use super::watchers;
2024-01-14 22:39:08 -05:00
pub(crate) struct Engine {
2024-05-09 15:59:08 -07:00
config: Config,
row_cache: Cache,
col_cache: HashMap<String, Cache>,
opts: Options,
env: Env,
2024-05-09 15:59:08 -07:00
old_cfs: Vec<String>,
rocks: Db<MultiThreaded>,
corks: AtomicU32,
2021-10-16 15:19:25 +02:00
}
2022-09-06 23:15:09 +02:00
impl KeyValueDatabaseEngine for Arc<Engine> {
2022-01-09 16:44:44 +01:00
fn open(config: &Config) -> Result<Self> {
2024-03-14 12:36:56 -07:00
let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0;
2024-05-04 09:45:37 -04:00
#[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)]
let row_cache_capacity_bytes = (cache_capacity_bytes * 0.50) as usize;
2024-05-04 09:45:37 -04:00
#[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)]
let col_cache_capacity_bytes = (cache_capacity_bytes * 0.50) as usize;
let mut col_cache = HashMap::new();
col_cache.insert("primary".to_owned(), Cache::new_lru_cache(col_cache_capacity_bytes));
2024-03-14 12:36:56 -07:00
let mut db_env = Env::new()?;
let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes);
let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache"));
2024-03-05 19:48:54 -05:00
let load_time = std::time::Instant::now();
if config.rocksdb_repair {
warn!("Starting database repair. This may take a long time...");
if let Err(e) = Db::<MultiThreaded>::repair(&db_opts, &config.database_path) {
error!("Repair failed: {:?}", e);
}
}
2024-04-06 08:48:41 -07:00
debug!("Listing column families in database");
let cfs = Db::<MultiThreaded>::list_cf(&db_opts, &config.database_path).unwrap_or_default();
2024-03-05 19:48:54 -05:00
2024-04-06 08:48:41 -07:00
debug!("Opening {} column family descriptors in database", cfs.len());
let cfds = cfs
.iter()
.map(|name| ColumnFamilyDescriptor::new(name, cf_options(config, name, db_opts.clone(), &mut col_cache)))
.collect::<Vec<_>>();
2024-04-06 08:48:41 -07:00
debug!("Opening database...");
let db = if config.rocksdb_read_only {
Db::<MultiThreaded>::open_cf_for_read_only(&db_opts, &config.database_path, cfs.clone(), false)?
} else {
Db::<MultiThreaded>::open_cf_descriptors(&db_opts, &config.database_path, cfds)?
};
info!(
"Opened database at sequence number {} in {:?}",
db.latest_sequence_number(),
load_time.elapsed()
);
2024-06-09 10:23:06 +00:00
Ok(Self::new(Engine {
2024-05-09 15:59:08 -07:00
config: config.clone(),
2024-03-14 12:36:56 -07:00
row_cache,
col_cache,
opts: db_opts,
env: db_env,
2024-05-09 15:59:08 -07:00
old_cfs: cfs,
rocks: db,
corks: AtomicU32::new(0),
2021-10-16 15:19:25 +02:00
}))
}
2024-03-05 19:48:54 -05:00
2022-09-06 23:15:09 +02:00
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
2021-10-16 15:19:25 +02:00
if !self.old_cfs.contains(&name.to_owned()) {
// Create if it didn't exist
debug!("Creating new column family in database: {}", name);
2024-05-23 01:27:04 -04:00
// TODO: the workaround for this needs to be extended to rocksdb caches, but i
// dont know that code to safely do that
#[allow(clippy::let_underscore_must_use)]
#[allow(clippy::let_underscore_untyped)] // attributes on expressions are experimental
let _ = self.rocks.create_cf(name, &self.opts);
2021-10-16 15:19:25 +02:00
}
2024-03-05 19:48:54 -05:00
2021-10-16 15:19:25 +02:00
Ok(Arc::new(RocksDbEngineTree {
name,
2024-06-09 10:23:06 +00:00
db: Self::clone(self),
2021-10-16 15:19:25 +02:00
watchers: Watchers::default(),
}))
}
2024-03-05 19:48:54 -05:00
2022-01-09 16:44:44 +01:00
fn flush(&self) -> Result<()> {
DBCommon::flush_wal(&self.rocks, false)?;
2024-03-06 18:02:19 -05:00
2021-10-16 15:19:25 +02:00
Ok(())
}
2024-03-05 19:48:54 -05:00
2024-03-19 09:26:46 -07:00
fn sync(&self) -> Result<()> {
DBCommon::flush_wal(&self.rocks, true)?;
2024-03-19 09:26:46 -07:00
Ok(())
}
fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 }
fn cork(&self) -> Result<()> {
2024-03-25 17:05:11 -04:00
self.corks
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn uncork(&self) -> Result<()> {
2024-03-25 17:05:11 -04:00
self.corks
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
2024-05-04 09:45:37 -04:00
#[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)]
fn memory_usage(&self) -> Result<String> {
let mut res = String::new();
2024-05-09 15:59:08 -07:00
let stats = get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.row_cache]))?;
2024-05-23 01:27:04 -04:00
writeln!(
res,
"Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow cache: {:.2} MiB",
stats.mem_table_total as f64 / 1024.0 / 1024.0,
stats.mem_table_unflushed as f64 / 1024.0 / 1024.0,
stats.mem_table_readers_total as f64 / 1024.0 / 1024.0,
self.row_cache.get_usage() as f64 / 1024.0 / 1024.0,
)
.expect("should be able to write to string buffer");
for (name, cache) in &self.col_cache {
2024-05-23 01:27:04 -04:00
writeln!(res, "{} cache: {:.2} MiB", name, cache.get_usage() as f64 / 1024.0 / 1024.0,)
.expect("should be able to write to string buffer");
}
Ok(res)
}
2024-03-05 19:48:54 -05:00
2024-03-06 18:02:19 -05:00
fn cleanup(&self) -> Result<()> {
debug!("Running flush_opt");
let flushoptions = rust_rocksdb::FlushOptions::default();
DBCommon::flush_opt(&self.rocks, &flushoptions)?;
2024-03-06 18:02:19 -05:00
Ok(())
}
2024-03-19 04:37:35 -07:00
fn backup(&self) -> Result<(), Box<dyn std::error::Error>> {
let path = self.config.database_backup_path.as_ref();
2024-03-20 00:23:46 -04:00
if path.is_none() || path.is_some_and(|path| path.as_os_str().is_empty()) {
2024-03-19 04:37:35 -07:00
return Ok(());
}
2024-03-20 00:03:07 -04:00
let options = BackupEngineOptions::new(path.unwrap())?;
2024-03-19 04:37:35 -07:00
let mut engine = BackupEngine::open(&options, &self.env)?;
2024-06-09 05:15:27 +00:00
if self.config.database_backups_to_keep > 0 {
if let Err(e) = engine.create_new_backup_flush(&self.rocks, true) {
return Err(Box::new(e));
2024-03-19 04:37:35 -07:00
}
let engine_info = engine.get_backup_info();
let info = &engine_info.last().unwrap();
info!(
"Created database backup #{} using {} bytes in {} files",
info.backup_id, info.size, info.num_files,
);
2024-06-09 05:15:27 +00:00
}
2024-03-19 04:37:35 -07:00
if self.config.database_backups_to_keep >= 0 {
let keep = u32::try_from(self.config.database_backups_to_keep)?;
2024-03-20 00:03:07 -04:00
if let Err(e) = engine.purge_old_backups(keep.try_into()?) {
error!("Failed to purge old backup: {:?}", e.to_string());
2024-03-19 04:37:35 -07:00
}
}
2024-06-09 05:15:27 +00:00
Ok(())
2024-03-19 04:37:35 -07:00
}
fn backup_list(&self) -> Result<String> {
let path = self.config.database_backup_path.as_ref();
2024-03-20 00:23:46 -04:00
if path.is_none() || path.is_some_and(|path| path.as_os_str().is_empty()) {
return Ok(
"Configure database_backup_path to enable backups, or the path specified is not valid".to_owned(),
);
2024-03-19 04:37:35 -07:00
}
let mut res = String::new();
2024-03-20 00:03:07 -04:00
let options = BackupEngineOptions::new(path.unwrap())?;
2024-03-19 04:37:35 -07:00
let engine = BackupEngine::open(&options, &self.env)?;
for info in engine.get_backup_info() {
2024-05-23 01:27:04 -04:00
writeln!(
res,
"#{} {}: {} bytes, {} files",
info.backup_id,
DateTime::<Utc>::from_timestamp(info.timestamp, 0)
.unwrap_or_default()
.to_rfc2822(),
info.size,
info.num_files,
2024-03-20 00:03:07 -04:00
)
2024-05-23 01:27:04 -04:00
.expect("should be able to write to string buffer");
2024-03-19 04:37:35 -07:00
}
Ok(res)
}
2024-03-22 19:37:36 -07:00
fn file_list(&self) -> Result<String> {
match self.rocks.live_files() {
Err(e) => Ok(String::from(e)),
Ok(files) => {
let mut res = String::new();
for file in files {
2024-05-23 01:27:04 -04:00
writeln!(
res,
"<code>L{} {:<13} {:7}+ {:4}- {:9}</code> {}<br>",
file.level, file.name, file.num_entries, file.num_deletions, file.size, file.column_family_name,
)
.expect("should be able to writeln to string buffer");
2024-03-22 19:37:36 -07:00
}
Ok(res)
},
}
}
2024-02-25 22:16:08 -05:00
// TODO: figure out if this is needed for rocksdb
#[allow(dead_code)]
fn clear_caches(&self) {}
2021-10-16 15:19:25 +02:00
}
2024-05-09 15:59:08 -07:00
impl Drop for Engine {
fn drop(&mut self) {
const BLOCKING: bool = true;
2024-06-02 00:22:48 +00:00
debug!("Waiting for background tasks to finish...");
2024-05-09 15:59:08 -07:00
self.rocks.cancel_all_background_work(BLOCKING);
debug!("Shutting down background threads");
self.env.set_high_priority_background_threads(0);
self.env.set_low_priority_background_threads(0);
self.env.set_bottom_priority_background_threads(0);
self.env.set_background_threads(0);
debug!("Joining background threads...");
self.env.join_all_threads();
}
}