Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ use variable::{decode_binary_view, decode_string_view};

use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
use crate::variable::{decode_binary, decode_string};
use arrow_array::types::{Int16Type, Int32Type, Int64Type};

mod fixed;
mod list;
mod run;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked and run is consistent with the naming of REEArray elsewhere in the crate 👍

mod variable;

/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
Expand Down Expand Up @@ -381,6 +383,8 @@ enum Codec {
Struct(RowConverter, OwnedRow),
/// A row converter for the child field
List(RowConverter),
/// A row converter for the values array of a run-end encoded array
RunEndEncoded(RowConverter),
}

impl Codec {
Expand All @@ -400,6 +404,17 @@ impl Codec {
};
Ok(Self::Dictionary(converter, owned))
}
DataType::RunEndEncoded(_, values) => {
// Similar to List implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can pull the transformation into a documented helper function (not needed, I just was confused for a bit until I read the comments in the List/LargeList implementation

let options = SortOptions {
descending: false,
nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
};

let field = SortField::new_with_options(values.data_type().clone(), options);
let converter = RowConverter::new(vec![field])?;
Ok(Self::RunEndEncoded(converter))
}
d if !d.is_nested() => Ok(Self::Stateless),
DataType::List(f) | DataType::LargeList(f) => {
// The encoded contents will be inverted if descending is set to true
Expand Down Expand Up @@ -460,6 +475,19 @@ impl Codec {
let rows = converter.convert_columns(&[values.clone()])?;
Ok(Encoder::List(rows))
}
Codec::RunEndEncoded(converter) => {
let values = match array.data_type() {
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => array.as_run::<Int16Type>().values(),
DataType::Int32 => array.as_run::<Int32Type>().values(),
DataType::Int64 => array.as_run::<Int64Type>().values(),
_ => unreachable!("Unsupported run end index type: {r:?}"),
},
_ => unreachable!(),
};
let rows = converter.convert_columns(&[values.clone()])?;
Ok(Encoder::RunEndEncoded(rows))
}
}
}

Expand All @@ -469,6 +497,7 @@ impl Codec {
Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
Codec::List(converter) => converter.size(),
Codec::RunEndEncoded(converter) => converter.size(),
}
}
}
Expand All @@ -487,6 +516,8 @@ enum Encoder<'a> {
Struct(Rows, Row<'a>),
/// The row encoding of the child array
List(Rows),
/// The row encoding of the values array
RunEndEncoded(Rows),
}

/// Configure the data type and sort order for a given column
Expand Down Expand Up @@ -545,6 +576,7 @@ impl RowConverter {
Self::supports_datatype(f.data_type())
}
DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
_ => false,
}
}
Expand Down Expand Up @@ -1331,6 +1363,27 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
}
_ => unreachable!(),
},
Encoder::RunEndEncoded(rows) => match array.data_type() {
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => run::compute_lengths(
tracker.materialized(),
rows,
array.as_run::<Int16Type>(),
),
DataType::Int32 => run::compute_lengths(
tracker.materialized(),
rows,
array.as_run::<Int32Type>(),
),
DataType::Int64 => run::compute_lengths(
tracker.materialized(),
rows,
array.as_run::<Int64Type>(),
),
_ => unreachable!("Unsupported run end index type: {r:?}"),
},
_ => unreachable!(),
},
}
}

Expand Down Expand Up @@ -1427,6 +1480,21 @@ fn encode_column(
}
_ => unreachable!(),
},
Encoder::RunEndEncoded(rows) => match column.data_type() {
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => {
run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
}
DataType::Int32 => {
run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
}
DataType::Int64 => {
run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
}
_ => unreachable!("Unsupported run end index type: {r:?}"),
},
_ => unreachable!(),
},
}
}

Expand Down Expand Up @@ -1512,6 +1580,30 @@ unsafe fn decode_column(
}
_ => unreachable!(),
},
Codec::RunEndEncoded(converter) => match &field.data_type {
DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
DataType::Int16 => Arc::new(run::decode::<Int16Type>(
converter,
rows,
field,
validate_utf8,
)?),
DataType::Int32 => Arc::new(run::decode::<Int32Type>(
converter,
rows,
field,
validate_utf8,
)?),
DataType::Int64 => Arc::new(run::decode::<Int64Type>(
converter,
rows,
field,
validate_utf8,
)?),
_ => unreachable!(),
},
_ => unreachable!(),
},
};
Ok(array)
}
Expand Down
Loading
Loading