diff --git a/src/abstract.rs b/src/abstract.rs index d9685605..1954e657 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -78,10 +78,14 @@ pub trait AbstractTree { /// Drops segments that are fully contained in a given range. /// + /// Accepts any `RangeBounds`, including unbounded or exclusive endpoints. + /// If the normalized lower bound is greater than the upper bound, the + /// method returns without performing any work. + /// /// # Errors /// - /// Will return `Err` if an IO error occurs. - fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()>; + /// Will return `Err` only if an IO error occurs during compaction. + fn drop_range, R: RangeBounds>(&self, range: R) -> crate::Result<()>; /// Performs major compaction, blocking the caller until it's done. /// diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index d9d525aa..8c0db395 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -432,8 +432,8 @@ impl AbstractTree for BlobTree { self.index.tombstone_count() } - fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()> { - self.index.drop_range(key_range) + fn drop_range, R: RangeBounds>(&self, range: R) -> crate::Result<()> { + self.index.drop_range(range) } fn ingest( diff --git a/src/compaction/drop_range.rs b/src/compaction/drop_range.rs index 3e71c8a0..7374804a 100644 --- a/src/compaction/drop_range.rs +++ b/src/compaction/drop_range.rs @@ -3,24 +3,68 @@ // (found in the LICENSE-* files in the repository) use super::{Choice, CompactionStrategy}; -use crate::{config::Config, level_manifest::LevelManifest, KeyRange}; +use crate::{ + config::Config, level_manifest::LevelManifest, slice::Slice, version::run::Ranged, KeyRange, +}; use crate::{HashSet, Segment}; +use std::ops::{Bound, RangeBounds}; + +#[derive(Clone, Debug)] +pub struct OwnedBounds { + pub start: Bound, + pub end: Bound, +} + +impl RangeBounds for OwnedBounds { + fn start_bound(&self) -> Bound<&Slice> { + match &self.start { + Bound::Unbounded => Bound::Unbounded, + Bound::Included(key) => Bound::Included(key), + Bound::Excluded(key) => Bound::Excluded(key), + } + } + + fn end_bound(&self) -> Bound<&Slice> { + match &self.end { + Bound::Unbounded => Bound::Unbounded, + Bound::Included(key) => Bound::Included(key), + Bound::Excluded(key) => Bound::Excluded(key), + } + } +} + +impl OwnedBounds { + #[must_use] + pub fn contains(&self, range: &KeyRange) -> bool { + let lower_ok = match &self.start { + Bound::Unbounded => true, + Bound::Included(key) => key.as_ref() <= range.min().as_ref(), + Bound::Excluded(key) => key.as_ref() < range.min().as_ref(), + }; + + if !lower_ok { + return false; + } + + match &self.end { + Bound::Unbounded => true, + Bound::Included(key) => key.as_ref() >= range.max().as_ref(), + Bound::Excluded(key) => key.as_ref() > range.max().as_ref(), + } + } +} /// Drops all segments that are **contained** in a key range pub struct Strategy { - key_range: KeyRange, + bounds: OwnedBounds, } impl Strategy { /// Configures a new `DropRange` compaction strategy. - /// - /// # Panics - /// - /// Panics, if `target_size` is below 1024 bytes. #[must_use] #[allow(dead_code)] - pub fn new(key_range: KeyRange) -> Self { - Self { key_range } + pub fn new(bounds: OwnedBounds) -> Self { + Self { bounds } } } @@ -34,7 +78,13 @@ impl CompactionStrategy for Strategy { .current_version() .iter_levels() .flat_map(|lvl| lvl.iter()) - .flat_map(|run| run.get_contained(&self.key_range)) + .flat_map(|run| { + run.range_overlap_indexes(&self.bounds) + .and_then(|(lo, hi)| run.get(lo..=hi)) + .unwrap_or_default() + .iter() + .filter(|x| self.bounds.contains(x.key_range())) + }) .map(Segment::id) .collect(); diff --git a/src/tree/mod.rs b/src/tree/mod.rs index ec583088..38935ab9 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -7,7 +7,7 @@ pub mod inner; use crate::{ coding::{Decode, Encode}, - compaction::CompactionStrategy, + compaction::{drop_range::OwnedBounds, CompactionStrategy}, config::Config, file::BLOBS_FOLDER, format_version::FormatVersion, @@ -16,6 +16,7 @@ use crate::{ manifest::Manifest, memtable::Memtable, segment::Segment, + slice::Slice, value::InternalValue, vlog::BlobFile, AbstractTree, Cache, DescriptorTable, KvPair, SegmentId, SeqNo, SequenceNumberCounter, UserKey, @@ -24,7 +25,7 @@ use crate::{ use inner::{MemtableId, SealedMemtables, TreeId, TreeInner}; use std::{ io::Cursor, - ops::RangeBounds, + ops::{Bound, RangeBounds}, path::Path, sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; @@ -168,9 +169,14 @@ impl AbstractTree for Tree { Ok(()) } - // TODO: change API to RangeBounds - fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()> { - let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(key_range)); + fn drop_range, R: RangeBounds>(&self, range: R) -> crate::Result<()> { + let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range)?; + + if is_empty { + return Ok(()); + } + + let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds)); // IMPORTANT: Write lock so we can be the only compaction going on let _lock = self @@ -549,6 +555,46 @@ impl AbstractTree for Tree { } impl Tree { + /// Normalizes a user-provided range into owned `Bound` values. + /// + /// Returns a tuple containing: + /// - the `OwnedBounds` that mirror the original bounds semantics (including + /// inclusive/exclusive markers and unbounded endpoints), and + /// - a `bool` flag indicating whether the normalized range is logically + /// empty (e.g., when the lower bound is greater than the upper bound). + /// + /// Callers can use the flag to detect empty ranges and skip further work + /// while still having access to the normalized bounds for non-empty cases. + fn range_bounds_to_owned_bounds, R: RangeBounds>( + range: &R, + ) -> crate::Result<(OwnedBounds, bool)> { + use Bound::{Excluded, Included, Unbounded}; + + let start = match range.start_bound() { + Included(key) => Included(Slice::from(key.as_ref())), + Excluded(key) => Excluded(Slice::from(key.as_ref())), + Unbounded => Unbounded, + }; + + let end = match range.end_bound() { + Included(key) => Included(Slice::from(key.as_ref())), + Excluded(key) => Excluded(Slice::from(key.as_ref())), + Unbounded => Unbounded, + }; + + let is_empty = if let (Included(lo), Included(hi)) + | (Included(lo), Excluded(hi)) + | (Excluded(lo), Included(hi)) + | (Excluded(lo), Excluded(hi)) = (&start, &end) + { + lo.as_ref() > hi.as_ref() + } else { + false + }; + + Ok((OwnedBounds { start, end }, is_empty)) + } + /// Opens an LSM-tree in the given directory. /// /// Will recover previous state if the folder was previously diff --git a/tests/tree_drop_range.rs b/tests/tree_drop_range.rs index a4f295b6..11cffb65 100644 --- a/tests/tree_drop_range.rs +++ b/tests/tree_drop_range.rs @@ -1,31 +1,210 @@ -use lsm_tree::{AbstractTree, Config, KeyRange, SeqNo, UserKey}; -use test_log::test; +use lsm_tree::{AbstractTree, Config, SeqNo, Tree}; +use std::ops::Bound::{Excluded, Included, Unbounded}; + +fn populate_segments(tree: &Tree) -> lsm_tree::Result<()> { + for key in 'a'..='e' { + tree.insert([key as u8], "", 0); + tree.flush_active_memtable(0)?; + } + Ok(()) +} + +#[test] +fn tree_drop_range_basic() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + populate_segments(&tree)?; + + assert_eq!(1, tree.l0_run_count()); + assert_eq!(5, tree.segment_count()); + + tree.drop_range("a"..="c")?; + + assert!(!tree.contains_key("a", SeqNo::MAX)?); + assert!(!tree.contains_key("b", SeqNo::MAX)?); + assert!(!tree.contains_key("c", SeqNo::MAX)?); + assert!(tree.contains_key("d", SeqNo::MAX)?); + assert!(tree.contains_key("e", SeqNo::MAX)?); + + assert_eq!(1, tree.l0_run_count()); + assert_eq!(2, tree.segment_count()); + + Ok(()) +} + +#[test] +fn tree_drop_range_partial_segment_overlap_kept() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + for key in ['a', 'b', 'c', 'd', 'e'] { + tree.insert([key as u8], "", 0); + } + tree.flush_active_memtable(0)?; + + assert_eq!(1, tree.l0_run_count()); + assert_eq!(1, tree.segment_count()); + + tree.drop_range("b".."d")?; + + for key in ['a', 'b', 'c', 'd', 'e'] { + assert!(tree.contains_key([key as u8], SeqNo::MAX)?); + } + + assert_eq!(1, tree.l0_run_count()); + assert_eq!(1, tree.segment_count()); + + Ok(()) +} + +#[test] +fn tree_drop_range_upper_exclusive() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + populate_segments(&tree)?; + + tree.drop_range("a".."d")?; + + assert!(!tree.contains_key("a", SeqNo::MAX)?); + assert!(!tree.contains_key("b", SeqNo::MAX)?); + assert!(!tree.contains_key("c", SeqNo::MAX)?); + assert!(tree.contains_key("d", SeqNo::MAX)?); + assert!(tree.contains_key("e", SeqNo::MAX)?); + + Ok(()) +} + +#[test] +fn tree_drop_range_lower_exclusive() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + populate_segments(&tree)?; + + tree.drop_range::<&str, _>((Excluded("a"), Included("c")))?; + + assert!(tree.contains_key("a", SeqNo::MAX)?); + assert!(!tree.contains_key("b", SeqNo::MAX)?); + assert!(!tree.contains_key("c", SeqNo::MAX)?); + assert!(tree.contains_key("d", SeqNo::MAX)?); + + Ok(()) +} + +#[test] +fn tree_drop_range_unbounded_lower_inclusive_upper() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + populate_segments(&tree)?; + + tree.drop_range::<&str, _>((Unbounded, Included("c")))?; + + assert!(!tree.contains_key("a", SeqNo::MAX)?); + assert!(!tree.contains_key("b", SeqNo::MAX)?); + assert!(!tree.contains_key("c", SeqNo::MAX)?); + assert!(tree.contains_key("d", SeqNo::MAX)?); + assert!(tree.contains_key("e", SeqNo::MAX)?); + + Ok(()) +} + +#[test] +fn tree_drop_range_unbounded_lower_exclusive_upper() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + populate_segments(&tree)?; + + tree.drop_range::<&str, _>((Unbounded, Excluded("d")))?; + + assert!(!tree.contains_key("a", SeqNo::MAX)?); + assert!(!tree.contains_key("b", SeqNo::MAX)?); + assert!(!tree.contains_key("c", SeqNo::MAX)?); + assert!(tree.contains_key("d", SeqNo::MAX)?); + + Ok(()) +} + +#[test] +fn tree_drop_range_exclusive_empty_interval() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + populate_segments(&tree)?; + + tree.drop_range::<&str, _>((Excluded("b"), Excluded("b")))?; + + assert!(tree.contains_key("a", SeqNo::MAX)?); + assert!(tree.contains_key("b", SeqNo::MAX)?); + assert!(tree.contains_key("c", SeqNo::MAX)?); + + Ok(()) +} + +#[test] +fn tree_drop_range_empty_tree() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + tree.drop_range("a"..="c")?; + + assert_eq!(0, tree.l0_run_count()); + assert_eq!(0, tree.segment_count()); + + Ok(()) +} + +#[test] +fn tree_drop_range_unbounded_upper() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; + + populate_segments(&tree)?; + + tree.drop_range("c"..)?; + + assert!(tree.contains_key("a", SeqNo::MAX)?); + assert!(tree.contains_key("b", SeqNo::MAX)?); + assert!(!tree.contains_key("c", SeqNo::MAX)?); + assert!(!tree.contains_key("d", SeqNo::MAX)?); + assert!(!tree.contains_key("e", SeqNo::MAX)?); + + Ok(()) +} #[test] -fn tree_drop_range() -> lsm_tree::Result<()> { +fn tree_drop_range_clear_all() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; - { - let tree = Config::new(&folder).open()?; + populate_segments(&tree)?; - for key in 'a'..='e' { - tree.insert([key as u8], "", 0); - tree.flush_active_memtable(0)?; - } + tree.drop_range::<&str, _>(..)?; - assert_eq!(1, tree.l0_run_count()); - assert_eq!(5, tree.segment_count()); + assert_eq!(0, tree.l0_run_count()); + assert_eq!(0, tree.segment_count()); + assert!(!tree.contains_key("a", SeqNo::MAX)?); + assert!(!tree.contains_key("e", SeqNo::MAX)?); + + Ok(()) +} + +#[test] +fn tree_drop_range_inverted_bounds_is_noop() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder).open()?; - tree.drop_range(KeyRange::new((UserKey::from("a"), UserKey::from("c"))))?; + populate_segments(&tree)?; - assert!(!tree.contains_key("a", SeqNo::MAX)?); - assert!(!tree.contains_key("b", SeqNo::MAX)?); - assert!(!tree.contains_key("c", SeqNo::MAX)?); - assert!(tree.contains_key("d", SeqNo::MAX)?); - assert!(tree.contains_key("e", SeqNo::MAX)?); + tree.drop_range("c".."a")?; + tree.drop_range("c"..="a")?; - assert_eq!(1, tree.l0_run_count()); - assert_eq!(2, tree.segment_count()); + // All keys remain because the range is treated as empty. + for key in 'a'..='e' { + assert!(tree.contains_key([key as u8], SeqNo::MAX)?); } Ok(())