use rocksdb caches for a few of the lru_caches

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-04-08 01:29:52 -07:00 committed by June
parent fc44ba6ab3
commit 345be5ba5e
10 changed files with 133 additions and 192 deletions

View file

@ -1,9 +1,12 @@
use std::sync::{atomic::AtomicU32, Arc};
use std::{
collections::HashMap,
sync::{atomic::AtomicU32, Arc},
};
use chrono::{DateTime, Utc};
use rust_rocksdb::{
backup::{BackupEngine, BackupEngineOptions},
DBWithThreadMode as Db, MultiThreaded,
Cache, ColumnFamilyDescriptor, DBCommon, DBWithThreadMode as Db, Env, MultiThreaded, Options,
};
use tracing::{debug, error, info, warn};
@ -20,11 +23,11 @@ use super::watchers;
pub(crate) struct Engine {
rocks: Db<MultiThreaded>,
row_cache: rust_rocksdb::Cache,
col_cache: rust_rocksdb::Cache,
row_cache: Cache,
col_cache: HashMap<String, Cache>,
old_cfs: Vec<String>,
opts: rust_rocksdb::Options,
env: rust_rocksdb::Env,
opts: Options,
env: Env,
config: Config,
corks: AtomicU32,
}
@ -32,14 +35,17 @@ pub(crate) struct Engine {
impl KeyValueDatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Self> {
let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0;
let row_cache_capacity_bytes = (cache_capacity_bytes * 0.25) as usize;
let col_cache_capacity_bytes = (cache_capacity_bytes * 0.75) as usize;
let row_cache_capacity_bytes = (cache_capacity_bytes * 0.50) as usize;
let col_cache_capacity_bytes = (cache_capacity_bytes * 0.50) as usize;
let db_env = rust_rocksdb::Env::new()?;
let row_cache = rust_rocksdb::Cache::new_lru_cache(row_cache_capacity_bytes);
let col_cache = rust_rocksdb::Cache::new_lru_cache(col_cache_capacity_bytes);
let db_opts = db_options(config, &db_env, &row_cache, &col_cache);
let mut col_cache = HashMap::new();
col_cache.insert("primary".to_owned(), Cache::new_lru_cache(col_cache_capacity_bytes));
let db_env = Env::new()?;
let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes);
let db_opts = db_options(config, &db_env, &row_cache, col_cache.get("primary").expect("cache"));
let load_time = std::time::Instant::now();
if config.rocksdb_repair {
warn!("Starting database repair. This may take a long time...");
if let Err(e) = Db::<MultiThreaded>::repair(&db_opts, &config.database_path) {
@ -53,7 +59,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
debug!("Opening {} column family descriptors in database", cfs.len());
let cfds = cfs
.iter()
.map(|name| rust_rocksdb::ColumnFamilyDescriptor::new(name, cf_options(name, db_opts.clone(), config)))
.map(|name| ColumnFamilyDescriptor::new(name, cf_options(config, name, db_opts.clone(), &mut col_cache)))
.collect::<Vec<_>>();
debug!("Opening database...");
@ -63,7 +69,11 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
Db::<MultiThreaded>::open_cf_descriptors(&db_opts, &config.database_path, cfds)?
};
info!("Opened database at sequence number {}", db.latest_sequence_number());
info!(
"Opened database at sequence number {} in {:?}",
db.latest_sequence_number(),
load_time.elapsed()
);
Ok(Arc::new(Engine {
rocks: db,
row_cache,
@ -91,13 +101,13 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
}
fn flush(&self) -> Result<()> {
rust_rocksdb::DBCommon::flush_wal(&self.rocks, false)?;
DBCommon::flush_wal(&self.rocks, false)?;
Ok(())
}
fn sync(&self) -> Result<()> {
rust_rocksdb::DBCommon::flush_wal(&self.rocks, true)?;
DBCommon::flush_wal(&self.rocks, true)?;
Ok(())
}
@ -119,31 +129,34 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
}
fn memory_usage(&self) -> Result<String> {
let stats = rust_rocksdb::perf::get_memory_usage_stats(
Some(&[&self.rocks]),
Some(&[&self.row_cache, &self.col_cache]),
)?;
Ok(format!(
"Approximate memory usage of all the mem-tables: {:.3} MB\nApproximate memory usage of un-flushed \
mem-tables: {:.3} MB\nApproximate memory usage of all the table readers: {:.3} MB\nApproximate memory \
usage by cache: {:.3} MB\nApproximate memory usage by row cache: {:.3} MB pinned: {:.3} MB\nApproximate \
memory usage by column cache: {:.3} MB pinned: {:.3} MB\n",
stats.mem_table_total as f64 / 1024.0 / 1024.0,
stats.mem_table_unflushed as f64 / 1024.0 / 1024.0,
stats.mem_table_readers_total as f64 / 1024.0 / 1024.0,
stats.cache_total as f64 / 1024.0 / 1024.0,
self.row_cache.get_usage() as f64 / 1024.0 / 1024.0,
self.row_cache.get_pinned_usage() as f64 / 1024.0 / 1024.0,
self.col_cache.get_usage() as f64 / 1024.0 / 1024.0,
self.col_cache.get_pinned_usage() as f64 / 1024.0 / 1024.0,
))
let mut res = String::new();
let stats = rust_rocksdb::perf::get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.row_cache]))?;
_ = std::fmt::write(
&mut res,
format_args!(
"Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow cache: {:.2} MiB\n",
stats.mem_table_total as f64 / 1024.0 / 1024.0,
stats.mem_table_unflushed as f64 / 1024.0 / 1024.0,
stats.mem_table_readers_total as f64 / 1024.0 / 1024.0,
self.row_cache.get_usage() as f64 / 1024.0 / 1024.0,
),
);
for (name, cache) in &self.col_cache {
_ = std::fmt::write(
&mut res,
format_args!("{} cache: {:.2} MiB\n", name, cache.get_usage() as f64 / 1024.0 / 1024.0,),
);
}
Ok(res)
}
fn cleanup(&self) -> Result<()> {
debug!("Running flush_opt");
let flushoptions = rust_rocksdb::FlushOptions::default();
rust_rocksdb::DBCommon::flush_opt(&self.rocks, &flushoptions)?;
DBCommon::flush_opt(&self.rocks, &flushoptions)?;
Ok(())
}

View file

@ -1,5 +1,7 @@
#![allow(dead_code)]
use std::collections::HashMap;
use rust_rocksdb::{
BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Env, LogLevel, Options,
UniversalCompactOptions, UniversalCompactionStopStyle,
@ -43,6 +45,7 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache, col_cach
// Blocks
let mut table_opts = table_options(config);
table_opts.set_block_cache(col_cache);
opts.set_block_based_table_factory(&table_opts);
opts.set_row_cache(row_cache);
// Buffers
@ -75,7 +78,6 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache, col_cach
4_u8..=u8::MAX => unimplemented!(),
});
opts.set_block_based_table_factory(&table_opts);
opts.set_env(env);
opts
}
@ -83,7 +85,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache, col_cach
/// Adjust options for the specific column by name. Provide the result of
/// db_options() as the argument to this function and use the return value in
/// the arguments to open the specific column.
pub(crate) fn cf_options(name: &str, mut opts: Options, config: &Config) -> Options {
pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mut HashMap<String, Cache>) -> Options {
// Columns with non-default compaction options
match name {
"backupid_algorithm"
| "backupid_etag"
@ -94,7 +97,52 @@ pub(crate) fn cf_options(name: &str, mut opts: Options, config: &Config) -> Opti
| "shortstatekey_statekey"
| "shortstatehash_statediff"
| "userdevicetxnid_response"
| "userfilterid_filter" => set_for_sequential_small_uc(&mut opts, config),
| "userfilterid_filter" => set_for_sequential_small_uc(&mut opts, cfg),
&_ => {},
}
// Columns with non-default table/cache configs
match name {
"shorteventid_eventid" => set_table_with_new_cache(
&mut opts,
cfg,
cache,
name,
cache_size(cfg, cfg.shorteventid_cache_capacity, 64),
),
"eventid_shorteventid" => set_table_with_new_cache(
&mut opts,
cfg,
cache,
name,
cache_size(cfg, cfg.eventidshort_cache_capacity, 64),
),
"shorteventid_authchain" => {
set_table_with_new_cache(&mut opts, cfg, cache, name, cache_size(cfg, cfg.auth_chain_cache_capacity, 192));
},
"shortstatekey_statekey" => set_table_with_new_cache(
&mut opts,
cfg,
cache,
name,
cache_size(cfg, cfg.shortstatekey_cache_capacity, 1024),
),
"statekey_shortstatekey" => set_table_with_new_cache(
&mut opts,
cfg,
cache,
name,
cache_size(cfg, cfg.statekeyshort_cache_capacity, 1024),
),
"pduid_pdu" => set_table_with_new_cache(&mut opts, cfg, cache, name, cfg.pdu_cache_capacity as usize * 1536),
"eventid_outlierpdu" => set_table_with_shared_cache(&mut opts, cfg, cache, name, "pduid_pdu"),
&_ => {},
}
@ -220,6 +268,31 @@ fn uc_options(_config: &Config) -> UniversalCompactOptions {
opts
}
fn set_table_with_new_cache(
opts: &mut Options, config: &Config, cache: &mut HashMap<String, Cache>, name: &str, size: usize,
) {
cache.insert(name.to_owned(), Cache::new_lru_cache(size));
set_table_with_shared_cache(opts, config, cache, name, name);
}
fn set_table_with_shared_cache(
opts: &mut Options, config: &Config, cache: &HashMap<String, Cache>, _name: &str, cache_name: &str,
) {
let mut table = table_options(config);
table.set_block_cache(
cache
.get(cache_name)
.expect("existing cache to share with this column"),
);
opts.set_block_based_table_factory(&table);
}
fn cache_size(config: &Config, base_size: u32, entity_size: usize) -> usize {
let ents = f64::from(base_size) * config.conduit_cache_capacity_modifier;
ents as usize * entity_size
}
fn table_options(_config: &Config) -> BlockBasedOptions {
let mut opts = BlockBasedOptions::default();