Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ zstd = { version = "0.13", default-features = false, optional = true }
crc = { version = "3.0", optional = true }

[dev-dependencies]
rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] }
rand = { version = "0.9.1", default-features = false, features = ["std", "std_rng", "thread_rng"] }
criterion = { version = "0.6.0", default-features = false }
tempfile = "3.3"
arrow = { workspace = true }
futures = "0.3.31"
bytes = "1.10.1"
async-stream = "0.3.6"

[[bench]]
name = "avro_reader"
Expand Down
15 changes: 6 additions & 9 deletions arrow-avro/benches/avro_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! This benchmark suite compares the performance characteristics of StringArray vs
//! StringViewArray across three key dimensions:
//! 1. Array creation performance
//! 2. String value access operations
//! 2. String value access operations
//! 3. Avro file reading with each array type

use std::fs::File;
Expand All @@ -31,7 +31,6 @@ use std::time::Duration;
use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, Int32Array, StringArray, StringViewArray};
use arrow_avro::ReadOptions;
use arrow_schema::ArrowError;
use criterion::*;
use tempfile::NamedTempFile;
Expand Down Expand Up @@ -79,7 +78,7 @@ fn create_avro_test_file(row_count: usize, str_length: usize) -> Result<NamedTem

fn read_avro_test_file(
file_path: &std::path::Path,
options: &ReadOptions,
use_utf8view: bool,
) -> Result<RecordBatch, ArrowError> {
let file = File::open(file_path)?;
let mut reader = BufReader::new(file);
Expand Down Expand Up @@ -110,7 +109,7 @@ fn read_avro_test_file(
ints.push(i32::from_le_bytes(int_bytes));
}

let string_array: ArrayRef = if options.use_utf8view() {
let string_array: ArrayRef = if use_utf8view {
Arc::new(StringViewArray::from_iter(
strings.iter().map(|s| Some(s.as_str())),
))
Expand All @@ -123,7 +122,7 @@ fn read_avro_test_file(
let int_array: ArrayRef = Arc::new(Int32Array::from(ints));

let schema = Arc::new(Schema::new(vec![
if options.use_utf8view() {
if use_utf8view {
Field::new("string_field", DataType::Utf8View, false)
} else {
Field::new("string_field", DataType::Utf8, false)
Expand Down Expand Up @@ -244,16 +243,14 @@ fn bench_avro_reader(c: &mut Criterion) {

group.bench_function(format!("string_array_{str_length}_chars"), |b| {
b.iter(|| {
let options = ReadOptions::default();
let batch = read_avro_test_file(file_path, &options).unwrap();
let batch = read_avro_test_file(file_path, false).unwrap();
std::hint::black_box(batch)
})
});

group.bench_function(format!("string_view_{str_length}_chars"), |b| {
b.iter(|| {
let options = ReadOptions::default().with_utf8view(true);
let batch = read_avro_test_file(file_path, &options).unwrap();
let batch = read_avro_test_file(file_path, true).unwrap();
std::hint::black_box(batch)
})
});
Expand Down
65 changes: 26 additions & 39 deletions arrow-avro/examples/read_with_utf8view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@

use std::env;
use std::fs::File;
use std::io::{BufReader, Seek, SeekFrom};
use std::sync::Arc;
use std::io::BufReader;
use std::time::Instant;

use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, StringViewArray};
use arrow_avro::reader::ReadOptions;
use arrow_schema::{ArrowError, DataType, Field, Schema};
use arrow_array::{RecordBatch, StringArray, StringViewArray};
use arrow_avro::reader::ReaderBuilder;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
Expand All @@ -41,20 +39,26 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
};

let file = File::open(file_path)?;
let mut reader = BufReader::new(file);
let file_for_view = file.try_clone()?;

let start = Instant::now();
let batch = read_avro_with_options(&mut reader, &ReadOptions::default())?;
let reader = BufReader::new(file);
let avro_reader = ReaderBuilder::new().build(reader)?;
let schema = avro_reader.schema();
let batches: Vec<RecordBatch> = avro_reader.collect::<Result<_, _>>()?;
let regular_duration = start.elapsed();

reader.seek(SeekFrom::Start(0))?;

let start = Instant::now();
let options = ReadOptions::default().with_utf8view(true);
let batch_view = read_avro_with_options(&mut reader, &options)?;
let reader_view = BufReader::new(file_for_view);
let avro_reader_view = ReaderBuilder::new()
.with_utf8_view(true)
.build(reader_view)?;
let batches_view: Vec<RecordBatch> = avro_reader_view.collect::<Result<_, _>>()?;
let view_duration = start.elapsed();

println!("Read {} rows from {}", batch.num_rows(), file_path);
let num_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();

println!("Read {num_rows} rows from {file_path}");
println!("Reading with StringArray: {regular_duration:?}");
println!("Reading with StringViewArray: {view_duration:?}");

Expand All @@ -70,7 +74,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
);
}

for (i, field) in batch.schema().fields().iter().enumerate() {
if batches.is_empty() {
println!("No data read from file.");
return Ok(());
}

// Inspect the first batch from each run to show the array types
let batch = &batches[0];
let batch_view = &batches_view[0];

for (i, field) in schema.fields().iter().enumerate() {
let col = batch.column(i);
let col_view = batch_view.column(i);

Expand All @@ -93,29 +106,3 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

fn read_avro_with_options(
reader: &mut BufReader<File>,
options: &ReadOptions,
) -> Result<RecordBatch, ArrowError> {
reader.get_mut().seek(SeekFrom::Start(0))?;

let mock_schema = Schema::new(vec![
Field::new("string_field", DataType::Utf8, false),
Field::new("int_field", DataType::Int32, false),
]);

let string_data = vec!["avro1", "avro2", "avro3", "avro4", "avro5"];
let int_data = vec![1, 2, 3, 4, 5];

let string_array: ArrayRef = if options.use_utf8view() {
Arc::new(StringViewArray::from(string_data))
} else {
Arc::new(StringArray::from(string_data))
};

let int_array: ArrayRef = Arc::new(Int32Array::from(int_data));

RecordBatch::try_new(Arc::new(mock_schema), vec![string_array, int_array])
.map_err(|e| ArrowError::ComputeError(format!("Failed to create record batch: {e}")))
}
5 changes: 2 additions & 3 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, TypeName};
use arrow_schema::DataType::{Decimal128, Decimal256};
use arrow_schema::{
ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, SchemaBuilder, SchemaRef,
TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
Expand Down Expand Up @@ -251,9 +250,9 @@ impl Codec {
}
};
if too_large_for_128 {
Decimal256(p, s)
DataType::Decimal256(p, s)
} else {
Decimal128(p, s)
DataType::Decimal128(p, s)
}
}
Self::Uuid => DataType::FixedSizeBinary(16),
Expand Down
2 changes: 0 additions & 2 deletions arrow-avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ pub mod compression;
/// Avro data types and Arrow data types.
pub mod codec;

pub use reader::ReadOptions;

/// Extension trait for AvroField to add Utf8View support
///
/// This trait adds methods for working with Utf8View support to the AvroField struct.
Expand Down
Loading
Loading