Skip to content

Commit 04729d3

Browse files
authored
Merge pull request #5791 from youngsofun/output
row base serializer
2 parents 3ce9ad0 + dbf0478 commit 04729d3

File tree

74 files changed

+1131
-927
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1131
-927
lines changed

common/datablocks/src/data_block.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,20 @@ impl DataBlock {
204204

205205
Ok(DataBlock::create(schema.clone(), columns))
206206
}
207+
208+
pub fn get_serializers(&self) -> Result<Vec<TypeSerializerImpl>> {
209+
let columns_size = self.num_columns();
210+
211+
let mut serializers = vec![];
212+
for col_index in 0..columns_size {
213+
let column = self.column(col_index);
214+
let field = self.schema().field(col_index);
215+
let data_type = field.data_type();
216+
let serializer = data_type.create_serializer(column)?;
217+
serializers.push(serializer);
218+
}
219+
Ok(serializers)
220+
}
207221
}
208222

209223
impl TryFrom<DataBlock> for Chunk<ArrayRef> {

common/datavalues/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,7 @@ harness = false
5757
[[bench]]
5858
name = "data_type"
5959
harness = false
60+
61+
[[bench]]
62+
name = "output_format"
63+
harness = false
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_datavalues::ColumnRef;
16+
use common_datavalues::DataType;
17+
use common_datavalues::Series;
18+
use common_datavalues::SeriesFrom;
19+
use common_datavalues::TypeSerializer;
20+
use common_io::prelude::FormatSettings;
21+
use criterion::black_box;
22+
use criterion::criterion_group;
23+
use criterion::criterion_main;
24+
use criterion::Criterion;
25+
use rand::distributions::Alphanumeric;
26+
use rand::rngs::StdRng;
27+
use rand::Rng;
28+
use rand::SeedableRng;
29+
30+
pub fn write_csv(cols: &[ColumnRef]) -> Vec<u8> {
31+
let mut buf = Vec::with_capacity(1000 * 1000);
32+
let mut ss = vec![];
33+
let rows = cols[0].len();
34+
35+
for c in cols {
36+
let t = c.data_type();
37+
ss.push(t.create_serializer(c).unwrap())
38+
}
39+
let f = &FormatSettings::default();
40+
for row in 0..rows {
41+
for s in &ss {
42+
s.write_field(row, &mut buf, f);
43+
}
44+
}
45+
buf
46+
}
47+
48+
fn add_benchmark(c: &mut Criterion) {
49+
let mut cols = vec![];
50+
let size = 4096;
51+
cols.push(create_primitive_array(size, None, 0));
52+
cols.push(create_string_array(size, None, 100));
53+
c.bench_function("not_nullable", |b| {
54+
b.iter(|| write_csv(black_box(&cols)));
55+
});
56+
}
57+
58+
pub fn create_primitive_array(
59+
size: usize,
60+
null_density: Option<f32>,
61+
_item_size: usize,
62+
) -> ColumnRef {
63+
let mut rng = StdRng::seed_from_u64(3);
64+
match null_density {
65+
None => {
66+
let v = (0..size).map(|_| rng.gen()).collect::<Vec<i32>>();
67+
Series::from_data(v)
68+
}
69+
Some(null_density) => {
70+
let v = (0..size)
71+
.map(|_| {
72+
if rng.gen::<f32>() < null_density {
73+
None
74+
} else {
75+
Some(rng.gen())
76+
}
77+
})
78+
.collect::<Vec<Option<i32>>>();
79+
Series::from_data(v)
80+
}
81+
}
82+
}
83+
use std::string::String;
84+
pub fn create_string_array(size: usize, null_density: Option<f32>, item_size: usize) -> ColumnRef {
85+
let mut rng = StdRng::seed_from_u64(3);
86+
match null_density {
87+
None => {
88+
let vec: Vec<String> = (0..size)
89+
.map(|_| {
90+
(&mut rng)
91+
.sample_iter(&Alphanumeric)
92+
.take(item_size)
93+
.map(char::from)
94+
.collect::<String>()
95+
})
96+
.collect();
97+
Series::from_data(vec)
98+
}
99+
Some(null_density) => {
100+
let vec: Vec<_> = (0..item_size)
101+
.map(|_| {
102+
if rng.gen::<f32>() < null_density {
103+
None
104+
} else {
105+
let value = (&mut rng)
106+
.sample_iter(&Alphanumeric)
107+
.take(size)
108+
.collect::<Vec<u8>>();
109+
Some(value)
110+
}
111+
})
112+
.collect();
113+
Series::from_data(vec)
114+
}
115+
}
116+
}
117+
118+
criterion_group!(benches, add_benchmark);
119+
criterion_main!(benches);

common/datavalues/src/types/data_type.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use super::type_string::StringType;
4040
use super::type_struct::StructType;
4141
use super::type_timestamp::TimestampType;
4242
use crate::prelude::*;
43+
use crate::serializations::ConstSerializer;
4344

4445
pub const ARROW_EXTENSION_NAME: &str = "ARROW:extension:databend_name";
4546
pub const ARROW_EXTENSION_META: &str = "ARROW:extension:databend_metadata";
@@ -73,7 +74,9 @@ pub enum DataTypeImpl {
7374
}
7475

7576
#[enum_dispatch]
76-
pub trait DataType: std::fmt::Debug + Sync + Send + DynClone {
77+
pub trait DataType: std::fmt::Debug + Sync + Send + DynClone
78+
where Self: Sized
79+
{
7780
fn data_type_id(&self) -> TypeID;
7881

7982
fn is_nullable(&self) -> bool {
@@ -125,7 +128,23 @@ pub trait DataType: std::fmt::Debug + Sync + Send + DynClone {
125128

126129
fn create_mutable(&self, capacity: usize) -> Box<dyn MutableColumn>;
127130

128-
fn create_serializer(&self) -> TypeSerializerImpl;
131+
fn create_serializer<'a>(&self, col: &'a ColumnRef) -> Result<TypeSerializerImpl<'a>> {
132+
if col.is_const() {
133+
let col: &ConstColumn = Series::check_get(col)?;
134+
let inner = Box::new(self.create_serializer_inner(col.inner())?);
135+
Ok(ConstSerializer {
136+
inner,
137+
size: col.len(),
138+
}
139+
.into())
140+
} else {
141+
self.create_serializer_inner(col)
142+
}
143+
}
144+
145+
fn create_serializer_inner<'a>(&self, _col: &'a ColumnRef) -> Result<TypeSerializerImpl<'a>> {
146+
unimplemented!()
147+
}
129148

130149
fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl;
131150
}

common/datavalues/src/types/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ pub use date_converter::*;
4444
pub use date_converter::*;
4545
pub use deserializations::*;
4646
pub use eq::*;
47-
pub use serializations::*;
47+
pub use serializations::TypeSerializer;
48+
pub use serializations::TypeSerializerImpl;
4849
pub use type_array::*;
4950
pub use type_boolean::*;
5051
pub use type_date::*;

common/datavalues/src/types/serializations/array.rs

Lines changed: 59 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,86 +12,91 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::fmt::Write;
1615
use std::sync::Arc;
1716

18-
use common_exception::ErrorCode;
1917
use common_exception::Result;
2018
use common_io::prelude::FormatSettings;
2119
use opensrv_clickhouse::types::column::ArrayColumnData;
2220
use serde_json::Value;
2321

2422
use crate::prelude::*;
2523

26-
#[derive(Debug, Clone)]
27-
pub struct ArraySerializer {
28-
pub inner: Box<TypeSerializerImpl>,
29-
pub typ: DataTypeImpl,
24+
#[derive(Clone)]
25+
pub struct ArraySerializer<'a> {
26+
offsets: &'a [i64],
27+
inner: Box<TypeSerializerImpl<'a>>,
3028
}
3129

32-
impl TypeSerializer for ArraySerializer {
33-
fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result<String> {
34-
if let DataValue::Array(vals) = value {
35-
let mut res = String::new();
36-
res.push('[');
37-
let mut first = true;
38-
let quoted = self.typ.data_type_id().is_quoted();
39-
for val in vals {
40-
if !first {
41-
res.push_str(", ");
42-
}
43-
first = false;
30+
impl<'a> ArraySerializer<'a> {
31+
pub fn try_create(column: &'a ColumnRef, inner_type: &DataTypeImpl) -> Result<Self> {
32+
let column: &ArrayColumn = Series::check_get(column)?;
33+
let inner = Box::new(inner_type.create_serializer(column.values())?);
34+
Ok(Self {
35+
offsets: column.offsets(),
36+
inner,
37+
})
38+
}
39+
}
4440

45-
let s = self.inner.serialize_value(val, format)?;
46-
if quoted {
47-
write!(res, "'{s}'").expect("write to string must succeed");
48-
} else {
49-
res.push_str(&s);
50-
}
41+
impl<'a> TypeSerializer<'a> for ArraySerializer<'a> {
42+
fn write_field(&self, row_index: usize, buf: &mut Vec<u8>, format: &FormatSettings) {
43+
let start = self.offsets[row_index] as usize;
44+
let end = self.offsets[row_index + 1] as usize;
45+
buf.push(b'[');
46+
let inner = &self.inner;
47+
for i in start..end {
48+
if i != start {
49+
buf.extend_from_slice(b", ");
5150
}
52-
res.push(']');
53-
Ok(res)
54-
} else {
55-
Err(ErrorCode::BadBytes("Incorrect Array value"))
51+
inner.write_field_quoted(i, buf, format, b'\'');
5652
}
53+
buf.push(b']');
5754
}
5855

59-
fn serialize_column(&self, column: &ColumnRef, format: &FormatSettings) -> Result<Vec<String>> {
60-
let column: &ArrayColumn = Series::check_get(column)?;
61-
let mut result = Vec::with_capacity(column.len());
62-
for i in 0..column.len() {
63-
let val = column.get(i);
64-
let s = self.serialize_value(&val, format)?;
65-
result.push(s);
56+
fn serialize_clickhouse_const(
57+
&self,
58+
format: &FormatSettings,
59+
size: usize,
60+
) -> Result<opensrv_clickhouse::types::column::ArcColumnData> {
61+
let len = self.offsets.len() - 1;
62+
let mut offsets = opensrv_clickhouse::types::column::List::with_capacity(size * len);
63+
let total = self.offsets[len];
64+
let mut base = 0;
65+
for _ in 0..size {
66+
for offset in self.offsets.iter().skip(1) {
67+
offsets.push(((*offset) + base) as u64);
68+
}
69+
base += total;
6670
}
67-
Ok(result)
68-
}
6971

70-
fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result<Vec<Value>> {
71-
let column: &ArrayColumn = Series::check_get(column)?;
72-
let mut result = Vec::with_capacity(column.len());
73-
for i in 0..column.len() {
74-
let val = column.get(i);
75-
let s = serde_json::to_value(val)?;
76-
result.push(s);
77-
}
78-
Ok(result)
72+
let inner_data = self.inner.serialize_clickhouse_const(format, size)?;
73+
Ok(Arc::new(ArrayColumnData::create(inner_data, offsets)))
7974
}
8075

81-
fn serialize_clickhouse_format(
76+
fn serialize_clickhouse_column(
8277
&self,
83-
column: &ColumnRef,
8478
format: &FormatSettings,
8579
) -> Result<opensrv_clickhouse::types::column::ArcColumnData> {
86-
let column: &ArrayColumn = Series::check_get(column)?;
87-
let mut offsets = opensrv_clickhouse::types::column::List::with_capacity(column.len());
88-
for offset in column.offsets().iter().skip(1) {
80+
let mut offsets =
81+
opensrv_clickhouse::types::column::List::with_capacity(self.offsets.len() - 1);
82+
for offset in self.offsets.iter().skip(1) {
8983
offsets.push(*offset as u64);
9084
}
9185

92-
let inner_data = self
93-
.inner
94-
.serialize_clickhouse_format(column.values(), format)?;
86+
let inner_data = self.inner.serialize_clickhouse_column(format)?;
9587
Ok(Arc::new(ArrayColumnData::create(inner_data, offsets)))
9688
}
89+
90+
fn serialize_json(&self, format: &FormatSettings) -> Result<Vec<Value>> {
91+
let size = self.offsets.len() - 1;
92+
let mut result = Vec::with_capacity(size);
93+
let inner = self.inner.serialize_json(format)?;
94+
let mut iter = inner.into_iter();
95+
for i in 0..size {
96+
let len = (self.offsets[i + 1] - self.offsets[i]) as usize;
97+
let chunk = iter.by_ref().take(len).collect();
98+
result.push(Value::Array(chunk))
99+
}
100+
Ok(result)
101+
}
97102
}

0 commit comments

Comments
 (0)