Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 62 additions & 104 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,19 @@ ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
apache-avro = { version = "0.17", default-features = false }
arrow = { version = "55.2.0", features = [
arrow = { version = "56.0.0", features = [
"prettyprint",
"chrono-tz",
] }
arrow-buffer = { version = "55.2.0", default-features = false }
arrow-flight = { version = "55.2.0", features = [
arrow-buffer = { version = "56.0.0", default-features = false }
arrow-flight = { version = "56.0.0", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "55.2.0", default-features = false, features = [
arrow-ipc = { version = "56.0.0", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "55.2.0", default-features = false }
arrow-schema = { version = "55.2.0", default-features = false }
arrow-ord = { version = "56.0.0", default-features = false }
arrow-schema = { version = "56.0.0", default-features = false }
async-trait = "0.1.88"
bigdecimal = "0.4.8"
bytes = "1.10"
Expand Down Expand Up @@ -155,7 +155,7 @@ itertools = "0.14"
log = "^0.4"
object_store = { version = "0.12.3", default-features = false }
parking_lot = "0.12"
parquet = { version = "55.2.0", default-features = false, features = [
parquet = { version = "56.0.0", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ serde_json = { workspace = true }
tempfile = { workspace = true }
test-utils = { path = "../test-utils" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.12.1"
tonic = "0.13.1"
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3" }
url = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.24.2", optional = true }
pyo3 = { version = "0.25", optional = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }
Expand Down
18 changes: 2 additions & 16 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,6 @@ config_namespace! {
/// default parquet writer setting
pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())

/// (writing) Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
/// max_statistics_size is deprecated, currently it is not being used
// TODO: remove once deprecated
#[deprecated(since = "45.0.0", note = "Setting does not do anything")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these were removed from the underlying parquet library as well

pub max_statistics_size: Option<usize>, default = Some(4096)

/// (writing) Target maximum number of rows in each row group (defaults to 1M
/// rows). Writing larger row groups requires more memory to write, but
/// can get better compression and be faster to read.
Expand All @@ -625,9 +618,9 @@ config_namespace! {
/// (writing) Sets column index truncate length
pub column_index_truncate_length: Option<usize>, default = Some(64)

/// (writing) Sets statictics truncate length. If NULL, uses
/// (writing) Sets statistics truncate length. If NULL, uses
/// default parquet writer setting
pub statistics_truncate_length: Option<usize>, default = None
pub statistics_truncate_length: Option<usize>, default = Some(64)

/// (writing) Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = 20_000
Expand Down Expand Up @@ -2064,13 +2057,6 @@ config_namespace_with_hashmap! {
/// Sets bloom filter number of distinct values. If NULL, uses
/// default parquet options
pub bloom_filter_ndv: Option<u64>, default = None

/// Sets max statistics size for the column path. If NULL, uses
/// default parquet options
/// max_statistics_size is deprecated, currently it is not being used
// TODO: remove once deprecated
#[deprecated(since = "45.0.0", note = "Setting does not do anything")]
pub max_statistics_size: Option<usize>, default = None
}
}

Expand Down
25 changes: 1 addition & 24 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use parquet::{
metadata::KeyValue,
properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
DEFAULT_STATISTICS_ENABLED,
},
},
schema::types::ColumnPath,
Expand Down Expand Up @@ -161,16 +161,6 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
builder =
builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
}

// max_statistics_size is deprecated, currently it is not being used
// TODO: remove once deprecated
#[allow(deprecated)]
if let Some(max_statistics_size) = options.max_statistics_size {
builder = {
#[allow(deprecated)]
builder.set_column_max_statistics_size(path, max_statistics_size)
}
}
}

Ok(builder)
Expand Down Expand Up @@ -219,7 +209,6 @@ impl ParquetOptions {
dictionary_enabled,
dictionary_page_size_limit,
statistics_enabled,
max_statistics_size,
max_row_group_size,
created_by,
column_index_truncate_length,
Expand Down Expand Up @@ -266,13 +255,6 @@ impl ParquetOptions {
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_on_write);

builder = {
#[allow(deprecated)]
builder.set_max_statistics_size(
max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
)
};

if let Some(bloom_filter_fpp) = bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
};
Expand Down Expand Up @@ -465,12 +447,10 @@ mod tests {
fn column_options_with_non_defaults(
src_col_defaults: &ParquetOptions,
) -> ParquetColumnOptions {
#[allow(deprecated)] // max_statistics_size
ParquetColumnOptions {
compression: Some("zstd(22)".into()),
dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
statistics_enabled: Some("none".into()),
max_statistics_size: Some(72),
encoding: Some("RLE".into()),
bloom_filter_enabled: Some(true),
bloom_filter_fpp: Some(0.72),
Expand All @@ -495,7 +475,6 @@ mod tests {
dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
dictionary_page_size_limit: 42,
statistics_enabled: Some("chunk".into()),
max_statistics_size: Some(42),
max_row_group_size: 42,
created_by: "wordy".into(),
column_index_truncate_length: Some(42),
Expand Down Expand Up @@ -554,7 +533,6 @@ mod tests {
),
bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
max_statistics_size: Some(props.max_statistics_size(&col)),
}
}

Expand Down Expand Up @@ -611,7 +589,6 @@ mod tests {
compression: default_col_props.compression,
dictionary_enabled: default_col_props.dictionary_enabled,
statistics_enabled: default_col_props.statistics_enabled,
max_statistics_size: default_col_props.max_statistics_size,
bloom_filter_on_write: default_col_props
.bloom_filter_enabled
.unwrap_or_default(),
Expand Down
19 changes: 14 additions & 5 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,10 @@ fn dict_from_values<K: ArrowDictionaryKeyType>(
.map(|index| {
if values_array.is_valid(index) {
let native_index = K::Native::from_usize(index).ok_or_else(|| {
DataFusionError::Internal(format!(
"Can not create index of type {} from value {}",
K::DATA_TYPE,
index
))
_internal_datafusion_err!(
"Can not create index of type {} from value {index}",
K::DATA_TYPE
)
})?;
Ok(Some(native_index))
} else {
Expand Down Expand Up @@ -2192,6 +2191,16 @@ impl ScalarValue {
}

let array: ArrayRef = match &data_type {
DataType::Decimal32(_precision, _scale) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decimal323 and Decimal64 types were added to arrow

return _not_impl_err!(
"Decimal32 not supported in ScalarValue::iter_to_array"
);
}
DataType::Decimal64(_precision, _scale) => {
return _not_impl_err!(
"Decimal64 not supported in ScalarValue::iter_to_array"
);
}
DataType::Decimal128(precision, scale) => {
let decimal_array =
ScalarValue::iter_to_decimal_array(scalars, *precision, *scale)?;
Expand Down
5 changes: 4 additions & 1 deletion datafusion/common/src/types/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,10 @@ impl From<DataType> for NativeType {
DataType::Union(union_fields, _) => {
Union(LogicalUnionFields::from(&union_fields))
}
DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => Decimal(p, s),
DataType::Decimal32(p, s)
| DataType::Decimal64(p, s)
| DataType::Decimal128(p, s)
| DataType::Decimal256(p, s) => Decimal(p, s),
DataType::Map(field, _) => Map(Arc::new(field.as_ref().into())),
DataType::Dictionary(_, data_type) => data_type.as_ref().clone().into(),
DataType::RunEndEncoded(_, field) => field.data_type().clone().into(),
Expand Down
11 changes: 3 additions & 8 deletions datafusion/core/tests/fuzz_cases/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,9 @@ async fn write_parquet_file(
row_groups: Vec<Vec<String>>,
) -> Bytes {
let mut buf = BytesMut::new().writer();
let mut props = WriterProperties::builder();
if let Some(truncation_length) = truncation_length {
props = {
#[allow(deprecated)]
props.set_max_statistics_size(truncation_length)
}
}
props = props.set_statistics_enabled(EnabledStatistics::Chunk); // row group level
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk) // row group level
.set_statistics_truncate_length(truncation_length);
let props = props.build();
{
let mut writer =
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ struct ContextWithParquet {

/// The output of running one of the test cases
struct TestOutput {
/// The input string
/// The input query SQL
sql: String,
/// Execution metrics for the Parquet Scan
parquet_metrics: MetricsSet,
/// number of rows in results
/// number of actual rows in results
result_rows: usize,
/// the contents of the input, as a string
pretty_input: String,
Expand Down
14 changes: 8 additions & 6 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct RowGroupPruningTest {
expected_files_pruned_by_statistics: Option<usize>,
expected_row_group_matched_by_bloom_filter: Option<usize>,
expected_row_group_pruned_by_bloom_filter: Option<usize>,
expected_results: usize,
expected_rows: usize,
}
impl RowGroupPruningTest {
// Start building the test configuration
Expand All @@ -48,7 +48,7 @@ impl RowGroupPruningTest {
expected_files_pruned_by_statistics: None,
expected_row_group_matched_by_bloom_filter: None,
expected_row_group_pruned_by_bloom_filter: None,
expected_results: 0,
expected_rows: 0,
}
}

Expand Down Expand Up @@ -99,9 +99,9 @@ impl RowGroupPruningTest {
self
}

// Set the expected rows for the test
/// Set the number of expected rows from the output of this test
fn with_expected_rows(mut self, rows: usize) -> Self {
self.expected_results = rows;
self.expected_rows = rows;
self
}

Expand Down Expand Up @@ -145,8 +145,10 @@ impl RowGroupPruningTest {
);
assert_eq!(
output.result_rows,
self.expected_results,
"mismatched expected rows: {}",
self.expected_rows,
"Expected {} rows, got {}: {}",
output.result_rows,
self.expected_rows,
output.description(),
);
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/datasource-avro/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ fn default_field_name(dt: &DataType) -> &str {
| DataType::LargeListView(_) => {
unimplemented!("View support not implemented")
}
DataType::Decimal32(_, _) => "decimal",
DataType::Decimal64(_, _) => "decimal",
DataType::Decimal128(_, _) => "decimal",
DataType::Decimal256(_, _) => "decimal",
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ pub fn can_hash(data_type: &DataType) -> bool {
DataType::Float16 => true,
DataType::Float32 => true,
DataType::Float64 => true,
DataType::Decimal32(_, _) => true,
DataType::Decimal64(_, _) => true,
DataType::Decimal128(_, _) => true,
DataType::Decimal256(_, _) => true,
DataType::Timestamp(_, _) => true,
Expand Down
40 changes: 32 additions & 8 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,19 @@ enum IntervalUnit{
MonthDayNano = 2;
}

message Decimal{
message Decimal32Type {
reserved 1, 2;
uint32 precision = 3;
int32 scale = 4;
}

message Decimal64Type {
reserved 1, 2;
uint32 precision = 3;
int32 scale = 4;
}

message Decimal128Type {
reserved 1, 2;
uint32 precision = 3;
int32 scale = 4;
Expand Down Expand Up @@ -286,6 +298,8 @@ message ScalarValue{
ScalarNestedValue struct_value = 32;
ScalarNestedValue map_value = 41;

Decimal32 decimal32_value = 43;
Decimal64 decimal64_value = 44;
Decimal128 decimal128_value = 20;
Decimal256 decimal256_value = 39;

Expand All @@ -310,6 +324,18 @@ message ScalarValue{
}
}

message Decimal32{
bytes value = 1;
int64 p = 2;
int64 s = 3;
}

message Decimal64{
bytes value = 1;
int64 p = 2;
int64 s = 3;
}

message Decimal128{
bytes value = 1;
int64 p = 2;
Expand Down Expand Up @@ -352,7 +378,9 @@ message ArrowType{
TimeUnit TIME32 = 21 ;
TimeUnit TIME64 = 22 ;
IntervalUnit INTERVAL = 23 ;
Decimal DECIMAL = 24 ;
Decimal32Type DECIMAL32 = 40;
Decimal64Type DECIMAL64 = 41;
Decimal128Type DECIMAL128 = 24;
Decimal256Type DECIMAL256 = 36;
List LIST = 25;
List LARGE_LIST = 26;
Expand Down Expand Up @@ -480,9 +508,7 @@ message ParquetColumnOptions {
uint64 bloom_filter_ndv = 7;
}

oneof max_statistics_size_opt {
uint32 max_statistics_size = 8;
}
reserved 8; // used to be uint32 max_statistics_size = 8;
}

message ParquetOptions {
Expand Down Expand Up @@ -522,9 +548,7 @@ message ParquetOptions {
string statistics_enabled = 13;
}

oneof max_statistics_size_opt {
uint64 max_statistics_size = 14;
}
reserved 14; // used to be uint32 max_statistics_size = 20;

oneof column_index_truncate_length_opt {
uint64 column_index_truncate_length = 17;
Expand Down
Loading