Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/datablocks/src/data_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ impl DataBlock {

Ok(DataBlock::create(schema.clone(), columns))
}

pub fn get_serializers(&self) -> Result<Vec<TypeSerializerImpl>> {
let columns_size = self.num_columns();

let mut serializers = vec![];
for col_index in 0..columns_size {
let column = self.column(col_index);
let field = self.schema().field(col_index);
let data_type = field.data_type();
let serializer = data_type.create_serializer(column)?;
serializers.push(serializer);
}
Ok(serializers)
}
}

impl TryFrom<DataBlock> for Chunk<ArrayRef> {
Expand Down
4 changes: 4 additions & 0 deletions common/datavalues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ harness = false
[[bench]]
name = "data_type"
harness = false

[[bench]]
name = "output_format"
harness = false
119 changes: 119 additions & 0 deletions common/datavalues/benches/output_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datavalues::ColumnRef;
use common_datavalues::DataType;
use common_datavalues::Series;
use common_datavalues::SeriesFrom;
use common_datavalues::TypeSerializer;
use common_io::prelude::FormatSettings;
use criterion::black_box;
use criterion::criterion_group;
use criterion::criterion_main;
use criterion::Criterion;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
use rand::Rng;
use rand::SeedableRng;

pub fn write_csv(cols: &[ColumnRef]) -> Vec<u8> {
let mut buf = Vec::with_capacity(1000 * 1000);
let mut ss = vec![];
let rows = cols[0].len();

for c in cols {
let t = c.data_type();
ss.push(t.create_serializer(c).unwrap())
}
let f = &FormatSettings::default();
for row in 0..rows {
for s in &ss {
s.write_field(row, &mut buf, f);
}
}
buf
}

fn add_benchmark(c: &mut Criterion) {
let mut cols = vec![];
let size = 4096;
cols.push(create_primitive_array(size, None, 0));
cols.push(create_string_array(size, None, 100));
c.bench_function("not_nullable", |b| {
b.iter(|| write_csv(black_box(&cols)));
});
}

pub fn create_primitive_array(
size: usize,
null_density: Option<f32>,
_item_size: usize,
) -> ColumnRef {
let mut rng = StdRng::seed_from_u64(3);
match null_density {
None => {
let v = (0..size).map(|_| rng.gen()).collect::<Vec<i32>>();
Series::from_data(v)
}
Some(null_density) => {
let v = (0..size)
.map(|_| {
if rng.gen::<f32>() < null_density {
None
} else {
Some(rng.gen())
}
})
.collect::<Vec<Option<i32>>>();
Series::from_data(v)
}
}
}
use std::string::String;
pub fn create_string_array(size: usize, null_density: Option<f32>, item_size: usize) -> ColumnRef {
let mut rng = StdRng::seed_from_u64(3);
match null_density {
None => {
let vec: Vec<String> = (0..size)
.map(|_| {
(&mut rng)
.sample_iter(&Alphanumeric)
.take(item_size)
.map(char::from)
.collect::<String>()
})
.collect();
Series::from_data(vec)
}
Some(null_density) => {
let vec: Vec<_> = (0..item_size)
.map(|_| {
if rng.gen::<f32>() < null_density {
None
} else {
let value = (&mut rng)
.sample_iter(&Alphanumeric)
.take(size)
.collect::<Vec<u8>>();
Some(value)
}
})
.collect();
Series::from_data(vec)
}
}
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
23 changes: 21 additions & 2 deletions common/datavalues/src/types/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use super::type_string::StringType;
use super::type_struct::StructType;
use super::type_timestamp::TimestampType;
use crate::prelude::*;
use crate::serializations::ConstSerializer;

pub const ARROW_EXTENSION_NAME: &str = "ARROW:extension:databend_name";
pub const ARROW_EXTENSION_META: &str = "ARROW:extension:databend_metadata";
Expand Down Expand Up @@ -73,7 +74,9 @@ pub enum DataTypeImpl {
}

#[enum_dispatch]
pub trait DataType: std::fmt::Debug + Sync + Send + DynClone {
pub trait DataType: std::fmt::Debug + Sync + Send + DynClone
where Self: Sized
{
fn data_type_id(&self) -> TypeID;

fn is_nullable(&self) -> bool {
Expand Down Expand Up @@ -125,7 +128,23 @@ pub trait DataType: std::fmt::Debug + Sync + Send + DynClone {

fn create_mutable(&self, capacity: usize) -> Box<dyn MutableColumn>;

fn create_serializer(&self) -> TypeSerializerImpl;
fn create_serializer<'a>(&self, col: &'a ColumnRef) -> Result<TypeSerializerImpl<'a>> {
if col.is_const() {
let col: &ConstColumn = Series::check_get(col)?;
let inner = Box::new(self.create_serializer_inner(col.inner())?);
Ok(ConstSerializer {
inner,
size: col.len(),
}
.into())
} else {
self.create_serializer_inner(col)
}
}

fn create_serializer_inner<'a>(&self, _col: &'a ColumnRef) -> Result<TypeSerializerImpl<'a>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used to for normal column( non-const)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_serializer_inner use for impl
create_serializer as interface

unimplemented!()
}

fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl;
}
Expand Down
3 changes: 2 additions & 1 deletion common/datavalues/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub use date_converter::*;
pub use date_converter::*;
pub use deserializations::*;
pub use eq::*;
pub use serializations::*;
pub use serializations::TypeSerializer;
pub use serializations::TypeSerializerImpl;
pub use type_array::*;
pub use type_boolean::*;
pub use type_date::*;
Expand Down
113 changes: 59 additions & 54 deletions common/datavalues/src/types/serializations/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,86 +12,91 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Write;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::FormatSettings;
use opensrv_clickhouse::types::column::ArrayColumnData;
use serde_json::Value;

use crate::prelude::*;

#[derive(Debug, Clone)]
pub struct ArraySerializer {
pub inner: Box<TypeSerializerImpl>,
pub typ: DataTypeImpl,
#[derive(Clone)]
pub struct ArraySerializer<'a> {
offsets: &'a [i64],
inner: Box<TypeSerializerImpl<'a>>,
}

impl TypeSerializer for ArraySerializer {
fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result<String> {
if let DataValue::Array(vals) = value {
let mut res = String::new();
res.push('[');
let mut first = true;
let quoted = self.typ.data_type_id().is_quoted();
for val in vals {
if !first {
res.push_str(", ");
}
first = false;
impl<'a> ArraySerializer<'a> {
pub fn try_create(column: &'a ColumnRef, inner_type: &DataTypeImpl) -> Result<Self> {
let column: &ArrayColumn = Series::check_get(column)?;
let inner = Box::new(inner_type.create_serializer(column.values())?);
Ok(Self {
offsets: column.offsets(),
inner,
})
}
}

let s = self.inner.serialize_value(val, format)?;
if quoted {
write!(res, "'{s}'").expect("write to string must succeed");
} else {
res.push_str(&s);
}
impl<'a> TypeSerializer<'a> for ArraySerializer<'a> {
fn write_field(&self, row_index: usize, buf: &mut Vec<u8>, format: &FormatSettings) {
let start = self.offsets[row_index] as usize;
let end = self.offsets[row_index + 1] as usize;
buf.push(b'[');
let inner = &self.inner;
for i in start..end {
if i != start {
buf.extend_from_slice(b", ");
}
res.push(']');
Ok(res)
} else {
Err(ErrorCode::BadBytes("Incorrect Array value"))
inner.write_field_quoted(i, buf, format, b'\'');
}
buf.push(b']');
}

fn serialize_column(&self, column: &ColumnRef, format: &FormatSettings) -> Result<Vec<String>> {
let column: &ArrayColumn = Series::check_get(column)?;
let mut result = Vec::with_capacity(column.len());
for i in 0..column.len() {
let val = column.get(i);
let s = self.serialize_value(&val, format)?;
result.push(s);
fn serialize_clickhouse_const(
&self,
format: &FormatSettings,
size: usize,
) -> Result<opensrv_clickhouse::types::column::ArcColumnData> {
let len = self.offsets.len() - 1;
let mut offsets = opensrv_clickhouse::types::column::List::with_capacity(size * len);
let total = self.offsets[len];
let mut base = 0;
for _ in 0..size {
for offset in self.offsets.iter().skip(1) {
offsets.push(((*offset) + base) as u64);
}
base += total;
}
Ok(result)
}

fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result<Vec<Value>> {
let column: &ArrayColumn = Series::check_get(column)?;
let mut result = Vec::with_capacity(column.len());
for i in 0..column.len() {
let val = column.get(i);
let s = serde_json::to_value(val)?;
result.push(s);
}
Ok(result)
let inner_data = self.inner.serialize_clickhouse_const(format, size)?;
Ok(Arc::new(ArrayColumnData::create(inner_data, offsets)))
}

fn serialize_clickhouse_format(
fn serialize_clickhouse_column(
&self,
column: &ColumnRef,
format: &FormatSettings,
) -> Result<opensrv_clickhouse::types::column::ArcColumnData> {
let column: &ArrayColumn = Series::check_get(column)?;
let mut offsets = opensrv_clickhouse::types::column::List::with_capacity(column.len());
for offset in column.offsets().iter().skip(1) {
let mut offsets =
opensrv_clickhouse::types::column::List::with_capacity(self.offsets.len() - 1);
for offset in self.offsets.iter().skip(1) {
offsets.push(*offset as u64);
}

let inner_data = self
.inner
.serialize_clickhouse_format(column.values(), format)?;
let inner_data = self.inner.serialize_clickhouse_column(format)?;
Ok(Arc::new(ArrayColumnData::create(inner_data, offsets)))
}

fn serialize_json(&self, format: &FormatSettings) -> Result<Vec<Value>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So row based api not works on serialize_json ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serializer can has API for both row based and col-based(clickhouse, json::Value)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format choose to use which

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub fn block_to_json_value(
    block: &DataBlock,
    format: &FormatSettings,
) -> Result<Vec<Vec<JsonValue>>> {
    let cols = block_to_json_value_columns(block, format)?;
    Ok(transpose(cols))
}

Since we need to transpose, so serialize_json is row based ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess column-based may be faster?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not zero alloc in the transpose function. So row based is faster in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not zero alloc in the transpose function. So row based is faster in this case.

make sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may I refactor it later in another pr later, this pr is large and need to change new-added tests case, hope it to merge early.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need add formats based on this

let size = self.offsets.len() - 1;
let mut result = Vec::with_capacity(size);
let inner = self.inner.serialize_json(format)?;
let mut iter = inner.into_iter();
for i in 0..size {
let len = (self.offsets[i + 1] - self.offsets[i]) as usize;
let chunk = iter.by_ref().take(len).collect();
result.push(Value::Array(chunk))
}
Ok(result)
}
}
Loading