Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

use crate::{
blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType,
iter_guard::IterGuardImpl, level_manifest::LevelManifest, segment::Segment,
tree::inner::MemtableId, vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue,
KvPair, Memtable, SegmentId, SeqNo, SequenceNumberCounter, Tree, TreeId, UserKey, UserValue,
iter_guard::IterGuardImpl, segment::Segment, tree::inner::MemtableId, version::Version,
vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SegmentId,
SeqNo, SequenceNumberCounter, Tree, TreeId, UserKey, UserValue,
};
use enum_dispatch::enum_dispatch;
use std::{
ops::RangeBounds,
sync::{Arc, RwLock, RwLockWriteGuard},
};
use std::{ops::RangeBounds, sync::Arc};

pub type RangeItem = crate::Result<KvPair>;

Expand All @@ -30,7 +27,7 @@ pub trait AbstractTree {
fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;

#[doc(hidden)]
fn manifest(&self) -> &Arc<RwLock<LevelManifest>>;
fn current_version(&self) -> Version;

/// Synchronously flushes the active memtable to a disk segment.
///
Expand Down Expand Up @@ -143,10 +140,6 @@ pub trait AbstractTree {
#[cfg(feature = "metrics")]
fn metrics(&self) -> &Arc<crate::Metrics>;

// TODO:?
/* #[doc(hidden)]
fn verify(&self) -> crate::Result<usize>; */

/// Synchronously flushes a memtable to a disk segment.
///
/// This method will not make the segment immediately available,
Expand Down Expand Up @@ -176,9 +169,6 @@ pub trait AbstractTree {
seqno_threshold: SeqNo,
) -> crate::Result<()>;

/// Write-locks the active memtable for exclusive access
fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc<Memtable>>;

/// Clears the active memtable atomically.
fn clear_active_memtable(&self);

Expand Down
95 changes: 33 additions & 62 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,20 @@ use crate::{
compaction::stream::CompactionStream,
file::{fsync_directory, BLOBS_FOLDER},
iter_guard::{IterGuard, IterGuardImpl},
level_manifest::LevelManifest,
r#abstract::{AbstractTree, RangeItem},
segment::Segment,
tree::inner::MemtableId,
value::InternalValue,
vlog::{Accessor, BlobFile, BlobFileId, BlobFileWriter, ValueHandle},
version::Version,
vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue,
};
use handle::BlobIndirection;
use std::{
collections::BTreeMap,
io::Cursor,
ops::RangeBounds,
path::PathBuf,
sync::{Arc, RwLock},
};
use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};

pub struct Guard<'a> {
blob_tree: &'a BlobTree,
vlog: Arc<BTreeMap<BlobFileId, BlobFile>>,
version: Version,
kv: crate::Result<InternalValue>,
}

Expand All @@ -42,26 +36,30 @@ impl IterGuard for Guard<'_> {
}

fn size(self) -> crate::Result<u32> {
let mut cursor = Cursor::new(self.kv?.value);
Ok(BlobIndirection::decode_from(&mut cursor)?.size)
let kv = self.kv?;

if kv.key.value_type.is_indirection() {
let mut cursor = Cursor::new(kv.value);
Ok(BlobIndirection::decode_from(&mut cursor)?.size)
} else {
// NOTE: We know that values are u32 max length
#[allow(clippy::cast_possible_truncation)]
Ok(kv.value.len() as u32)
}
}

fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
resolve_value_handle(self.blob_tree, &self.vlog, self.kv?)
resolve_value_handle(self.blob_tree, &self.version, self.kv?)
}
}

fn resolve_value_handle(
tree: &BlobTree,
vlog: &BTreeMap<BlobFileId, BlobFile>,
item: InternalValue,
) -> RangeItem {
fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem {
if item.key.value_type.is_indirection() {
let mut cursor = Cursor::new(item.value);
let vptr = BlobIndirection::decode_from(&mut cursor)?;

// Resolve indirection using value log
match Accessor::new(vlog).get(
match Accessor::new(&version.value_log).get(
tree.id(),
&tree.blobs_folder,
&item.key.user_key,
Expand All @@ -75,10 +73,10 @@ fn resolve_value_handle(
}
Ok(None) => {
panic!(
"value handle ({:?} => {:?}) did not match any blob - this is a bug",
String::from_utf8_lossy(&item.key.user_key),
vptr.vhandle,
)
"value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
item.key.user_key, vptr.vhandle,
version.id(),
);
}
Err(e) => Err(e),
}
Expand Down Expand Up @@ -112,9 +110,6 @@ impl BlobTree {
fsync_directory(&blobs_folder)?;

let blob_file_id_to_continue_with = index
.manifest()
.read()
.expect("lock is poisoned")
.current_version()
.value_log
.values()
Expand Down Expand Up @@ -148,8 +143,8 @@ impl AbstractTree for BlobTree {
self.index.get_internal_entry(key, seqno)
}

fn manifest(&self) -> &Arc<RwLock<LevelManifest>> {
self.index.manifest()
fn current_version(&self) -> Version {
self.index.current_version()
}

fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
Expand Down Expand Up @@ -191,20 +186,15 @@ impl AbstractTree for BlobTree {

let range = prefix_to_range(prefix.as_ref());

let version = self
.manifest()
.read()
.expect("lock is poisoned")
.current_version()
.clone();
let version = self.current_version();

Box::new(
self.index
.create_internal_range(&range, seqno, index)
.map(move |kv| {
IterGuardImpl::Blob(Guard {
blob_tree: self,
vlog: version.value_log.clone(), // TODO: PERF: ugly Arc clone
version: version.clone(), // TODO: PERF: ugly Arc clone
kv,
})
}),
Expand All @@ -217,12 +207,7 @@ impl AbstractTree for BlobTree {
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
let version = self
.manifest()
.read()
.expect("lock is poisoned")
.current_version()
.clone();
let version = self.current_version();

// TODO: PERF: ugly Arc clone
Box::new(
Expand All @@ -231,7 +216,7 @@ impl AbstractTree for BlobTree {
.map(move |kv| {
IterGuardImpl::Blob(Guard {
blob_tree: self,
vlog: version.value_log.clone(), // TODO: PERF: ugly Arc clone
version: version.clone(), // TODO: PERF: ugly Arc clone
kv,
})
}),
Expand Down Expand Up @@ -335,11 +320,7 @@ impl AbstractTree for BlobTree {
}

fn blob_file_count(&self) -> usize {
self.manifest()
.read()
.expect("lock is poisoned")
.current_version()
.blob_file_count()
self.current_version().blob_file_count()
}

// NOTE: We skip reading from the value log
Expand All @@ -363,12 +344,7 @@ impl AbstractTree for BlobTree {
}

fn stale_blob_bytes(&self) -> u64 {
self.manifest()
.read()
.expect("lock is poisoned")
.current_version()
.gc_stats()
.stale_bytes()
self.current_version().gc_stats().stale_bytes()
}

fn filter_size(&self) -> usize {
Expand Down Expand Up @@ -520,10 +496,6 @@ impl AbstractTree for BlobTree {
.register_segments(segments, blob_files, frag_map, seqno_threshold)
}

fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
self.index.lock_active_memtable()
}

fn set_active_memtable(&self, memtable: Memtable) {
self.index.set_active_memtable(memtable);
}
Expand Down Expand Up @@ -595,8 +567,7 @@ impl AbstractTree for BlobTree {
}

fn disk_space(&self) -> u64 {
let lock = self.manifest().read().expect("lock is poisoned");
let version = lock.current_version();
let version = self.current_version();
let vlog = crate::vlog::Accessor::new(&version.value_log);
self.index.disk_space() + vlog.disk_space()
}
Expand Down Expand Up @@ -628,9 +599,9 @@ impl AbstractTree for BlobTree {
return Ok(None);
};

let lock = self.manifest().read().expect("lock is poisoned");
let version = lock.current_version();
let (_, v) = resolve_value_handle(self, &version.value_log, item)?;
let version = self.current_version();
let (_, v) = resolve_value_handle(self, &version, item)?;

Ok(Some(v))
}

Expand Down
13 changes: 6 additions & 7 deletions src/compaction/drop_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
// (found in the LICENSE-* files in the repository)

use super::{Choice, CompactionStrategy};
use crate::{
config::Config, level_manifest::LevelManifest, slice::Slice, version::run::Ranged, KeyRange,
};
use crate::compaction::state::CompactionState;
use crate::version::Version;
use crate::{config::Config, slice::Slice, version::run::Ranged, KeyRange};
use crate::{HashSet, Segment};
use std::ops::{Bound, RangeBounds};

Expand Down Expand Up @@ -73,9 +73,8 @@ impl CompactionStrategy for Strategy {
"DropRangeCompaction"
}

fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let segment_ids: HashSet<_> = levels
.current_version()
fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
let segment_ids: HashSet<_> = version
.iter_levels()
.flat_map(|lvl| lvl.iter())
.flat_map(|run| {
Expand All @@ -93,7 +92,7 @@ impl CompactionStrategy for Strategy {
// But just as a fail-safe...
let some_hidden = segment_ids
.iter()
.any(|&id| levels.hidden_set().is_hidden(id));
.any(|&id| state.hidden_set().is_hidden(id));

if some_hidden {
Choice::DoNothing
Expand Down
11 changes: 8 additions & 3 deletions src/compaction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

use super::{Choice, CompactionStrategy};
use crate::{config::Config, level_manifest::LevelManifest, HashSet};
use crate::{compaction::state::CompactionState, config::Config, version::Version, HashSet};

/// FIFO-style compaction
///
Expand Down Expand Up @@ -45,13 +45,18 @@ impl CompactionStrategy for Strategy {
}

// TODO: TTL
fn choose(&self, levels: &LevelManifest, _config: &Config) -> Choice {
fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice {
// NOTE: We always have at least one level
#[allow(clippy::expect_used)]
let first_level = levels.as_slice().first().expect("should have first level");
let first_level = version.l0();

assert!(first_level.is_disjoint(), "L0 needs to be disjoint");

assert!(
!version.level_is_busy(0, state.hidden_set()),
"FIFO compaction never compacts",
);

let l0_size = first_level.size();

if l0_size > self.limit {
Expand Down
Loading