-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge?
It's currently not possible to aggregate by RunArrays.
Example code grouping by a `RunArray`
use arrow::array::{Array, Int32Array, RunArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, Int32Type};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), datafusion::error::DataFusionError> {
// Create a new DataFusion context
let ctx = SessionContext::new();
// First, let's create our data
// We'll have temperature readings where multiple consecutive readings come from the same sensor
// Temperature values (not run-length encoded)
// This represents all temperature readings in sequence
let temperatures = Int32Array::from(vec![
22, 23, 24, 25, 22, 21, 20, 21, 22, 23, 24, 25, 26, 27, 28
]);
// Create the string values for sensor IDs
let sensor_id_values = StringViewArray::from(vec!["sensor_A", "sensor_B", "sensor_C", "sensor_D"]);
// Create the run ends array (positions where each run ends)
let sensor_id_run_ends = Int32Array::from(vec![4, 7, 12, 15]);
// Create RunArray for sensor IDs with Int32Type as run end type
let sensor_id_ree = RunArray::<Int32Type>::try_new(&sensor_id_run_ends, &sensor_id_values)
.expect("Failed to create sensor ID RunArray");
// Get the exact data type of the RunArray for the schema
let sensor_id_type = sensor_id_ree.data_type().clone();
// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("sensor_id", sensor_id_type, false),
Field::new("temperature", DataType::Int32, false),
]));
// Create record batch
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(sensor_id_ree), Arc::new(temperatures)],
)?;
// Register as a table
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
ctx.register_table("sensor_readings", Arc::new(provider))?;
// Run aggregation query
// Group by sensor ID and calculate statistics
let sql = "
SELECT
sensor_id,
AVG(temperature) AS avg_temp,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
COUNT(temperature) AS reading_count
FROM sensor_readings
GROUP BY sensor_id
ORDER BY sensor_id
";
let results = ctx.sql(sql).await?.collect().await?;
for batch in results {
println!("{:?}", batch);
}
Ok(())
}Describe the solution you'd like
To make it happen there are a variety of things that need to happen:
- Support for
RunArrays inarrow-select'sconcat. arrow-select: Implement concat forRunArrays arrow-rs#7487 - Support for
RunArrays inarrow-row. arrow-row: Add support for REE arrow-rs#7649 - Support for
RunArrays inarrow-data'sbuild_extend_nullsandbuild_extend. arrow-data: Add REE support forbuild_extendandbuild_extend_nullsarrow-rs#7671 - Support for grouping by
RunArrays in DataFusion (mainly indatafusion/common/src/cast.rsanddatafusion/physical-plan/src/aggregates/group_values/row.rsto turn groups intoRunArrays after aggregating anddatafusion/common/src/hash_utils.rsto implement the actual hashing handling)
Describe alternatives you've considered
We're currently expanding REE arrays before pushing them through DataFusion query plans, but being able to do it with zero-copy would be much better for performance.
Additional context
I've already got all pieces implemented, but I'm opening this for context and more easy tracking.
alamb
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request