Skip to content

Commit e17a516

Browse files
committed
Adds Instrumented Object Store to CLI
- Adds the InstrumentedObjectStore to datafusion-cli to support building instrumented output for queries executed via the cli - Integrates the new InstrumentedObjectStore into the InstrumentedObjectStoreRegistry to ensure any ObjectStore instance registered gets instrumentation
1 parent f8ff82a commit e17a516

File tree

1 file changed

+80
-4
lines changed

1 file changed

+80
-4
lines changed

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,88 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::Arc;
18+
use std::{fmt, sync::Arc};
1919

20+
use async_trait::async_trait;
2021
use datafusion::execution::object_store::ObjectStoreRegistry;
21-
use object_store::ObjectStore;
22+
use futures::stream::BoxStream;
23+
use object_store::{
24+
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
25+
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
26+
};
2227
use url::Url;
2328

24-
/// Provides access to wrapped [`ObjectStore`] instances that record requests for reporting
29+
/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
30+
/// inner [`ObjectStore`]
31+
#[derive(Debug)]
32+
struct InstrumentedObjectStore {
33+
inner: Arc<dyn ObjectStore>,
34+
}
35+
36+
impl InstrumentedObjectStore {
37+
/// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`]
38+
fn new(object_store: Arc<dyn ObjectStore>) -> Self {
39+
Self {
40+
inner: object_store,
41+
}
42+
}
43+
}
44+
45+
impl fmt::Display for InstrumentedObjectStore {
46+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47+
write!(f, "Instrumented Object Store: {}", self.inner)
48+
}
49+
}
50+
51+
#[async_trait]
52+
impl ObjectStore for InstrumentedObjectStore {
53+
async fn put_opts(
54+
&self,
55+
location: &Path,
56+
payload: PutPayload,
57+
opts: PutOptions,
58+
) -> Result<PutResult> {
59+
self.inner.put_opts(location, payload, opts).await
60+
}
61+
62+
async fn put_multipart_opts(
63+
&self,
64+
location: &Path,
65+
opts: PutMultipartOptions,
66+
) -> Result<Box<dyn MultipartUpload>> {
67+
self.inner.put_multipart_opts(location, opts).await
68+
}
69+
70+
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
71+
self.inner.get_opts(location, options).await
72+
}
73+
74+
async fn delete(&self, location: &Path) -> Result<()> {
75+
self.inner.delete(location).await
76+
}
77+
78+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
79+
self.inner.list(prefix)
80+
}
81+
82+
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
83+
self.inner.list_with_delimiter(prefix).await
84+
}
85+
86+
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
87+
self.inner.copy(from, to).await
88+
}
89+
90+
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
91+
self.inner.copy_if_not_exists(from, to).await
92+
}
93+
94+
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
95+
self.inner.head(location).await
96+
}
97+
}
98+
99+
/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
25100
#[derive(Debug)]
26101
pub struct InstrumentedObjectStoreRegistry {
27102
inner: Arc<dyn ObjectStoreRegistry>,
@@ -41,7 +116,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
41116
url: &Url,
42117
store: Arc<dyn ObjectStore>,
43118
) -> Option<Arc<dyn ObjectStore>> {
44-
self.inner.register_store(url, store)
119+
let instrumented = Arc::new(InstrumentedObjectStore::new(store));
120+
self.inner.register_store(url, instrumented)
45121
}
46122

47123
fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {

0 commit comments

Comments
 (0)