Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ crc = { version = "3.0", optional = true }

[dev-dependencies]
rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] }
criterion = { version = "0.5", default-features = false }
criterion = { version = "0.6.0", default-features = false }
tempfile = "3.3"
arrow = { workspace = true }

Expand Down
12 changes: 6 additions & 6 deletions arrow-avro/benches/avro_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn bench_array_creation(c: &mut Criterion) {
)
.unwrap();

criterion::black_box(batch)
std::hint::black_box(batch)
})
});

Expand All @@ -187,7 +187,7 @@ fn bench_array_creation(c: &mut Criterion) {
)
.unwrap();

criterion::black_box(batch)
std::hint::black_box(batch)
})
});
}
Expand All @@ -214,7 +214,7 @@ fn bench_string_operations(c: &mut Criterion) {
for i in 0..rows {
sum_len += string_array.value(i).len();
}
criterion::black_box(sum_len)
std::hint::black_box(sum_len)
})
});

Expand All @@ -224,7 +224,7 @@ fn bench_string_operations(c: &mut Criterion) {
for i in 0..rows {
sum_len += string_view_array.value(i).len();
}
criterion::black_box(sum_len)
std::hint::black_box(sum_len)
})
});
}
Expand All @@ -246,15 +246,15 @@ fn bench_avro_reader(c: &mut Criterion) {
b.iter(|| {
let options = ReadOptions::default();
let batch = read_avro_test_file(file_path, &options).unwrap();
criterion::black_box(batch)
std::hint::black_box(batch)
})
});

group.bench_function(format!("string_view_{str_length}_chars"), |b| {
b.iter(|| {
let options = ReadOptions::default().with_utf8view(true);
let batch = read_avro_test_file(file_path, &options).unwrap();
criterion::black_box(batch)
std::hint::black_box(batch)
})
});
}
Expand Down
80 changes: 73 additions & 7 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
// under the License.

use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, TypeName};
use arrow_schema::DataType::{Decimal128, Decimal256};
use arrow_schema::{
ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, SchemaBuilder, SchemaRef, TimeUnit,
ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, SchemaBuilder, SchemaRef,
TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
};
use std::borrow::Cow;
use std::collections::HashMap;
Expand Down Expand Up @@ -192,6 +194,13 @@ pub enum Codec {
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
/// The i32 parameter indicates the fixed binary size
Fixed(i32),
/// Represents Avro decimal type, maps to Arrow's Decimal128 or Decimal256 data types
///
/// The fields are `(precision, scale, fixed_size)`.
/// - `precision` (`usize`): Total number of digits.
/// - `scale` (`Option<usize>`): Number of fractional digits.
/// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
Decimal(usize, Option<usize>, Option<usize>),
/// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
Uuid,
/// Represents Avro array type, maps to Arrow's List data type
Expand Down Expand Up @@ -227,6 +236,22 @@ impl Codec {
}
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
Self::Decimal(precision, scale, size) => {
let p = *precision as u8;
let s = scale.unwrap_or(0) as i8;
let too_large_for_128 = match *size {
Some(sz) => sz > 16,
None => {
(p as usize) > DECIMAL128_MAX_PRECISION as usize
|| (s as usize) > DECIMAL128_MAX_SCALE as usize
}
};
if too_large_for_128 {
Decimal256(p, s)
} else {
Decimal128(p, s)
}
}
Self::Uuid => DataType::FixedSizeBinary(16),
Self::List(f) => {
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
Expand Down Expand Up @@ -267,6 +292,32 @@ impl From<PrimitiveType> for Codec {
}
}

fn parse_decimal_attributes(
attributes: &Attributes,
fallback_size: Option<usize>,
precision_required: bool,
) -> Result<(usize, usize, Option<usize>), ArrowError> {
let precision = attributes
.additional
.get("precision")
.and_then(|v| v.as_u64())
.or(if precision_required { None } else { Some(10) })
.ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
as usize;
let scale = attributes
.additional
.get("scale")
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize;
let size = attributes
.additional
.get("size")
.and_then(|v| v.as_u64())
.map(|s| s as usize)
.or(fallback_size);
Ok((precision, scale, size))
}

impl Codec {
/// Converts a string codec to use Utf8View if requested
///
Expand Down Expand Up @@ -412,7 +463,6 @@ fn make_data_type<'a>(
let size = f.size.try_into().map_err(|e| {
ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
})?;

let field = AvroDataType {
nullability: None,
metadata: f.attributes.field_metadata(),
Expand Down Expand Up @@ -443,11 +493,27 @@ fn make_data_type<'a>(

// https://avro.apache.org/docs/1.11.1/specification/#logical-types
match (t.attributes.logical_type, &mut field.codec) {
(Some("decimal"), c @ Codec::Fixed(_)) => {
return Err(ArrowError::NotYetImplemented(
"Decimals are not currently supported".to_string(),
))
}
(Some("decimal"), c) => match *c {
Codec::Fixed(sz_val) => {
let (prec, sc, size_opt) =
parse_decimal_attributes(&t.attributes, Some(sz_val as usize), true)?;
let final_sz = if let Some(sz_actual) = size_opt {
sz_actual
} else {
sz_val as usize
};
*c = Codec::Decimal(prec, Some(sc), Some(final_sz));
}
Codec::Binary => {
let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
*c = Codec::Decimal(prec, Some(sc), None);
}
_ => {
return Err(ArrowError::SchemaError(format!(
"Decimal logical type can only be backed by Fixed or Bytes, found {c:?}"
)))
}
},
(Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
(Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
(Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
Expand Down
Loading
Loading