diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 05fd10112dfc..6866b4011f9e 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -1417,9 +1417,7 @@ mod tests { fn from_qualified_schema_into_arrow_schema() -> Result<()> { let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; let arrow_schema = schema.as_arrow(); - let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ - Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"; - assert_eq!(expected, arrow_schema.to_string()); + insta::assert_snapshot!(arrow_schema, @r#"Field { name: "c0", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); Ok(()) } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6b4d2592f608..c28e56790e66 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2486,7 +2486,7 @@ mod tests { // the cast here is implicit so has CastOptions with safe=true let expected = r#"BinaryExpr { left: Column { name: "c7", index: 2 }, op: Lt, right: Literal { value: Int64(5), field: Field { name: "lit", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }"#; - assert!(format!("{exec_plan:?}").contains(expected)); + assert_contains!(format!("{exec_plan:?}"), expected); Ok(()) } @@ -2510,9 +2510,121 @@ mod tests { &session_state, ); - let expected = r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c3")], groups: [[false, false, false], [true, false, false], [false, true, false], [false, false, true], [true, true, false], [true, false, true], [false, true, true], [true, true, true]] })"#; - - assert_eq!(format!("{cube:?}"), expected); + insta::assert_debug_snapshot!(cube, @r#" + Ok( + PhysicalGroupBy { + expr: [ + ( + Column { + name: "c1", + index: 0, + }, + "c1", + ), + ( + Column { + name: "c2", + index: 1, + }, + "c2", + ), + ( + Column { + name: "c3", + index: 2, + }, + "c3", + ), + ], + null_expr: [ + ( + Literal { + value: Utf8(NULL), + field: Field { + name: "lit", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + }, + "c1", + ), + ( + Literal { + value: Int64(NULL), + field: Field { + name: "lit", + data_type: Int64, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + }, + "c2", + ), + ( + Literal { + value: Int64(NULL), + field: Field { + name: "lit", + data_type: Int64, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + }, + "c3", + ), + ], + groups: [ + [ + false, + false, + false, + ], + [ + true, + false, + false, + ], + [ + false, + true, + false, + ], + [ + false, + false, + true, + ], + [ + true, + true, + false, + ], + [ + true, + false, + true, + ], + [ + false, + true, + true, + ], + [ + true, + true, + true, + ], + ], + }, + ) + "#); Ok(()) } @@ -2537,9 +2649,101 @@ mod tests { &session_state, ); - let expected = r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c3")], groups: [[true, true, true], [false, true, true], [false, false, true], [false, false, false]] })"#; - - assert_eq!(format!("{rollup:?}"), expected); + insta::assert_debug_snapshot!(rollup, @r#" + Ok( + PhysicalGroupBy { + expr: [ + ( + Column { + name: "c1", + index: 0, + }, + "c1", + ), + ( + Column { + name: "c2", + index: 1, + }, + "c2", + ), + ( + Column { + name: "c3", + index: 2, + }, + "c3", + ), + ], + null_expr: [ + ( + Literal { + value: Utf8(NULL), + field: Field { + name: "lit", + data_type: Utf8, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + }, + "c1", + ), + ( + Literal { + value: Int64(NULL), + field: Field { + name: "lit", + data_type: Int64, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + }, + "c2", + ), + ( + Literal { + value: Int64(NULL), + field: Field { + name: "lit", + data_type: Int64, + nullable: true, + dict_id: 0, + dict_is_ordered: false, + metadata: {}, + }, + }, + "c3", + ), + ], + groups: [ + [ + true, + true, + true, + ], + [ + false, + true, + true, + ], + [ + false, + false, + true, + ], + [ + false, + false, + false, + ], + ], + }, + ) + "#); Ok(()) } @@ -2677,35 +2881,13 @@ mod tests { let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(NoOpExtensionNode::default()), }); - let plan = planner + let e = planner .create_physical_plan(&logical_plan, &session_state) - .await; + .await + .expect_err("planning error") + .strip_backtrace(); - let expected_error: &str = "Error during planning: \ - Extension planner for NoOp created an ExecutionPlan with mismatched schema. \ - LogicalPlan schema: \ - DFSchema { inner: Schema { fields: \ - [Field { name: \"a\", \ - data_type: Int32, \ - nullable: false, \ - dict_id: 0, \ - dict_is_ordered: false, metadata: {} }], \ - metadata: {} }, field_qualifiers: [None], \ - functional_dependencies: FunctionalDependencies { deps: [] } }, \ - ExecutionPlan schema: Schema { fields: \ - [Field { name: \"b\", \ - data_type: Int32, \ - nullable: false, \ - dict_id: 0, \ - dict_is_ordered: false, metadata: {} }], \ - metadata: {} }"; - match plan { - Ok(_) => panic!("Expected planning failure"), - Err(e) => assert!( - e.to_string().contains(expected_error), - "Error '{e}' did not contain expected error '{expected_error}'" - ), - } + insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }"#); } #[tokio::test] @@ -2723,8 +2905,7 @@ mod tests { let expected = "expr: [ProjectionExpr { expr: BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }"; - let actual = format!("{execution_plan:?}"); - assert!(actual.contains(expected), "{}", actual); + assert_contains!(format!("{execution_plan:?}"), expected); Ok(()) } diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 1978c189c4f8..98c3e3ccee8a 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -218,10 +218,12 @@ async fn test_parameter_invalid_types() -> Result<()> { .with_param_values(vec![ScalarValue::from(4_i32)])? .collect() .await; - assert_eq!( - results.unwrap_err().strip_backtrace(), - "type_coercion\ncaused by\nError during planning: Cannot infer common argument type for comparison operation List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) = Int32" -); + assert_snapshot!(results.unwrap_err().strip_backtrace(), + @r#" + type_coercion + caused by + Error during planning: Cannot infer common argument type for comparison operation List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) = Int32 + "#); Ok(()) } diff --git a/datafusion/expr-common/src/type_coercion/binary/tests/arithmetic.rs b/datafusion/expr-common/src/type_coercion/binary/tests/arithmetic.rs index bfedcf071387..63945a4dabd0 100644 --- a/datafusion/expr-common/src/type_coercion/binary/tests/arithmetic.rs +++ b/datafusion/expr-common/src/type_coercion/binary/tests/arithmetic.rs @@ -37,8 +37,8 @@ fn test_date_timestamp_arithmetic_error() -> Result<()> { &DataType::Timestamp(TimeUnit::Millisecond, None), ) .get_input_types()?; - assert_eq!(lhs.to_string(), "Timestamp(Millisecond, None)"); - assert_eq!(rhs.to_string(), "Timestamp(Millisecond, None)"); + assert_eq!(lhs, DataType::Timestamp(TimeUnit::Millisecond, None)); + assert_eq!(rhs, DataType::Timestamp(TimeUnit::Millisecond, None)); let err = BinaryTypeCoercer::new(&DataType::Date32, &Operator::Plus, &DataType::Date64) diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index c1fd201e10fd..407e3e6a9d29 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -262,8 +262,8 @@ mod tests { }, datatypes::*, }; - use datafusion_common::assert_contains; use datafusion_physical_expr_common::physical_expr::fmt_sql; + use insta::assert_snapshot; // runs an end-to-end test of physical type cast // 1. construct a record batch with a column "a" of type A @@ -438,11 +438,8 @@ mod tests { )?; let expression = cast_with_options(col("a", &schema)?, &schema, Decimal128(6, 2), None)?; - let e = expression.evaluate(&batch).unwrap_err(); // panics on OK - assert_contains!( - e.to_string(), - "Arrow error: Invalid argument error: 12345679 is too large to store in a Decimal128 of precision 6. Max is 999999" - ); + let e = expression.evaluate(&batch).unwrap_err().strip_backtrace(); // panics on OK + assert_snapshot!(e, @"Arrow error: Invalid argument error: 12345679 is too large to store in a Decimal128 of precision 6. Max is 999999"); let expression_safe = cast_with_options( col("a", &schema)?, diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index a0d173d3e846..fa91635d9bfd 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -476,13 +476,13 @@ pub fn in_list( #[cfg(test)] mod tests { - use super::*; use crate::expressions; use crate::expressions::{col, lit, try_cast}; use datafusion_common::plan_err; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_physical_expr_common::physical_expr::fmt_sql; + use insta::assert_snapshot; use itertools::Itertools as _; type InListCastResult = (Arc, Vec>); @@ -1456,7 +1456,7 @@ mod tests { } #[test] - fn test_fmt_sql() -> Result<()> { + fn test_fmt_sql_1() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); let col_a = col("a", &schema)?; @@ -1465,33 +1465,53 @@ mod tests { let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?; let sql_string = fmt_sql(expr.as_ref()).to_string(); let display_string = expr.to_string(); - assert_eq!(sql_string, "a IN (a, b)"); - assert_eq!(display_string, "a@0 IN (SET) ([a, b])"); + assert_snapshot!(sql_string, @"a IN (a, b)"); + assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b])"); + Ok(()) + } + + #[test] + fn test_fmt_sql_2() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); + let col_a = col("a", &schema)?; // Test: a NOT IN ('a', 'b') let list = vec![lit("a"), lit("b")]; let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?; let sql_string = fmt_sql(expr.as_ref()).to_string(); let display_string = expr.to_string(); - assert_eq!(sql_string, "a NOT IN (a, b)"); - assert_eq!(display_string, "a@0 NOT IN (SET) ([a, b])"); + assert_snapshot!(sql_string, @"a NOT IN (a, b)"); + assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b])"); + Ok(()) + } + + #[test] + fn test_fmt_sql_3() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); + let col_a = col("a", &schema)?; // Test: a IN ('a', 'b', NULL) let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))]; let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?; let sql_string = fmt_sql(expr.as_ref()).to_string(); let display_string = expr.to_string(); - assert_eq!(sql_string, "a IN (a, b, NULL)"); - assert_eq!(display_string, "a@0 IN (SET) ([a, b, NULL])"); + assert_snapshot!(sql_string, @"a IN (a, b, NULL)"); + assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b, NULL])"); + Ok(()) + } + + #[test] + fn test_fmt_sql_4() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); + let col_a = col("a", &schema)?; // Test: a NOT IN ('a', 'b', NULL) let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))]; let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?; let sql_string = fmt_sql(expr.as_ref()).to_string(); let display_string = expr.to_string(); - assert_eq!(sql_string, "a NOT IN (a, b, NULL)"); - assert_eq!(display_string, "a@0 NOT IN (SET) ([a, b, NULL])"); - + assert_snapshot!(sql_string, @"a NOT IN (a, b, NULL)"); + assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b, NULL])"); Ok(()) } } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index cb8bd0a3cf19..891fd0ae4851 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1249,7 +1249,7 @@ mod tests { use crate::windows::{ create_udwf_window_expr, create_window_expr, BoundedWindowAggExec, InputOrderMode, }; - use crate::{execute_stream, get_plan_string, ExecutionPlan}; + use crate::{displayable, execute_stream, ExecutionPlan}; use arrow::array::{ builder::{Int64Builder, UInt64Builder}, @@ -1694,16 +1694,11 @@ mod tests { let batches = collect(physical_plan.execute(0, task_ctx)?).await?; - let expected = vec![ - "BoundedWindowAggExec: wdw=[last: Field { name: \"last\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(-1): Field { name: \"nth_value(-1)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(-2): Field { name: \"nth_value(-2)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]", - " DataSourceExec: partitions=1, partition_sizes=[3]", - ]; // Get string representation of the plan - let actual = get_plan_string(&physical_plan); - assert_eq!( - expected, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + assert_snapshot!(displayable(physical_plan.as_ref()).indent(true), @r#" + BoundedWindowAggExec: wdw=[last: Field { name: "last", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(-1): Field { name: "nth_value(-1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(-2): Field { name: "nth_value(-2)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + DataSourceExec: partitions=1, partition_sizes=[3] + "#); assert_snapshot!(batches_to_string(&batches), @r#" +---+------+---------------+---------------+ @@ -1816,18 +1811,12 @@ mod tests { let plan = projection_exec(window)?; - let expected_plan = vec![ - "ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]@2 as col_2]", - " BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]: Field { name: \"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \\\"sn\\\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING], mode=[Linear]", - " StreamingTableExec: partition_sizes=1, projection=[sn, hash], infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]", - ]; - // Get string representation of the plan - let actual = get_plan_string(&plan); - assert_eq!( - expected_plan, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_plan:#?}\nactual:\n\n{actual:#?}\n\n" - ); + assert_snapshot!(displayable(plan.as_ref()).indent(true), @r#" + ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: "sn", index: 0 }]) PARTITION BY: [[Column { name: "hash", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: "sn", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]@2 as col_2] + BoundedWindowAggExec: wdw=[count([Column { name: "sn", index: 0 }]) PARTITION BY: [[Column { name: "hash", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: "sn", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]: Field { name: "count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING], mode=[Linear] + StreamingTableExec: partition_sizes=1, projection=[sn, hash], infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST] + "#); let task_ctx = task_context(); let batches = collect_with_timeout(plan, task_ctx, timeout_duration).await?;