Skip to content

Commit 62603da

Browse files
authored
Merge pull request #1181 from drmingdrmer/vtree
LGTM
2 parents a8b1682 + 8a9ae22 commit 62603da

File tree

6 files changed

+959
-0
lines changed

6 files changed

+959
-0
lines changed

fusestore/store/src/meta_service/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub mod raft_types;
1616
pub mod raftmeta;
1717
pub mod sled_serde;
1818
pub mod sled_tree;
19+
pub mod sled_vartype_tree;
20+
pub mod sledkv;
1921
pub mod snapshot;
2022
pub mod state_machine;
2123

@@ -33,9 +35,11 @@ pub use raft_types::NodeId;
3335
pub use raft_types::Term;
3436
pub use raftmeta::MetaNode;
3537
pub use raftmeta::MetaStore;
38+
pub use sled_serde::SledOrderedSerde;
3639
pub use sled_serde::SledSerde;
3740
pub use sled_tree::SledTree;
3841
pub use sled_tree::SledValueToKey;
42+
pub use sled_vartype_tree::SledVarTypeTree;
3943
pub use snapshot::Snapshot;
4044
pub use state_machine::Node;
4145
pub use state_machine::Slot;
@@ -67,4 +71,6 @@ mod sled_serde_test;
6771
#[cfg(test)]
6872
mod sled_tree_test;
6973
#[cfg(test)]
74+
mod sled_vartype_tree_test;
75+
#[cfg(test)]
7076
mod state_machine_test;

fusestore/store/src/meta_service/raft_types.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22
//
33
// SPDX-License-Identifier: Apache-2.0.
44

5+
use async_raft::LogId;
56
pub use async_raft::NodeId;
67
use byteorder::BigEndian;
78
use byteorder::ByteOrder;
9+
use common_exception::ErrorCode;
10+
use sled::IVec;
811

912
use crate::meta_service::sled_serde::SledOrderedSerde;
13+
use crate::meta_service::SledSerde;
1014

1115
pub type LogIndex = u64;
1216
pub type Term = u64;
@@ -21,3 +25,26 @@ impl SledOrderedSerde for u64 {
2125
BigEndian::read_u64(buf)
2226
}
2327
}
28+
29+
/// For LogId to be able to stored in sled::Tree as a key.
30+
impl SledOrderedSerde for String {
31+
fn ser(&self) -> Result<IVec, ErrorCode> {
32+
Ok(IVec::from(self.as_str()))
33+
}
34+
35+
fn de<V: AsRef<[u8]>>(v: V) -> Result<Self, ErrorCode>
36+
where Self: Sized {
37+
Ok(String::from_utf8(v.as_ref().to_vec())?)
38+
}
39+
40+
fn order_preserved_serialize(&self, _buf: &mut [u8]) {
41+
todo!()
42+
}
43+
44+
fn order_preserved_deserialize(_buf: &[u8]) -> Self {
45+
todo!()
46+
}
47+
}
48+
49+
/// For LogId to be able to stored in sled::Tree as a value.
50+
impl SledSerde for LogId {}
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
// Copyright 2020-2021 The Datafuse Authors.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0.
4+
5+
use std::fmt::Display;
6+
use std::ops::Bound;
7+
use std::ops::RangeBounds;
8+
9+
use common_exception::ErrorCode;
10+
use common_exception::ToErrorCode;
11+
12+
use crate::meta_service::sledkv::SledKV;
13+
use crate::meta_service::SledValueToKey;
14+
15+
/// SledVarTypeTree is a wrapper of sled::Tree that provides access of more than one key-value
16+
/// types.
17+
/// A `SledKVType` defines a key-value type to be stored.
18+
/// The key type `K` must be serializable with order preserved, i.e. impl trait `SledOrderedSerde`.
19+
/// The value type `V` can be any serialize impl, i.e. for most cases, to impl trait `SledSerde`.
20+
pub struct SledVarTypeTree {
21+
pub name: String,
22+
pub(crate) tree: sled::Tree,
23+
}
24+
25+
impl SledVarTypeTree {
26+
/// Open SledVarTypeTree
27+
pub async fn open<N: AsRef<[u8]> + Display>(
28+
db: &sled::Db,
29+
tree_name: N,
30+
) -> common_exception::Result<Self> {
31+
let t = db
32+
.open_tree(&tree_name)
33+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
34+
format!("open tree: {}", tree_name)
35+
})?;
36+
37+
let rl = SledVarTypeTree {
38+
name: format!("{}", tree_name),
39+
tree: t,
40+
};
41+
Ok(rl)
42+
}
43+
44+
/// Return true if the tree contains the key.
45+
pub fn contains_key<KV: SledKV>(&self, key: &KV::K) -> common_exception::Result<bool>
46+
where KV: SledKV {
47+
let got = self
48+
.tree
49+
.contains_key(KV::serialize_key(key)?)
50+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
51+
format!("contains_key: {}:{}", self.name, key)
52+
})?;
53+
54+
Ok(got)
55+
}
56+
57+
/// Retrieve the value of key.
58+
pub fn get<KV: SledKV>(&self, key: &KV::K) -> common_exception::Result<Option<KV::V>>
59+
where KV: SledKV {
60+
let got = self
61+
.tree
62+
.get(KV::serialize_key(key)?)
63+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
64+
format!("get: {}:{}", self.name, key)
65+
})?;
66+
67+
let v = match got {
68+
None => None,
69+
Some(v) => Some(KV::deserialize_value(v)?),
70+
};
71+
72+
Ok(v)
73+
}
74+
75+
/// Retrieve the last key value pair.
76+
pub fn last<KV>(&self) -> common_exception::Result<Option<(KV::K, KV::V)>>
77+
where KV: SledKV {
78+
// TODO(xp): last should be limited to the value range
79+
80+
let range = (
81+
Bound::Unbounded,
82+
KV::serialize_bound(Bound::Unbounded, "right")?,
83+
);
84+
85+
let mut it = self.tree.range(range).rev();
86+
let last = it.next();
87+
let last = match last {
88+
None => {
89+
return Ok(None);
90+
}
91+
Some(res) => res,
92+
};
93+
94+
let last = last.map_err_to_code(ErrorCode::MetaStoreDamaged, || "last")?;
95+
96+
let (k, v) = last;
97+
let key = KV::deserialize_key(k)?;
98+
let value = KV::deserialize_value(v)?;
99+
Ok(Some((key, value)))
100+
}
101+
102+
/// Delete kvs that are in `range`.
103+
pub async fn range_delete<KV, R>(&self, range: R, flush: bool) -> common_exception::Result<()>
104+
where
105+
KV: SledKV,
106+
R: RangeBounds<KV::K>,
107+
{
108+
let mut batch = sled::Batch::default();
109+
110+
// Convert K range into sled::IVec range
111+
let sled_range = KV::serialize_range(&range)?;
112+
113+
let range_mes = self.range_message::<KV, _>(&range);
114+
115+
for item in self.tree.range(sled_range) {
116+
let (k, _) = item.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
117+
format!("range_delete: {}", range_mes,)
118+
})?;
119+
batch.remove(k);
120+
}
121+
122+
self.tree
123+
.apply_batch(batch)
124+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
125+
format!("batch delete: {}", range_mes,)
126+
})?;
127+
128+
if flush {
129+
self.tree
130+
.flush_async()
131+
.await
132+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
133+
format!("flush range delete: {}", range_mes,)
134+
})?;
135+
}
136+
137+
Ok(())
138+
}
139+
140+
/// Get keys in `range`
141+
pub fn range_keys<KV, R>(&self, range: R) -> common_exception::Result<Vec<KV::K>>
142+
where
143+
KV: SledKV,
144+
R: RangeBounds<KV::K>,
145+
{
146+
let mut res = vec![];
147+
148+
let range_mes = self.range_message::<KV, _>(&range);
149+
150+
// Convert K range into sled::IVec range
151+
let range = KV::serialize_range(&range)?;
152+
for item in self.tree.range(range) {
153+
let (k, _) = item.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
154+
format!("range_get: {}", range_mes,)
155+
})?;
156+
157+
let key = KV::deserialize_key(k)?;
158+
res.push(key);
159+
}
160+
161+
Ok(res)
162+
}
163+
164+
/// Get values of key in `range`
165+
pub fn range_get<KV, R>(&self, range: R) -> common_exception::Result<Vec<KV::V>>
166+
where
167+
KV: SledKV,
168+
R: RangeBounds<KV::K>,
169+
{
170+
let mut res = vec![];
171+
172+
let range_mes = self.range_message::<KV, _>(&range);
173+
174+
// Convert K range into sled::IVec range
175+
let range = KV::serialize_range(&range)?;
176+
177+
for item in self.tree.range(range) {
178+
let (_, v) = item.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
179+
format!("range_get: {}", range_mes,)
180+
})?;
181+
182+
let ent = KV::deserialize_value(v)?;
183+
res.push(ent);
184+
}
185+
186+
Ok(res)
187+
}
188+
189+
/// Append many key-values into SledVarTypeTree.
190+
pub async fn append<KV>(&self, kvs: &[(KV::K, KV::V)]) -> common_exception::Result<()>
191+
where KV: SledKV {
192+
let mut batch = sled::Batch::default();
193+
194+
for (key, value) in kvs.iter() {
195+
let k = KV::serialize_key(key)?;
196+
let v = KV::serialize_value(value)?;
197+
198+
batch.insert(k, v);
199+
}
200+
201+
self.tree
202+
.apply_batch(batch)
203+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || "batch append")?;
204+
205+
self.tree
206+
.flush_async()
207+
.await
208+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || "flush append")?;
209+
210+
Ok(())
211+
}
212+
213+
/// Append many values into SledVarTypeTree.
214+
/// This could be used in cases the key is included in value and a value should impl trait `IntoKey` to retrieve the key from a value.
215+
pub async fn append_values<KV>(&self, values: &[KV::V]) -> common_exception::Result<()>
216+
where
217+
KV: SledKV,
218+
KV::V: SledValueToKey<KV::K>,
219+
{
220+
let mut batch = sled::Batch::default();
221+
222+
for value in values.iter() {
223+
let key: KV::K = value.to_key();
224+
225+
let k = KV::serialize_key(&key)?;
226+
let v = KV::serialize_value(value)?;
227+
228+
batch.insert(k, v);
229+
}
230+
231+
self.tree
232+
.apply_batch(batch)
233+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || "batch append_values")?;
234+
235+
self.tree
236+
.flush_async()
237+
.await
238+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || "flush append_values")?;
239+
240+
Ok(())
241+
}
242+
243+
/// Insert a single kv.
244+
/// Returns the last value if it is set.
245+
pub async fn insert<KV>(
246+
&self,
247+
key: &KV::K,
248+
value: &KV::V,
249+
) -> common_exception::Result<Option<KV::V>>
250+
where
251+
KV: SledKV,
252+
{
253+
let k = KV::serialize_key(key)?;
254+
let v = KV::serialize_value(value)?;
255+
256+
let prev = self
257+
.tree
258+
.insert(k, v)
259+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
260+
format!("insert_value {}", key)
261+
})?;
262+
263+
let prev = match prev {
264+
None => None,
265+
Some(x) => Some(KV::deserialize_value(x)?),
266+
};
267+
268+
self.tree
269+
.flush_async()
270+
.await
271+
.map_err_to_code(ErrorCode::MetaStoreDamaged, || {
272+
format!("flush insert_value {}", key)
273+
})?;
274+
275+
Ok(prev)
276+
}
277+
278+
/// Insert a single kv, Retrieve the key from value.
279+
pub async fn insert_value<KV>(&self, value: &KV::V) -> common_exception::Result<Option<KV::V>>
280+
where
281+
KV: SledKV,
282+
KV::V: SledValueToKey<KV::K>,
283+
{
284+
let key = value.to_key();
285+
self.insert::<KV>(&key, value).await
286+
}
287+
288+
/// Build a string describing the range for a range operation.
289+
fn range_message<KV, R>(&self, range: &R) -> String
290+
where
291+
KV: SledKV,
292+
R: RangeBounds<KV::K>,
293+
{
294+
format!(
295+
"{}:{}/[{:?}, {:?}]",
296+
self.name,
297+
KV::NAME,
298+
range.start_bound(),
299+
range.end_bound()
300+
)
301+
}
302+
}

0 commit comments

Comments
 (0)