Skip to content

Commit 73b1f2b

Browse files
authored
Adds Instrument Mode for InstrumentedObjectStore in datafusion-cli (#18000)
- Adds mode type to allow changing the mode of an InstrumentedObjectStore to datafusion-cli - Implements string parsing and u8 conversion for InstrumentedObjectStoreMode - Adds tests to validate trait implementations
1 parent a497074 commit 73b1f2b

File tree

2 files changed

+104
-14
lines changed

2 files changed

+104
-14
lines changed

datafusion-cli/src/main.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use datafusion::logical_expr::ExplainFormat;
3333
use datafusion::prelude::SessionContext;
3434
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3535
use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
36-
use datafusion_cli::object_storage::instrumented::InstrumentedObjectStoreRegistry;
36+
use datafusion_cli::object_storage::instrumented::{
37+
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
38+
};
3739
use datafusion_cli::{
3840
exec,
3941
pool_type::PoolType,
@@ -208,9 +210,10 @@ async fn main_inner() -> Result<()> {
208210
rt_builder = rt_builder.with_disk_manager_builder(builder);
209211
}
210212

211-
let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
212-
DefaultObjectStoreRegistry::new(),
213-
)));
213+
let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(
214+
Arc::new(DefaultObjectStoreRegistry::new()),
215+
InstrumentedObjectStoreMode::default(),
216+
));
214217
rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone());
215218

216219
let runtime_env = rt_builder.build_arc()?;

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 97 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,36 +15,90 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{fmt, sync::Arc};
18+
use std::{
19+
fmt,
20+
str::FromStr,
21+
sync::{
22+
atomic::{AtomicU8, Ordering},
23+
Arc,
24+
},
25+
};
1926

2027
use async_trait::async_trait;
21-
use datafusion::execution::object_store::ObjectStoreRegistry;
28+
use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry};
2229
use futures::stream::BoxStream;
2330
use object_store::{
2431
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
2532
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
2633
};
2734
use url::Url;
2835

36+
/// The profiling mode to use for an [`ObjectStore`] instance that has been instrumented to collect
37+
/// profiling data. Collecting profiling data will have a small negative impact on both CPU and
38+
/// memory usage. Default is `Disabled`
39+
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
40+
pub enum InstrumentedObjectStoreMode {
41+
/// Disable collection of profiling data
42+
#[default]
43+
Disabled,
44+
/// Enable collection of profiling data
45+
Enabled,
46+
}
47+
48+
impl fmt::Display for InstrumentedObjectStoreMode {
49+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50+
write!(f, "{:?}", self)
51+
}
52+
}
53+
54+
impl FromStr for InstrumentedObjectStoreMode {
55+
type Err = DataFusionError;
56+
57+
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
58+
match s.to_lowercase().as_str() {
59+
"disabled" => Ok(Self::Disabled),
60+
"enabled" => Ok(Self::Enabled),
61+
_ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))),
62+
}
63+
}
64+
}
65+
66+
impl From<u8> for InstrumentedObjectStoreMode {
67+
fn from(value: u8) -> Self {
68+
match value {
69+
1 => InstrumentedObjectStoreMode::Enabled,
70+
_ => InstrumentedObjectStoreMode::Disabled,
71+
}
72+
}
73+
}
74+
2975
/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
3076
/// inner [`ObjectStore`]
3177
#[derive(Debug)]
3278
struct InstrumentedObjectStore {
3379
inner: Arc<dyn ObjectStore>,
80+
instrument_mode: AtomicU8,
3481
}
3582

3683
impl InstrumentedObjectStore {
3784
/// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`]
38-
fn new(object_store: Arc<dyn ObjectStore>) -> Self {
85+
fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self {
3986
Self {
4087
inner: object_store,
88+
instrument_mode,
4189
}
4290
}
4391
}
4492

4593
impl fmt::Display for InstrumentedObjectStore {
4694
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47-
write!(f, "Instrumented Object Store: {}", self.inner)
95+
let mode: InstrumentedObjectStoreMode =
96+
self.instrument_mode.load(Ordering::Relaxed).into();
97+
write!(
98+
f,
99+
"Instrumented Object Store: instrument_mode: {mode}, inner: {}",
100+
self.inner
101+
)
48102
}
49103
}
50104

@@ -100,13 +154,20 @@ impl ObjectStore for InstrumentedObjectStore {
100154
#[derive(Debug)]
101155
pub struct InstrumentedObjectStoreRegistry {
102156
inner: Arc<dyn ObjectStoreRegistry>,
157+
instrument_mode: InstrumentedObjectStoreMode,
103158
}
104159

105160
impl InstrumentedObjectStoreRegistry {
106161
/// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
107162
/// [`ObjectStoreRegistry`]
108-
pub fn new(registry: Arc<dyn ObjectStoreRegistry>) -> Self {
109-
Self { inner: registry }
163+
pub fn new(
164+
registry: Arc<dyn ObjectStoreRegistry>,
165+
default_mode: InstrumentedObjectStoreMode,
166+
) -> Self {
167+
Self {
168+
inner: registry,
169+
instrument_mode: default_mode,
170+
}
110171
}
111172
}
112173

@@ -116,7 +177,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
116177
url: &Url,
117178
store: Arc<dyn ObjectStore>,
118179
) -> Option<Arc<dyn ObjectStore>> {
119-
let instrumented = Arc::new(InstrumentedObjectStore::new(store));
180+
let mode = AtomicU8::new(self.instrument_mode as u8);
181+
let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode));
120182
self.inner.register_store(url, instrumented)
121183
}
122184

@@ -131,11 +193,36 @@ mod tests {
131193

132194
use super::*;
133195

196+
#[test]
197+
fn instrumented_mode() {
198+
assert!(matches!(
199+
InstrumentedObjectStoreMode::default(),
200+
InstrumentedObjectStoreMode::Disabled
201+
));
202+
203+
assert!(matches!(
204+
"dIsABleD".parse().unwrap(),
205+
InstrumentedObjectStoreMode::Disabled
206+
));
207+
assert!(matches!(
208+
"EnABlEd".parse().unwrap(),
209+
InstrumentedObjectStoreMode::Enabled
210+
));
211+
assert!("does_not_exist"
212+
.parse::<InstrumentedObjectStoreMode>()
213+
.is_err());
214+
215+
assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
216+
assert!(matches!(1.into(), InstrumentedObjectStoreMode::Enabled));
217+
assert!(matches!(2.into(), InstrumentedObjectStoreMode::Disabled));
218+
}
219+
134220
#[test]
135221
fn instrumented_registry() {
136-
let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
137-
DefaultObjectStoreRegistry::new(),
138-
)));
222+
let reg = Arc::new(InstrumentedObjectStoreRegistry::new(
223+
Arc::new(DefaultObjectStoreRegistry::new()),
224+
InstrumentedObjectStoreMode::default(),
225+
));
139226
let store = object_store::memory::InMemory::new();
140227

141228
let url = "mem://test".parse().unwrap();

0 commit comments

Comments
 (0)