Skip to content

Commit 26c9c7a

Browse files
authored
Add benchmarks for arrow-avro writer (#8165)
# Which issue does this PR close? - Part of #4886 # Rationale for this change This PR introduces benchmark tests for the `AvroWriter` in the `arrow-avro` crate. Adding these benchmarks is essential for tracking the performance of the writer, identifying potential regressions, and guiding future optimizations. # What changes are included in this PR? A new benchmark file, `benches/avro_writer.rs`, is added to the project. This file contains a suite of benchmarks that measure the performance of writing `RecordBatch`es to the Avro format. The benchmarks cover a variety of Arrow data types: - `Boolean` - `Int32` and `Int64` - `Float32` and `Float64` - `Binary` - `Timestamp` (Microsecond precision) - A schema with a mix of the above types These benchmarks are run with varying numbers of rows (100, 10,000, and 1,000,000) to assess performance across different data scales. # Are these changes tested? Yes, this pull request consists entirely of new benchmark tests. Therefore, no separate tests are needed. # Are there any user-facing changes? NA
1 parent 549709f commit 26c9c7a

File tree

2 files changed

+328
-0
lines changed

2 files changed

+328
-0
lines changed

arrow-avro/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,8 @@ harness = false
8383

8484
[[bench]]
8585
name = "decoder"
86+
harness = false
87+
88+
[[bench]]
89+
name = "avro_writer"
8690
harness = false

arrow-avro/benches/avro_writer.rs

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for `arrow‑avro` **Writer** (Avro Object Container Files)
19+
//!
20+
21+
extern crate arrow_avro;
22+
extern crate criterion;
23+
extern crate once_cell;
24+
25+
use arrow_array::{
26+
types::{Int32Type, Int64Type, TimestampMicrosecondType},
27+
ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, PrimitiveArray, RecordBatch,
28+
};
29+
use arrow_avro::writer::AvroWriter;
30+
use arrow_schema::{DataType, Field, Schema, TimeUnit};
31+
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
32+
use once_cell::sync::Lazy;
33+
use rand::{
34+
distr::uniform::{SampleRange, SampleUniform},
35+
rngs::StdRng,
36+
Rng, SeedableRng,
37+
};
38+
use std::io::Cursor;
39+
use std::sync::Arc;
40+
use std::time::Duration;
41+
use tempfile::tempfile;
42+
43+
const SIZES: [usize; 4] = [4_096, 8_192, 100_000, 1_000_000];
44+
const BASE_SEED: u64 = 0x5EED_1234_ABCD_EF01;
45+
const MIX_CONST_1: u64 = 0x9E37_79B1_85EB_CA87;
46+
const MIX_CONST_2: u64 = 0xC2B2_AE3D_27D4_EB4F;
47+
48+
#[inline]
49+
fn rng_for(tag: u64, n: usize) -> StdRng {
50+
let seed = BASE_SEED ^ tag.wrapping_mul(MIX_CONST_1) ^ (n as u64).wrapping_mul(MIX_CONST_2);
51+
StdRng::seed_from_u64(seed)
52+
}
53+
54+
#[inline]
55+
fn sample_in<T, Rg>(rng: &mut StdRng, range: Rg) -> T
56+
where
57+
T: SampleUniform,
58+
Rg: SampleRange<T>,
59+
{
60+
rng.random_range(range)
61+
}
62+
63+
#[inline]
64+
fn make_bool_array_with_tag(n: usize, tag: u64) -> BooleanArray {
65+
let mut rng = rng_for(tag, n);
66+
let values = (0..n).map(|_| rng.random_bool(0.5));
67+
BooleanArray::from_iter(values.map(Some))
68+
}
69+
70+
#[inline]
71+
fn make_i32_array_with_tag(n: usize, tag: u64) -> PrimitiveArray<Int32Type> {
72+
let mut rng = rng_for(tag, n);
73+
let values = (0..n).map(|_| rng.random::<i32>());
74+
PrimitiveArray::<Int32Type>::from_iter_values(values)
75+
}
76+
77+
#[inline]
78+
fn make_i64_array_with_tag(n: usize, tag: u64) -> PrimitiveArray<Int64Type> {
79+
let mut rng = rng_for(tag, n);
80+
let values = (0..n).map(|_| rng.random::<i64>());
81+
PrimitiveArray::<Int64Type>::from_iter_values(values)
82+
}
83+
84+
#[inline]
85+
fn make_f32_array_with_tag(n: usize, tag: u64) -> Float32Array {
86+
let mut rng = rng_for(tag, n);
87+
let values = (0..n).map(|_| rng.random::<f32>());
88+
Float32Array::from_iter_values(values)
89+
}
90+
91+
#[inline]
92+
fn make_f64_array_with_tag(n: usize, tag: u64) -> Float64Array {
93+
let mut rng = rng_for(tag, n);
94+
let values = (0..n).map(|_| rng.random::<f64>());
95+
Float64Array::from_iter_values(values)
96+
}
97+
98+
#[inline]
99+
fn make_binary_array_with_tag(n: usize, tag: u64) -> BinaryArray {
100+
let mut rng = rng_for(tag, n);
101+
let mut payloads: Vec<[u8; 16]> = vec![[0; 16]; n];
102+
for p in payloads.iter_mut() {
103+
rng.fill(&mut p[..]);
104+
}
105+
let views: Vec<&[u8]> = payloads.iter().map(|p| &p[..]).collect();
106+
BinaryArray::from_vec(views)
107+
}
108+
109+
#[inline]
110+
fn make_ts_micros_array_with_tag(n: usize, tag: u64) -> PrimitiveArray<TimestampMicrosecondType> {
111+
let mut rng = rng_for(tag, n);
112+
let base: i64 = 1_600_000_000_000_000;
113+
let year_us: i64 = 31_536_000_000_000;
114+
let values = (0..n).map(|_| base + sample_in::<i64, _>(&mut rng, 0..year_us));
115+
PrimitiveArray::<TimestampMicrosecondType>::from_iter_values(values)
116+
}
117+
118+
#[inline]
119+
fn make_bool_array(n: usize) -> BooleanArray {
120+
make_bool_array_with_tag(n, 0xB001)
121+
}
122+
#[inline]
123+
fn make_i32_array(n: usize) -> PrimitiveArray<Int32Type> {
124+
make_i32_array_with_tag(n, 0x1337_0032)
125+
}
126+
#[inline]
127+
fn make_i64_array(n: usize) -> PrimitiveArray<Int64Type> {
128+
make_i64_array_with_tag(n, 0x1337_0064)
129+
}
130+
#[inline]
131+
fn make_f32_array(n: usize) -> Float32Array {
132+
make_f32_array_with_tag(n, 0xF0_0032)
133+
}
134+
#[inline]
135+
fn make_f64_array(n: usize) -> Float64Array {
136+
make_f64_array_with_tag(n, 0xF0_0064)
137+
}
138+
#[inline]
139+
fn make_binary_array(n: usize) -> BinaryArray {
140+
make_binary_array_with_tag(n, 0xB1_0001)
141+
}
142+
#[inline]
143+
fn make_ts_micros_array(n: usize) -> PrimitiveArray<TimestampMicrosecondType> {
144+
make_ts_micros_array_with_tag(n, 0x7157_0001)
145+
}
146+
147+
#[inline]
148+
fn schema_single(name: &str, dt: DataType) -> Arc<Schema> {
149+
Arc::new(Schema::new(vec![Field::new(name, dt, false)]))
150+
}
151+
152+
#[inline]
153+
fn schema_mixed() -> Arc<Schema> {
154+
Arc::new(Schema::new(vec![
155+
Field::new("f1", DataType::Int32, false),
156+
Field::new("f2", DataType::Int64, false),
157+
Field::new("f3", DataType::Binary, false),
158+
Field::new("f4", DataType::Float64, false),
159+
]))
160+
}
161+
162+
static BOOLEAN_DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
163+
let schema = schema_single("field1", DataType::Boolean);
164+
SIZES
165+
.iter()
166+
.map(|&n| {
167+
let col: ArrayRef = Arc::new(make_bool_array(n));
168+
RecordBatch::try_new(schema.clone(), vec![col]).unwrap()
169+
})
170+
.collect()
171+
});
172+
173+
static INT32_DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
174+
let schema = schema_single("field1", DataType::Int32);
175+
SIZES
176+
.iter()
177+
.map(|&n| {
178+
let col: ArrayRef = Arc::new(make_i32_array(n));
179+
RecordBatch::try_new(schema.clone(), vec![col]).unwrap()
180+
})
181+
.collect()
182+
});
183+
184+
static INT64_DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
185+
let schema = schema_single("field1", DataType::Int64);
186+
SIZES
187+
.iter()
188+
.map(|&n| {
189+
let col: ArrayRef = Arc::new(make_i64_array(n));
190+
RecordBatch::try_new(schema.clone(), vec![col]).unwrap()
191+
})
192+
.collect()
193+
});
194+
195+
static FLOAT32_DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
196+
let schema = schema_single("field1", DataType::Float32);
197+
SIZES
198+
.iter()
199+
.map(|&n| {
200+
let col: ArrayRef = Arc::new(make_f32_array(n));
201+
RecordBatch::try_new(schema.clone(), vec![col]).unwrap()
202+
})
203+
.collect()
204+
});
205+
206+
static FLOAT64_DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
207+
let schema = schema_single("field1", DataType::Float64);
208+
SIZES
209+
.iter()
210+
.map(|&n| {
211+
let col: ArrayRef = Arc::new(make_f64_array(n));
212+
RecordBatch::try_new(schema.clone(), vec![col]).unwrap()
213+
})
214+
.collect()
215+
});
216+
217+
static BINARY_DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
218+
let schema = schema_single("field1", DataType::Binary);
219+
SIZES
220+
.iter()
221+
.map(|&n| {
222+
let col: ArrayRef = Arc::new(make_binary_array(n));
223+
RecordBatch::try_new(schema.clone(), vec![col]).unwrap()
224+
})
225+
.collect()
226+
});
227+
228+
static TIMESTAMP_US_DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
229+
let schema = schema_single("field1", DataType::Timestamp(TimeUnit::Microsecond, None));
230+
SIZES
231+
.iter()
232+
.map(|&n| {
233+
let col: ArrayRef = Arc::new(make_ts_micros_array(n));
234+
RecordBatch::try_new(schema.clone(), vec![col]).unwrap()
235+
})
236+
.collect()
237+
});
238+
239+
static MIXED_DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
240+
let schema = schema_mixed();
241+
SIZES
242+
.iter()
243+
.map(|&n| {
244+
let f1: ArrayRef = Arc::new(make_i32_array_with_tag(n, 0xA1));
245+
let f2: ArrayRef = Arc::new(make_i64_array_with_tag(n, 0xA2));
246+
let f3: ArrayRef = Arc::new(make_binary_array_with_tag(n, 0xA3));
247+
let f4: ArrayRef = Arc::new(make_f64_array_with_tag(n, 0xA4));
248+
RecordBatch::try_new(schema.clone(), vec![f1, f2, f3, f4]).unwrap()
249+
})
250+
.collect()
251+
});
252+
253+
fn ocf_size_for_batch(batch: &RecordBatch) -> usize {
254+
let schema_owned: Schema = (*batch.schema()).clone();
255+
let cursor = Cursor::new(Vec::<u8>::with_capacity(1024));
256+
let mut writer = AvroWriter::new(cursor, schema_owned).expect("create writer");
257+
writer.write(batch).expect("write batch");
258+
writer.finish().expect("finish writer");
259+
let inner = writer.into_inner();
260+
inner.into_inner().len()
261+
}
262+
263+
fn bench_writer_scenario(c: &mut Criterion, name: &str, data_sets: &[RecordBatch]) {
264+
let mut group = c.benchmark_group(name);
265+
let schema_owned: Schema = (*data_sets[0].schema()).clone();
266+
for (idx, &rows) in SIZES.iter().enumerate() {
267+
let batch = &data_sets[idx];
268+
let bytes = ocf_size_for_batch(batch);
269+
group.throughput(Throughput::Bytes(bytes as u64));
270+
match rows {
271+
4_096 | 8_192 => {
272+
group
273+
.sample_size(40)
274+
.measurement_time(Duration::from_secs(10))
275+
.warm_up_time(Duration::from_secs(3));
276+
}
277+
100_000 => {
278+
group
279+
.sample_size(20)
280+
.measurement_time(Duration::from_secs(10))
281+
.warm_up_time(Duration::from_secs(3));
282+
}
283+
1_000_000 => {
284+
group
285+
.sample_size(10)
286+
.measurement_time(Duration::from_secs(10))
287+
.warm_up_time(Duration::from_secs(3));
288+
}
289+
_ => {}
290+
}
291+
group.bench_function(BenchmarkId::from_parameter(rows), |b| {
292+
b.iter_batched_ref(
293+
|| {
294+
let file = tempfile().expect("create temp file");
295+
AvroWriter::new(file, schema_owned.clone()).expect("create writer")
296+
},
297+
|writer| {
298+
writer.write(batch).unwrap();
299+
writer.finish().unwrap();
300+
},
301+
BatchSize::SmallInput,
302+
)
303+
});
304+
}
305+
group.finish();
306+
}
307+
308+
fn criterion_benches(c: &mut Criterion) {
309+
bench_writer_scenario(c, "write-Boolean", &BOOLEAN_DATA);
310+
bench_writer_scenario(c, "write-Int32", &INT32_DATA);
311+
bench_writer_scenario(c, "write-Int64", &INT64_DATA);
312+
bench_writer_scenario(c, "write-Float32", &FLOAT32_DATA);
313+
bench_writer_scenario(c, "write-Float64", &FLOAT64_DATA);
314+
bench_writer_scenario(c, "write-Binary(Bytes)", &BINARY_DATA);
315+
bench_writer_scenario(c, "write-TimestampMicros", &TIMESTAMP_US_DATA);
316+
bench_writer_scenario(c, "write-Mixed", &MIXED_DATA);
317+
}
318+
319+
criterion_group! {
320+
name = avro_writer;
321+
config = Criterion::default().configure_from_args();
322+
targets = criterion_benches
323+
}
324+
criterion_main!(avro_writer);

0 commit comments

Comments
 (0)