diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index 94445ee64ef4..4486b84a90ea 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -26,6 +26,8 @@ use std::{ time::Duration, }; +use arrow::array::{ArrayRef, RecordBatch, StringArray}; +use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; use chrono::Utc; use datafusion::{ @@ -260,7 +262,7 @@ impl ObjectStore for InstrumentedObjectStore { } /// Object store operation types tracked by [`InstrumentedObjectStore`] -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum Operation { _Copy, _Delete, @@ -270,6 +272,12 @@ pub enum Operation { _Put, } +impl fmt::Display for Operation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + /// Holds profiling details about individual requests made through an [`InstrumentedObjectStore`] #[derive(Debug)] pub struct RequestDetails { @@ -309,35 +317,172 @@ impl fmt::Display for RequestDetails { } } -/// Summary statistics for an [`InstrumentedObjectStore`]'s [`RequestDetails`] +/// Summary statistics for all requests recorded in an [`InstrumentedObjectStore`] #[derive(Default)] -pub struct RequestSummary { - count: usize, - duration_stats: Option>, - size_stats: Option>, +pub struct RequestSummaries { + summaries: Vec, } -impl RequestSummary { - /// Generates a set of [RequestSummaries](RequestSummary) from the input [`RequestDetails`] - /// grouped by the input's [`Operation`] - pub fn summarize_by_operation( - requests: &[RequestDetails], - ) -> HashMap { - let mut summaries: HashMap = HashMap::new(); +/// Display the summary as a table +impl fmt::Display for RequestSummaries { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Don't expect an error, but avoid panicking if it happens + match pretty_format_batches(&[self.to_batch()]) { + Err(e) => { + write!(f, "Error formatting summary: {e}") + } + Ok(displayable) => { + write!(f, "{displayable}") + } + } + } +} + +impl RequestSummaries { + /// Summarizes input [`RequestDetails`] + pub fn new(requests: &[RequestDetails]) -> Self { + let mut summaries: HashMap = HashMap::new(); for rd in requests { match summaries.get_mut(&rd.op) { Some(rs) => rs.push(rd), None => { - let mut rs = RequestSummary::default(); + let mut rs = RequestSummary::new(rd.op); rs.push(rd); summaries.insert(rd.op, rs); } } } - - summaries + // Convert to a Vec with consistent ordering + let mut summaries: Vec = summaries.into_values().collect(); + summaries.sort_by_key(|s| s.operation); + Self { summaries } + } + + /// Convert the summaries into a `RecordBatch` for display + /// + /// Results in a table like: + /// ```text + /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+ + /// | Operation | Metric | min | max | avg | sum | count | + /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+ + /// | Get | duration | 5.000000s | 5.000000s | 5.000000s | | 1 | + /// | Get | size | 100 B | 100 B | 100 B | 100 B | 1 | + /// +-----------+----------+-----------+-----------+-----------+-----------+-----------+ + /// ``` + pub fn to_batch(&self) -> RecordBatch { + let operations: StringArray = self + .iter() + .flat_map(|s| std::iter::repeat_n(Some(s.operation.to_string()), 2)) + .collect(); + let metrics: StringArray = self + .iter() + .flat_map(|_s| [Some("duration"), Some("size")]) + .collect(); + let mins: StringArray = self + .stats_iter() + .flat_map(|(duration_stats, size_stats)| { + let dur_min = + duration_stats.map(|d| format!("{:.6}s", d.min.as_secs_f32())); + let size_min = size_stats.map(|s| format!("{} B", s.min)); + [dur_min, size_min] + }) + .collect(); + let maxs: StringArray = self + .stats_iter() + .flat_map(|(duration_stats, size_stats)| { + let dur_max = + duration_stats.map(|d| format!("{:.6}s", d.max.as_secs_f32())); + let size_max = size_stats.map(|s| format!("{} B", s.max)); + [dur_max, size_max] + }) + .collect(); + let avgs: StringArray = self + .iter() + .flat_map(|s| { + let count = s.count as f32; + let duration_stats = s.duration_stats.as_ref(); + let size_stats = s.size_stats.as_ref(); + let dur_avg = duration_stats.map(|d| { + let avg = d.sum.as_secs_f32() / count; + format!("{:.6}s", avg) + }); + let size_avg = size_stats.map(|s| { + let avg = s.sum as f32 / count; + format!("{} B", avg) + }); + [dur_avg, size_avg] + }) + .collect(); + let sums: StringArray = self + .stats_iter() + .flat_map(|(duration_stats, size_stats)| { + // Omit a sum stat for duration in the initial + // implementation because it can be a bit misleading (at least + // at first glance). For example, particularly large queries the + // sum of the durations was often larger than the total time of + // the query itself, can be confusing without additional + // explanation (e.g. that the sum is of individual requests, + // which may be concurrent). + let dur_sum = + duration_stats.map(|d| format!("{:.6}s", d.sum.as_secs_f32())); + let size_sum = size_stats.map(|s| format!("{} B", s.sum)); + [dur_sum, size_sum] + }) + .collect(); + let counts: StringArray = self + .iter() + .flat_map(|s| { + let count = s.count.to_string(); + [Some(count.clone()), Some(count)] + }) + .collect(); + + RecordBatch::try_from_iter(vec![ + ("Operation", Arc::new(operations) as ArrayRef), + ("Metric", Arc::new(metrics) as ArrayRef), + ("min", Arc::new(mins) as ArrayRef), + ("max", Arc::new(maxs) as ArrayRef), + ("avg", Arc::new(avgs) as ArrayRef), + ("sum", Arc::new(sums) as ArrayRef), + ("count", Arc::new(counts) as ArrayRef), + ]) + .expect("Created the batch correctly") + } + + /// Return an iterator over the summaries + fn iter(&self) -> impl Iterator { + self.summaries.iter() + } + + /// Return an iterator over (duration_stats, size_stats) tuples + /// for each summary + fn stats_iter( + &self, + ) -> impl Iterator>, Option<&Stats>)> { + self.summaries + .iter() + .map(|s| (s.duration_stats.as_ref(), s.size_stats.as_ref())) } +} + +/// Summary statistics for a particular type of [`Operation`] (e.g. `GET` or `PUT`) +/// in an [`InstrumentedObjectStore`]'s [`RequestDetails`] +pub struct RequestSummary { + operation: Operation, + count: usize, + duration_stats: Option>, + size_stats: Option>, +} +impl RequestSummary { + fn new(operation: Operation) -> Self { + Self { + operation, + count: 0, + duration_stats: None, + size_stats: None, + } + } fn push(&mut self, request: &RequestDetails) { self.count += 1; if let Some(dur) = request.duration { @@ -349,29 +494,6 @@ impl RequestSummary { } } -impl fmt::Display for RequestSummary { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "count: {}", self.count)?; - - if let Some(dur_stats) = &self.duration_stats { - writeln!(f, "duration min: {:.6}s", dur_stats.min.as_secs_f32())?; - writeln!(f, "duration max: {:.6}s", dur_stats.max.as_secs_f32())?; - let avg = dur_stats.sum.as_secs_f32() / (self.count as f32); - writeln!(f, "duration avg: {:.6}s", avg)?; - } - - if let Some(size_stats) = &self.size_stats { - writeln!(f, "size min: {} B", size_stats.min)?; - writeln!(f, "size max: {} B", size_stats.max)?; - let avg = size_stats.sum / self.count; - writeln!(f, "size avg: {} B", avg)?; - writeln!(f, "size sum: {} B", size_stats.sum)?; - } - - Ok(()) - } -} - struct Stats> { min: T, max: T, @@ -478,6 +600,7 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { #[cfg(test)] mod tests { use super::*; + use insta::assert_snapshot; #[test] fn instrumented_mode() { @@ -640,8 +763,12 @@ mod tests { fn request_summary() { // Test empty request list let mut requests = Vec::new(); - let summaries = RequestSummary::summarize_by_operation(&requests); - assert!(summaries.is_empty()); + assert_snapshot!(RequestSummaries::new(&requests), @r" + +-----------+--------+-----+-----+-----+-----+-------+ + | Operation | Metric | min | max | avg | sum | count | + +-----------+--------+-----+-----+-----+-----+-------+ + +-----------+--------+-----+-----+-----+-----+-------+ + "); requests.push(RequestDetails { op: Operation::Get, @@ -653,26 +780,14 @@ mod tests { extra_display: None, }); - let summaries = RequestSummary::summarize_by_operation(&requests); - assert_eq!(summaries.len(), 1); - - let summary = summaries.get(&Operation::Get).unwrap(); - assert_eq!(summary.count, 1); - assert_eq!( - summary.duration_stats.as_ref().unwrap().min, - Duration::from_secs(5) - ); - assert_eq!( - summary.duration_stats.as_ref().unwrap().max, - Duration::from_secs(5) - ); - assert_eq!( - summary.duration_stats.as_ref().unwrap().sum, - Duration::from_secs(5) - ); - assert_eq!(summary.size_stats.as_ref().unwrap().min, 100); - assert_eq!(summary.size_stats.as_ref().unwrap().max, 100); - assert_eq!(summary.size_stats.as_ref().unwrap().sum, 100); + assert_snapshot!(RequestSummaries::new(&requests), @r" + +-----------+----------+-----------+-----------+-----------+-----------+-------+ + | Operation | Metric | min | max | avg | sum | count | + +-----------+----------+-----------+-----------+-----------+-----------+-------+ + | Get | duration | 5.000000s | 5.000000s | 5.000000s | 5.000000s | 1 | + | Get | size | 100 B | 100 B | 100 B | 100 B | 1 | + +-----------+----------+-----------+-----------+-----------+-----------+-------+ + "); // Add more Get requests to test aggregation requests.push(RequestDetails { @@ -693,27 +808,14 @@ mod tests { range: None, extra_display: None, }); - - let summaries = RequestSummary::summarize_by_operation(&requests); - assert_eq!(summaries.len(), 1); - - let summary = summaries.get(&Operation::Get).unwrap(); - assert_eq!(summary.count, 3); - assert_eq!( - summary.duration_stats.as_ref().unwrap().min, - Duration::from_secs(2) - ); - assert_eq!( - summary.duration_stats.as_ref().unwrap().max, - Duration::from_secs(8) - ); - assert_eq!( - summary.duration_stats.as_ref().unwrap().sum, - Duration::from_secs(15) - ); - assert_eq!(summary.size_stats.as_ref().unwrap().min, 50); - assert_eq!(summary.size_stats.as_ref().unwrap().max, 150); - assert_eq!(summary.size_stats.as_ref().unwrap().sum, 300); + assert_snapshot!(RequestSummaries::new(&requests), @r" + +-----------+----------+-----------+-----------+-----------+------------+-------+ + | Operation | Metric | min | max | avg | sum | count | + +-----------+----------+-----------+-----------+-----------+------------+-------+ + | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 | + | Get | size | 50 B | 150 B | 100 B | 300 B | 3 | + +-----------+----------+-----------+-----------+-----------+------------+-------+ + "); // Add Put requests to test grouping requests.push(RequestDetails { @@ -726,20 +828,20 @@ mod tests { extra_display: None, }); - let summaries = RequestSummary::summarize_by_operation(&requests); - assert_eq!(summaries.len(), 2); - - let get_summary = summaries.get(&Operation::Get).unwrap(); - assert_eq!(get_summary.count, 3); - - let put_summary = summaries.get(&Operation::_Put).unwrap(); - assert_eq!(put_summary.count, 1); - assert_eq!( - put_summary.duration_stats.as_ref().unwrap().min, - Duration::from_millis(200) - ); - assert_eq!(put_summary.size_stats.as_ref().unwrap().sum, 75); + assert_snapshot!(RequestSummaries::new(&requests), @r" + +-----------+----------+-----------+-----------+-----------+------------+-------+ + | Operation | Metric | min | max | avg | sum | count | + +-----------+----------+-----------+-----------+-----------+------------+-------+ + | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 | + | Get | size | 50 B | 150 B | 100 B | 300 B | 3 | + | _Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 | + | _Put | size | 75 B | 75 B | 75 B | 75 B | 1 | + +-----------+----------+-----------+-----------+-----------+------------+-------+ + "); + } + #[test] + fn request_summary_only_duration() { // Test request with only duration (no size) let only_duration = vec![RequestDetails { op: Operation::Get, @@ -750,12 +852,18 @@ mod tests { range: None, extra_display: None, }]; - let summaries = RequestSummary::summarize_by_operation(&only_duration); - let summary = summaries.get(&Operation::Get).unwrap(); - assert_eq!(summary.count, 1); - assert!(summary.duration_stats.is_some()); - assert!(summary.size_stats.is_none()); + assert_snapshot!(RequestSummaries::new(&only_duration), @r" + +-----------+----------+-----------+-----------+-----------+-----------+-------+ + | Operation | Metric | min | max | avg | sum | count | + +-----------+----------+-----------+-----------+-----------+-----------+-------+ + | Get | duration | 3.000000s | 3.000000s | 3.000000s | 3.000000s | 1 | + | Get | size | | | | | 1 | + +-----------+----------+-----------+-----------+-----------+-----------+-------+ + "); + } + #[test] + fn request_summary_only_size() { // Test request with only size (no duration) let only_size = vec![RequestDetails { op: Operation::Get, @@ -766,13 +874,18 @@ mod tests { range: None, extra_display: None, }]; - let summaries = RequestSummary::summarize_by_operation(&only_size); - let summary = summaries.get(&Operation::Get).unwrap(); - assert_eq!(summary.count, 1); - assert!(summary.duration_stats.is_none()); - assert!(summary.size_stats.is_some()); - assert_eq!(summary.size_stats.as_ref().unwrap().sum, 200); + assert_snapshot!(RequestSummaries::new(&only_size), @r" + +-----------+----------+-------+-------+-------+-------+-------+ + | Operation | Metric | min | max | avg | sum | count | + +-----------+----------+-------+-------+-------+-------+-------+ + | Get | duration | | | | | 1 | + | Get | size | 200 B | 200 B | 200 B | 200 B | 1 | + +-----------+----------+-------+-------+-------+-------+-------+ + "); + } + #[test] + fn request_summary_neither_duration_or_size() { // Test request with neither duration nor size let no_stats = vec![RequestDetails { op: Operation::Get, @@ -783,10 +896,13 @@ mod tests { range: None, extra_display: None, }]; - let summaries = RequestSummary::summarize_by_operation(&no_stats); - let summary = summaries.get(&Operation::Get).unwrap(); - assert_eq!(summary.count, 1); - assert!(summary.duration_stats.is_none()); - assert!(summary.size_stats.is_none()); + assert_snapshot!(RequestSummaries::new(&no_stats), @r" + +-----------+----------+-----+-----+-----+-----+-------+ + | Operation | Metric | min | max | avg | sum | count | + +-----------+----------+-----+-----+-----+-----+-------+ + | Get | duration | | | | | 1 | + | Get | size | | | | | 1 | + +-----------+----------+-----+-----+-----+-----+-------+ + "); } } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 01be736ca54d..5804617f39a7 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -22,7 +22,7 @@ use std::str::FromStr; use std::sync::Arc; use crate::object_storage::instrumented::{ - InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummary, + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummaries, }; use crate::print_format::PrintFormat; @@ -205,11 +205,8 @@ impl PrintOptions { } writeln!(writer, "Summaries:")?; - let summaries = RequestSummary::summarize_by_operation(&requests); - for (op, summary) in summaries { - writeln!(writer, "{op:?}")?; - writeln!(writer, "{summary}")?; - } + let summaries = RequestSummaries::new(&requests); + writeln!(writer, "{}", summaries)?; } } } diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 56620346ed0f..809e6fd32c4f 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -402,7 +402,6 @@ async fn test_object_store_profiling() { let container = setup_minio_container().await; let mut settings = make_settings(); - settings.set_snapshot_suffix("s3_url_fallback"); // as the object store profiling contains timestamps and durations, we must // filter them out to have stable snapshots @@ -416,14 +415,13 @@ async fn test_object_store_profiling() { " operation=$1 duration=[DURATION] size=$2 path=$3", ); - // We also need to filter out the durations reported in the summary output - // + // We also need to filter out the summary statistics (anything with an 's' at the end) // Example line(s) to filter: - // - // duration min: 0.000729s - // duration max: 0.000729s - // duration avg: 0.000729s - settings.add_filter(r"duration (min|max|avg): \d+\.\d{6}s", "[SUMMARY_DURATION]"); + // | Get | duration | 5.000000s | 5.000000s | 5.000000s | | 1 | + settings.add_filter( + r"\| (Get|Put|Delete|List|Head)( +)\| duration \| .*? \| .*? \| .*? \| .*? \| (.*?) \|", + "| $1$2 | duration | ...NORMALIZED...| $3 |", + ); let _bound = settings.bind_to_scope(); diff --git a/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap b/datafusion-cli/tests/snapshots/object_store_profiling.snap similarity index 69% rename from datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap rename to datafusion-cli/tests/snapshots/object_store_profiling.snap index 5c91800676a4..cff646f3b0e0 100644 --- a/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap +++ b/datafusion-cli/tests/snapshots/object_store_profiling.snap @@ -6,7 +6,7 @@ info: env: AWS_ACCESS_KEY_ID: TEST-DataFusionLogin AWS_ALLOW_HTTP: "true" - AWS_ENDPOINT: "http://localhost:55031" + AWS_ENDPOINT: "http://localhost:55057" AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword stdin: "\n CREATE EXTERNAL TABLE CARS\nSTORED AS CSV\nLOCATION 's3://data/cars.csv';\n\n-- Initial query should not show any profiling as the object store is not instrumented yet\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling trace\n-- Query again to see the full profiling output\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling summary\n-- Query again to see the summarized profiling output\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling disabled\n-- Final query should not show any profiling as we disabled it again\nSELECT * from CARS LIMIT 1;\n" snapshot_kind: text @@ -40,16 +40,12 @@ Instrumented Object Store: instrument_mode: Trace, inner: AmazonS3(data) operation=Get duration=[DURATION] size=1006 path=cars.csv Summaries: -Get -count: 1 -[SUMMARY_DURATION] -[SUMMARY_DURATION] -[SUMMARY_DURATION] -size min: 1006 B -size max: 1006 B -size avg: 1006 B -size sum: 1006 B - ++-----------+----------+-----------+-----------+-----------+-----------+-------+ +| Operation | Metric | min | max | avg | sum | count | ++-----------+----------+-----------+-----------+-----------+-----------+-------+ +| Get | duration | ...NORMALIZED...| 1 | +| Get | size | 1006 B | 1006 B | 1006 B | 1006 B | 1 | ++-----------+----------+-----------+-----------+-----------+-----------+-------+ ObjectStore Profile mode set to Summary +-----+-------+---------------------+ | car | speed | time | @@ -62,16 +58,12 @@ ObjectStore Profile mode set to Summary Object Store Profiling Instrumented Object Store: instrument_mode: Summary, inner: AmazonS3(data) Summaries: -Get -count: 1 -[SUMMARY_DURATION] -[SUMMARY_DURATION] -[SUMMARY_DURATION] -size min: 1006 B -size max: 1006 B -size avg: 1006 B -size sum: 1006 B - ++-----------+----------+-----------+-----------+-----------+-----------+-------+ +| Operation | Metric | min | max | avg | sum | count | ++-----------+----------+-----------+-----------+-----------+-----------+-------+ +| Get | duration | ...NORMALIZED...| 1 | +| Get | size | 1006 B | 1006 B | 1006 B | 1006 B | 1 | ++-----------+----------+-----------+-----------+-----------+-----------+-------+ ObjectStore Profile mode set to Disabled +-----+-------+---------------------+ | car | speed | time | diff --git a/docs/source/user-guide/cli/usage.md b/docs/source/user-guide/cli/usage.md index 29ed6b8183c2..68804b2817e7 100644 --- a/docs/source/user-guide/cli/usage.md +++ b/docs/source/user-guide/cli/usage.md @@ -132,6 +132,35 @@ Available commands inside DataFusion CLI are: > \object_store_profiling [disabled|summary|trace] ``` +When enabled, prints detailed information about object store (I/O) operations +performed during query execution to STDOUT. + +```sql +> \object_store_profiling trace +ObjectStore Profile mode set to Trace +> select count(*) from 'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'; ++----------+ +| count(*) | ++----------+ +| 1000000 | ++----------+ +1 row(s) fetched. +Elapsed 0.552 seconds. + +Object Store Profiling +Instrumented Object Store: instrument_mode: Trace, inner: HttpStore +2025-10-17T18:08:48.457992+00:00 operation=Get duration=0.043592s size=8 range: bytes=174965036-174965043 path=hits_compatible/athena_partitioned/hits_1.parquet +2025-10-17T18:08:48.501878+00:00 operation=Get duration=0.031542s size=34322 range: bytes=174930714-174965035 path=hits_compatible/athena_partitioned/hits_1.parquet + +Summaries: ++-----------+----------+-----------+-----------+-----------+-----------+-------+ +| Operation | Metric | min | max | avg | sum | count | ++-----------+----------+-----------+-----------+-----------+-----------+-------+ +| Get | duration | 0.031542s | 0.043592s | 0.037567s | 0.075133s | 2 | +| Get | size | 8 B | 34322 B | 17165 B | 34330 B | 2 | ++-----------+----------+-----------+-----------+-----------+-----------+-------+ +``` + ## Supported SQL In addition to the normal [SQL supported in DataFusion], `datafusion-cli` also