Skip to content

Support Aggregating by RunArrays #16011

@brancz

Description

@brancz

Is your feature request related to a problem or challenge?

It's currently not possible to aggregate by RunArrays.

Example code grouping by a `RunArray`
use arrow::array::{Array, Int32Array, RunArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, Int32Type};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), datafusion::error::DataFusionError> {
    // Create a new DataFusion context
    let ctx = SessionContext::new();

    // First, let's create our data
    // We'll have temperature readings where multiple consecutive readings come from the same sensor

    // Temperature values (not run-length encoded)
    // This represents all temperature readings in sequence
    let temperatures = Int32Array::from(vec![
        22, 23, 24, 25, 22, 21, 20, 21, 22, 23, 24, 25, 26, 27, 28
    ]);

    // Create the string values for sensor IDs
    let sensor_id_values = StringViewArray::from(vec!["sensor_A", "sensor_B", "sensor_C", "sensor_D"]);

    // Create the run ends array (positions where each run ends)
    let sensor_id_run_ends = Int32Array::from(vec![4, 7, 12, 15]);

    // Create RunArray for sensor IDs with Int32Type as run end type
    let sensor_id_ree = RunArray::<Int32Type>::try_new(&sensor_id_run_ends, &sensor_id_values)
        .expect("Failed to create sensor ID RunArray");

    // Get the exact data type of the RunArray for the schema
    let sensor_id_type = sensor_id_ree.data_type().clone();

    // Create schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("sensor_id", sensor_id_type, false),
        Field::new("temperature", DataType::Int32, false),
    ]));

    // Create record batch
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![Arc::new(sensor_id_ree), Arc::new(temperatures)],
    )?;

    // Register as a table
    let provider = MemTable::try_new(schema, vec![vec![batch]])?;
    ctx.register_table("sensor_readings", Arc::new(provider))?;

    // Run aggregation query
    // Group by sensor ID and calculate statistics
    let sql = "
        SELECT
            sensor_id,
            AVG(temperature) AS avg_temp,
            MIN(temperature) AS min_temp,
            MAX(temperature) AS max_temp,
            COUNT(temperature) AS reading_count
        FROM sensor_readings
        GROUP BY sensor_id
        ORDER BY sensor_id
    ";

    let results = ctx.sql(sql).await?.collect().await?;
    for batch in results {
        println!("{:?}", batch);
    }

    Ok(())
}

Describe the solution you'd like

To make it happen there are a variety of things that need to happen:

Describe alternatives you've considered

We're currently expanding REE arrays before pushing them through DataFusion query plans, but being able to do it with zero-copy would be much better for performance.

Additional context

I've already got all pieces implemented, but I'm opening this for context and more easy tracking.

@alamb

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions