Skip to content

[Bug] Aggregate + TopK fails when asc = false #16837

@niebayes

Description

@niebayes

Describe the bug

There's a rare case that a plan TableScan -> Aggregate -> TopK would fail to execute, and raise the following error:

Arrow error: Invalid argument error: column types must match schema types, expected Timestamp(Millisecond, Some("UTC")) but found Timestamp(Millisecond, None) at column index 0

This issue occurs IF AND ONLY IF all of the following conditions are met simultaneously:

  1. Grouping by a timestamp column, where the timestamp column has a non-empty timezone.
  2. Aggregating by a column other than the timestamp column (the aggregation column cannot be a timestamp column).
  3. Sorting by this aggregation column.
  4. The sorting is in descending order. (Ascending order does not trigger this bug.)
  5. A non-zero Limit is applied.

To Reproduce

I have written a unit test to reproduce the bug. The test consists of two cases:

  1. Construct a plan TableScan -> Aggregate -> TopK with asc = true. Executing this plan is ok.
  2. Construct the same plan but with asc = false, i.e. sort in descending order. Executing this plan would raise an error.
#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use arrow_array::{Int32Array, RecordBatch, TimestampMillisecondArray};
    use arrow_cast::pretty::pretty_format_batches;
    use arrow_schema::{DataType, Field, Schema, TimeUnit};
    use datafusion::{
        catalog::MemTable, datasource::provider_as_source, functions_aggregate::min_max::max,
        physical_plan::collect, prelude::SessionContext,
    };
    use datafusion_common::DataFusionError;
    use datafusion_expr::{LogicalPlanBuilder, SortExpr, col};

    async fn test_aggregate_then_topk(asc: bool) -> datafusion_common::Result<()> {
        let schema = Arc::new(Schema::new(vec![
            Field::new(
                "ts",
                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
                false,
            ),
            Field::new("value", DataType::Int32, false),
        ]));
        let columns = vec![
            Arc::new(
                TimestampMillisecondArray::from(vec![1000, 2000, 3000])
                    .with_timezone("UTC".to_string()),
            ) as _,
            Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
        ];
        let batch = RecordBatch::try_new(schema.clone(), columns).map_err(DataFusionError::from)?;
        let mem_table = MemTable::try_new(schema.clone(), vec![vec![batch]])?;

        let plan = LogicalPlanBuilder::scan("t", provider_as_source(Arc::new(mem_table)), None)?
            .aggregate(vec![col("ts")], vec![max(col("value")).alias("max_value")])?
            .sort_with_limit(vec![SortExpr::new(col("max_value"), asc, true)], Some(1))?
            .build()?;
        println!("{}", plan.display_indent());

        let session_state = SessionContext::new().state();
        let exec_plan = session_state.create_physical_plan(&plan).await?;
        let batches = collect(exec_plan, session_state.task_ctx()).await?;
        println!("{}", pretty_format_batches(&batches).unwrap());

        Ok(())
    }

    #[tokio::test]
    async fn test() {
        // Case 1: TableScan -> Aggregate -> Topk (asc = true)
        let result = test_aggregate_then_topk(true).await;
        assert!(result.is_ok());

        // Case 2: TableScan -> Aggregate -> Topk (asc = false)
        let result = test_aggregate_then_topk(false).await;
        assert!(result.is_err());
    }
}

Expected behavior

No response

Additional context

This bug is confirmed to exist both in the latest codebase (rev: 350c61b) and in the v48.0.0

Metadata

Metadata

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions