diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index fc7d1a2617cf..8ab37c5a0040 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -31,6 +31,7 @@ use datafusion::common::instant::Instant; use datafusion::error::{DataFusionError, Result}; use std::fs::File; use std::io::BufReader; +use std::io::Write; use std::str::FromStr; use std::sync::Arc; @@ -62,9 +63,24 @@ impl Command { Self::Help => { let now = Instant::now(); let command_batch = all_commands_info(); - let schema = command_batch.schema(); let num_rows = command_batch.num_rows(); - print_options.print_batches(schema, &[command_batch], now, num_rows) + let schema = command_batch.schema(); + let stdout = std::io::stdout(); + let writer = &mut stdout.lock(); + // Help is using the default format Automatic + print_options.format.print_no_table_batches( + writer, + schema, + &[command_batch], + true, + )?; + let formatted_exec_details = + print_options.get_execution_details_formatted(num_rows, now); + + if !print_options.quiet { + writeln!(writer, "{formatted_exec_details}")?; + } + Ok(()) } Self::ListTables => { exec_and_print(ctx, print_options, "SHOW TABLES".into()).await diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 0f4d70c1cca9..0cc1ee0b08b8 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -26,12 +26,6 @@ use crate::{ object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, }; -use futures::StreamExt; -use std::collections::HashMap; -use std::fs::File; -use std::io::prelude::*; -use std::io::BufReader; - use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; use datafusion::config::ConfigFileType; @@ -41,13 +35,15 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan}; use datafusion::physical_plan::execution_plan::EmissionType; use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties}; use datafusion::sql::parser::{DFParser, Statement}; -use datafusion::sql::sqlparser::dialect::dialect_from_str; - -use datafusion::execution::memory_pool::MemoryConsumer; -use datafusion::physical_plan::spill::get_record_batch_memory_size; use datafusion::sql::sqlparser; +use datafusion::sql::sqlparser::dialect::dialect_from_str; use rustyline::error::ReadlineError; use rustyline::Editor; +use std::collections::HashMap; +use std::fs::File; +use std::io::prelude::*; +use std::io::BufReader; +use std::sync::Arc; use tokio::signal; /// run and execute SQL statements and commands, against a context with the given print options @@ -229,18 +225,18 @@ pub(super) async fn exec_and_print( for statement in statements { let adjusted = AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement); - let plan = create_plan(ctx, statement).await?; let adjusted = adjusted.with_plan(&plan); let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; - // Track memory usage for the query result if it's bounded - let mut reservation = - MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool()); + let is_unbounded = physical_plan.boundedness().is_unbounded(); + let stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?; + let schema = physical_plan.schema(); - if physical_plan.boundedness().is_unbounded() { + // Both bounded and unbounded streams are streaming prints + if is_unbounded { if physical_plan.pipeline_behavior() == EmissionType::Final { return plan_err!( "The given query can generate a valid result only once \ @@ -248,38 +244,15 @@ pub(super) async fn exec_and_print( ); } // As the input stream comes, we can generate results. - // However, memory safety is not guaranteed. - let stream = execute_stream(physical_plan, task_ctx.clone())?; - print_options.print_stream(stream, now).await?; + print_options.print_stream(schema, stream, now).await?; } else { - // Bounded stream; collected results size is limited by the maxrows option - let schema = physical_plan.schema(); - let mut stream = execute_stream(physical_plan, task_ctx.clone())?; - let mut results = vec![]; - let mut row_count = 0_usize; - let max_rows = match print_options.maxrows { - MaxRows::Unlimited => usize::MAX, - MaxRows::Limited(n) => n, - }; - while let Some(batch) = stream.next().await { - let batch = batch?; - let curr_num_rows = batch.num_rows(); - // Stop collecting results if the number of rows exceeds the limit - // results batch should include the last batch that exceeds the limit - if row_count < max_rows + curr_num_rows { - // Try to grow the reservation to accommodate the batch in memory - reservation.try_grow(get_record_batch_memory_size(&batch))?; - results.push(batch); - } - row_count += curr_num_rows; - } + // We need to finalize and return the inner `PrintOptions` for unbounded streams adjusted .into_inner() - .print_batches(schema, &results, now, row_count)?; - reservation.free(); + .print_stream(schema, stream, now) + .await?; } } - Ok(()) } diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 1fc949593512..9e7c70c3dadf 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -19,14 +19,15 @@ use std::str::FromStr; -use crate::print_options::MaxRows; - use arrow::csv::writer::WriterBuilder; use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; use arrow::json::{ArrayWriter, LineDelimitedWriter}; use arrow::record_batch::RecordBatch; +use arrow::util::display::ArrayFormatter; use arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; +use datafusion::common::plan_err; use datafusion::error::Result; /// Allow records to be printed in different formats @@ -89,78 +90,13 @@ fn print_batches_with_sep( Ok(()) } -fn keep_only_maxrows(s: &str, maxrows: usize) -> String { - let lines: Vec = s.lines().map(String::from).collect(); - - assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border - - let last_line = &lines[lines.len() - 1]; // bottom border line - - let spaces = last_line.len().saturating_sub(4); - let dotted_line = format!("| .{:( - writer: &mut W, - batches: &[RecordBatch], - maxrows: MaxRows, -) -> Result<()> { - match maxrows { - MaxRows::Limited(maxrows) => { - // Filter batches to meet the maxrows condition - let mut filtered_batches = Vec::new(); - let mut row_count: usize = 0; - let mut over_limit = false; - for batch in batches { - if row_count + batch.num_rows() > maxrows { - // If adding this batch exceeds maxrows, slice the batch - let limit = maxrows - row_count; - let sliced_batch = batch.slice(0, limit); - filtered_batches.push(sliced_batch); - over_limit = true; - break; - } else { - filtered_batches.push(batch.clone()); - row_count += batch.num_rows(); - } - } - - let formatted = pretty_format_batches_with_options( - &filtered_batches, - &DEFAULT_CLI_FORMAT_OPTIONS, - )?; - if over_limit { - let mut formatted_str = format!("{}", formatted); - formatted_str = keep_only_maxrows(&formatted_str, maxrows); - writeln!(writer, "{}", formatted_str)?; - } else { - writeln!(writer, "{}", formatted)?; - } - } - MaxRows::Unlimited => { - let formatted = - pretty_format_batches_with_options(batches, &DEFAULT_CLI_FORMAT_OPTIONS)?; - writeln!(writer, "{}", formatted)?; - } - } - - Ok(()) -} - impl PrintFormat { /// Print the batches to a writer using the specified format - pub fn print_batches( + pub fn print_no_table_batches( &self, writer: &mut W, schema: SchemaRef, batches: &[RecordBatch], - maxrows: MaxRows, with_header: bool, ) -> Result<()> { // filter out any empty batches @@ -179,10 +115,7 @@ impl PrintFormat { } Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header), Self::Table => { - if maxrows == MaxRows::Limited(0) { - return Ok(()); - } - format_batches_with_maxrows(writer, &batches, maxrows) + plan_err!("print_no_table_batches does not support Table format") } Self::Json => batches_to_json!(ArrayWriter, writer, &batches), Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches), @@ -190,7 +123,7 @@ impl PrintFormat { } /// Print when the result batches contain no rows - fn print_empty( + pub fn print_empty( &self, writer: &mut W, schema: SchemaRef, @@ -209,6 +142,214 @@ impl PrintFormat { } Ok(()) } + + #[allow(clippy::too_many_arguments)] + /// Processes a batch of records and writes them to the provided writer in a table format. + /// + /// This function handles the formatting and output of a `RecordBatch` by either storing it + /// for preview purposes or printing it directly, depending on whether column widths have + /// been precomputed. It ensures that the table header is printed once and manages the + /// accumulation of preview batches until a specified limit is reached. + /// + /// # Arguments + /// + /// * `batch` - The `RecordBatch` to process. + /// * `schema` - The schema reference associated with the batch. + /// * `preview_batches` - A mutable vector to store batches for preview purposes. + /// * `preview_row_count` - A mutable reference to the count of rows in the preview batches. + /// * `preview_limit` - The maximum number of rows to accumulate before printing. + /// * `precomputed_widths` - An optional mutable reference to precomputed column widths. + /// * `header_printed` - A mutable reference indicating whether the table header has been printed. + /// * `writer` - The output writer where the formatted table will be written. + /// + /// # Returns + /// + /// * `Result<()>` - Returns `Ok(())` if processing is successful; otherwise, returns an error. + /// + /// # Behavior + /// + /// - If `precomputed_widths` is `None`, the function accumulates batches in `preview_batches` + /// until `preview_row_count` reaches `preview_limit`. Once the limit is reached, it computes + /// the column widths, prints the table header, and then prints all accumulated batches. + /// - If `precomputed_widths` is `Some`, it means the column widths have already been computed. + /// In this case, the function directly prints the current batch using the precomputed widths. + /// - The table header is printed only once, controlled by the `header_printed` flag. + /// + /// # Errors + /// + /// This function will return an error if computing column widths or writing to the writer fails. + pub fn process_table_batch( + &self, + batch: &RecordBatch, + schema: SchemaRef, + preview_batches: &mut Vec, + preview_row_count: &mut usize, + preview_limit: usize, + precomputed_widths: &mut Option>, + header_printed: &mut bool, + writer: &mut W, + ) -> Result<()> { + if precomputed_widths.is_none() { + preview_batches.push(batch.clone()); + *preview_row_count += batch.num_rows(); + if *preview_row_count >= preview_limit { + let widths = + Self::compute_column_widths(self, preview_batches, schema.clone())?; + *precomputed_widths = Some(widths.clone()); + Self::print_header(self, &schema, &widths, writer)?; + *header_printed = true; + for preview_batch in preview_batches.drain(..) { + Self::print_batch_with_widths(self, &preview_batch, &widths, writer)?; + } + } + } else { + let widths = precomputed_widths.as_ref().unwrap(); + if !*header_printed { + Self::print_header(self, &schema, widths, writer)?; + *header_printed = true; + } + Self::print_batch_with_widths(self, batch, widths, writer)?; + } + Ok(()) + } + + pub fn compute_column_widths( + &self, + batches: &Vec, + schema: SchemaRef, + ) -> Result> { + let mut widths: Vec = + schema.fields().iter().map(|f| f.name().len()).collect(); + for batch in batches { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + for (i, formatter) in formatters.iter().enumerate() { + let cell = formatter.value(row).to_string(); + let max_line_width = + cell.lines().map(|line| line.len()).max().unwrap_or(0); + widths[i] = widths[i].max(max_line_width); + } + } + } + Ok(widths) + } + + pub fn print_header( + &self, + schema: &SchemaRef, + widths: &[usize], + writer: &mut W, + ) -> Result<()> { + let header: Vec = schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| Self::pad_cell(field.name(), widths[i])) + .collect(); + + if header.is_empty() { + return Ok(()); + } + + Self::print_border(widths, writer)?; + writeln!(writer, "| {} |", header.join(" | "))?; + + Self::print_border(widths, writer)?; + Ok(()) + } + + pub fn print_batch_with_widths( + &self, + batch: &RecordBatch, + widths: &[usize], + writer: &mut W, + ) -> Result<(), ArrowError> { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + + for row in 0..batch.num_rows() { + let cell_lines: Vec> = formatters + .iter() + .map(|formatter| { + let s = formatter.value(row).to_string(); + if s.is_empty() { + vec!["".to_string()] + } else { + s.lines().map(|line| line.to_string()).collect() + } + }) + .collect(); + + let max_lines = cell_lines + .iter() + .map(|lines| lines.len()) + .max() + .unwrap_or(1); + + for line_idx in 0..max_lines { + let mut line_cells: Vec = Vec::new(); + for (i, lines) in cell_lines.iter().enumerate() { + let cell_line = if line_idx < lines.len() { + lines[line_idx].clone() + } else { + "".to_string() + }; + line_cells.push(format!("{:( + &self, + widths: &[usize], + writer: &mut W, + ) -> Result<()> { + if widths.is_empty() { + return Ok(()); + } + let cells: Vec = widths + .iter() + .map(|&w| format!(" {: ( + &self, + widths: &[usize], + writer: &mut W, + ) -> Result<()> { + if widths.is_empty() { + return Ok(()); + } + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) + } + + fn print_border(widths: &[usize], writer: &mut W) -> Result<()> { + if widths.is_empty() { + return Ok(()); + } + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) + } + + fn pad_cell(cell: &str, width: usize) -> String { + format!("{: = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batch_with_same_widths() { + let batch = three_column_batch(); + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format + .print_batch_with_widths(&batch, &widths, &mut writer) + .unwrap(); + let expected = &["| 1 | 4 | 7 |", "| 2 | 5 | 8 |", "| 3 | 6 | 9 |"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + /// add the test case for print_batch with multi_lines + #[test] + fn test_print_batch_with_multi_lines() { + let batch = two_column_batch_multi_lines(); + let widths = vec![13, 53]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format + .print_batch_with_widths(&batch, &widths, &mut writer) + .unwrap(); + let expected = &[ + "| logical_plan | Filter: foo.x = Int32(4) |", + "| | TableScan: foo projection=[x, y] |", + "| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |", + "| | FilterExec: x@0 = 4 |", + "| | DataSourceExec: partitions=1, partition_sizes=[1] |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batch_with_different_widths() { + let batch = three_column_batch_with_widths(); + let widths = vec![7, 5, 6]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format + .print_batch_with_widths(&batch, &widths, &mut writer) + .unwrap(); + let expected = &[ + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_dotted_line() { + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_dotted_line(&widths, &mut writer).unwrap(); + let expected = &["| . | . | . |"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_bottom_border() { + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let format = PrintFormat::Table; + format.print_bottom_border(&widths, &mut writer).unwrap(); + let expected = &["+---+---+---+"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + // test print_batch with different batch widths + // and preview count is less than the first batch + #[test] + fn test_print_batches_with_preview_count_less_than_first_batch() { + let batch = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + let preview_limit = 2; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_table_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---------+-------+--------+", + "| a | b | c |", + "+---------+-------+--------+", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_preview_and_later_batches() { + let batch1 = three_column_batch(); + let batch2 = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + // preview limit is less than the first batch + // so the second batch if it's width is greater than the first batch, it will be unformatted + let preview_limit = 2; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_table_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_table_batch( + &batch2, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_table_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_preview_cover_later_batches() { + let batch1 = three_column_batch(); + let batch2 = three_column_batch_with_widths(); + let schema = three_column_schema(); + let format = PrintFormat::Table; + // preview limit is greater than the first batch + let preview_limit = 4; + let mut preview_batches = Vec::new(); + let mut preview_row_count = 0; + let mut precomputed_widths = None; + let mut header_printed = false; + let mut writer = Vec::new(); + + format + .process_table_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_table_batch( + &batch2, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + format + .process_table_batch( + &batch1, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + &mut writer, + ) + .unwrap(); + + let expected = &[ + "+---------+-------+--------+", + "| a | b | c |", + "+---------+-------+--------+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + /// Note: this is the older version of the test, which now is only limited to the print_no_table_batches method + /// so we rename it to PrintNoTableBatchesTest #[derive(Debug)] - struct PrintBatchesTest { + struct PrintNoTableBatchesTest { format: PrintFormat, schema: SchemaRef, batches: Vec, - maxrows: MaxRows, with_header: WithHeader, expected: Vec<&'static str>, } @@ -558,13 +889,12 @@ mod tests { Ignored, } - impl PrintBatchesTest { + impl PrintNoTableBatchesTest { fn new() -> Self { Self { format: PrintFormat::Table, schema: Arc::new(Schema::empty()), batches: vec![], - maxrows: MaxRows::Unlimited, with_header: WithHeader::Ignored, expected: vec![], } @@ -588,12 +918,6 @@ mod tests { self } - /// set maxrows - fn with_maxrows(mut self, maxrows: MaxRows) -> Self { - self.maxrows = maxrows; - self - } - /// set with_header fn with_header(mut self, with_header: WithHeader) -> Self { self.with_header = with_header; @@ -638,11 +962,10 @@ mod tests { fn output_with_header(&self, with_header: bool) -> String { let mut buffer: Vec = vec![]; self.format - .print_batches( + .print_no_table_batches( &mut buffer, self.schema.clone(), &self.batches, - self.maxrows, with_header, ) .unwrap(); @@ -659,6 +982,14 @@ mod tests { ])) } + /// Return a schema with two StringArray columns + fn two_column_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, false), + Field::new("plan", DataType::Utf8, false), + ])) + } + /// Return a batch with three columns and three rows fn three_column_batch() -> RecordBatch { RecordBatch::try_new( @@ -672,6 +1003,34 @@ mod tests { .unwrap() } + /// Return a batch with three columns and three rows, but with different widths + fn three_column_batch_with_widths() -> RecordBatch { + RecordBatch::try_new( + three_column_schema(), + vec![ + Arc::new(Int32Array::from(vec![1, 2222222, 3])), + Arc::new(Int32Array::from(vec![42222, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 922222])), + ], + ) + .unwrap() + } + + /// Return a batch with two columns, but with multi-line values for each column + fn two_column_batch_multi_lines() -> RecordBatch { + RecordBatch::try_new( + two_column_schema(), + vec![ + Arc::new(StringArray::from(vec!["logical_plan", "physical_plan"])), + Arc::new(StringArray::from(vec![ + "Filter: foo.x = Int32(4)\n TableScan: foo projection=[x, y]", + "CoalesceBatchesExec: target_batch_size=8192\n FilterExec: x@0 = 4\n DataSourceExec: partitions=1, partition_sizes=[1]\n", + ])), + ], + ) + .unwrap() + } + /// Return a schema with one column fn one_column_schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])) diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 9557e783e8a7..b12d1ddbe384 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -16,7 +16,6 @@ // under the License. use std::fmt::{Display, Formatter}; -use std::io::Write; use std::pin::Pin; use std::str::FromStr; @@ -25,10 +24,10 @@ use crate::print_format::PrintFormat; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion::common::instant::Instant; -use datafusion::common::DataFusionError; use datafusion::error::Result; use datafusion::physical_plan::RecordBatchStream; +use datafusion::execution::SendableRecordBatchStream; use futures::StreamExt; #[derive(Debug, Clone, PartialEq, Copy)] @@ -74,51 +73,161 @@ pub struct PrintOptions { pub color: bool, } -// Returns the query execution details formatted -fn get_execution_details_formatted( - row_count: usize, - maxrows: MaxRows, - query_start_time: Instant, -) -> String { - let nrows_shown_msg = match maxrows { - MaxRows::Limited(nrows) if nrows < row_count => { - format!("(First {nrows} displayed. Use --maxrows to adjust)") - } - _ => String::new(), - }; - - format!( - "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n", - row_count, - nrows_shown_msg, - query_start_time.elapsed().as_secs_f64() - ) -} - impl PrintOptions { - /// Print the batches to stdout using the specified format - pub fn print_batches( + /// Prints the stream to stdout using the table format. + /// + /// This function processes a stream of record batches, formats them into a table, + /// and writes the output to the provided writer. It handles scenarios where the + /// number of rows exceeds the specified `max_rows` and ensures that the table + /// header and borders are printed appropriately. + /// + /// # Arguments + /// + /// * `schema` - The schema reference of the record batches. + /// * `stream` - The mutable stream of record batches to be printed. + /// * `max_rows` - The maximum number of rows to print. + /// * `writer` - The mutable writer to which the formatted table will be written. + /// * `now` - The instant representing the start time of the operation. + /// + /// # Returns + /// + /// A `Result` indicating the success or failure of the operation. + pub async fn print_streaming_table_batch( &self, schema: SchemaRef, - batches: &[RecordBatch], - query_start_time: Instant, - row_count: usize, + stream: &mut SendableRecordBatchStream, + max_rows: usize, + writer: &mut W, + now: Instant, ) -> Result<()> { - let stdout = std::io::stdout(); - let mut writer = stdout.lock(); + let preview_limit: usize = 1000; + let mut preview_batches: Vec = vec![]; + let mut preview_row_count = 0_usize; + let mut total_count = 0_usize; + let mut precomputed_widths: Option> = None; + let mut header_printed = false; + let mut max_rows_reached = false; - self.format - .print_batches(&mut writer, schema, batches, self.maxrows, true)?; + // Iterate over each batch in the stream + while let Some(batch) = stream.next().await { + let batch = batch?; + let batch_rows = batch.num_rows(); - let formatted_exec_details = get_execution_details_formatted( - row_count, - if self.format == PrintFormat::Table { - self.maxrows - } else { - MaxRows::Unlimited - }, - query_start_time, - ); + // Check if the maximum number of rows to print has been reached + if !max_rows_reached && total_count < max_rows { + if total_count + batch_rows > max_rows { + // If adding this batch exceeds max_rows, slice the batch to fit + let needed = max_rows - total_count; + let batch_to_print = batch.slice(0, needed); + self.format.process_table_batch( + &batch_to_print, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + writer, + )?; + if precomputed_widths.is_none() { + // Compute column widths if not already done + let widths = self + .format + .compute_column_widths(&preview_batches, schema.clone())?; + precomputed_widths = Some(widths.clone()); + if !header_printed { + // Print the table header + self.format.print_header(&schema, &widths, writer)?; + } + // Print each preview batch with the computed widths + for preview_batch in preview_batches.drain(..) { + self.format.print_batch_with_widths( + &preview_batch, + &widths, + writer, + )?; + } + } + if let Some(ref widths) = precomputed_widths { + // Print dotted lines and bottom border to indicate truncation + for _ in 0..3 { + self.format.print_dotted_line(widths, writer)?; + } + self.format.print_bottom_border(widths, writer)?; + } + max_rows_reached = true; + } else { + // Process the entire batch if it doesn't exceed max_rows + self.format.process_table_batch( + &batch, + schema.clone(), + &mut preview_batches, + &mut preview_row_count, + preview_limit, + &mut precomputed_widths, + &mut header_printed, + writer, + )?; + } + } + + total_count += batch_rows; + } + + // Handle the case where the maximum number of rows was not reached + if !max_rows_reached { + if precomputed_widths.is_none() { + // Compute column widths if not already done + if preview_batches.is_empty() { + // If no rows are present, set column widths based on header names + let widths = schema + .fields() + .iter() + .map(|f| f.name().len()) + .collect::>(); + precomputed_widths = Some(widths); + } else { + let widths = self + .format + .compute_column_widths(&preview_batches, schema.clone())?; + precomputed_widths = Some(widths); + if !header_printed { + // Print the table header + self.format.print_header( + &schema, + precomputed_widths.as_ref().unwrap(), + writer, + )?; + header_printed = true; + } + // Print each preview batch with the computed widths + // Here we make sure the preview_batches are printed even if the total count is less than max_rows + for preview_batch in preview_batches.drain(..) { + self.format.print_batch_with_widths( + &preview_batch, + precomputed_widths.as_ref().unwrap(), + writer, + )?; + } + } + if !header_printed { + // Print the table header if not already printed + self.format.print_header( + &schema, + precomputed_widths.as_ref().unwrap(), + writer, + )?; + } + } + if let Some(ref widths) = precomputed_widths { + // Print the bottom border of the table + self.format.print_bottom_border(widths, writer)?; + } + } + + // Get and print the formatted execution details + let formatted_exec_details = + self.get_execution_details_formatted(total_count, now); if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; @@ -127,42 +236,50 @@ impl PrintOptions { Ok(()) } - /// Print the stream to stdout using the specified format - pub async fn print_stream( + /// Print the stream to stdout using the format which is not table format + pub async fn print_no_table_streaming_batch( &self, - mut stream: Pin>, - query_start_time: Instant, + stream: &mut SendableRecordBatchStream, + writer: &mut W, + now: Instant, ) -> Result<()> { - if self.format == PrintFormat::Table { - return Err(DataFusionError::External( - "PrintFormat::Table is not implemented".to_string().into(), - )); + let max_count = match self.maxrows { + MaxRows::Unlimited => usize::MAX, + MaxRows::Limited(n) => n, }; - let stdout = std::io::stdout(); - let mut writer = stdout.lock(); - let mut row_count = 0_usize; let mut with_header = true; + let mut max_rows_reached = false; while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; - row_count += batch.num_rows(); - self.format.print_batches( - &mut writer, - batch.schema(), - &[batch], - MaxRows::Unlimited, - with_header, - )?; + let curr_batch_rows = batch.num_rows(); + if !max_rows_reached && row_count < max_count { + if row_count + curr_batch_rows > max_count { + let needed = max_count - row_count; + let batch_to_print = batch.slice(0, needed); + self.format.print_no_table_batches( + writer, + batch.schema(), + &[batch_to_print], + with_header, + )?; + max_rows_reached = true; + } else { + self.format.print_no_table_batches( + writer, + batch.schema(), + &[batch], + with_header, + )?; + } + } + row_count += curr_batch_rows; with_header = false; } - let formatted_exec_details = get_execution_details_formatted( - row_count, - MaxRows::Unlimited, - query_start_time, - ); + let formatted_exec_details = self.get_execution_details_formatted(row_count, now); if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; @@ -170,4 +287,69 @@ impl PrintOptions { Ok(()) } + + /// Print the stream to stdout using the specified format + /// There are two modes of operation: + /// 1. If the format is table, the stream is processed in batches and previewed to determine the column widths + /// before printing the full result set. And after we have the column widths, we print batch by batch with the correct widths. + /// + /// 2. If the format is not table, the stream is processed batch by batch and printed immediately. + /// + /// The query_start_time is used to calculate the elapsed time for the query. + /// The schema is used to print the header. + pub async fn print_stream( + &self, + schema: SchemaRef, + mut stream: Pin>, + query_start_time: Instant, + ) -> Result<()> { + let max_count = match self.maxrows { + MaxRows::Unlimited => usize::MAX, + MaxRows::Limited(n) => n, + }; + + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); + + if self.format == PrintFormat::Table { + self.print_streaming_table_batch( + schema, + &mut stream, + max_count, + &mut writer, + query_start_time, + ) + .await?; + } else { + self.print_no_table_streaming_batch( + &mut stream, + &mut writer, + query_start_time, + ) + .await?; + } + + Ok(()) + } + + // Returns the query execution details formatted + pub fn get_execution_details_formatted( + &self, + row_count: usize, + query_start_time: Instant, + ) -> String { + let nrows_shown_msg = match self.maxrows { + MaxRows::Limited(nrows) if nrows < row_count => { + format!("(First {nrows} displayed. Use --maxrows to adjust)") + } + _ => String::new(), + }; + + format!( + "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n", + row_count, + nrows_shown_msg, + query_start_time.elapsed().as_secs_f64() + ) + } } diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index fa170ae19259..28c2e31bd54b 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -51,6 +51,163 @@ fn init() { ["--command", "show datafusion.execution.batch_size", "--format", "json", "-q", "-b", "1"], "[{\"name\":\"datafusion.execution.batch_size\",\"value\":\"1\"}]\n" )] + +/// Add case fixed issue: https://github.com/apache/datafusion/issues/14920 +#[case::exec_from_commands( + [ + "--command", "SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC;", + "--format", "table", + "-q" + ], + "+----+\n\ + | v1 |\n\ + +----+\n\ + | 5 |\n\ + | 4 |\n\ + | 3 |\n\ + | 2 |\n\ + | 1 |\n\ + +----+\n" +)] + +/// Add case for unlimited the number of rows to be printed for table format +#[case::exec_from_commands( + [ + "--command", "SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC;", + "--format", "table", + "--maxrows", "inf", + "-q" + ], + "+----+\n\ + | v1 |\n\ + +----+\n\ + | 5 |\n\ + | 4 |\n\ + | 3 |\n\ + | 2 |\n\ + | 1 |\n\ + +----+\n" +)] + +/// Add case for limiting the number of rows to be printed for table format +#[case::exec_from_commands( + [ + "--command", "SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC;", + "--format", "table", + "--maxrows", "3", + "-q" + ], + "+----+\n\ + | v1 |\n\ + +----+\n\ + | 5 |\n\ + | 4 |\n\ + | 3 |\n\ + | . |\n\ + | . |\n\ + | . |\n\ + +----+\n" +)] + +/// Add case for limiting the number to 0 of rows to be printed for table format +#[case::exec_from_commands( + [ + "--command", "SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC;", + "--format", "table", + "--maxrows", "0", + "-q" + ], + "+----+\n\ + | v1 |\n\ + +----+\n\ + +----+\n" +)] + +/// Add case for limiting the number of rows to be printed for csv format +#[case::exec_from_commands( + [ + "--command", "SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC;", + "--format", "csv", + "--maxrows", "3", + "-q" + ], + "v1\n5\n4\n3\n" +)] + +/// Add case for limiting the number of rows to be printed for json format +#[case::exec_from_commands( + [ + "--command", "SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC;", + "--format", "json", + "--maxrows", "3", + "-q" + ], + "[{\"v1\":5},{\"v1\":4},{\"v1\":3}]\n" +)] + +/// Add case for explain table format printing +#[case::exec_explain_simple( + ["--command", "explain select 1;", "--format", "table", "-q"], + "+---------------+--------------------------------------+\n\ + | plan_type | plan |\n\ + +---------------+--------------------------------------+\n\ + | logical_plan | Projection: Int64(1) |\n\ + | | EmptyRelation |\n\ + | physical_plan | ProjectionExec: expr=[1 as Int64(1)] |\n\ + | | PlaceholderRowExec |\n\ + +---------------+--------------------------------------+\n" +)] + +/// Add case for printing empty result set for table format +#[case::exec_select_empty( + [ + "--command", + "select * from (values (1)) as t(col) where false;", + "--format", + "table", + "-q" + ], + "+-----+\n\ + | col |\n\ + +-----+\n\ + +-----+\n" +)] + +/// Add case for printing empty result set for json format +#[case::exec_select_empty_json( + [ + "--command", + "select * from (values (1)) as t(col) where false;", + "--format", + "json", + "-q" + ], + "" +)] + +/// Add case for printing empty result set for csv format +#[case::exec_select_empty_csv( + [ + "--command", + "select * from (values (1)) as t(col) where false;", + "--format", + "csv", + "-q" + ], + "" +)] + +/// Add case for create table should return empty result set for table format +#[case::exec_create_table_empty( + [ + "--command", + "create table t1 (c1 int);", + "--format", + "table", + "-q" + ], + "" +)] #[test] fn cli_quick_test<'a>( #[case] args: impl IntoIterator,