diff --git a/src/abstract.rs b/src/abstract.rs index 666e96da..65cee716 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -4,15 +4,12 @@ use crate::{ blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType, - iter_guard::IterGuardImpl, level_manifest::LevelManifest, segment::Segment, - tree::inner::MemtableId, vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, - KvPair, Memtable, SegmentId, SeqNo, SequenceNumberCounter, Tree, TreeId, UserKey, UserValue, + iter_guard::IterGuardImpl, segment::Segment, tree::inner::MemtableId, version::Version, + vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SegmentId, + SeqNo, SequenceNumberCounter, Tree, TreeId, UserKey, UserValue, }; use enum_dispatch::enum_dispatch; -use std::{ - ops::RangeBounds, - sync::{Arc, RwLock, RwLockWriteGuard}, -}; +use std::{ops::RangeBounds, sync::Arc}; pub type RangeItem = crate::Result; @@ -30,7 +27,7 @@ pub trait AbstractTree { fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result>; #[doc(hidden)] - fn manifest(&self) -> &Arc>; + fn current_version(&self) -> Version; /// Synchronously flushes the active memtable to a disk segment. /// @@ -143,10 +140,6 @@ pub trait AbstractTree { #[cfg(feature = "metrics")] fn metrics(&self) -> &Arc; - // TODO:? - /* #[doc(hidden)] - fn verify(&self) -> crate::Result; */ - /// Synchronously flushes a memtable to a disk segment. /// /// This method will not make the segment immediately available, @@ -176,9 +169,6 @@ pub trait AbstractTree { seqno_threshold: SeqNo, ) -> crate::Result<()>; - /// Write-locks the active memtable for exclusive access - fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc>; - /// Clears the active memtable atomically. fn clear_active_memtable(&self); diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 248c91a4..2222815d 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -13,26 +13,20 @@ use crate::{ compaction::stream::CompactionStream, file::{fsync_directory, BLOBS_FOLDER}, iter_guard::{IterGuard, IterGuardImpl}, - level_manifest::LevelManifest, r#abstract::{AbstractTree, RangeItem}, segment::Segment, tree::inner::MemtableId, value::InternalValue, - vlog::{Accessor, BlobFile, BlobFileId, BlobFileWriter, ValueHandle}, + version::Version, + vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle}, Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue, }; use handle::BlobIndirection; -use std::{ - collections::BTreeMap, - io::Cursor, - ops::RangeBounds, - path::PathBuf, - sync::{Arc, RwLock}, -}; +use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc}; pub struct Guard<'a> { blob_tree: &'a BlobTree, - vlog: Arc>, + version: Version, kv: crate::Result, } @@ -42,26 +36,30 @@ impl IterGuard for Guard<'_> { } fn size(self) -> crate::Result { - let mut cursor = Cursor::new(self.kv?.value); - Ok(BlobIndirection::decode_from(&mut cursor)?.size) + let kv = self.kv?; + + if kv.key.value_type.is_indirection() { + let mut cursor = Cursor::new(kv.value); + Ok(BlobIndirection::decode_from(&mut cursor)?.size) + } else { + // NOTE: We know that values are u32 max length + #[allow(clippy::cast_possible_truncation)] + Ok(kv.value.len() as u32) + } } fn into_inner(self) -> crate::Result<(UserKey, UserValue)> { - resolve_value_handle(self.blob_tree, &self.vlog, self.kv?) + resolve_value_handle(self.blob_tree, &self.version, self.kv?) } } -fn resolve_value_handle( - tree: &BlobTree, - vlog: &BTreeMap, - item: InternalValue, -) -> RangeItem { +fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem { if item.key.value_type.is_indirection() { let mut cursor = Cursor::new(item.value); let vptr = BlobIndirection::decode_from(&mut cursor)?; // Resolve indirection using value log - match Accessor::new(vlog).get( + match Accessor::new(&version.value_log).get( tree.id(), &tree.blobs_folder, &item.key.user_key, @@ -75,10 +73,10 @@ fn resolve_value_handle( } Ok(None) => { panic!( - "value handle ({:?} => {:?}) did not match any blob - this is a bug", - String::from_utf8_lossy(&item.key.user_key), - vptr.vhandle, - ) + "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}", + item.key.user_key, vptr.vhandle, + version.id(), + ); } Err(e) => Err(e), } @@ -112,9 +110,6 @@ impl BlobTree { fsync_directory(&blobs_folder)?; let blob_file_id_to_continue_with = index - .manifest() - .read() - .expect("lock is poisoned") .current_version() .value_log .values() @@ -148,8 +143,8 @@ impl AbstractTree for BlobTree { self.index.get_internal_entry(key, seqno) } - fn manifest(&self) -> &Arc> { - self.index.manifest() + fn current_version(&self) -> Version { + self.index.current_version() } fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result> { @@ -191,12 +186,7 @@ impl AbstractTree for BlobTree { let range = prefix_to_range(prefix.as_ref()); - let version = self - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .clone(); + let version = self.current_version(); Box::new( self.index @@ -204,7 +194,7 @@ impl AbstractTree for BlobTree { .map(move |kv| { IterGuardImpl::Blob(Guard { blob_tree: self, - vlog: version.value_log.clone(), // TODO: PERF: ugly Arc clone + version: version.clone(), // TODO: PERF: ugly Arc clone kv, }) }), @@ -217,12 +207,7 @@ impl AbstractTree for BlobTree { seqno: SeqNo, index: Option>, ) -> Box> + '_> { - let version = self - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .clone(); + let version = self.current_version(); // TODO: PERF: ugly Arc clone Box::new( @@ -231,7 +216,7 @@ impl AbstractTree for BlobTree { .map(move |kv| { IterGuardImpl::Blob(Guard { blob_tree: self, - vlog: version.value_log.clone(), // TODO: PERF: ugly Arc clone + version: version.clone(), // TODO: PERF: ugly Arc clone kv, }) }), @@ -335,11 +320,7 @@ impl AbstractTree for BlobTree { } fn blob_file_count(&self) -> usize { - self.manifest() - .read() - .expect("lock is poisoned") - .current_version() - .blob_file_count() + self.current_version().blob_file_count() } // NOTE: We skip reading from the value log @@ -363,12 +344,7 @@ impl AbstractTree for BlobTree { } fn stale_blob_bytes(&self) -> u64 { - self.manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .stale_bytes() + self.current_version().gc_stats().stale_bytes() } fn filter_size(&self) -> usize { @@ -520,10 +496,6 @@ impl AbstractTree for BlobTree { .register_segments(segments, blob_files, frag_map, seqno_threshold) } - fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc> { - self.index.lock_active_memtable() - } - fn set_active_memtable(&self, memtable: Memtable) { self.index.set_active_memtable(memtable); } @@ -595,8 +567,7 @@ impl AbstractTree for BlobTree { } fn disk_space(&self) -> u64 { - let lock = self.manifest().read().expect("lock is poisoned"); - let version = lock.current_version(); + let version = self.current_version(); let vlog = crate::vlog::Accessor::new(&version.value_log); self.index.disk_space() + vlog.disk_space() } @@ -628,9 +599,9 @@ impl AbstractTree for BlobTree { return Ok(None); }; - let lock = self.manifest().read().expect("lock is poisoned"); - let version = lock.current_version(); - let (_, v) = resolve_value_handle(self, &version.value_log, item)?; + let version = self.current_version(); + let (_, v) = resolve_value_handle(self, &version, item)?; + Ok(Some(v)) } diff --git a/src/compaction/drop_range.rs b/src/compaction/drop_range.rs index 7374804a..d1c40f97 100644 --- a/src/compaction/drop_range.rs +++ b/src/compaction/drop_range.rs @@ -3,9 +3,9 @@ // (found in the LICENSE-* files in the repository) use super::{Choice, CompactionStrategy}; -use crate::{ - config::Config, level_manifest::LevelManifest, slice::Slice, version::run::Ranged, KeyRange, -}; +use crate::compaction::state::CompactionState; +use crate::version::Version; +use crate::{config::Config, slice::Slice, version::run::Ranged, KeyRange}; use crate::{HashSet, Segment}; use std::ops::{Bound, RangeBounds}; @@ -73,9 +73,8 @@ impl CompactionStrategy for Strategy { "DropRangeCompaction" } - fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice { - let segment_ids: HashSet<_> = levels - .current_version() + fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice { + let segment_ids: HashSet<_> = version .iter_levels() .flat_map(|lvl| lvl.iter()) .flat_map(|run| { @@ -93,7 +92,7 @@ impl CompactionStrategy for Strategy { // But just as a fail-safe... let some_hidden = segment_ids .iter() - .any(|&id| levels.hidden_set().is_hidden(id)); + .any(|&id| state.hidden_set().is_hidden(id)); if some_hidden { Choice::DoNothing diff --git a/src/compaction/fifo.rs b/src/compaction/fifo.rs index 0d6cc78b..de2d873f 100644 --- a/src/compaction/fifo.rs +++ b/src/compaction/fifo.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use super::{Choice, CompactionStrategy}; -use crate::{config::Config, level_manifest::LevelManifest, HashSet}; +use crate::{compaction::state::CompactionState, config::Config, version::Version, HashSet}; /// FIFO-style compaction /// @@ -45,13 +45,18 @@ impl CompactionStrategy for Strategy { } // TODO: TTL - fn choose(&self, levels: &LevelManifest, _config: &Config) -> Choice { + fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice { // NOTE: We always have at least one level #[allow(clippy::expect_used)] - let first_level = levels.as_slice().first().expect("should have first level"); + let first_level = version.l0(); assert!(first_level.is_disjoint(), "L0 needs to be disjoint"); + assert!( + !version.level_is_busy(0, state.hidden_set()), + "FIFO compaction never compacts", + ); + let l0_size = first_level.size(); if l0_size > self.limit { diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index c3d9e4f4..c03f8d32 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -4,11 +4,12 @@ use std::time::Instant; use crate::blob_tree::handle::BlobIndirection; use crate::blob_tree::FragmentationMap; use crate::coding::{Decode, Encode}; +use crate::compaction::state::CompactionState; use crate::compaction::worker::Options; use crate::compaction::Input as CompactionPayload; use crate::file::SEGMENTS_FOLDER; -use crate::level_manifest::LevelManifest; use crate::segment::multi_writer::MultiWriter; +use crate::tree::inner::SuperVersion; use crate::version::Version; use crate::vlog::{BlobFileId, BlobFileMergeScanner, BlobFileWriter}; use crate::{BlobFile, HashSet, InternalValue, Segment}; @@ -81,9 +82,11 @@ pub(super) fn prepare_table_writer( pub(super) trait CompactionFlavour { fn write(&mut self, item: InternalValue) -> crate::Result<()>; + #[warn(clippy::too_many_arguments)] fn finish( self: Box, - levels: &mut LevelManifest, + super_version: &mut SuperVersion, + state: &mut CompactionState, opts: &Options, payload: &CompactionPayload, dst_lvl: usize, @@ -214,7 +217,8 @@ impl CompactionFlavour for RelocatingCompaction { fn finish( mut self: Box, - levels: &mut LevelManifest, + super_version: &mut SuperVersion, + state: &mut CompactionState, opts: &Options, payload: &CompactionPayload, dst_lvl: usize, @@ -232,14 +236,15 @@ impl CompactionFlavour for RelocatingCompaction { let mut blob_file_ids_to_drop = self.rewriting_blob_file_ids; - for blob_file in levels.current_version().value_log.values() { - if blob_file.is_dead(levels.current_version().gc_stats()) { + for blob_file in super_version.version.value_log.values() { + if blob_file.is_dead(super_version.version.gc_stats()) { blob_file_ids_to_drop.insert(blob_file.id()); self.rewriting_blob_files.push(blob_file.clone()); } } - levels.atomic_swap( + state.upgrade_version( + super_version, |current| { Ok(current.with_merge( &payload.segment_ids.iter().copied().collect::>(), @@ -333,7 +338,8 @@ impl CompactionFlavour for StandardCompaction { fn finish( mut self: Box, - levels: &mut LevelManifest, + super_version: &mut SuperVersion, + state: &mut CompactionState, opts: &Options, payload: &CompactionPayload, dst_lvl: usize, @@ -347,13 +353,14 @@ impl CompactionFlavour for StandardCompaction { let mut blob_files_to_drop = Vec::default(); - for blob_file in levels.current_version().value_log.values() { - if blob_file.is_dead(levels.current_version().gc_stats()) { + for blob_file in super_version.version.value_log.values() { + if blob_file.is_dead(super_version.version.gc_stats()) { blob_files_to_drop.push(blob_file.clone()); } } - levels.atomic_swap( + state.upgrade_version( + super_version, |current| { Ok(current.with_merge( &payload.segment_ids.iter().copied().collect::>(), diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index ae50df5a..d40f428d 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -4,11 +4,11 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput}; use crate::{ + compaction::state::{hidden_set::HiddenSet, CompactionState}, config::Config, - level_manifest::{hidden_set::HiddenSet, LevelManifest}, segment::Segment, slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt}, - version::{run::Ranged, Run}, + version::{run::Ranged, Run, Version}, HashSet, KeyRange, SegmentId, }; @@ -189,27 +189,25 @@ impl CompactionStrategy for Strategy { } #[allow(clippy::too_many_lines)] - fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice { - assert!(levels.as_slice().len() == 7, "should have exactly 7 levels"); + fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice { + assert!(version.level_count() == 7, "should have exactly 7 levels"); // Find the level that corresponds to L1 #[allow(clippy::map_unwrap_or)] - let mut canonical_l1_idx = levels - .as_slice() - .iter() + let mut canonical_l1_idx = version + .iter_levels() .enumerate() .skip(1) .find(|(_, lvl)| !lvl.is_empty()) .map(|(idx, _)| idx) - .unwrap_or_else(|| usize::from(levels.last_level_index())); + .unwrap_or_else(|| version.level_count() - 1); // Number of levels we have to shift to get from the actual level idx to the canonical let mut level_shift = canonical_l1_idx - 1; - if canonical_l1_idx > 1 && levels.as_slice().iter().skip(1).any(|lvl| !lvl.is_empty()) { - let need_new_l1 = levels - .as_slice() - .iter() + if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) { + let need_new_l1 = version + .iter_levels() .enumerate() .skip(1) .filter(|(_, lvl)| !lvl.is_empty()) @@ -219,7 +217,7 @@ impl CompactionStrategy for Strategy { .flat_map(|x| x.iter()) // NOTE: Take bytes that are already being compacted into account, // otherwise we may be overcompensating - .filter(|x| !levels.hidden_set().is_hidden(x.id())) + .filter(|x| !state.hidden_set().is_hidden(x.id())) .map(Segment::file_size) .sum::(); @@ -243,7 +241,7 @@ impl CompactionStrategy for Strategy { // NOTE: We always have at least one level #[allow(clippy::expect_used)] - let first_level = levels.as_slice().first().expect("first level should exist"); + let first_level = version.l0(); // TODO: use run_count instead? but be careful because of version free list GC thingy if first_level.segment_count() >= usize::from(self.l0_threshold) { @@ -252,7 +250,7 @@ impl CompactionStrategy for Strategy { } // Score L1+ - for (idx, level) in levels.as_slice().iter().enumerate().skip(1) { + for (idx, level) in version.iter_levels().enumerate().skip(1) { if level.is_empty() { continue; } @@ -262,7 +260,7 @@ impl CompactionStrategy for Strategy { .flat_map(|x| x.iter()) // NOTE: Take bytes that are already being compacted into account, // otherwise we may be overcompensating - .filter(|x| !levels.hidden_set().is_hidden(x.id())) + .filter(|x| !state.hidden_set().is_hidden(x.id())) .map(Segment::file_size) .sum::(); @@ -277,9 +275,8 @@ impl CompactionStrategy for Strategy { ); // NOTE: Force a trivial move - if levels - .as_slice() - .get(idx + 1) + if version + .level(idx + 1) .is_some_and(|next_level| next_level.is_empty()) { scores[idx] = (99.99, 999); @@ -313,15 +310,17 @@ impl CompactionStrategy for Strategy { // We choose L0->L1 compaction if level_idx_with_highest_score == 0 { - let Some(first_level) = levels.current_version().level(0) else { + let Some(first_level) = version.level(0) else { return Choice::DoNothing; }; - if levels.level_is_busy(0) || levels.level_is_busy(canonical_l1_idx) { + if version.level_is_busy(0, state.hidden_set()) + || version.level_is_busy(canonical_l1_idx, state.hidden_set()) + { return Choice::DoNothing; } - let Some(target_level) = &levels.current_version().level(canonical_l1_idx) else { + let Some(target_level) = &version.level(canonical_l1_idx) else { return Choice::DoNothing; }; @@ -365,11 +364,11 @@ impl CompactionStrategy for Strategy { let next_level_index = curr_level_index + 1; - let Some(level) = levels.current_version().level(level_idx_with_highest_score) else { + let Some(level) = version.level(level_idx_with_highest_score) else { return Choice::DoNothing; }; - let Some(next_level) = levels.current_version().level(next_level_index as usize) else { + let Some(next_level) = version.level(next_level_index as usize) else { return Choice::DoNothing; }; @@ -379,7 +378,7 @@ impl CompactionStrategy for Strategy { let Some((segment_ids, can_trivial_move)) = pick_minimal_compaction( level.first_run().expect("should have exactly one run"), next_level.first_run().map(std::ops::Deref::deref), - levels.hidden_set(), + state.hidden_set(), overshoot_bytes, u64::from(self.target_size), ) else { diff --git a/src/compaction/major.rs b/src/compaction/major.rs index 6cb79e14..580900cc 100644 --- a/src/compaction/major.rs +++ b/src/compaction/major.rs @@ -3,7 +3,9 @@ // (found in the LICENSE-* files in the repository) use super::{Choice, CompactionStrategy, Input as CompactionInput}; -use crate::{config::Config, level_manifest::LevelManifest, segment::Segment, HashSet}; +use crate::{ + compaction::state::CompactionState, config::Config, segment::Segment, version::Version, HashSet, +}; /// Compacts all segments into the last level pub struct Strategy { @@ -37,23 +39,25 @@ impl CompactionStrategy for Strategy { "MajorCompaction" } - fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice { - let segment_ids: HashSet<_> = levels.iter().map(Segment::id).collect(); + fn choose(&self, version: &Version, cfg: &Config, state: &CompactionState) -> Choice { + let segment_ids: HashSet<_> = version.iter_segments().map(Segment::id).collect(); // NOTE: This should generally not occur because of the // tree-level major compaction lock // But just as a fail-safe... let some_hidden = segment_ids .iter() - .any(|&id| levels.hidden_set().is_hidden(id)); + .any(|&id| state.hidden_set().is_hidden(id)); if some_hidden { Choice::DoNothing } else { + let last_level_idx = cfg.level_count - 1; + Choice::Merge(CompactionInput { segment_ids, - dest_level: levels.last_level_index(), - canonical_level: levels.last_level_index(), + dest_level: last_level_idx, + canonical_level: last_level_idx, target_size: self.target_size, }) } diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index f69938e4..f8720d4b 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -12,6 +12,7 @@ mod flavour; pub(crate) mod major; pub(crate) mod movedown; pub(crate) mod pulldown; +pub(crate) mod state; pub(crate) mod stream; pub(crate) mod tiered; pub(crate) mod worker; @@ -20,7 +21,9 @@ pub use fifo::Strategy as Fifo; pub use leveled::Strategy as Leveled; pub use tiered::Strategy as SizeTiered; -use crate::{config::Config, level_manifest::LevelManifest, HashSet, SegmentId}; +use crate::{ + compaction::state::CompactionState, config::Config, version::Version, HashSet, SegmentId, +}; /// Alias for `Leveled` pub type Levelled = Leveled; @@ -83,5 +86,5 @@ pub trait CompactionStrategy { fn get_name(&self) -> &'static str; /// Decides on what to do based on the current state of the LSM-tree's levels - fn choose(&self, _: &LevelManifest, config: &Config) -> Choice; + fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice; } diff --git a/src/compaction/movedown.rs b/src/compaction/movedown.rs index 5a78b68c..e7f2ec76 100644 --- a/src/compaction/movedown.rs +++ b/src/compaction/movedown.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use super::{Choice, CompactionStrategy, Input}; -use crate::{level_manifest::LevelManifest, segment::Segment, Config}; +use crate::{compaction::state::CompactionState, segment::Segment, version::Version, Config}; /// Moves down a level into the destination level. pub struct Strategy(pub u8, pub u8); @@ -14,12 +14,12 @@ impl CompactionStrategy for Strategy { } #[allow(clippy::expect_used)] - fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice { - if levels.level_is_busy(usize::from(self.0)) { + fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice { + if version.level_is_busy(usize::from(self.0), state.hidden_set()) { return Choice::DoNothing; } - let Some(level) = levels.as_slice().get(self.0 as usize) else { + let Some(level) = version.level(self.0.into()) else { return Choice::DoNothing; }; diff --git a/src/compaction/pulldown.rs b/src/compaction/pulldown.rs index d88ffd2a..af838be7 100644 --- a/src/compaction/pulldown.rs +++ b/src/compaction/pulldown.rs @@ -2,8 +2,8 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use super::{Choice, CompactionStrategy, Input}; -use crate::{level_manifest::LevelManifest, segment::Segment, Config, HashSet}; +use super::{Choice, CompactionStrategy}; +use crate::{compaction::state::CompactionState, version::Version, Config}; /// Pulls down and merges a level into the destination level. /// @@ -16,7 +16,7 @@ impl CompactionStrategy for Strategy { } #[allow(clippy::expect_used)] - fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice { + fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice { todo!() } } diff --git a/src/level_manifest/hidden_set.rs b/src/compaction/state/hidden_set.rs similarity index 76% rename from src/level_manifest/hidden_set.rs rename to src/compaction/state/hidden_set.rs index 90615e7a..1f07ae69 100644 --- a/src/level_manifest/hidden_set.rs +++ b/src/compaction/state/hidden_set.rs @@ -1,3 +1,7 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + use crate::SegmentId; /// The hidden set keeps track of which segments are currently being compacted @@ -33,4 +37,11 @@ impl HiddenSet { pub(crate) fn is_empty(&self) -> bool { self.set.is_empty() } + + pub(crate) fn should_decline_compaction>( + &self, + ids: T, + ) -> bool { + self.is_blocked(ids) + } } diff --git a/src/compaction/state/mod.rs b/src/compaction/state/mod.rs new file mode 100644 index 00000000..9bc304c1 --- /dev/null +++ b/src/compaction/state/mod.rs @@ -0,0 +1,203 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +pub mod hidden_set; + +use crate::{ + file::{fsync_directory, rewrite_atomic}, + tree::inner::SuperVersion, + version::Version, + SeqNo, +}; +use hidden_set::HiddenSet; +use std::{ + collections::VecDeque, + io::BufWriter, + path::{Path, PathBuf}, +}; + +pub fn persist_version(folder: &Path, version: &Version) -> crate::Result<()> { + log::trace!( + "Persisting version {} in {}", + version.id(), + folder.display(), + ); + + let path = folder.join(format!("v{}", version.id())); + let file = std::fs::File::create_new(path)?; + let writer = BufWriter::new(file); + let mut writer = sfa::Writer::into_writer(writer); + + version.encode_into(&mut writer)?; + + writer.finish().map_err(|e| match e { + sfa::Error::Io(e) => crate::Error::from(e), + _ => unreachable!(), + })?; + + // IMPORTANT: fsync folder on Unix + fsync_directory(folder)?; + + rewrite_atomic(&folder.join("current"), &version.id().to_le_bytes())?; + + Ok(()) +} + +pub struct CompactionState { + /// Path of tree folder. + folder: PathBuf, + + /// Set of segment IDs that are masked. + /// + /// While consuming segments (because of compaction) they will not appear in the list of segments + /// as to not cause conflicts between multiple compaction threads (compacting the same segments). + hidden_set: HiddenSet, + + /// Holds onto versions until they are safe to drop. + version_free_list: VecDeque, +} + +impl CompactionState { + pub fn new(folder: impl Into) -> Self { + Self { + folder: folder.into(), + hidden_set: HiddenSet::default(), + version_free_list: VecDeque::default(), + } + } + + pub fn create_new(folder: impl Into) -> crate::Result { + let folder = folder.into(); + + persist_version(&folder, &Version::new(0))?; + + Ok(Self::new(folder)) + } + + /// Modifies the level manifest atomically. + /// + /// The function accepts a transition function that receives the current version + /// and returns a new version. + /// + /// The function takes care of persisting the version changes on disk. + pub(crate) fn upgrade_version crate::Result>( + &mut self, + super_version: &mut SuperVersion, + f: F, + gc_watermark: SeqNo, + ) -> crate::Result<()> { + // NOTE: Copy-on-write... + // + // Create a copy of the levels we can operate on + // without mutating the current level manifest + // If persisting to disk fails, this way the level manifest + // is unchanged + let next_version = f(&super_version.version)?; + + persist_version(&self.folder, &next_version)?; + + let mut old_version = std::mem::replace(&mut super_version.version, next_version); + old_version.seqno_watermark = gc_watermark; + + self.push_old_version(old_version); + + Ok(()) + } + + fn push_old_version(&mut self, version: Version) { + self.version_free_list.push_back(version); + } + + pub fn version_free_list_len(&self) -> usize { + self.version_free_list.len() + } + + pub fn hidden_set(&self) -> &HiddenSet { + &self.hidden_set + } + + pub fn hidden_set_mut(&mut self) -> &mut HiddenSet { + &mut self.hidden_set + } + + pub(crate) fn maintenance(&mut self, gc_watermark: SeqNo) -> crate::Result<()> { + log::debug!("Running manifest GC"); + + loop { + let Some(head) = self.version_free_list.front() else { + break; + }; + + if head.seqno_watermark < gc_watermark { + let path = self.folder.join(format!("v{}", head.id())); + std::fs::remove_file(path)?; + self.version_free_list.pop_front(); + } else { + break; + } + } + + log::debug!("Manifest GC done"); + + Ok(()) + } +} + +#[cfg(test)] +#[allow(clippy::expect_used)] +mod tests { + use crate::AbstractTree; + use test_log::test; + + #[test] + fn level_manifest_atomicity() -> crate::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = crate::Config::new(folder).open()?; + + tree.insert("a", "a", 0); + tree.flush_active_memtable(0)?; + tree.insert("a", "a", 1); + tree.flush_active_memtable(0)?; + tree.insert("a", "a", 2); + tree.flush_active_memtable(0)?; + + assert_eq!(3, tree.approximate_len()); + + tree.major_compact(u64::MAX, 3)?; + + assert_eq!(1, tree.segment_count()); + + tree.insert("a", "a", 3); + tree.flush_active_memtable(0)?; + + let segment_count_before_major_compact = tree.segment_count(); + + let crate::AnyTree::Standard(tree) = tree else { + unreachable!(); + }; + + { + // NOTE: Purposefully change level manifest to have invalid path + // to force an I/O error + tree.compaction_state + .lock() + .expect("lock is poisoned") + .folder = "/invaliiid/asd".into(); + } + + assert!(tree.major_compact(u64::MAX, 4).is_err()); + + assert!(tree + .compaction_state + .lock() + .expect("lock is poisoned") + .hidden_set() + .is_empty()); + + assert_eq!(segment_count_before_major_compact, tree.segment_count()); + + Ok(()) + } +} diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index 638f78b4..0e9cb1f7 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -2,12 +2,12 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use super::{Choice, CompactionStrategy, Input as CompactionInput}; -use crate::{level_manifest::LevelManifest, segment::Segment, Config, HashSet}; +use super::{Choice, CompactionStrategy}; +use crate::{compaction::state::CompactionState, version::Version, Config}; -fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, base_size: u32) -> usize { - (ratio as usize).pow(u32::from(level_idx + 1)) * (base_size as usize) -} +// fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, base_size: u32) -> usize { +// (ratio as usize).pow(u32::from(level_idx + 1)) * (base_size as usize) +// } /// Size-tiered compaction strategy (STCS) /// @@ -54,8 +54,8 @@ impl CompactionStrategy for Strategy { "TieredStrategy" } - fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice { - todo!() + fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice { + unimplemented!() } } /* diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index bf04bf7a..576e295a 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -7,21 +7,21 @@ use crate::{ blob_tree::FragmentationMap, compaction::{ flavour::{RelocatingCompaction, StandardCompaction}, + state::CompactionState, stream::CompactionStream, Choice, }, file::BLOBS_FOLDER, - level_manifest::LevelManifest, merge::Merger, run_scanner::RunScanner, stop_signal::StopSignal, - tree::inner::TreeId, + tree::inner::{SuperVersion, TreeId}, + version::Version, vlog::{BlobFileMergeScanner, BlobFileScanner, BlobFileWriter}, - AbstractTree, BlobFile, Config, HashSet, InternalValue, SegmentId, SeqNo, - SequenceNumberCounter, + BlobFile, Config, HashSet, InternalValue, SegmentId, SeqNo, SequenceNumberCounter, }; use std::{ - sync::{atomic::AtomicU64, Arc, RwLock, RwLockWriteGuard}, + sync::{atomic::AtomicU64, Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}, time::Instant, }; @@ -41,8 +41,7 @@ pub struct Options { /// Configuration of tree. pub config: Config, - /// Levels manifest. - pub levels: Arc>, + pub super_version: Arc>, /// Compaction strategy to use. pub strategy: Arc, @@ -54,6 +53,8 @@ pub struct Options { /// Evicts items that are older than this seqno (MVCC GC). pub eviction_seqno: u64, + pub compaction_state: Arc>, + #[cfg(feature = "metrics")] pub metrics: Arc, } @@ -65,10 +66,13 @@ impl Options { segment_id_generator: tree.segment_id_counter.clone(), blob_file_id_generator: tree.blob_file_id_generator.clone(), config: tree.config.clone(), - levels: tree.manifest().clone(), + super_version: tree.super_version.clone(), stop_signal: tree.stop_signal.clone(), strategy, eviction_seqno: 0, + + compaction_state: tree.compaction_state.clone(), + #[cfg(feature = "metrics")] metrics: tree.metrics.clone(), } @@ -79,23 +83,27 @@ impl Options { /// /// This will block until the compactor is fully finished. pub fn do_compaction(opts: &Options) -> crate::Result<()> { - log::trace!("Acquiring levels manifest lock"); - let original_levels = opts.levels.write().expect("lock is poisoned"); + let compaction_state = opts.compaction_state.lock().expect("lock is poisoned"); + + let super_version = opts.super_version.read().expect("lock is poisoned"); let start = Instant::now(); log::trace!( "Consulting compaction strategy {:?}", opts.strategy.get_name(), ); - let choice = opts.strategy.choose(&original_levels, &opts.config); + let choice = opts + .strategy + .choose(&super_version.version, &opts.config, &compaction_state); log::debug!("Compaction choice: {choice:?} in {:?}", start.elapsed()); match choice { - Choice::Merge(payload) => merge_segments(original_levels, opts, &payload), - Choice::Move(payload) => move_segments(original_levels, opts, &payload), + Choice::Merge(payload) => merge_segments(compaction_state, super_version, opts, &payload), + Choice::Move(payload) => move_segments(compaction_state, super_version, opts, &payload), Choice::Drop(payload) => drop_segments( - original_levels, + compaction_state, + super_version, opts, &payload.into_iter().collect::>(), ), @@ -107,14 +115,14 @@ pub fn do_compaction(opts: &Options) -> crate::Result<()> { } fn create_compaction_stream<'a>( - levels: &LevelManifest, + version: &Version, to_compact: &[SegmentId], eviction_seqno: SeqNo, ) -> crate::Result>>>> { let mut readers: Vec> = vec![]; let mut found = 0; - for level in levels.current_version().iter_levels() { + for level in version.iter_levels() { if level.is_empty() { continue; } @@ -168,12 +176,20 @@ fn create_compaction_stream<'a>( } fn move_segments( - mut levels: RwLockWriteGuard<'_, LevelManifest>, + mut compaction_state: MutexGuard<'_, CompactionState>, + super_version: RwLockReadGuard<'_, SuperVersion>, opts: &Options, payload: &CompactionPayload, ) -> crate::Result<()> { + drop(super_version); + + let mut super_version = opts.super_version.write().expect("lock is poisoned"); + // Fail-safe for buggy compaction strategies - if levels.should_decline_compaction(payload.segment_ids.iter().copied()) { + if compaction_state + .hidden_set() + .should_decline_compaction(payload.segment_ids.iter().copied()) + { log::warn!( "Compaction task created by {:?} contained hidden segments, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md", opts.strategy.get_name(), @@ -183,12 +199,13 @@ fn move_segments( let segment_ids = payload.segment_ids.iter().copied().collect::>(); - levels.atomic_swap( + compaction_state.upgrade_version( + &mut super_version, |current| Ok(current.with_moved(&segment_ids, payload.dest_level as usize)), opts.eviction_seqno, )?; - if let Err(e) = levels.maintenance(opts.eviction_seqno) { + if let Err(e) = compaction_state.maintenance(opts.eviction_seqno) { log::error!("Manifest maintenance failed: {e:?}"); return Err(e); } @@ -198,7 +215,8 @@ fn move_segments( #[allow(clippy::too_many_lines)] fn merge_segments( - mut levels: RwLockWriteGuard<'_, LevelManifest>, + mut compaction_state: MutexGuard<'_, CompactionState>, + super_version: RwLockReadGuard<'_, SuperVersion>, opts: &Options, payload: &CompactionPayload, ) -> crate::Result<()> { @@ -208,7 +226,10 @@ fn merge_segments( } // Fail-safe for buggy compaction strategies - if levels.should_decline_compaction(payload.segment_ids.iter().copied()) { + if compaction_state + .hidden_set() + .should_decline_compaction(payload.segment_ids.iter().copied()) + { log::warn!( "Compaction task created by {:?} contained hidden segments, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md", opts.strategy.get_name(), @@ -219,7 +240,7 @@ fn merge_segments( let Some(segments) = payload .segment_ids .iter() - .map(|&id| levels.current_version().get_segment(id).cloned()) + .map(|&id| super_version.version.get_segment(id).cloned()) .collect::>>() else { log::warn!( @@ -232,7 +253,7 @@ fn merge_segments( let mut blob_frag_map = FragmentationMap::default(); let Some(mut merge_iter) = create_compaction_stream( - &levels, + &super_version.version, &payload.segment_ids.iter().copied().collect::>(), opts.eviction_seqno, )? @@ -244,14 +265,15 @@ fn merge_segments( }; let dst_lvl = payload.canonical_level.into(); - let last_level = levels.last_level_index(); + let last_level = opts.config.level_count - 1; // NOTE: Only evict tombstones when reaching the last level, // That way we don't resurrect data beneath the tombstone let is_last_level = payload.dest_level == last_level; - let table_writer = - super::flavour::prepare_table_writer(levels.current_version(), opts, payload)?; + let current_version = &super_version.version; + + let table_writer = super::flavour::prepare_table_writer(current_version, opts, payload)?; let start = Instant::now(); @@ -259,8 +281,6 @@ fn merge_segments( Some(blob_opts) => { merge_iter = merge_iter.with_expiration_callback(&mut blob_frag_map); - let version = levels.current_version(); - let blob_files_to_rewrite = { // TODO: 3.0.0 vvv if blob gc is disabled, skip this part vvv @@ -268,32 +288,38 @@ fn merge_segments( let mut linked_blob_files = payload .segment_ids .iter() - .map(|&id| version.get_segment(id).expect("table should exist")) + .map(|&id| current_version.get_segment(id).expect("table should exist")) .filter_map(|x| x.get_linked_blob_files().expect("handle error")) .flatten() .map(|blob_file_ref| { - version + current_version .value_log .get(&blob_file_ref.blob_file_id) .expect("blob file should exist") }) .filter(|blob_file| { - blob_file.is_stale(version.gc_stats(), blob_opts.staleness_threshold) + blob_file + .is_stale(current_version.gc_stats(), blob_opts.staleness_threshold) }) .filter(|blob_file| { - // NOTE: Dead blob files are dropped anyway during Version change commit - !blob_file.is_dead(version.gc_stats()) + // NOTE: Dead blob files are dropped anyway during current_version change commit + !blob_file.is_dead(current_version.gc_stats()) }) .collect::>() .into_iter() .collect::>(); linked_blob_files.sort_by_key(|a| a.id()); - // TODO: 3.0.0 ^- age cutoff + + let cutoff_point = { + let len = linked_blob_files.len() as f32; + (len * blob_opts.age_cutoff) as usize + }; + linked_blob_files.drain(cutoff_point..); // NOTE: If there is any table not part of our compaction input // that also points to the blob file, we cannot rewrite the blob file - for table in version.iter_segments() { + for table in current_version.iter_segments() { if payload.segment_ids.contains(&table.id()) { continue; } @@ -360,17 +386,25 @@ fn merge_segments( log::trace!("Blob file GC preparation done in {:?}", start.elapsed()); - levels.hide_segments(payload.segment_ids.iter().copied()); + drop(super_version); + + { + compaction_state + .hidden_set_mut() + .hide(payload.segment_ids.iter().copied()); + } - // IMPORTANT: Free lock so the compaction (which may go on for a while) - // does not block possible other compactions and writes/reads - drop(levels); + // IMPORTANT: Unlock exclusive compaction lock as we are now doing the actual (CPU-intensive) compaction + drop(compaction_state); for (idx, item) in merge_iter.enumerate() { let item = item.inspect_err(|_| { // IMPORTANT: We need to show tables again on error - let mut levels = opts.levels.write().expect("lock is poisoned"); - levels.show_segments(payload.segment_ids.iter().copied()); + let mut compaction_state = opts.compaction_state.lock().expect("lock is poisoned"); + + compaction_state + .hidden_set_mut() + .show(payload.segment_ids.iter().copied()); })?; // IMPORTANT: We can only drop tombstones when writing into last level @@ -380,8 +414,11 @@ fn merge_segments( compactor.write(item).inspect_err(|_| { // IMPORTANT: We need to show tables again on error - let mut levels = opts.levels.write().expect("lock is poisoned"); - levels.show_segments(payload.segment_ids.iter().copied()); + let mut compaction_state = opts.compaction_state.lock().expect("lock is poisoned"); + + compaction_state + .hidden_set_mut() + .show(payload.segment_ids.iter().copied()); })?; if idx % 1_000_000 == 0 && opts.stop_signal.is_stopped() { @@ -390,25 +427,39 @@ fn merge_segments( } } - // NOTE: Mind lock order L -> M -> S - log::trace!("Acquiring levels manifest write lock"); - let mut levels = opts.levels.write().expect("lock is poisoned"); - log::trace!("Acquired levels manifest write lock"); + let mut compaction_state = opts.compaction_state.lock().expect("lock is poisoned"); + + log::trace!("Acquiring super version write lock"); + let mut super_version = opts.super_version.write().expect("lock is poisoned"); + log::trace!("Acquired super version write lock"); compactor - .finish(&mut levels, opts, payload, dst_lvl, blob_frag_map) + .finish( + &mut super_version, + &mut compaction_state, + opts, + payload, + dst_lvl, + blob_frag_map, + ) .inspect_err(|_| { - // IMPORTANT: We need to show tables again on error - levels.show_segments(payload.segment_ids.iter().copied()); + compaction_state + .hidden_set_mut() + .show(payload.segment_ids.iter().copied()); })?; - levels.show_segments(payload.segment_ids.iter().copied()); + compaction_state + .hidden_set_mut() + .show(payload.segment_ids.iter().copied()); - levels.maintenance(opts.eviction_seqno).inspect_err(|e| { - log::error!("Manifest maintenance failed: {e:?}"); - })?; + compaction_state + .maintenance(opts.eviction_seqno) + .inspect_err(|e| { + log::error!("Manifest maintenance failed: {e:?}"); + })?; - drop(levels); + drop(super_version); + drop(compaction_state); log::trace!("Compaction successful"); @@ -416,12 +467,20 @@ fn merge_segments( } fn drop_segments( - mut levels: RwLockWriteGuard<'_, LevelManifest>, + mut compaction_state: MutexGuard<'_, CompactionState>, + super_version: RwLockReadGuard<'_, SuperVersion>, opts: &Options, ids_to_drop: &[SegmentId], ) -> crate::Result<()> { + drop(super_version); + + let mut super_version = opts.super_version.write().expect("lock is poisoned"); + // Fail-safe for buggy compaction strategies - if levels.should_decline_compaction(ids_to_drop.iter().copied()) { + if compaction_state + .hidden_set() + .should_decline_compaction(ids_to_drop.iter().copied()) + { log::warn!( "Compaction task created by {:?} contained hidden segments, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md", opts.strategy.get_name(), @@ -431,7 +490,7 @@ fn drop_segments( let Some(segments) = ids_to_drop .iter() - .map(|&id| levels.current_version().get_segment(id).cloned()) + .map(|&id| super_version.version.get_segment(id).cloned()) .collect::>>() else { log::warn!( @@ -443,7 +502,8 @@ fn drop_segments( // IMPORTANT: Write the manifest with the removed segments first // Otherwise the segment files are deleted, but are still referenced! - levels.atomic_swap( + compaction_state.upgrade_version( + &mut super_version, |current| current.with_dropped(ids_to_drop), opts.eviction_seqno, // TODO: make naming in code base eviction_seqno vs watermark vs threshold consistent )?; @@ -458,12 +518,13 @@ fn drop_segments( // TODO: fwiw also add all dead blob files // TODO: look if any blob files can be trivially deleted as well - if let Err(e) = levels.maintenance(opts.eviction_seqno) { + if let Err(e) = compaction_state.maintenance(opts.eviction_seqno) { log::error!("Manifest maintenance failed: {e:?}"); return Err(e); } - drop(levels); + drop(super_version); + drop(compaction_state); log::trace!("Dropped {} segments", ids_to_drop.len()); diff --git a/src/config/mod.rs b/src/config/mod.rs index 6b441857..7d9d05c3 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -58,7 +58,7 @@ impl TryFrom for TreeType { const DEFAULT_FILE_FOLDER: &str = ".lsm.data"; /// Options for key-value separation -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct KvSeparationOptions { /// What type of compression is used for blobs pub compression: CompressionType, @@ -73,7 +73,7 @@ pub struct KvSeparationOptions { pub(crate) staleness_threshold: f32, - pub(crate) age_cutoff: f32, // TODO: 3.0.0 + pub(crate) age_cutoff: f32, } impl Default for KvSeparationOptions { @@ -85,11 +85,11 @@ impl Default for KvSeparationOptions { #[cfg(not(feature="lz4"))] compression: CompressionType::None, - file_target_size: /* 256 MiB */ 256 * 1_024 * 1_024, + file_target_size: /* 64 MiB */ 64 * 1_024 * 1_024, separation_threshold: /* 1 KiB */ 1_024, - staleness_threshold: 0.25, - age_cutoff: 0.25, + staleness_threshold: 0.33, + age_cutoff: 0.20, } } } @@ -134,12 +134,21 @@ impl KvSeparationOptions { /// The staleness percentage determines how much a blob file needs to be fragmented to be /// picked up by the garbage collection. /// - /// Defaults to 25%. + /// Defaults to 33%. #[must_use] pub fn staleness_threshold(mut self, ratio: f32) -> Self { self.staleness_threshold = ratio; self } + + /// Sets the age cutoff threshold. + /// + /// Defaults to 20%. + #[must_use] + pub fn age_cutoff(mut self, ratio: f32) -> Self { + self.age_cutoff = ratio; + self + } } #[derive(Clone)] diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs deleted file mode 100644 index 54a0abf1..00000000 --- a/src/level_manifest/mod.rs +++ /dev/null @@ -1,479 +0,0 @@ -// Copyright (c) 2024-present, fjall-rs -// This source code is licensed under both the Apache 2.0 and MIT License -// (found in the LICENSE-* files in the repository) - -pub(crate) mod hidden_set; - -use crate::{ - coding::Decode, - file::{fsync_directory, rewrite_atomic}, - segment::Segment, - version::{Level, Run, Version, VersionId, DEFAULT_LEVEL_COUNT}, - vlog::BlobFileId, - BlobFile, SegmentId, SeqNo, -}; -use byteorder::{LittleEndian, ReadBytesExt}; -use hidden_set::HiddenSet; -use std::{ - collections::VecDeque, - io::BufWriter, - path::{Path, PathBuf}, - sync::Arc, -}; - -pub struct Recovery { - pub curr_version_id: VersionId, - pub segment_ids: Vec>>, - pub blob_file_ids: Vec, - pub gc_stats: crate::blob_tree::FragmentationMap, -} - -/// Represents the levels of a log-structured merge tree -pub struct LevelManifest { - /// Path of tree folder. - folder: PathBuf, - - /// Current version. - current: Version, - - /// Set of segment IDs that are masked. - /// - /// While consuming segments (because of compaction) they will not appear in the list of segments - /// as to not cause conflicts between multiple compaction threads (compacting the same segments). - hidden_set: HiddenSet, - - /// Holds onto versions until they are safe to drop. - pub(crate) version_free_list: VecDeque, -} - -impl std::fmt::Display for LevelManifest { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for (idx, level) in self.current.iter_levels().enumerate() { - writeln!( - f, - "{idx} [{}], r={}: ", - match (level.is_empty(), level.is_disjoint()) { - (true, _) => ".", - (false, true) => "D", - (false, false) => "_", - }, - level.len(), - )?; - - for run in level.iter() { - write!(f, " ")?; - - if run.len() >= 30 { - #[allow(clippy::indexing_slicing)] - for segment in run.iter().take(2) { - let id = segment.id(); - let is_hidden = self.hidden_set.is_hidden(id); - - write!( - f, - "{}{id}{}", - if is_hidden { "(" } else { "[" }, - if is_hidden { ")" } else { "]" }, - )?; - } - write!(f, " . . . ")?; - - #[allow(clippy::indexing_slicing)] - for segment in run.iter().rev().take(2).rev() { - let id = segment.id(); - let is_hidden = self.hidden_set.is_hidden(id); - - write!( - f, - "{}{id}{}", - if is_hidden { "(" } else { "[" }, - if is_hidden { ")" } else { "]" }, - )?; - } - - writeln!( - f, - " | # = {}, {} MiB", - run.len(), - run.iter().map(Segment::file_size).sum::() / 1_024 / 1_024, - )?; - } else { - for segment in run.iter() { - let id = segment.id(); - let is_hidden = self.hidden_set.is_hidden(id); - - write!( - f, - "{}{id}{}", - if is_hidden { "(" } else { "[" }, - if is_hidden { ")" } else { "]" }, - )?; - } - - writeln!( - f, - " | # = {}, {} MiB", - run.len(), - run.iter().map(Segment::file_size).sum::() / 1_024 / 1_024, - )?; - } - } - } - - Ok(()) - } -} - -impl LevelManifest { - #[must_use] - pub fn current_version(&self) -> &Version { - &self.current - } - - pub(crate) fn is_compacting(&self) -> bool { - !self.hidden_set.is_empty() - } - - pub(crate) fn create_new>(folder: P) -> crate::Result { - // assert!(level_count > 0, "level_count should be >= 1"); - - #[allow(unused_mut)] - let mut manifest = Self { - folder: folder.into(), - current: Version::new(0), - hidden_set: HiddenSet::default(), - version_free_list: VecDeque::default(), - }; - - Self::persist_version(&manifest.folder, &manifest.current)?; - - Ok(manifest) - } - - pub(crate) fn recover_ids(folder: &Path) -> crate::Result { - let curr_version_id = Self::get_current_version(folder)?; - let version_file_path = folder.join(format!("v{curr_version_id}")); - - log::info!( - "Recovering current manifest at {}", - version_file_path.display(), - ); - - let reader = sfa::Reader::new(&version_file_path)?; - let toc = reader.toc(); - - // // TODO: vvv move into Version::decode vvv - let mut levels = vec![]; - - { - let mut reader = toc - .section(b"tables") - .expect("tables should exist") - .buf_reader(&version_file_path)?; - - let level_count = reader.read_u8()?; - - for _ in 0..level_count { - let mut level = vec![]; - let run_count = reader.read_u8()?; - - for _ in 0..run_count { - let mut run = vec![]; - let segment_count = reader.read_u32::()?; - - for _ in 0..segment_count { - let id = reader.read_u64::()?; - run.push(id); - } - - level.push(run); - } - - levels.push(level); - } - } - - let blob_file_ids = { - let mut reader = toc - .section(b"blob_files") - .expect("blob_files should exist") - .buf_reader(&version_file_path)?; - - let blob_file_count = reader.read_u32::()?; - let mut blob_file_ids = Vec::with_capacity(blob_file_count as usize); - - for _ in 0..blob_file_count { - let id = reader.read_u64::()?; - blob_file_ids.push(id); - } - - blob_file_ids - }; - - let gc_stats = { - let mut reader = toc - .section(b"blob_gc_stats") - .expect("blob_gc_stats should exist") - .buf_reader(&version_file_path)?; - - crate::blob_tree::FragmentationMap::decode_from(&mut reader)? - }; - - Ok(Recovery { - curr_version_id, - segment_ids: levels, - blob_file_ids, - gc_stats, - }) - } - - pub fn get_current_version(folder: &Path) -> crate::Result { - std::fs::File::open(folder.join("current")) - .and_then(|mut f| f.read_u64::()) - .map_err(Into::into) - } - - pub(crate) fn recover>( - folder: P, - recovery: Recovery, - segments: &[Segment], - blob_files: &[BlobFile], - ) -> crate::Result { - let version_levels = recovery - .segment_ids - .iter() - .map(|level| { - let level_runs = level - .iter() - .map(|run| { - let run_segments = run - .iter() - .map(|segment_id| { - segments - .iter() - .find(|x| x.id() == *segment_id) - .cloned() - .ok_or(crate::Error::Unrecoverable) - }) - .collect::>>()?; - - Ok(Arc::new(Run::new(run_segments))) - }) - .collect::>>()?; - - Ok(Level::from_runs(level_runs)) - }) - .collect::>>()?; - - Ok(Self { - current: Version::from_levels( - recovery.curr_version_id, - version_levels, - blob_files.iter().cloned().map(|bf| (bf.id(), bf)).collect(), - recovery.gc_stats, - ), - folder: folder.into(), - hidden_set: HiddenSet::default(), - version_free_list: VecDeque::default(), // TODO: 3. create free list from versions that are N < CURRENT, or delete old versions eagerly... - }) - } - - fn persist_version(folder: &Path, version: &Version) -> crate::Result<()> { - log::trace!( - "Persisting version {} in {}", - version.id(), - folder.display(), - ); - - let path = folder.join(format!("v{}", version.id())); - let file = std::fs::File::create_new(path)?; - let writer = BufWriter::new(file); - let mut writer = sfa::Writer::into_writer(writer); - - version.encode_into(&mut writer)?; - - writer.finish().map_err(|e| match e { - sfa::Error::Io(e) => crate::Error::from(e), - _ => unreachable!(), - })?; - - // IMPORTANT: fsync folder on Unix - fsync_directory(folder)?; - - rewrite_atomic(&folder.join("current"), &version.id().to_le_bytes())?; - - Ok(()) - } - - /// Modifies the level manifest atomically. - /// - /// The function accepts a transition function that receives the current version - /// and returns a new version. - /// - /// The function takes care of persisting the version changes on disk. - pub(crate) fn atomic_swap crate::Result>( - &mut self, - f: F, - gc_watermark: SeqNo, - ) -> crate::Result<()> { - // NOTE: Copy-on-write... - // - // Create a copy of the levels we can operate on - // without mutating the current level manifest - // If persisting to disk fails, this way the level manifest - // is unchanged - let next_version = f(&self.current)?; - - Self::persist_version(&self.folder, &next_version)?; - - let mut old_version = std::mem::replace(&mut self.current, next_version); - old_version.seqno_watermark = gc_watermark; - - self.version_free_list.push_back(old_version); - - Ok(()) - } - - pub(crate) fn maintenance(&mut self, gc_watermark: SeqNo) -> crate::Result<()> { - log::debug!("Running manifest GC"); - - loop { - let Some(head) = self.version_free_list.front() else { - break; - }; - - if head.seqno_watermark < gc_watermark { - let path = self.folder.join(format!("v{}", head.id())); - std::fs::remove_file(path)?; - self.version_free_list.pop_front(); - } else { - break; - } - } - - log::debug!("Manifest GC done"); - - Ok(()) - } - - /// Returns `true` if there are no segments - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Returns the number of levels in the tree - #[must_use] - pub fn level_count(&self) -> u8 { - // NOTE: Level count is u8 - #[allow(clippy::cast_possible_truncation)] - { - self.current.level_count() as u8 - } - } - - /// Returns the number of levels in the tree. - #[must_use] - pub fn last_level_index(&self) -> u8 { - DEFAULT_LEVEL_COUNT - 1 - } - - /// Returns the number of segments, summed over all levels - #[must_use] - pub fn len(&self) -> usize { - self.current.segment_count() - } - - /// Returns the (compressed) size of all segments - #[must_use] - pub fn size(&self) -> u64 { - self.iter().map(Segment::file_size).sum() - } - - #[must_use] - pub fn level_is_busy(&self, idx: usize) -> bool { - self.current.level(idx).is_some_and(|level| { - level - .iter() - .flat_map(|run| run.iter()) - .any(|segment| self.hidden_set.is_hidden(segment.id())) - }) - } - - #[must_use] - pub fn as_slice(&self) -> &[Level] { - &self.current.levels - } - - pub fn iter(&self) -> impl Iterator { - self.current.iter_segments() - } - - pub(crate) fn should_decline_compaction>( - &self, - ids: T, - ) -> bool { - self.hidden_set().is_blocked(ids) - } - - pub(crate) fn hidden_set(&self) -> &HiddenSet { - &self.hidden_set - } - - pub(crate) fn hide_segments>(&mut self, keys: T) { - self.hidden_set.hide(keys); - } - - pub(crate) fn show_segments>(&mut self, keys: T) { - self.hidden_set.show(keys); - } -} - -#[cfg(test)] -#[allow(clippy::expect_used)] -mod tests { - use crate::AbstractTree; - use test_log::test; - - #[test] - fn level_manifest_atomicity() -> crate::Result<()> { - let folder = tempfile::tempdir()?; - - let tree = crate::Config::new(folder).open()?; - - tree.insert("a", "a", 0); - tree.flush_active_memtable(0)?; - tree.insert("a", "a", 1); - tree.flush_active_memtable(0)?; - tree.insert("a", "a", 2); - tree.flush_active_memtable(0)?; - - assert_eq!(3, tree.approximate_len()); - - tree.major_compact(u64::MAX, 3)?; - - assert_eq!(1, tree.segment_count()); - - tree.insert("a", "a", 3); - tree.flush_active_memtable(0)?; - - let segment_count_before_major_compact = tree.segment_count(); - - // NOTE: Purposefully change level manifest to have invalid path - // to force an I/O error - tree.manifest().write().expect("lock is poisoned").folder = "/invaliiid/asd".into(); - - assert!(tree.major_compact(u64::MAX, 4).is_err()); - - assert!(tree - .manifest() - .read() - .expect("lock is poisoned") - .hidden_set - .is_empty()); - - assert_eq!(segment_count_before_major_compact, tree.segment_count()); - - Ok(()) - } -} diff --git a/src/lib.rs b/src/lib.rs index cd2883eb..6328eb7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -178,9 +178,6 @@ mod iter_guard; mod key; mod key_range; -#[doc(hidden)] -pub mod level_manifest; - mod run_reader; mod run_scanner; diff --git a/src/multi_reader.rs b/src/multi_reader.rs index bf32552c..250813ab 100644 --- a/src/multi_reader.rs +++ b/src/multi_reader.rs @@ -76,9 +76,6 @@ mod tests { } let segments = tree - .manifest() - .read() - .expect("lock is poisoned") .current_version() .iter_segments() .cloned() diff --git a/src/range.rs b/src/range.rs index 7b5034dd..8b56d233 100644 --- a/src/range.rs +++ b/src/range.rs @@ -4,7 +4,6 @@ use crate::{ key::InternalKey, - level_manifest::LevelManifest, memtable::Memtable, merge::Merger, mvcc_stream::MvccStream, @@ -144,7 +143,7 @@ impl TreeIter { guard: IterState, range: R, seqno: SeqNo, - level_manifest: &LevelManifest, + version: &Version, ) -> Self { Self::new(guard, |lock| { let lo = match range.start_bound() { @@ -209,11 +208,7 @@ impl TreeIter { // }; #[allow(clippy::needless_continue)] - for run in level_manifest - .current_version() - .iter_levels() - .flat_map(|lvl| lvl.iter()) - { + for run in version.iter_levels().flat_map(|lvl| lvl.iter()) { match run.len() { 0 => continue, 1 => { diff --git a/src/run_reader.rs b/src/run_reader.rs index 21b9c443..a6a292e9 100644 --- a/src/run_reader.rs +++ b/src/run_reader.rs @@ -154,10 +154,8 @@ mod tests { } let segments = tree - .manifest() - .read() - .expect("lock is poisoned") - .iter() + .current_version() + .iter_segments() .cloned() .collect::>(); @@ -196,10 +194,8 @@ mod tests { } let segments = tree - .manifest() - .read() - .expect("lock is poisoned") - .iter() + .current_version() + .iter_segments() .cloned() .collect::>(); diff --git a/src/run_scanner.rs b/src/run_scanner.rs index 1405b8d9..a522323c 100644 --- a/src/run_scanner.rs +++ b/src/run_scanner.rs @@ -90,9 +90,6 @@ mod tests { } let segments = tree - .manifest() - .read() - .expect("lock is poisoned") .current_version() .iter_segments() .cloned() diff --git a/src/segment/index_block/block_handle.rs b/src/segment/index_block/block_handle.rs index 83f6ec4e..7b4c194c 100644 --- a/src/segment/index_block/block_handle.rs +++ b/src/segment/index_block/block_handle.rs @@ -263,6 +263,6 @@ impl Decodable for KeyedBlockHandle { offset: usize, base_key_offset: usize, ) -> Option { - todo!() + unimplemented!() } } diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 63d5d539..9523999c 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -141,12 +141,7 @@ impl<'a> Ingestion<'a> { self.tree .register_segments(&created_segments, None, None, 0)?; - let last_level_idx = self - .tree - .manifest - .read() - .expect("lock is poisoned") - .last_level_index(); + let last_level_idx = self.tree.config.level_count - 1; self.tree .compact(Arc::new(MoveDown(0, last_level_idx)), 0)?; diff --git a/src/tree/inner.rs b/src/tree/inner.rs index 52d16abb..79bb08b6 100644 --- a/src/tree/inner.rs +++ b/src/tree/inner.rs @@ -3,10 +3,15 @@ // (found in the LICENSE-* files in the repository) use crate::{ - config::Config, level_manifest::LevelManifest, memtable::Memtable, stop_signal::StopSignal, + compaction::state::{persist_version, CompactionState}, + config::Config, + memtable::Memtable, + stop_signal::StopSignal, + tree::sealed::SealedMemtables, + version::Version, SegmentId, SequenceNumberCounter, }; -use std::sync::{atomic::AtomicU64, Arc, RwLock}; +use std::sync::{atomic::AtomicU64, Arc, Mutex, RwLock}; #[cfg(feature = "metrics")] use crate::metrics::Metrics; @@ -21,37 +26,23 @@ pub type TreeId = u64; /// Memtable IDs map one-to-one to some segment. pub type MemtableId = u64; -/// Stores references to all sealed memtables -/// -/// Memtable IDs are monotonically increasing, so we don't really -/// need a search tree; also there are only a handful of them at most. -#[derive(Default)] -pub struct SealedMemtables(Vec<(MemtableId, Arc)>); - -impl SealedMemtables { - pub fn add(&mut self, id: MemtableId, memtable: Arc) { - self.0.push((id, memtable)); - } - - pub fn remove(&mut self, id_to_remove: MemtableId) { - self.0.retain(|(id, _)| *id != id_to_remove); - } - - pub fn iter(&self) -> impl DoubleEndedIterator)> { - self.0.iter() - } - - pub fn len(&self) -> usize { - self.0.len() - } -} - /// Hands out a unique (monotonically increasing) tree ID. pub fn get_next_tree_id() -> TreeId { static TREE_ID_COUNTER: AtomicU64 = AtomicU64::new(0); TREE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed) } +pub struct SuperVersion { + /// Active memtable that is being written to + pub(crate) active_memtable: Arc, + + /// Frozen memtables that are being flushed + pub(crate) sealed_memtables: Arc, + + /// Current tree version + pub(crate) version: Version, +} + #[allow(clippy::module_name_repetitions)] pub struct TreeInner { /// Unique tree ID @@ -65,14 +56,9 @@ pub struct TreeInner { /// Hands out a unique (monotonically increasing) blob file ID pub(crate) blob_file_id_generator: SequenceNumberCounter, - /// Active memtable that is being written to - pub(crate) active_memtable: Arc>>, + pub(crate) super_version: Arc>, - /// Frozen memtables that are being flushed - pub(crate) sealed_memtables: Arc>, - - /// Current tree version - pub(super) manifest: Arc>, + pub(crate) compaction_state: Arc>, /// Tree configuration pub config: Config, @@ -81,6 +67,10 @@ pub struct TreeInner { /// will interrupt the compaction and kill the worker. pub(crate) stop_signal: StopSignal, + /// Used by major compaction to be the exclusive compaction going on. + /// + /// Minor compactions use `major_compaction_lock.read()` instead, so they + /// can be concurrent next to each other. pub(crate) major_compaction_lock: RwLock<()>, #[doc(hidden)] @@ -90,18 +80,25 @@ pub struct TreeInner { impl TreeInner { pub(crate) fn create_new(config: Config) -> crate::Result { - let manifest = LevelManifest::create_new(&config.path)?; + let version = Version::new(0); + persist_version(&config.path, &version)?; + + let path = config.path.clone(); Ok(Self { id: get_next_tree_id(), segment_id_counter: Arc::new(AtomicU64::default()), blob_file_id_generator: SequenceNumberCounter::default(), config, - active_memtable: Arc::default(), - sealed_memtables: Arc::default(), - manifest: Arc::new(RwLock::new(manifest)), + super_version: Arc::new(RwLock::new(SuperVersion { + active_memtable: Arc::default(), + sealed_memtables: Arc::default(), + version, + })), stop_signal: StopSignal::default(), major_compaction_lock: RwLock::default(), + compaction_state: Arc::new(Mutex::new(CompactionState::new(path))), + #[cfg(feature = "metrics")] metrics: Metrics::default().into(), }) diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 5ba5a533..eb374341 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -4,31 +4,33 @@ pub mod ingest; pub mod inner; +mod sealed; use crate::{ blob_tree::FragmentationMap, coding::{Decode, Encode}, - compaction::{drop_range::OwnedBounds, CompactionStrategy}, + compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy}, config::Config, file::BLOBS_FOLDER, format_version::FormatVersion, iter_guard::{IterGuard, IterGuardImpl}, - level_manifest::LevelManifest, manifest::Manifest, memtable::Memtable, segment::Segment, slice::Slice, + tree::inner::SuperVersion, value::InternalValue, + version::{recovery::recover_ids, Version}, vlog::BlobFile, AbstractTree, Cache, DescriptorTable, KvPair, SegmentId, SeqNo, SequenceNumberCounter, TreeType, UserKey, UserValue, ValueType, }; -use inner::{MemtableId, SealedMemtables, TreeId, TreeInner}; +use inner::{MemtableId, TreeId, TreeInner}; use std::{ io::Cursor, ops::{Bound, RangeBounds}, path::Path, - sync::{atomic::AtomicU64, Arc, RwLock, RwLockWriteGuard}, + sync::{atomic::AtomicU64, Arc, Mutex, RwLock}, }; #[cfg(feature = "metrics")] @@ -84,27 +86,26 @@ impl AbstractTree for Tree { } fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result> { - // TODO: consolidate memtable & sealed behind single RwLock + #[allow(clippy::significant_drop_tightening)] + let version_lock = self.super_version.read().expect("lock is poisoned"); - let memtable_lock = self.active_memtable.read().expect("lock is poisoned"); - - if let Some(entry) = memtable_lock.get(key, seqno) { + if let Some(entry) = version_lock.active_memtable.get(key, seqno) { return Ok(ignore_tombstone_value(entry)); } - drop(memtable_lock); - // Now look in sealed memtables - if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) { + if let Some(entry) = + self.get_internal_entry_from_sealed_memtables(&version_lock, key, seqno) + { return Ok(ignore_tombstone_value(entry)); } // Now look in segments... this may involve disk I/O - self.get_internal_entry_from_segments(key, seqno) + self.get_internal_entry_from_segments(&version_lock, key, seqno) } - fn manifest(&self) -> &Arc> { - &self.manifest + fn current_version(&self) -> Version { + self.super_version.read().expect("poisoned").version.clone() } fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result> { @@ -130,11 +131,10 @@ impl AbstractTree for Tree { } fn version_free_list_len(&self) -> usize { - self.manifest - .read() + self.compaction_state + .lock() .expect("lock is poisoned") - .version_free_list - .len() + .version_free_list_len() } fn prefix>( @@ -163,10 +163,7 @@ impl AbstractTree for Tree { // TODO: doctest fn tombstone_count(&self) -> u64 { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() + self.current_version() .iter_segments() .map(Segment::tombstone_count) .sum() @@ -181,16 +178,20 @@ impl AbstractTree for Tree { use crate::tree::ingest::Ingestion; use std::time::Instant; - // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading - let memtable_lock = self.lock_active_memtable(); + // // TODO: 3.0.0 ... hmmmm + // let global_lock = self.super_version.write().expect("lock is poisoned"); let seqno = seqno_generator.next(); // TODO: allow ingestion always, by flushing memtable - assert!( - memtable_lock.is_empty(), - "can only perform bulk ingestion with empty memtable", - ); + // assert!( + // global_lock.active_memtable.is_empty(), + // "can only perform bulk ingestion with empty memtable(s)", + // ); + // assert!( + // global_lock.sealed_memtables.len() == 0, + // "can only perform bulk ingestion with empty memtable(s)", + // ); let mut writer = Ingestion::new(self)?.with_seqno(seqno); @@ -258,10 +259,7 @@ impl AbstractTree for Tree { } fn l0_run_count(&self) -> usize { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() + self.current_version() .level(0) .map(|x| x.run_count()) .unwrap_or_default() @@ -274,39 +272,31 @@ impl AbstractTree for Tree { } fn filter_size(&self) -> usize { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() + self.current_version() .iter_segments() .map(Segment::filter_size) .sum() } fn pinned_filter_size(&self) -> usize { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() + self.current_version() .iter_segments() .map(Segment::pinned_filter_size) .sum() } fn pinned_block_index_size(&self) -> usize { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() + self.current_version() .iter_segments() .map(Segment::pinned_block_index_size) .sum() } fn sealed_memtable_count(&self) -> usize { - self.sealed_memtables + self.super_version .read() .expect("lock is poisoned") + .sealed_memtables .len() } @@ -402,17 +392,11 @@ impl AbstractTree for Tree { blob_files.map(<[BlobFile]>::len).unwrap_or_default(), ); - // NOTE: Mind lock order L -> M -> S - log::trace!("register: Acquiring levels manifest write lock"); - let mut manifest = self.manifest.write().expect("lock is poisoned"); - log::trace!("register: Acquired levels manifest write lock"); - - // NOTE: Mind lock order L -> M -> S - log::trace!("register: Acquiring sealed memtables write lock"); - let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned"); - log::trace!("register: Acquired sealed memtables write lock"); + let mut compaction_state = self.compaction_state.lock().expect("lock is poisoned"); + let mut super_version = self.super_version.write().expect("lock is poisoned"); - manifest.atomic_swap( + compaction_state.upgrade_version( + &mut super_version, |version| { Ok(version.with_new_l0_run( segments, @@ -425,28 +409,29 @@ impl AbstractTree for Tree { for segment in segments { log::trace!("releasing sealed memtable {}", segment.id()); - sealed_memtables.remove(segment.id()); + + super_version.sealed_memtables = + Arc::new(super_version.sealed_memtables.remove(segment.id())); } Ok(()) } - fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc> { - self.active_memtable.write().expect("lock is poisoned") - } - fn clear_active_memtable(&self) { - *self.active_memtable.write().expect("lock is poisoned") = Arc::new(Memtable::default()); + self.super_version + .write() + .expect("lock is poisoned") + .active_memtable = Arc::new(Memtable::default()); } fn set_active_memtable(&self, memtable: Memtable) { - let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned"); - *memtable_lock = Arc::new(memtable); + let mut version_lock = self.super_version.write().expect("lock is poisoned"); + version_lock.active_memtable = Arc::new(memtable); } fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc) { - let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned"); - memtable_lock.add(id, memtable); + let mut version_lock = self.super_version.write().expect("lock is poisoned"); + version_lock.sealed_memtables = Arc::new(version_lock.sealed_memtables.add(id, memtable)); } fn compact( @@ -477,9 +462,10 @@ impl AbstractTree for Tree { fn active_memtable_size(&self) -> u64 { use std::sync::atomic::Ordering::Acquire; - self.active_memtable + self.super_version .read() .expect("lock is poisoned") + .active_memtable .approximate_size .load(Acquire) } @@ -489,21 +475,22 @@ impl AbstractTree for Tree { } fn rotate_memtable(&self) -> Option<(MemtableId, Arc)> { - log::trace!("rotate: acquiring active memtable write lock"); - let mut active_memtable = self.lock_active_memtable(); + let mut version_lock = self.super_version.write().expect("lock is poisoned"); - log::trace!("rotate: acquiring sealed memtables write lock"); - let mut sealed_memtables = self.lock_sealed_memtables(); - - if active_memtable.is_empty() { + if version_lock.active_memtable.is_empty() { return None; } - let yanked_memtable = std::mem::take(&mut *active_memtable); + let yanked_memtable = std::mem::take(&mut version_lock.active_memtable); let yanked_memtable = yanked_memtable; let tmp_memtable_id = self.get_next_segment_id(); - sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone()); + + version_lock.sealed_memtables = Arc::new( + version_lock + .sealed_memtables + .add(tmp_memtable_id, yanked_memtable.clone()), + ); log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables"); @@ -511,37 +498,29 @@ impl AbstractTree for Tree { } fn segment_count(&self) -> usize { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() - .segment_count() + self.current_version().segment_count() } fn level_segment_count(&self, idx: usize) -> Option { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() - .level(idx) - .map(|x| x.segment_count()) + self.current_version().level(idx).map(|x| x.segment_count()) } #[allow(clippy::significant_drop_tightening)] fn approximate_len(&self) -> usize { - // NOTE: Mind lock order L -> M -> S - let manifest = self.manifest.read().expect("lock is poisoned"); - let memtable = self.active_memtable.read().expect("lock is poisoned"); - let sealed = self.sealed_memtables.read().expect("lock is poisoned"); + let version = self.super_version.read().expect("lock is poisoned"); - let segments_item_count = manifest + let segments_item_count = self .current_version() .iter_segments() .map(|x| x.metadata.item_count) .sum::(); - let memtable_count = memtable.len() as u64; - let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::() as u64; + let memtable_count = version.active_memtable.len() as u64; + let sealed_count = version + .sealed_memtables + .iter() + .map(|(_, mt)| mt.len()) + .sum::() as u64; (memtable_count + sealed_count + segments_item_count) .try_into() @@ -549,26 +528,19 @@ impl AbstractTree for Tree { } fn disk_space(&self) -> u64 { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() + self.current_version() .iter_levels() .map(super::version::Level::size) .sum() } fn get_highest_memtable_seqno(&self) -> Option { - let active = self - .active_memtable - .read() - .expect("lock is poisoned") - .get_highest_seqno(); + let version = self.super_version.read().expect("lock is poisoned"); + + let active = version.active_memtable.get_highest_seqno(); - let sealed = self + let sealed = version .sealed_memtables - .read() - .expect("Lock is poisoned") .iter() .map(|(_, table)| table.get_highest_seqno()) .max() @@ -578,10 +550,7 @@ impl AbstractTree for Tree { } fn get_highest_persisted_seqno(&self) -> Option { - self.manifest - .read() - .expect("lock is poisoned") - .current_version() + self.current_version() .iter_segments() .map(Segment::get_highest_seqno) .max() @@ -718,45 +687,21 @@ impl Tree { #[doc(hidden)] #[must_use] pub fn is_compacting(&self) -> bool { - self.manifest - .read() + !self + .compaction_state + .lock() .expect("lock is poisoned") - .is_compacting() - } - - /// Write-locks the sealed memtables for exclusive access - fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> { - self.sealed_memtables.write().expect("lock is poisoned") - } - - // TODO: maybe not needed anyway - /// Used for [`BlobTree`] lookup - pub(crate) fn get_internal_entry_with_memtable( - &self, - memtable_lock: &Memtable, - key: &[u8], - seqno: SeqNo, - ) -> crate::Result> { - if let Some(entry) = memtable_lock.get(key, seqno) { - return Ok(ignore_tombstone_value(entry)); - } - - // Now look in sealed memtables - if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) { - return Ok(ignore_tombstone_value(entry)); - } - - self.get_internal_entry_from_segments(key, seqno) + .hidden_set() + .is_empty() } fn get_internal_entry_from_sealed_memtables( &self, + super_version: &SuperVersion, key: &[u8], seqno: SeqNo, ) -> Option { - let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned"); - - for (_, memtable) in memtable_lock.iter().rev() { + for (_, memtable) in super_version.sealed_memtables.iter().rev() { if let Some(entry) = memtable.get(key, seqno) { return Some(entry); } @@ -767,6 +712,7 @@ impl Tree { fn get_internal_entry_from_segments( &self, + super_version: &SuperVersion, key: &[u8], seqno: SeqNo, ) -> crate::Result> { @@ -774,9 +720,7 @@ impl Tree { // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/ let key_hash = crate::segment::filter::standard_bloom::Builder::get_hash(key); - let manifest = self.manifest.read().expect("lock is poisoned"); - - for level in manifest.current_version().iter_levels() { + for level in super_version.version.iter_levels() { for run in level.iter() { // NOTE: Based on benchmarking, binary search is only worth it with ~4 segments if run.len() >= 4 { @@ -854,24 +798,21 @@ impl Tree { let bounds: (Bound, Bound) = (lo, hi); - // NOTE: Mind lock order L -> M -> S - log::trace!("range read: acquiring read locks"); - - let manifest = self.manifest.read().expect("lock is poisoned"); + let super_version = self.super_version.write().expect("lock is poisoned"); let iter_state = { - let active = self.active_memtable.read().expect("lock is poisoned"); - let sealed = &self.sealed_memtables.read().expect("lock is poisoned"); + let active = &super_version.active_memtable; + let sealed = &super_version.sealed_memtables; IterState { active: active.clone(), sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(), ephemeral, - version: manifest.current_version().clone(), + version: super_version.version.clone(), } }; - TreeIter::create_range(iter_state, bounds, seqno, &manifest) + TreeIter::create_range(iter_state, bounds, seqno, &super_version.version) } #[doc(hidden)] @@ -907,8 +848,11 @@ impl Tree { #[doc(hidden)] #[must_use] pub fn append_entry(&self, value: InternalValue) -> (u64, u64) { - let memtable_lock = self.active_memtable.read().expect("lock is poisoned"); - memtable_lock.insert(value) + self.super_version + .read() + .expect("lock is poisoned") + .active_memtable + .insert(value) } /// Recovers previous state, by loading the level manifest and segments. @@ -938,7 +882,7 @@ impl Tree { #[cfg(feature = "metrics")] let metrics = Arc::new(Metrics::default()); - let levels = Self::recover_levels( + let version = Self::recover_levels( &config.path, tree_id, &config.cache, @@ -947,18 +891,28 @@ impl Tree { &metrics, )?; - let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default(); + let highest_segment_id = version + .iter_segments() + .map(Segment::id) + .max() + .unwrap_or_default(); + + let path = config.path.clone(); let inner = TreeInner { id: tree_id, segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)), blob_file_id_generator: SequenceNumberCounter::default(), - active_memtable: Arc::default(), - sealed_memtables: Arc::default(), - manifest: Arc::new(RwLock::new(levels)), + super_version: Arc::new(RwLock::new(SuperVersion { + active_memtable: Arc::default(), + sealed_memtables: Arc::default(), + version, + })), stop_signal: StopSignal::default(), config, major_compaction_lock: RwLock::default(), + compaction_state: Arc::new(Mutex::new(CompactionState::new(path))), + #[cfg(feature = "metrics")] metrics, }; @@ -1012,12 +966,12 @@ impl Tree { cache: &Arc, descriptor_table: &Arc, #[cfg(feature = "metrics")] metrics: &Arc, - ) -> crate::Result { + ) -> crate::Result { use crate::{file::fsync_directory, file::SEGMENTS_FOLDER, SegmentId}; let tree_path = tree_path.as_ref(); - let recovery = LevelManifest::recover_ids(tree_path)?; + let recovery = recover_ids(tree_path)?; let segment_id_map = { let mut result: crate::HashMap = @@ -1138,6 +1092,6 @@ impl Tree { &recovery.blob_file_ids, )?; - LevelManifest::recover(tree_path, recovery, &segments, &blob_files) + Version::from_recovery(recovery, &segments, &blob_files) } } diff --git a/src/tree/sealed.rs b/src/tree/sealed.rs new file mode 100644 index 00000000..fa48fbad --- /dev/null +++ b/src/tree/sealed.rs @@ -0,0 +1,33 @@ +use crate::{tree::inner::MemtableId, Memtable}; +use std::sync::Arc; + +/// Stores references to all sealed memtables +/// +/// Memtable IDs are monotonically increasing, so we don't really +/// need a search tree; also there are only a handful of them at most. +#[derive(Clone, Default)] +pub struct SealedMemtables(Vec<(MemtableId, Arc)>); + +impl SealedMemtables { + /// Copy-and-writes a new list with additional Memtable. + pub fn add(&self, id: MemtableId, memtable: Arc) -> Self { + let mut copy = self.clone(); + copy.0.push((id, memtable)); + copy + } + + /// Copy-and-writes a new list with the specified Memtable removed. + pub fn remove(&self, id_to_remove: MemtableId) -> Self { + let mut copy = self.clone(); + copy.0.retain(|(id, _)| *id != id_to_remove); + copy + } + + pub fn iter(&self) -> impl DoubleEndedIterator)> { + self.0.iter() + } + + pub fn len(&self) -> usize { + self.0.len() + } +} diff --git a/src/version/mod.rs b/src/version/mod.rs index 4f47f6c6..2c27b92c 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -3,18 +3,22 @@ // (found in the LICENSE-* files in the repository) mod optimize; +pub(crate) mod recovery; pub mod run; pub use run::Run; use crate::blob_tree::{FragmentationEntry, FragmentationMap}; use crate::coding::Encode; +use crate::compaction::state::hidden_set::HiddenSet; +use crate::version::recovery::Recovery; use crate::{ vlog::{BlobFile, BlobFileId}, HashSet, KeyRange, Segment, SegmentId, SeqNo, }; use optimize::optimize_runs; use run::Ranged; +use std::path::PathBuf; use std::{collections::BTreeMap, ops::Deref, sync::Arc}; pub const DEFAULT_LEVEL_COUNT: u8 = 7; @@ -142,7 +146,7 @@ pub struct VersionInner { id: VersionId, /// The individual LSM-tree levels which consist of runs of tables - pub(crate) levels: Vec, + levels: Vec, // TODO: 3.0.0 this should really be a newtype // NOTE: We purposefully use Arc<_> to avoid deep cloning the blob files again and again @@ -190,6 +194,20 @@ impl Version { &self.gc_stats } + pub fn l0(&self) -> &Level { + self.levels.first().expect("L0 should exist") + } + + #[must_use] + pub fn level_is_busy(&self, idx: usize, hidden_set: &HiddenSet) -> bool { + self.level(idx).is_some_and(|level| { + level + .iter() + .flat_map(|run: &Arc>| run.iter()) + .any(|segment| hidden_set.is_hidden(segment.id())) + }) + } + /// Creates a new empty version. pub fn new(id: VersionId) -> Self { let levels = (0..DEFAULT_LEVEL_COUNT).map(|_| Level::empty()).collect(); @@ -205,6 +223,45 @@ impl Version { } } + pub(crate) fn from_recovery( + recovery: Recovery, + segments: &[Segment], + blob_files: &[BlobFile], + ) -> crate::Result { + let version_levels = recovery + .segment_ids + .iter() + .map(|level| { + let level_runs = level + .iter() + .map(|run| { + let run_segments = run + .iter() + .map(|segment_id| { + segments + .iter() + .find(|x| x.id() == *segment_id) + .cloned() + .ok_or(crate::Error::Unrecoverable) + }) + .collect::>>()?; + + Ok(Arc::new(Run::new(run_segments))) + }) + .collect::>>()?; + + Ok(Level::from_runs(level_runs)) + }) + .collect::>>()?; + + Ok(Self::from_levels( + recovery.curr_version_id, + version_levels, + blob_files.iter().cloned().map(|bf| (bf.id(), bf)).collect(), + recovery.gc_stats, + )) + } + /// Creates a new pre-populated version. pub fn from_levels( id: VersionId, @@ -581,4 +638,80 @@ impl Version { Ok(()) } + + pub fn fmt(&self, f: &mut std::fmt::Formatter<'_>, hidden_set: &HiddenSet) -> std::fmt::Result { + for (idx, level) in self.iter_levels().enumerate() { + writeln!( + f, + "{idx} [{}], r={}: ", + match (level.is_empty(), level.is_disjoint()) { + (true, _) => ".", + (false, true) => "D", + (false, false) => "_", + }, + level.len(), + )?; + + for run in level.iter() { + write!(f, " ")?; + + if run.len() >= 30 { + #[allow(clippy::indexing_slicing)] + for segment in run.iter().take(2) { + let id = segment.id(); + let is_hidden = hidden_set.is_hidden(id); + + write!( + f, + "{}{id}{}", + if is_hidden { "(" } else { "[" }, + if is_hidden { ")" } else { "]" }, + )?; + } + write!(f, " . . . ")?; + + #[allow(clippy::indexing_slicing)] + for segment in run.iter().rev().take(2).rev() { + let id = segment.id(); + let is_hidden = hidden_set.is_hidden(id); + + write!( + f, + "{}{id}{}", + if is_hidden { "(" } else { "[" }, + if is_hidden { ")" } else { "]" }, + )?; + } + + writeln!( + f, + " | # = {}, {} MiB", + run.len(), + run.iter().map(Segment::file_size).sum::() / 1_024 / 1_024, + )?; + } else { + for segment in run.iter() { + let id = segment.id(); + let is_hidden = hidden_set.is_hidden(id); + + write!( + f, + "{}{id}{}", + if is_hidden { "(" } else { "[" }, + if is_hidden { ")" } else { "]" }, + )?; + } + + writeln!( + f, + " | # = {}, {} MiB", + run.len(), + run.iter().map(Segment::file_size).sum::() / 1_024 / 1_024, + )?; + } + } + } + + Ok(()) + } } diff --git a/src/version/optimize.rs b/src/version/optimize.rs index 2ab53184..c031f8b7 100644 --- a/src/version/optimize.rs +++ b/src/version/optimize.rs @@ -91,7 +91,6 @@ mod tests { } #[test] - #[ignore = "fix!!!"] fn optimize_runs_two_overlap_2() { let runs = vec![ Run::new(vec![s(0, "a", "z")]), diff --git a/src/version/recovery.rs b/src/version/recovery.rs new file mode 100644 index 00000000..2627ce8d --- /dev/null +++ b/src/version/recovery.rs @@ -0,0 +1,99 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{coding::Decode, version::VersionId, vlog::BlobFileId, SegmentId}; +use byteorder::{LittleEndian, ReadBytesExt}; +use std::path::Path; + +pub fn get_current_version(folder: &std::path::Path) -> crate::Result { + use byteorder::{LittleEndian, ReadBytesExt}; + + std::fs::File::open(folder.join("current")) + .and_then(|mut f| f.read_u64::()) + .map_err(Into::into) +} + +pub struct Recovery { + pub curr_version_id: VersionId, + pub segment_ids: Vec>>, + pub blob_file_ids: Vec, + pub gc_stats: crate::blob_tree::FragmentationMap, +} + +pub fn recover_ids(folder: &Path) -> crate::Result { + let curr_version_id = get_current_version(folder)?; + let version_file_path = folder.join(format!("v{curr_version_id}")); + + log::info!( + "Recovering current manifest at {}", + version_file_path.display(), + ); + + let reader = sfa::Reader::new(&version_file_path)?; + let toc = reader.toc(); + + // // TODO: vvv move into Version::decode vvv + let mut levels = vec![]; + + { + let mut reader = toc + .section(b"tables") + .expect("tables should exist") + .buf_reader(&version_file_path)?; + + let level_count = reader.read_u8()?; + + for _ in 0..level_count { + let mut level = vec![]; + let run_count = reader.read_u8()?; + + for _ in 0..run_count { + let mut run = vec![]; + let segment_count = reader.read_u32::()?; + + for _ in 0..segment_count { + let id = reader.read_u64::()?; + run.push(id); + } + + level.push(run); + } + + levels.push(level); + } + } + + let blob_file_ids = { + let mut reader = toc + .section(b"blob_files") + .expect("blob_files should exist") + .buf_reader(&version_file_path)?; + + let blob_file_count = reader.read_u32::()?; + let mut blob_file_ids = Vec::with_capacity(blob_file_count as usize); + + for _ in 0..blob_file_count { + let id = reader.read_u64::()?; + blob_file_ids.push(id); + } + + blob_file_ids + }; + + let gc_stats = { + let mut reader = toc + .section(b"blob_gc_stats") + .expect("blob_gc_stats should exist") + .buf_reader(&version_file_path)?; + + crate::blob_tree::FragmentationMap::decode_from(&mut reader)? + }; + + Ok(Recovery { + curr_version_id, + segment_ids: levels, + blob_file_ids, + gc_stats, + }) +} diff --git a/tests/blob_compression.rs b/tests/blob_compression.rs index 2796a74f..752535ca 100644 --- a/tests/blob_compression.rs +++ b/tests/blob_compression.rs @@ -1,9 +1,9 @@ -use lsm_tree::{blob_tree::FragmentationEntry, AbstractTree, KvSeparationOptions, SeqNo}; -use test_log::test; - #[test] #[cfg(feature = "lz4")] fn blob_tree_compression() -> lsm_tree::Result<()> { + use lsm_tree::{blob_tree::FragmentationEntry, AbstractTree, KvSeparationOptions, SeqNo}; + use test_log::test; + let folder = tempfile::tempdir()?; let path = folder.path(); @@ -12,7 +12,8 @@ fn blob_tree_compression() -> lsm_tree::Result<()> { KvSeparationOptions::default() .compression(lsm_tree::CompressionType::Lz4) .separation_threshold(1) - .staleness_threshold(0.0000001), + .staleness_threshold(0.0000001) + .age_cutoff(1.0), )) .open()?; @@ -49,13 +50,7 @@ fn blob_tree_compression() -> lsm_tree::Result<()> { assert_eq!(1, tree.blob_file_count()); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!( &{ @@ -82,13 +77,7 @@ fn blob_tree_compression() -> lsm_tree::Result<()> { assert_eq!(1, tree.blob_file_count()); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!(&lsm_tree::HashMap::default(), &*gc_stats); } diff --git a/tests/blob_drop_after_flush._rs b/tests/blob_drop_after_flush._rs deleted file mode 100644 index fad0eaf3..00000000 --- a/tests/blob_drop_after_flush._rs +++ /dev/null @@ -1,51 +0,0 @@ -use lsm_tree::{config::CompressionPolicy, AbstractTree, Config, SeqNo}; -use std::time::Duration; -use test_log::test; - -// NOTE: This was a race condition in v2 that could drop a blob file -// before its corresponding segment was registered -// -// https://github.com/fjall-rs/lsm-tree/commit/a3a174ed9eff0755f671f793626d17f4ef3f5f57 -#[test] -#[ignore = "restore"] -fn blob_drop_after_flush() -> lsm_tree::Result<()> { - let folder = tempfile::tempdir()?; - - let tree = Config::new(&folder) - .data_block_compression_policy(CompressionPolicy::all(lsm_tree::CompressionType::None)) - .open_as_blob_tree()?; - - tree.insert("a", "neptune".repeat(10_000), 0); - let (id, memtable) = tree.rotate_memtable().unwrap(); - - let (segment, blob_file) = tree.flush_memtable(id, &memtable, 0).unwrap().unwrap(); - - // NOTE: Segment is now in-flight - - let gc_report = std::thread::spawn({ - let tree = tree.clone(); - - move || { - let report = tree.gc_scan_stats(1, 0)?; - Ok::<_, lsm_tree::Error>(report) - } - }); - - std::thread::sleep(Duration::from_secs(1)); - - let strategy = lsm_tree::gc::SpaceAmpStrategy::new(1.0); - tree.apply_gc_strategy(&strategy, 0)?; - - tree.register_segments(&[segment], Some(&[blob_file.unwrap()]), 0)?; - - assert_eq!( - "neptune".repeat(10_000).as_bytes(), - &*tree.get("a", SeqNo::MAX)?.unwrap(), - ); - - let report = gc_report.join().unwrap()?; - assert_eq!(0, report.stale_blobs); - assert_eq!(1, report.total_blobs); - - Ok(()) -} diff --git a/tests/blob_drop_range_gc_stats.rs b/tests/blob_drop_range_gc_stats.rs index ae8b7d73..fba6a2ab 100644 --- a/tests/blob_drop_range_gc_stats.rs +++ b/tests/blob_drop_range_gc_stats.rs @@ -30,13 +30,7 @@ fn blob_tree_drop_range_gc_stats() -> lsm_tree::Result<()> { assert_eq!(0, tree.blob_file_count()); assert_eq!(0, tree.segment_count()); - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); // "big":0 was dropped assert_eq!( diff --git a/tests/blob_gc._rs b/tests/blob_gc._rs deleted file mode 100644 index 6aa50d86..00000000 --- a/tests/blob_gc._rs +++ /dev/null @@ -1,148 +0,0 @@ -use lsm_tree::{AbstractTree, Config, SeqNo, SequenceNumberCounter}; -use test_log::test; - -#[test] -#[ignore] -fn blob_gc_1() -> lsm_tree::Result<()> { - let folder = tempfile::tempdir()?; - - let tree = Config::new(&folder).open_as_blob_tree()?; - - let seqno = SequenceNumberCounter::default(); - - tree.insert("a", "neptune".repeat(10_000), seqno.next()); - tree.insert("b", "neptune".repeat(10_000), seqno.next()); - tree.insert("c", "neptune".repeat(10_000), seqno.next()); - - tree.flush_active_memtable(0)?; - assert_eq!(1, tree.blob_file_count()); - - tree.gc_scan_stats(seqno.get(), 0)?; - - assert_eq!(1.0, tree.space_amp()); - - tree.insert("a", "a", seqno.next()); - tree.gc_scan_stats(seqno.get(), /* simulate some time has passed */ 1_000)?; - assert_eq!(1.5, tree.space_amp()); - - tree.insert("b", "b", seqno.next()); - tree.gc_scan_stats(seqno.get(), 1_000)?; - assert_eq!(3.0, tree.space_amp()); - - // NOTE: Everything is stale - tree.insert("c", "c", seqno.next()); - tree.gc_scan_stats(seqno.get(), 1_000)?; - assert_eq!(0.0, tree.space_amp()); - - tree.gc_drop_stale()?; - - assert_eq!(&*tree.get("a", SeqNo::MAX)?.unwrap(), b"a"); - assert_eq!(&*tree.get("b", SeqNo::MAX)?.unwrap(), b"b"); - assert_eq!(&*tree.get("c", SeqNo::MAX)?.unwrap(), b"c"); - assert_eq!(0, tree.blob_file_count()); - assert_eq!(0.0, tree.space_amp()); - - Ok(()) -} - -#[test] -#[ignore] -fn blob_gc_2() -> lsm_tree::Result<()> { - let folder = tempfile::tempdir()?; - - let tree = Config::new(&folder).open_as_blob_tree()?; - - let seqno = SequenceNumberCounter::default(); - - tree.insert("a", "neptune".repeat(10_000), seqno.next()); - tree.insert("b", "neptune".repeat(10_000), seqno.next()); - tree.insert("c", "neptune".repeat(10_000), seqno.next()); - - tree.flush_active_memtable(0)?; - assert_eq!(1, tree.blob_file_count()); - - tree.gc_scan_stats(seqno.get(), 0)?; - assert_eq!(1.0, tree.space_amp()); - - tree.insert("a", "a", seqno.next()); - tree.gc_scan_stats(seqno.get(), /* simulate some time has passed */ 1_000)?; - assert_eq!(1.5, tree.space_amp()); - - tree.insert("b", "b", seqno.next()); - tree.gc_scan_stats(seqno.get(), 1_000)?; - assert_eq!(3.0, tree.space_amp()); - - let strategy = lsm_tree::gc::SpaceAmpStrategy::new(1.0); - tree.apply_gc_strategy(&strategy, seqno.next())?; - - assert_eq!(&*tree.get("a", SeqNo::MAX)?.unwrap(), b"a"); - assert_eq!(&*tree.get("b", SeqNo::MAX)?.unwrap(), b"b"); - assert_eq!( - &*tree.get("c", SeqNo::MAX)?.unwrap(), - "neptune".repeat(10_000).as_bytes() - ); - assert_eq!(1, tree.blob_file_count()); - assert_eq!(1.0, tree.space_amp()); - - tree.insert("c", "c", seqno.next()); - - tree.gc_scan_stats(seqno.get(), 1_000)?; - - let strategy = lsm_tree::gc::SpaceAmpStrategy::new(1.0); - tree.apply_gc_strategy(&strategy, seqno.next())?; - assert_eq!(0, tree.blob_file_count()); - - Ok(()) -} - -#[test] -#[ignore] -fn blob_gc_3() -> lsm_tree::Result<()> { - let folder = tempfile::tempdir()?; - - let tree = Config::new(&folder).open_as_blob_tree()?; - - let seqno = SequenceNumberCounter::default(); - - tree.insert("a", "neptune".repeat(10_000), seqno.next()); - tree.insert("b", "neptune".repeat(10_000), seqno.next()); - tree.insert("c", "neptune".repeat(10_000), seqno.next()); - - tree.flush_active_memtable(0)?; - assert_eq!(1, tree.blob_file_count()); - - tree.gc_scan_stats(seqno.get(), 0)?; - assert_eq!(1.0, tree.space_amp()); - - tree.remove("a", seqno.next()); - - tree.gc_scan_stats(seqno.get(), /* simulate some time has passed */ 1_000)?; - assert_eq!(1.5, tree.space_amp()); - - tree.remove("b", seqno.next()); - tree.gc_scan_stats(seqno.get(), 1_000)?; - assert_eq!(3.0, tree.space_amp()); - - let strategy = lsm_tree::gc::SpaceAmpStrategy::new(1.0); - tree.apply_gc_strategy(&strategy, seqno.next())?; - - assert!(tree.get("a", SeqNo::MAX)?.is_none()); - assert!(tree.get("b", SeqNo::MAX)?.is_none()); - assert_eq!( - &*tree.get("c", SeqNo::MAX)?.unwrap(), - "neptune".repeat(10_000).as_bytes() - ); - assert_eq!(1, tree.blob_file_count()); - assert_eq!(1.0, tree.space_amp()); - - tree.remove("c", seqno.next()); - assert!(tree.get("c", SeqNo::MAX)?.is_none()); - - tree.gc_scan_stats(seqno.get(), 1_000)?; - - let strategy = lsm_tree::gc::SpaceAmpStrategy::new(1.0); - tree.apply_gc_strategy(&strategy, seqno.next())?; - assert_eq!(0, tree.blob_file_count()); - - Ok(()) -} diff --git a/tests/blob_gc_watermark._rs b/tests/blob_gc_watermark.rs similarity index 69% rename from tests/blob_gc_watermark._rs rename to tests/blob_gc_watermark.rs index 8c4b4f46..38d4f56a 100644 --- a/tests/blob_gc_watermark._rs +++ b/tests/blob_gc_watermark.rs @@ -1,4 +1,7 @@ -use lsm_tree::{config::CompressionPolicy, AbstractTree, Config, SeqNo, SequenceNumberCounter}; +use lsm_tree::{ + config::CompressionPolicy, AbstractTree, Config, KvSeparationOptions, SeqNo, + SequenceNumberCounter, +}; use test_log::test; // NOTE: This was a logic/MVCC error in v2 that could drop @@ -6,13 +9,17 @@ use test_log::test; // // https://github.com/fjall-rs/lsm-tree/commit/79c6ead4b955051cbb4835913e21d08b8aeafba1 #[test] -#[ignore] fn blob_gc_seqno_watermark() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let tree = Config::new(&folder) .data_block_compression_policy(CompressionPolicy::all(lsm_tree::CompressionType::None)) - .open_as_blob_tree()?; + .with_kv_separation(Some( + KvSeparationOptions::default() + .staleness_threshold(0.01) + .age_cutoff(1.0), + )) + .open()?; let seqno = SequenceNumberCounter::default(); tree.insert("a", "neptune".repeat(10_000), seqno.next()); @@ -58,18 +65,18 @@ fn blob_gc_seqno_watermark() -> lsm_tree::Result<()> { b"neptune3".repeat(10_000), ); - let report = tree.gc_scan_stats(seqno.get() + 1, 0)?; - assert_eq!(2, report.stale_blobs); - - let strategy = lsm_tree::gc::SpaceAmpStrategy::new(1.0); - tree.apply_gc_strategy(&strategy, 0)?; + tree.major_compact(u64::MAX, 0)?; + tree.major_compact(u64::MAX, 0)?; // IMPORTANT: We cannot drop any blobs yet // because the watermark is too low // // This would previously fail - let report = tree.gc_scan_stats(seqno.get() + 1, 0)?; - assert_eq!(2, report.stale_blobs); + + { + let gc_stats = tree.current_version().gc_stats().clone(); + assert_eq!(&lsm_tree::HashMap::default(), &*gc_stats); + } assert_eq!( &*tree.get("a", snapshot_seqno)?.unwrap(), @@ -80,5 +87,19 @@ fn blob_gc_seqno_watermark() -> lsm_tree::Result<()> { b"neptune3".repeat(10_000), ); + tree.major_compact(u64::MAX, 1_000)?; + + { + let gc_stats = tree.current_version().gc_stats().clone(); + assert!(!gc_stats.is_empty()); + } + + tree.major_compact(u64::MAX, 1_000)?; + + { + let gc_stats = tree.current_version().gc_stats().clone(); + assert_eq!(&lsm_tree::HashMap::default(), &*gc_stats); + } + Ok(()) } diff --git a/tests/blob_major_compact_drop_dead_files.rs b/tests/blob_major_compact_drop_dead_files.rs index c8e2414a..f25e447d 100644 --- a/tests/blob_major_compact_drop_dead_files.rs +++ b/tests/blob_major_compact_drop_dead_files.rs @@ -45,13 +45,7 @@ fn blob_tree_major_compact_drop_dead_files() -> lsm_tree::Result<()> { assert_eq!(&*value, new_big_value); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!(&lsm_tree::HashMap::default(), &*gc_stats); } @@ -64,13 +58,7 @@ fn blob_tree_major_compact_drop_dead_files() -> lsm_tree::Result<()> { assert_eq!(&*value, new_big_value); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!( &{ @@ -90,13 +78,7 @@ fn blob_tree_major_compact_drop_dead_files() -> lsm_tree::Result<()> { assert_eq!(1, tree.blob_file_count()); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!(&lsm_tree::HashMap::default(), &*gc_stats); } diff --git a/tests/blob_major_compact_gc_stats.rs b/tests/blob_major_compact_gc_stats.rs index e01f5eb4..c9d2a16d 100644 --- a/tests/blob_major_compact_gc_stats.rs +++ b/tests/blob_major_compact_gc_stats.rs @@ -37,13 +37,7 @@ fn blob_tree_major_compact_gc_stats() -> lsm_tree::Result<()> { assert_eq!(1, tree.segment_count()); assert_eq!(2, tree.blob_file_count()); - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); // "big":0 is expired assert_eq!( @@ -95,10 +89,7 @@ fn blob_tree_major_compact_gc_stats_tombstone() -> lsm_tree::Result<()> { bytes: 2 * big_value.len() as u64, len: 2, }]), - tree.manifest() - .read() - .expect("lock is poisoned") - .current_version() + tree.current_version() .iter_segments() .nth(1) .unwrap() @@ -111,13 +102,7 @@ fn blob_tree_major_compact_gc_stats_tombstone() -> lsm_tree::Result<()> { assert_eq!(1, tree.segment_count()); assert_eq!(1, tree.blob_file_count()); - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); // "big":0 is expired assert_eq!( @@ -135,10 +120,7 @@ fn blob_tree_major_compact_gc_stats_tombstone() -> lsm_tree::Result<()> { bytes: big_value.len() as u64, len: 1, }]), - tree.manifest() - .read() - .expect("lock is poisoned") - .current_version() + tree.current_version() .iter_segments() .next() .unwrap() diff --git a/tests/blob_major_compact_relink.rs b/tests/blob_major_compact_relink.rs index c917dff7..d4b3d9d9 100644 --- a/tests/blob_major_compact_relink.rs +++ b/tests/blob_major_compact_relink.rs @@ -30,10 +30,7 @@ fn blob_tree_major_compact_relink() -> lsm_tree::Result<()> { bytes: big_value.len() as u64, len: 1, }]), - tree.manifest() - .read() - .expect("lock is poisoned") - .current_version() + tree.current_version() .iter_segments() .next() .unwrap() @@ -52,10 +49,7 @@ fn blob_tree_major_compact_relink() -> lsm_tree::Result<()> { bytes: big_value.len() as u64, len: 1, }]), - tree.manifest() - .read() - .expect("lock is poisoned") - .current_version() + tree.current_version() .iter_segments() .next() .unwrap() diff --git a/tests/blob_major_compact_relocation.rs b/tests/blob_major_compact_relocation.rs index 9cd32c8b..a6d07221 100644 --- a/tests/blob_major_compact_relocation.rs +++ b/tests/blob_major_compact_relocation.rs @@ -1,4 +1,4 @@ -use lsm_tree::{blob_tree::FragmentationEntry, AbstractTree, SeqNo}; +use lsm_tree::{blob_tree::FragmentationEntry, AbstractTree, KvSeparationOptions, SeqNo}; use test_log::test; #[test] @@ -11,7 +11,7 @@ fn blob_tree_major_compact_relocation_simple() -> lsm_tree::Result<()> { { let tree = lsm_tree::Config::new(path) - .with_kv_separation(Some(Default::default())) + .with_kv_separation(Some(KvSeparationOptions::default().age_cutoff(1.0))) .open()?; assert!(tree.get("big", SeqNo::MAX)?.is_none()); @@ -53,13 +53,7 @@ fn blob_tree_major_compact_relocation_simple() -> lsm_tree::Result<()> { assert_eq!(&*value, b"smol"); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); // "big":0 is expired assert_eq!( @@ -77,13 +71,7 @@ fn blob_tree_major_compact_relocation_simple() -> lsm_tree::Result<()> { assert_eq!(2, tree.blob_file_count()); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!(&lsm_tree::HashMap::default(), &*gc_stats); } @@ -109,7 +97,7 @@ fn blob_tree_major_compact_relocation_repeated_key() -> lsm_tree::Result<()> { { let tree = lsm_tree::Config::new(path) - .with_kv_separation(Some(Default::default())) + .with_kv_separation(Some(KvSeparationOptions::default().age_cutoff(1.0))) .open()?; assert!(tree.get("big", SeqNo::MAX)?.is_none()); @@ -167,13 +155,7 @@ fn blob_tree_major_compact_relocation_repeated_key() -> lsm_tree::Result<()> { assert_eq!(&*value, big_value); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!( &{ @@ -190,13 +172,7 @@ fn blob_tree_major_compact_relocation_repeated_key() -> lsm_tree::Result<()> { assert_eq!(1, tree.blob_file_count()); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!(&lsm_tree::HashMap::default(), &*gc_stats); } @@ -225,7 +201,7 @@ fn blob_tree_major_compact_relocation_interleaved() -> lsm_tree::Result<()> { { let tree = lsm_tree::Config::new(path) - .with_kv_separation(Some(Default::default())) + .with_kv_separation(Some(KvSeparationOptions::default().age_cutoff(1.0))) .open()?; assert!(tree.get("big", SeqNo::MAX)?.is_none()); @@ -282,13 +258,7 @@ fn blob_tree_major_compact_relocation_interleaved() -> lsm_tree::Result<()> { let value = tree.get("e", SeqNo::MAX)?.expect("should exist"); assert_eq!(&*value, b"smol"); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!( &{ @@ -305,13 +275,7 @@ fn blob_tree_major_compact_relocation_interleaved() -> lsm_tree::Result<()> { assert_eq!(1, tree.blob_file_count()); { - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); assert_eq!(&lsm_tree::HashMap::default(), &*gc_stats); } diff --git a/tests/blob_recover_gc_stats.rs b/tests/blob_recover_gc_stats.rs index e0a9695b..558532ca 100644 --- a/tests/blob_recover_gc_stats.rs +++ b/tests/blob_recover_gc_stats.rs @@ -32,13 +32,7 @@ fn blob_tree_recover_gc_stats() -> lsm_tree::Result<()> { tree.major_compact(64_000_000, 1_000)?; - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); // "big":0 is expired assert_eq!( @@ -56,13 +50,7 @@ fn blob_tree_recover_gc_stats() -> lsm_tree::Result<()> { .with_kv_separation(Some(Default::default())) .open()?; - let gc_stats = tree - .manifest() - .read() - .expect("lock is poisoned") - .current_version() - .gc_stats() - .clone(); + let gc_stats = tree.current_version().gc_stats().clone(); // "big":0 is still expired assert_eq!( diff --git a/tests/blob_sep_threshold.rs b/tests/blob_sep_threshold.rs index ab643d91..fb537053 100644 --- a/tests/blob_sep_threshold.rs +++ b/tests/blob_sep_threshold.rs @@ -2,7 +2,6 @@ use lsm_tree::{AbstractTree, KvSeparationOptions, SeqNo}; use test_log::test; #[test] -#[ignore] fn blob_tree_separation_threshold() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let path = folder.path(); diff --git a/tests/blob_simple.rs b/tests/blob_simple.rs index 8557ed1d..d51eb03f 100644 --- a/tests/blob_simple.rs +++ b/tests/blob_simple.rs @@ -63,47 +63,3 @@ fn blob_tree_simple_flush_read() -> lsm_tree::Result<()> { Ok(()) } - -#[cfg(feature = "lz4")] -#[test] -#[ignore = "wip"] -fn blob_tree_simple_compressed() -> lsm_tree::Result<()> { - todo!() - - // let folder = tempfile::tempdir()?; - // let path = folder.path(); - - // let tree = lsm_tree::Config::new(path) - // .compression(lsm_tree::CompressionType::Lz4) - // .open_as_blob_tree()?; - - // let big_value = b"neptune!".repeat(128_000); - - // assert!(tree.get("big", SeqNo::MAX)?.is_none()); - // tree.insert("big", &big_value, 0); - // tree.insert("smol", "small value", 0); - - // let value = tree.get("big", SeqNo::MAX)?.expect("should exist"); - // assert_eq!(&*value, big_value); - - // tree.flush_active_memtable(0)?; - - // let value = tree.get("big", SeqNo::MAX)?.expect("should exist"); - // assert_eq!(&*value, big_value); - - // let value = tree.get("smol", SeqNo::MAX)?.expect("should exist"); - // assert_eq!(&*value, b"small value"); - - // let new_big_value = b"winter!".repeat(128_000); - // tree.insert("big", &new_big_value, 1); - - // let value = tree.get("big", SeqNo::MAX)?.expect("should exist"); - // assert_eq!(&*value, new_big_value); - - // tree.flush_active_memtable(0)?; - - // let value = tree.get("big", SeqNo::MAX)?.expect("should exist"); - // assert_eq!(&*value, new_big_value); - - // Ok(()) -} diff --git a/tests/blob_tombstone._rs b/tests/blob_tombstone._rs deleted file mode 100644 index 6f767808..00000000 --- a/tests/blob_tombstone._rs +++ /dev/null @@ -1,43 +0,0 @@ -use lsm_tree::{AbstractTree, SeqNo}; -use test_log::test; - -#[test] -#[ignore] -fn blob_tree_tombstone() -> lsm_tree::Result<()> { - let folder = tempfile::tempdir()?; - let path = folder.path(); - - let tree = lsm_tree::Config::new(path).open_as_blob_tree()?; - - let big_value = b"neptune!".repeat(128_000); - - tree.insert("a", &big_value, 0); - tree.insert("b", &big_value, 0); - tree.insert("c", &big_value, 0); - assert_eq!(3, tree.len(SeqNo::MAX, None)?); - - tree.flush_active_memtable(0)?; - assert_eq!(3, tree.len(SeqNo::MAX, None)?); - - tree.remove("b", 1); - assert_eq!(2, tree.len(SeqNo::MAX, None)?); - - tree.flush_active_memtable(0)?; - assert_eq!(2, tree.len(SeqNo::MAX, None)?); - - assert_eq!(&*tree.get("a", SeqNo::MAX)?.unwrap(), big_value); - assert!(tree.get("b", SeqNo::MAX)?.is_none()); - assert_eq!(&*tree.get("c", SeqNo::MAX)?.unwrap(), big_value); - - tree.gc_scan_stats(2, 0)?; - - let strategy = lsm_tree::gc::StaleThresholdStrategy::new(0.01); - tree.apply_gc_strategy(&strategy, 2)?; - assert_eq!(2, tree.len(SeqNo::MAX, None)?); - - assert_eq!(&*tree.get("a", SeqNo::MAX)?.unwrap(), big_value); - assert!(tree.get("b", SeqNo::MAX)?.is_none()); - assert_eq!(&*tree.get("c", SeqNo::MAX)?.unwrap(), big_value); - - Ok(()) -} diff --git a/tests/blob_tree_flush._rs b/tests/blob_tree_flush._rs deleted file mode 100644 index 1fa4792c..00000000 --- a/tests/blob_tree_flush._rs +++ /dev/null @@ -1,36 +0,0 @@ -use lsm_tree::{AbstractTree, Config, SequenceNumberCounter}; -use test_log::test; - -#[test] -#[ignore] -fn blob_gc_flush_tombstone() -> lsm_tree::Result<()> { - let folder = tempfile::tempdir()?; - - let tree = Config::new(&folder).open_as_blob_tree()?; - - let seqno = SequenceNumberCounter::default(); - - tree.insert("a", "neptune".repeat(10_000), seqno.next()); - tree.insert("b", "neptune".repeat(10_000), seqno.next()); - tree.flush_active_memtable(0)?; - - tree.remove("b", seqno.next()); - - tree.gc_scan_stats(seqno.get(), /* simulate some time has passed */ 1_000)?; - assert_eq!(2.0, tree.space_amp()); - - let strategy = lsm_tree::gc::SpaceAmpStrategy::new(1.0); - tree.apply_gc_strategy(&strategy, seqno.next())?; - assert_eq!(1, tree.blob_file_count()); - - tree.gc_scan_stats(seqno.get(), 1_000)?; - assert_eq!(1.0, tree.space_amp()); - - tree.flush_active_memtable(0)?; - assert_eq!(1, tree.blob_file_count()); - - tree.gc_scan_stats(seqno.get(), 1_000)?; - assert_eq!(1.0, tree.space_amp()); - - Ok(()) -} diff --git a/tests/experimental_blob_tree_guarded_size.rs b/tests/blob_tree_guarded_size.rs similarity index 83% rename from tests/experimental_blob_tree_guarded_size.rs rename to tests/blob_tree_guarded_size.rs index df7dbaff..634301cc 100644 --- a/tests/experimental_blob_tree_guarded_size.rs +++ b/tests/blob_tree_guarded_size.rs @@ -2,8 +2,7 @@ use lsm_tree::{AbstractTree, Config, Guard, SeqNo}; use test_log::test; #[test] -#[ignore = "restore"] -fn experimental_blob_tree_guarded_size() -> lsm_tree::Result<()> { +fn blob_tree_guarded_size() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let tree = Config::new(folder) diff --git a/tests/blob_tree_reload_blob.rs b/tests/blob_tree_reload_blob.rs index 784eb115..7ed5ccd4 100644 --- a/tests/blob_tree_reload_blob.rs +++ b/tests/blob_tree_reload_blob.rs @@ -4,7 +4,6 @@ use test_log::test; const ITEM_COUNT: usize = 10_000; #[test] -#[ignore] fn blob_tree_reload_empty() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; @@ -68,7 +67,6 @@ fn blob_tree_reload_empty() -> lsm_tree::Result<()> { } #[test] -#[ignore] fn blob_tree_reload() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; diff --git a/tests/compaction_readers_grouping.rs b/tests/compaction_readers_grouping.rs index dd7d5ea0..79c71b28 100644 --- a/tests/compaction_readers_grouping.rs +++ b/tests/compaction_readers_grouping.rs @@ -2,6 +2,7 @@ use lsm_tree::{AbstractTree, Config, SeqNo, SequenceNumberCounter}; use std::sync::Arc; use test_log::test; +/// NOTE: Fix: https://github.com/fjall-rs/lsm-tree/commit/66a974ae6748646a40df475c291e04cf1dfbaece #[test] #[ignore] fn compaction_readers_grouping() -> lsm_tree::Result<()> { @@ -37,36 +38,24 @@ fn compaction_readers_grouping() -> lsm_tree::Result<()> { tree.compact(Arc::new(lsm_tree::compaction::PullDown(2, 3)), 0)?; assert!(!tree - .manifest() - .read() - .expect("asdasd") .current_version() .level(0) .expect("level should exist") .is_empty()); assert!(tree - .manifest() - .read() - .expect("asdasd") .current_version() .level(1) .expect("level should exist") .is_empty()); assert!(tree - .manifest() - .read() - .expect("asdasd") .current_version() .level(2) .expect("level should exist") .is_empty()); assert!(!tree - .manifest() - .read() - .expect("asdasd") .current_version() .level(3) .expect("level should exist") diff --git a/tests/multi_trees.rs b/tests/multi_trees.rs index 47dff0ec..5f94c245 100644 --- a/tests/multi_trees.rs +++ b/tests/multi_trees.rs @@ -19,9 +19,6 @@ fn tree_multi_segment_ids() -> lsm_tree::Result<()> { assert_eq!( 0, tree0 - .manifest() - .read() - .expect("lock is poisoned") .current_version() .level(0) .expect("level should exist") @@ -46,9 +43,6 @@ fn tree_multi_segment_ids() -> lsm_tree::Result<()> { assert_eq!( 0, tree1 - .manifest() - .read() - .expect("lock is poisoned") .current_version() .level(0) .expect("level should exist") diff --git a/tests/mvcc_slab.rs b/tests/mvcc_slab.rs index 51ce4638..77aecbf3 100644 --- a/tests/mvcc_slab.rs +++ b/tests/mvcc_slab.rs @@ -21,10 +21,9 @@ fn segment_reader_mvcc_slab() -> lsm_tree::Result<()> { tree.flush_active_memtable(0)?; - let level_manifest = tree.manifest().read().expect("lock is poisoned"); + let version = tree.current_version(); - let segment = level_manifest - .current_version() + let segment = version .level(0) .expect("level should exist") .first() @@ -59,10 +58,9 @@ fn segment_reader_mvcc_slab_blob() -> lsm_tree::Result<()> { tree.flush_active_memtable(0)?; - let level_manifest = tree.manifest().read().expect("lock is poisoned"); + let version = tree.current_version(); - let segment = level_manifest - .current_version() + let segment = version .level(0) .expect("level should exist") .first() diff --git a/tests/open_files.rs b/tests/open_files.rs deleted file mode 100644 index e4466387..00000000 --- a/tests/open_files.rs +++ /dev/null @@ -1,26 +0,0 @@ -use lsm_tree::{AbstractTree, Cache, Config, SeqNo}; -use std::sync::Arc; -use test_log::test; - -#[test] -#[ignore = "this is a sanity check test, but the data it writes is impossible, so the range scan first_key_value is doing is crashing as of 2.1.1 lol"] -fn open_file_limit() -> lsm_tree::Result<()> { - std::fs::create_dir_all(".test_open_files")?; - let folder = tempfile::tempdir_in(".test_open_files")?; - - let tree = Config::new(folder) - .use_cache(Arc::new(Cache::with_capacity_bytes(0))) - .open()?; - - for _ in 0..2_048 { - let key = 0u64.to_be_bytes(); - tree.insert(key, key, 0); - tree.flush_active_memtable(0)?; - } - - for _ in 0..5 { - assert!(tree.first_key_value(SeqNo::MAX, None)?.is_some()); - } - - Ok(()) -} diff --git a/tests/tree_disjoint_point_read.rs b/tests/tree_disjoint_point_read.rs index a369c24b..8d799263 100644 --- a/tests/tree_disjoint_point_read.rs +++ b/tests/tree_disjoint_point_read.rs @@ -84,16 +84,7 @@ fn tree_disjoint_point_read_multiple_levels() -> lsm_tree::Result<()> { tree.flush_active_memtable(0)?; tree.compact(Arc::new(lsm_tree::compaction::SizeTiered::new(10, 8)), 1)?; - assert_eq!( - 1, - tree.manifest() - .read() - .expect("asdasd") - .current_version() - .level(1) - .unwrap() - .len() - ); + assert_eq!(1, tree.current_version().level(1).unwrap().len()); tree.insert("e", "e", 0); tree.flush_active_memtable(0)?; @@ -140,16 +131,7 @@ fn tree_disjoint_point_read_multiple_levels_blob() -> lsm_tree::Result<()> { tree.flush_active_memtable(0)?; tree.compact(Arc::new(lsm_tree::compaction::SizeTiered::new(10, 8)), 1)?; - assert_eq!( - 1, - tree.manifest() - .read() - .expect("asdasd") - .current_version() - .level(1) - .unwrap() - .len() - ); + assert_eq!(1, tree.current_version().level(1).unwrap().len()); tree.insert("e", "e", 0); tree.flush_active_memtable(0)?; diff --git a/tests/tree_flush_eviction.rs b/tests/tree_flush_eviction.rs index a1ed93c7..95ea31f4 100644 --- a/tests/tree_flush_eviction.rs +++ b/tests/tree_flush_eviction.rs @@ -84,10 +84,7 @@ fn tree_flush_eviction_4() -> lsm_tree::Result<()> { assert_eq!(1, tree.len(SeqNo::MAX, None)?); assert_eq!( 1, - tree.manifest() - .read() - .expect("lock is poisoned") - .current_version() + tree.current_version() .level(0) .expect("should exist") .first() @@ -104,10 +101,7 @@ fn tree_flush_eviction_4() -> lsm_tree::Result<()> { assert_eq!(1, tree.len(SeqNo::MAX, None)?); assert_eq!( 0, - tree.manifest() - .read() - .expect("lock is poisoned") - .current_version() + tree.current_version() .level(6) .expect("should exist") .first() diff --git a/tests/experimental_tree_guarded_range.rs b/tests/tree_guarded_range.rs similarity index 91% rename from tests/experimental_tree_guarded_range.rs rename to tests/tree_guarded_range.rs index 0b18937d..a359828e 100644 --- a/tests/experimental_tree_guarded_range.rs +++ b/tests/tree_guarded_range.rs @@ -2,7 +2,7 @@ use lsm_tree::{AbstractTree, Config, Guard, SeqNo}; use test_log::test; #[test] -fn experimental_tree_guarded_range() -> lsm_tree::Result<()> { +fn tree_guarded_range() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let tree = Config::new(folder).open()?; @@ -32,8 +32,7 @@ fn experimental_tree_guarded_range() -> lsm_tree::Result<()> { } #[test] -#[ignore = "restore"] -fn experimental_blob_tree_guarded_range() -> lsm_tree::Result<()> { +fn blob_tree_guarded_range() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let tree = Config::new(folder) diff --git a/tests/tree_range.rs b/tests/tree_range.rs index 8375e24c..21c18128 100644 --- a/tests/tree_range.rs +++ b/tests/tree_range.rs @@ -62,7 +62,6 @@ fn tree_range_count() -> lsm_tree::Result<()> { } #[test] -#[ignore = "restore"] fn blob_tree_range_count() -> lsm_tree::Result<()> { use std::ops::Bound::{self, Excluded, Unbounded};