-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
There's a rare case that a plan TableScan -> Aggregate -> TopK would fail to execute, and raise the following error:
Arrow error: Invalid argument error: column types must match schema types, expected Timestamp(Millisecond, Some("UTC")) but found Timestamp(Millisecond, None) at column index 0
This issue occurs IF AND ONLY IF all of the following conditions are met simultaneously:
- Grouping by a timestamp column, where the timestamp column has a non-empty timezone.
- Aggregating by a column other than the timestamp column (the aggregation column cannot be a timestamp column).
- Sorting by this aggregation column.
- The sorting is in descending order. (Ascending order does not trigger this bug.)
- A non-zero Limit is applied.
To Reproduce
I have written a unit test to reproduce the bug. The test consists of two cases:
- Construct a plan
TableScan -> Aggregate -> TopKwithasc = true. Executing this plan is ok. - Construct the same plan but with
asc = false, i.e. sort in descending order. Executing this plan would raise an error.
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_array::{Int32Array, RecordBatch, TimestampMillisecondArray};
use arrow_cast::pretty::pretty_format_batches;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use datafusion::{
catalog::MemTable, datasource::provider_as_source, functions_aggregate::min_max::max,
physical_plan::collect, prelude::SessionContext,
};
use datafusion_common::DataFusionError;
use datafusion_expr::{LogicalPlanBuilder, SortExpr, col};
async fn test_aggregate_then_topk(asc: bool) -> datafusion_common::Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
),
Field::new("value", DataType::Int32, false),
]));
let columns = vec![
Arc::new(
TimestampMillisecondArray::from(vec![1000, 2000, 3000])
.with_timezone("UTC".to_string()),
) as _,
Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
];
let batch = RecordBatch::try_new(schema.clone(), columns).map_err(DataFusionError::from)?;
let mem_table = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
let plan = LogicalPlanBuilder::scan("t", provider_as_source(Arc::new(mem_table)), None)?
.aggregate(vec![col("ts")], vec![max(col("value")).alias("max_value")])?
.sort_with_limit(vec![SortExpr::new(col("max_value"), asc, true)], Some(1))?
.build()?;
println!("{}", plan.display_indent());
let session_state = SessionContext::new().state();
let exec_plan = session_state.create_physical_plan(&plan).await?;
let batches = collect(exec_plan, session_state.task_ctx()).await?;
println!("{}", pretty_format_batches(&batches).unwrap());
Ok(())
}
#[tokio::test]
async fn test() {
// Case 1: TableScan -> Aggregate -> Topk (asc = true)
let result = test_aggregate_then_topk(true).await;
assert!(result.is_ok());
// Case 2: TableScan -> Aggregate -> Topk (asc = false)
let result = test_aggregate_then_topk(false).await;
assert!(result.is_err());
}
}Expected behavior
No response
Additional context
This bug is confirmed to exist both in the latest codebase (rev: 350c61b) and in the v48.0.0
alamb
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working