diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index a6b818c10960..39aca4cd1320 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -27,11 +27,13 @@ use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; +use datafusion::execution::object_store::DefaultObjectStoreRegistry; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::logical_expr::ExplainFormat; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc}; +use datafusion_cli::object_storage::instrumented::InstrumentedObjectStoreRegistry; use datafusion_cli::{ exec, pool_type::PoolType, @@ -206,6 +208,11 @@ async fn main_inner() -> Result<()> { rt_builder = rt_builder.with_disk_manager_builder(builder); } + let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new( + DefaultObjectStoreRegistry::new(), + ))); + rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone()); + let runtime_env = rt_builder.build_arc()?; // enable dynamic file query diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 533ac3ba03d3..e6e6be42c7ad 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +pub mod instrumented; + use async_trait::async_trait; use aws_config::BehaviorVersion; use aws_credential_types::provider::{ diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs new file mode 100644 index 000000000000..c4bd44011dc7 --- /dev/null +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::execution::object_store::ObjectStoreRegistry; +use object_store::ObjectStore; +use url::Url; + +/// Provides access to wrapped [`ObjectStore`] instances that record requests for reporting +#[derive(Debug)] +pub struct InstrumentedObjectStoreRegistry { + inner: Arc, +} + +impl InstrumentedObjectStoreRegistry { + /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided + /// [`ObjectStoreRegistry`] + pub fn new(registry: Arc) -> Self { + Self { inner: registry } + } +} + +impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { + fn register_store( + &self, + url: &Url, + store: Arc, + ) -> Option> { + self.inner.register_store(url, store) + } + + fn get_store(&self, url: &Url) -> datafusion::common::Result> { + self.inner.get_store(url) + } +} + +#[cfg(test)] +mod tests { + use datafusion::execution::object_store::DefaultObjectStoreRegistry; + + use super::*; + + #[test] + fn instrumented_registry() { + let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new( + DefaultObjectStoreRegistry::new(), + ))); + let store = object_store::memory::InMemory::new(); + + let url = "mem://test".parse().unwrap(); + let registered = reg.register_store(&url, Arc::new(store)); + assert!(registered.is_none()); + + let fetched = reg.get_store(&url); + assert!(fetched.is_ok()) + } +}