-
Notifications
You must be signed in to change notification settings - Fork 825
row base serializer #5791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
row base serializer #5791
Changes from all commits
2eea3de
3404056
a93d413
632e973
818ddf3
8aef9e0
0b32973
cb9a793
8e551cf
dbf0478
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,3 +57,7 @@ harness = false | |
[[bench]] | ||
name = "data_type" | ||
harness = false | ||
|
||
[[bench]] | ||
name = "output_format" | ||
harness = false |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
// Copyright 2022 Datafuse Labs. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use common_datavalues::ColumnRef; | ||
use common_datavalues::DataType; | ||
use common_datavalues::Series; | ||
use common_datavalues::SeriesFrom; | ||
use common_datavalues::TypeSerializer; | ||
use common_io::prelude::FormatSettings; | ||
use criterion::black_box; | ||
use criterion::criterion_group; | ||
use criterion::criterion_main; | ||
use criterion::Criterion; | ||
use rand::distributions::Alphanumeric; | ||
use rand::rngs::StdRng; | ||
use rand::Rng; | ||
use rand::SeedableRng; | ||
|
||
pub fn write_csv(cols: &[ColumnRef]) -> Vec<u8> { | ||
let mut buf = Vec::with_capacity(1000 * 1000); | ||
let mut ss = vec![]; | ||
let rows = cols[0].len(); | ||
|
||
for c in cols { | ||
let t = c.data_type(); | ||
ss.push(t.create_serializer(c).unwrap()) | ||
} | ||
let f = &FormatSettings::default(); | ||
for row in 0..rows { | ||
for s in &ss { | ||
s.write_field(row, &mut buf, f); | ||
} | ||
} | ||
buf | ||
} | ||
|
||
fn add_benchmark(c: &mut Criterion) { | ||
let mut cols = vec![]; | ||
let size = 4096; | ||
cols.push(create_primitive_array(size, None, 0)); | ||
cols.push(create_string_array(size, None, 100)); | ||
c.bench_function("not_nullable", |b| { | ||
b.iter(|| write_csv(black_box(&cols))); | ||
}); | ||
} | ||
|
||
pub fn create_primitive_array( | ||
size: usize, | ||
null_density: Option<f32>, | ||
_item_size: usize, | ||
) -> ColumnRef { | ||
let mut rng = StdRng::seed_from_u64(3); | ||
match null_density { | ||
None => { | ||
let v = (0..size).map(|_| rng.gen()).collect::<Vec<i32>>(); | ||
Series::from_data(v) | ||
} | ||
Some(null_density) => { | ||
let v = (0..size) | ||
.map(|_| { | ||
if rng.gen::<f32>() < null_density { | ||
None | ||
} else { | ||
Some(rng.gen()) | ||
} | ||
}) | ||
.collect::<Vec<Option<i32>>>(); | ||
Series::from_data(v) | ||
} | ||
} | ||
} | ||
use std::string::String; | ||
pub fn create_string_array(size: usize, null_density: Option<f32>, item_size: usize) -> ColumnRef { | ||
let mut rng = StdRng::seed_from_u64(3); | ||
match null_density { | ||
None => { | ||
let vec: Vec<String> = (0..size) | ||
.map(|_| { | ||
(&mut rng) | ||
.sample_iter(&Alphanumeric) | ||
.take(item_size) | ||
.map(char::from) | ||
.collect::<String>() | ||
}) | ||
.collect(); | ||
Series::from_data(vec) | ||
} | ||
Some(null_density) => { | ||
let vec: Vec<_> = (0..item_size) | ||
.map(|_| { | ||
if rng.gen::<f32>() < null_density { | ||
None | ||
} else { | ||
let value = (&mut rng) | ||
.sample_iter(&Alphanumeric) | ||
.take(size) | ||
.collect::<Vec<u8>>(); | ||
Some(value) | ||
} | ||
}) | ||
.collect(); | ||
Series::from_data(vec) | ||
} | ||
} | ||
} | ||
|
||
criterion_group!(benches, add_benchmark); | ||
criterion_main!(benches); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,86 +12,91 @@ | |
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::fmt::Write; | ||
use std::sync::Arc; | ||
|
||
use common_exception::ErrorCode; | ||
use common_exception::Result; | ||
use common_io::prelude::FormatSettings; | ||
use opensrv_clickhouse::types::column::ArrayColumnData; | ||
use serde_json::Value; | ||
|
||
use crate::prelude::*; | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct ArraySerializer { | ||
pub inner: Box<TypeSerializerImpl>, | ||
pub typ: DataTypeImpl, | ||
#[derive(Clone)] | ||
pub struct ArraySerializer<'a> { | ||
offsets: &'a [i64], | ||
inner: Box<TypeSerializerImpl<'a>>, | ||
} | ||
|
||
impl TypeSerializer for ArraySerializer { | ||
fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result<String> { | ||
if let DataValue::Array(vals) = value { | ||
let mut res = String::new(); | ||
res.push('['); | ||
let mut first = true; | ||
let quoted = self.typ.data_type_id().is_quoted(); | ||
for val in vals { | ||
if !first { | ||
res.push_str(", "); | ||
} | ||
first = false; | ||
impl<'a> ArraySerializer<'a> { | ||
pub fn try_create(column: &'a ColumnRef, inner_type: &DataTypeImpl) -> Result<Self> { | ||
let column: &ArrayColumn = Series::check_get(column)?; | ||
let inner = Box::new(inner_type.create_serializer(column.values())?); | ||
Ok(Self { | ||
offsets: column.offsets(), | ||
inner, | ||
}) | ||
} | ||
} | ||
|
||
let s = self.inner.serialize_value(val, format)?; | ||
if quoted { | ||
write!(res, "'{s}'").expect("write to string must succeed"); | ||
} else { | ||
res.push_str(&s); | ||
} | ||
impl<'a> TypeSerializer<'a> for ArraySerializer<'a> { | ||
fn write_field(&self, row_index: usize, buf: &mut Vec<u8>, format: &FormatSettings) { | ||
let start = self.offsets[row_index] as usize; | ||
let end = self.offsets[row_index + 1] as usize; | ||
buf.push(b'['); | ||
let inner = &self.inner; | ||
for i in start..end { | ||
if i != start { | ||
buf.extend_from_slice(b", "); | ||
} | ||
res.push(']'); | ||
Ok(res) | ||
} else { | ||
Err(ErrorCode::BadBytes("Incorrect Array value")) | ||
inner.write_field_quoted(i, buf, format, b'\''); | ||
} | ||
buf.push(b']'); | ||
} | ||
|
||
fn serialize_column(&self, column: &ColumnRef, format: &FormatSettings) -> Result<Vec<String>> { | ||
let column: &ArrayColumn = Series::check_get(column)?; | ||
let mut result = Vec::with_capacity(column.len()); | ||
for i in 0..column.len() { | ||
let val = column.get(i); | ||
let s = self.serialize_value(&val, format)?; | ||
result.push(s); | ||
fn serialize_clickhouse_const( | ||
&self, | ||
format: &FormatSettings, | ||
size: usize, | ||
) -> Result<opensrv_clickhouse::types::column::ArcColumnData> { | ||
let len = self.offsets.len() - 1; | ||
let mut offsets = opensrv_clickhouse::types::column::List::with_capacity(size * len); | ||
let total = self.offsets[len]; | ||
let mut base = 0; | ||
for _ in 0..size { | ||
for offset in self.offsets.iter().skip(1) { | ||
offsets.push(((*offset) + base) as u64); | ||
} | ||
base += total; | ||
} | ||
Ok(result) | ||
} | ||
|
||
fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result<Vec<Value>> { | ||
let column: &ArrayColumn = Series::check_get(column)?; | ||
let mut result = Vec::with_capacity(column.len()); | ||
for i in 0..column.len() { | ||
let val = column.get(i); | ||
let s = serde_json::to_value(val)?; | ||
result.push(s); | ||
} | ||
Ok(result) | ||
let inner_data = self.inner.serialize_clickhouse_const(format, size)?; | ||
Ok(Arc::new(ArrayColumnData::create(inner_data, offsets))) | ||
} | ||
|
||
fn serialize_clickhouse_format( | ||
fn serialize_clickhouse_column( | ||
&self, | ||
column: &ColumnRef, | ||
format: &FormatSettings, | ||
) -> Result<opensrv_clickhouse::types::column::ArcColumnData> { | ||
let column: &ArrayColumn = Series::check_get(column)?; | ||
let mut offsets = opensrv_clickhouse::types::column::List::with_capacity(column.len()); | ||
for offset in column.offsets().iter().skip(1) { | ||
let mut offsets = | ||
opensrv_clickhouse::types::column::List::with_capacity(self.offsets.len() - 1); | ||
for offset in self.offsets.iter().skip(1) { | ||
offsets.push(*offset as u64); | ||
} | ||
|
||
let inner_data = self | ||
.inner | ||
.serialize_clickhouse_format(column.values(), format)?; | ||
let inner_data = self.inner.serialize_clickhouse_column(format)?; | ||
Ok(Arc::new(ArrayColumnData::create(inner_data, offsets))) | ||
} | ||
|
||
fn serialize_json(&self, format: &FormatSettings) -> Result<Vec<Value>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So row based api not works on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. serializer can has API for both row based and col-based(clickhouse, json::Value) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. format choose to use which There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Since we need to transpose, so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess column-based may be faster? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But it's not zero alloc in the transpose function. So row based is faster in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
make sense There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may I refactor it later in another pr later, this pr is large and need to change new-added tests case, hope it to merge early. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also need add formats based on this |
||
let size = self.offsets.len() - 1; | ||
let mut result = Vec::with_capacity(size); | ||
let inner = self.inner.serialize_json(format)?; | ||
let mut iter = inner.into_iter(); | ||
for i in 0..size { | ||
let len = (self.offsets[i + 1] - self.offsets[i]) as usize; | ||
let chunk = iter.by_ref().take(len).collect(); | ||
result.push(Value::Array(chunk)) | ||
} | ||
Ok(result) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used to for normal column( non-const)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create_serializer_inner use for impl
create_serializer as interface