diff --git a/common/datablocks/src/data_block.rs b/common/datablocks/src/data_block.rs index de2e7d96585e0..28c6a11d10976 100644 --- a/common/datablocks/src/data_block.rs +++ b/common/datablocks/src/data_block.rs @@ -204,6 +204,20 @@ impl DataBlock { Ok(DataBlock::create(schema.clone(), columns)) } + + pub fn get_serializers(&self) -> Result> { + let columns_size = self.num_columns(); + + let mut serializers = vec![]; + for col_index in 0..columns_size { + let column = self.column(col_index); + let field = self.schema().field(col_index); + let data_type = field.data_type(); + let serializer = data_type.create_serializer(column)?; + serializers.push(serializer); + } + Ok(serializers) + } } impl TryFrom for Chunk { diff --git a/common/datavalues/Cargo.toml b/common/datavalues/Cargo.toml index fb40fe8fc9205..90270012e61e5 100644 --- a/common/datavalues/Cargo.toml +++ b/common/datavalues/Cargo.toml @@ -57,3 +57,7 @@ harness = false [[bench]] name = "data_type" harness = false + +[[bench]] +name = "output_format" +harness = false diff --git a/common/datavalues/benches/output_format.rs b/common/datavalues/benches/output_format.rs new file mode 100644 index 0000000000000..0774b2bb6a2a7 --- /dev/null +++ b/common/datavalues/benches/output_format.rs @@ -0,0 +1,119 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datavalues::ColumnRef; +use common_datavalues::DataType; +use common_datavalues::Series; +use common_datavalues::SeriesFrom; +use common_datavalues::TypeSerializer; +use common_io::prelude::FormatSettings; +use criterion::black_box; +use criterion::criterion_group; +use criterion::criterion_main; +use criterion::Criterion; +use rand::distributions::Alphanumeric; +use rand::rngs::StdRng; +use rand::Rng; +use rand::SeedableRng; + +pub fn write_csv(cols: &[ColumnRef]) -> Vec { + let mut buf = Vec::with_capacity(1000 * 1000); + let mut ss = vec![]; + let rows = cols[0].len(); + + for c in cols { + let t = c.data_type(); + ss.push(t.create_serializer(c).unwrap()) + } + let f = &FormatSettings::default(); + for row in 0..rows { + for s in &ss { + s.write_field(row, &mut buf, f); + } + } + buf +} + +fn add_benchmark(c: &mut Criterion) { + let mut cols = vec![]; + let size = 4096; + cols.push(create_primitive_array(size, None, 0)); + cols.push(create_string_array(size, None, 100)); + c.bench_function("not_nullable", |b| { + b.iter(|| write_csv(black_box(&cols))); + }); +} + +pub fn create_primitive_array( + size: usize, + null_density: Option, + _item_size: usize, +) -> ColumnRef { + let mut rng = StdRng::seed_from_u64(3); + match null_density { + None => { + let v = (0..size).map(|_| rng.gen()).collect::>(); + Series::from_data(v) + } + Some(null_density) => { + let v = (0..size) + .map(|_| { + if rng.gen::() < null_density { + None + } else { + Some(rng.gen()) + } + }) + .collect::>>(); + Series::from_data(v) + } + } +} +use std::string::String; +pub fn create_string_array(size: usize, null_density: Option, item_size: usize) -> ColumnRef { + let mut rng = StdRng::seed_from_u64(3); + match null_density { + None => { + let vec: Vec = (0..size) + .map(|_| { + (&mut rng) + .sample_iter(&Alphanumeric) + .take(item_size) + .map(char::from) + .collect::() + }) + .collect(); + Series::from_data(vec) + } + Some(null_density) => { + let vec: Vec<_> = (0..item_size) + .map(|_| { + if rng.gen::() < null_density { + None + } else { + let value = (&mut rng) + .sample_iter(&Alphanumeric) + .take(size) + .collect::>(); + Some(value) + } + }) + .collect(); + Series::from_data(vec) + } + } +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/common/datavalues/src/types/data_type.rs b/common/datavalues/src/types/data_type.rs index 987f47c0fd23d..98b2f1a2f51f4 100644 --- a/common/datavalues/src/types/data_type.rs +++ b/common/datavalues/src/types/data_type.rs @@ -40,6 +40,7 @@ use super::type_string::StringType; use super::type_struct::StructType; use super::type_timestamp::TimestampType; use crate::prelude::*; +use crate::serializations::ConstSerializer; pub const ARROW_EXTENSION_NAME: &str = "ARROW:extension:databend_name"; pub const ARROW_EXTENSION_META: &str = "ARROW:extension:databend_metadata"; @@ -73,7 +74,9 @@ pub enum DataTypeImpl { } #[enum_dispatch] -pub trait DataType: std::fmt::Debug + Sync + Send + DynClone { +pub trait DataType: std::fmt::Debug + Sync + Send + DynClone +where Self: Sized +{ fn data_type_id(&self) -> TypeID; fn is_nullable(&self) -> bool { @@ -125,7 +128,23 @@ pub trait DataType: std::fmt::Debug + Sync + Send + DynClone { fn create_mutable(&self, capacity: usize) -> Box; - fn create_serializer(&self) -> TypeSerializerImpl; + fn create_serializer<'a>(&self, col: &'a ColumnRef) -> Result> { + if col.is_const() { + let col: &ConstColumn = Series::check_get(col)?; + let inner = Box::new(self.create_serializer_inner(col.inner())?); + Ok(ConstSerializer { + inner, + size: col.len(), + } + .into()) + } else { + self.create_serializer_inner(col) + } + } + + fn create_serializer_inner<'a>(&self, _col: &'a ColumnRef) -> Result> { + unimplemented!() + } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl; } diff --git a/common/datavalues/src/types/mod.rs b/common/datavalues/src/types/mod.rs index d4b78e6f99e7e..f7601516efff1 100644 --- a/common/datavalues/src/types/mod.rs +++ b/common/datavalues/src/types/mod.rs @@ -44,7 +44,8 @@ pub use date_converter::*; pub use date_converter::*; pub use deserializations::*; pub use eq::*; -pub use serializations::*; +pub use serializations::TypeSerializer; +pub use serializations::TypeSerializerImpl; pub use type_array::*; pub use type_boolean::*; pub use type_date::*; diff --git a/common/datavalues/src/types/serializations/array.rs b/common/datavalues/src/types/serializations/array.rs index c3e14e2b15eb7..b2b491b162e71 100644 --- a/common/datavalues/src/types/serializations/array.rs +++ b/common/datavalues/src/types/serializations/array.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Write; use std::sync::Arc; -use common_exception::ErrorCode; use common_exception::Result; use common_io::prelude::FormatSettings; use opensrv_clickhouse::types::column::ArrayColumnData; @@ -23,75 +21,82 @@ use serde_json::Value; use crate::prelude::*; -#[derive(Debug, Clone)] -pub struct ArraySerializer { - pub inner: Box, - pub typ: DataTypeImpl, +#[derive(Clone)] +pub struct ArraySerializer<'a> { + offsets: &'a [i64], + inner: Box>, } -impl TypeSerializer for ArraySerializer { - fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result { - if let DataValue::Array(vals) = value { - let mut res = String::new(); - res.push('['); - let mut first = true; - let quoted = self.typ.data_type_id().is_quoted(); - for val in vals { - if !first { - res.push_str(", "); - } - first = false; +impl<'a> ArraySerializer<'a> { + pub fn try_create(column: &'a ColumnRef, inner_type: &DataTypeImpl) -> Result { + let column: &ArrayColumn = Series::check_get(column)?; + let inner = Box::new(inner_type.create_serializer(column.values())?); + Ok(Self { + offsets: column.offsets(), + inner, + }) + } +} - let s = self.inner.serialize_value(val, format)?; - if quoted { - write!(res, "'{s}'").expect("write to string must succeed"); - } else { - res.push_str(&s); - } +impl<'a> TypeSerializer<'a> for ArraySerializer<'a> { + fn write_field(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { + let start = self.offsets[row_index] as usize; + let end = self.offsets[row_index + 1] as usize; + buf.push(b'['); + let inner = &self.inner; + for i in start..end { + if i != start { + buf.extend_from_slice(b", "); } - res.push(']'); - Ok(res) - } else { - Err(ErrorCode::BadBytes("Incorrect Array value")) + inner.write_field_quoted(i, buf, format, b'\''); } + buf.push(b']'); } - fn serialize_column(&self, column: &ColumnRef, format: &FormatSettings) -> Result> { - let column: &ArrayColumn = Series::check_get(column)?; - let mut result = Vec::with_capacity(column.len()); - for i in 0..column.len() { - let val = column.get(i); - let s = self.serialize_value(&val, format)?; - result.push(s); + fn serialize_clickhouse_const( + &self, + format: &FormatSettings, + size: usize, + ) -> Result { + let len = self.offsets.len() - 1; + let mut offsets = opensrv_clickhouse::types::column::List::with_capacity(size * len); + let total = self.offsets[len]; + let mut base = 0; + for _ in 0..size { + for offset in self.offsets.iter().skip(1) { + offsets.push(((*offset) + base) as u64); + } + base += total; } - Ok(result) - } - fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result> { - let column: &ArrayColumn = Series::check_get(column)?; - let mut result = Vec::with_capacity(column.len()); - for i in 0..column.len() { - let val = column.get(i); - let s = serde_json::to_value(val)?; - result.push(s); - } - Ok(result) + let inner_data = self.inner.serialize_clickhouse_const(format, size)?; + Ok(Arc::new(ArrayColumnData::create(inner_data, offsets))) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_column( &self, - column: &ColumnRef, format: &FormatSettings, ) -> Result { - let column: &ArrayColumn = Series::check_get(column)?; - let mut offsets = opensrv_clickhouse::types::column::List::with_capacity(column.len()); - for offset in column.offsets().iter().skip(1) { + let mut offsets = + opensrv_clickhouse::types::column::List::with_capacity(self.offsets.len() - 1); + for offset in self.offsets.iter().skip(1) { offsets.push(*offset as u64); } - let inner_data = self - .inner - .serialize_clickhouse_format(column.values(), format)?; + let inner_data = self.inner.serialize_clickhouse_column(format)?; Ok(Arc::new(ArrayColumnData::create(inner_data, offsets))) } + + fn serialize_json(&self, format: &FormatSettings) -> Result> { + let size = self.offsets.len() - 1; + let mut result = Vec::with_capacity(size); + let inner = self.inner.serialize_json(format)?; + let mut iter = inner.into_iter(); + for i in 0..size { + let len = (self.offsets[i + 1] - self.offsets[i]) as usize; + let chunk = iter.by_ref().take(len).collect(); + result.push(Value::Array(chunk)) + } + Ok(result) + } } diff --git a/common/datavalues/src/types/serializations/boolean.rs b/common/datavalues/src/types/serializations/boolean.rs index 1ca1a43052638..73ff756a1ec94 100644 --- a/common/datavalues/src/types/serializations/boolean.rs +++ b/common/datavalues/src/types/serializations/boolean.rs @@ -13,7 +13,6 @@ // limitations under the License. use common_arrow::arrow::bitmap::Bitmap; -use common_exception::ErrorCode; use common_exception::Result; use common_io::prelude::FormatSettings; use opensrv_clickhouse::types::column::ArcColumnWrapper; @@ -22,80 +21,78 @@ use serde_json::Value; use crate::prelude::*; -#[derive(Debug, Clone)] -pub struct BooleanSerializer {} - -const TRUE_STR: &str = "1"; -const FALSE_STR: &str = "0"; +#[derive(Clone)] +pub struct BooleanSerializer { + pub(crate) values: Bitmap, +} -impl TypeSerializer for BooleanSerializer { - fn serialize_value(&self, value: &DataValue, _format: &FormatSettings) -> Result { - if let DataValue::Boolean(x) = value { - if *x { - Ok(TRUE_STR.to_owned()) - } else { - Ok(FALSE_STR.to_owned()) - } - } else { - Err(ErrorCode::BadBytes("Incorrect boolean value")) - } +impl BooleanSerializer { + pub fn try_create(col: &ColumnRef) -> Result { + let col: &BooleanColumn = Series::check_get(col)?; + let values = col.values().clone(); + Ok(Self { values }) } +} - fn serialize_column( - &self, - column: &ColumnRef, - _format: &FormatSettings, - ) -> Result> { - let array: &BooleanColumn = Series::check_get(column)?; +impl<'a> TypeSerializer<'a> for BooleanSerializer { + fn need_quote(&self) -> bool { + false + } - let result: Vec = array - .iter() - .map(|v| { - if v { - TRUE_STR.to_owned() - } else { - FALSE_STR.to_owned() - } - }) - .collect(); - Ok(result) + fn write_field(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { + let v = if self.values.get_bit(row_index) { + &format.true_bytes + } else { + &format.false_bytes + }; + buf.extend_from_slice(v); } - fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result> { - let array: &BooleanColumn = Series::check_get(column)?; - let result: Vec = array + fn serialize_json(&self, _format: &FormatSettings) -> Result> { + let result: Vec = self + .values .iter() .map(|v| serde_json::to_value(v).unwrap()) .collect(); Ok(result) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_const( + &self, + _format: &FormatSettings, + size: usize, + ) -> Result { + let mut values: Vec = Vec::with_capacity(self.values.len() * size); + for _ in 0..size { + for v in self.values.iter() { + values.push(v as u8) + } + } + Ok(Vec::column_from::(values)) + } + + fn serialize_clickhouse_column( &self, - column: &ColumnRef, _format: &FormatSettings, ) -> Result { - let col: &BooleanColumn = Series::check_get(column)?; - let values: Vec = col.iter().map(|c| c as u8).collect(); + let values: Vec = self.values.iter().map(|c| c as u8).collect(); Ok(Vec::column_from::(values)) } fn serialize_json_object( &self, - column: &ColumnRef, _valids: Option<&Bitmap>, format: &FormatSettings, ) -> Result> { - self.serialize_json(column, format) + self.serialize_json(format) } fn serialize_json_object_suppress_error( &self, - column: &ColumnRef, _format: &FormatSettings, ) -> Result>> { - let column: &BooleanColumn = Series::check_get(column)?; - let result: Vec> = column + let result: Vec> = self + .values .iter() .map(|x| match serde_json::to_value(x) { Ok(v) => Some(v), diff --git a/common/datavalues/src/types/serializations/const_.rs b/common/datavalues/src/types/serializations/const_.rs new file mode 100644 index 0000000000000..52bb0e4b817aa --- /dev/null +++ b/common/datavalues/src/types/serializations/const_.rs @@ -0,0 +1,71 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_arrow::arrow::bitmap::Bitmap; +use common_exception::Result; +use common_io::prelude::FormatSettings; +use serde_json::Value; + +use crate::prelude::*; + +#[derive(Clone)] +pub struct ConstSerializer<'a> { + pub size: usize, + pub inner: Box>, +} + +impl<'a> ConstSerializer<'a> { + fn repeat(&self, one: Vec) -> Vec { + let mut res = Vec::with_capacity(self.size); + for _ in 0..self.size { + res.push(one[0].clone()) + } + res + } +} +impl<'a> TypeSerializer<'a> for ConstSerializer<'a> { + fn need_quote(&self) -> bool { + self.inner.need_quote() + } + + fn write_field(&self, _row_index: usize, buf: &mut Vec, format: &FormatSettings) { + self.inner.write_field(0, buf, format) + } + + fn serialize_json(&self, format: &FormatSettings) -> Result> { + Ok(self.repeat(self.inner.serialize_json(format)?)) + } + + fn serialize_clickhouse_column( + &self, + format: &FormatSettings, + ) -> Result { + self.inner.serialize_clickhouse_const(format, self.size) + } + + fn serialize_json_object( + &self, + valids: Option<&Bitmap>, + format: &FormatSettings, + ) -> Result> { + Ok(self.repeat(self.inner.serialize_json_object(valids, format)?)) + } + + fn serialize_json_object_suppress_error( + &self, + format: &FormatSettings, + ) -> Result>> { + Ok(self.repeat(self.inner.serialize_json_object_suppress_error(format)?)) + } +} diff --git a/common/datavalues/src/types/serializations/date.rs b/common/datavalues/src/types/serializations/date.rs index 74c58408b2171..f1e0de7cd341d 100644 --- a/common/datavalues/src/types/serializations/date.rs +++ b/common/datavalues/src/types/serializations/date.rs @@ -11,107 +11,96 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::marker::PhantomData; + use std::ops::AddAssign; use chrono::Date; use chrono::Duration; use chrono::NaiveDate; use chrono_tz::Tz; -use common_exception::*; +use common_exception::Result; use common_io::prelude::FormatSettings; +use lexical_core::ToLexical; use num::cast::AsPrimitive; +use opensrv_clickhouse::types::column::ArcColumnData; use opensrv_clickhouse::types::column::ArcColumnWrapper; use opensrv_clickhouse::types::column::ColumnFrom; use serde_json::Value; -use crate::prelude::*; +use crate::serializations::TypeSerializer; +use crate::ColumnRef; +use crate::DateConverter; +use crate::PrimitiveColumn; +use crate::PrimitiveType; +use crate::Series; + +const DATE_FMT: &str = "%Y-%m-%d"; #[derive(Debug, Clone)] -pub struct DateSerializer> { - t: PhantomData, +pub struct DateSerializer<'a, T: PrimitiveType + AsPrimitive + ToLexical> { + pub(crate) values: &'a [T], } -impl> Default for DateSerializer { - fn default() -> Self { - Self { - t: Default::default(), - } - } +fn v_to_string(v: &i64) -> String { + let mut date = NaiveDate::from_ymd(1970, 1, 1); + let d = Duration::days(*v); + date.add_assign(d); + date.format(DATE_FMT).to_string() } -const DATE_FMT: &str = "%Y-%m-%d"; +impl<'a, T: PrimitiveType + AsPrimitive + ToLexical> DateSerializer<'a, T> { + pub fn try_create(col: &'a ColumnRef) -> Result { + let col: &PrimitiveColumn = Series::check_get(col)?; + Ok(Self { + values: col.values(), + }) + } +} -impl> TypeSerializer for DateSerializer { - fn serialize_value(&self, value: &DataValue, _format: &FormatSettings) -> Result { - let mut date = NaiveDate::from_ymd(1970, 1, 1); - let d = Duration::days(value.as_i64()?); - date.add_assign(d); - Ok(date.format(DATE_FMT).to_string()) +impl<'a, T: PrimitiveType + AsPrimitive + ToLexical> TypeSerializer<'a> + for DateSerializer<'a, T> +{ + fn need_quote(&self) -> bool { + true } - fn serialize_column( - &self, - column: &ColumnRef, - _format: &FormatSettings, - ) -> Result> { - let column: &PrimitiveColumn = Series::check_get(column)?; + fn write_field(&self, row_index: usize, buf: &mut Vec, _format: &FormatSettings) { + let s = v_to_string(&self.values[row_index].as_i64()); + buf.extend_from_slice(s.as_bytes()) + } - let result: Vec = column - .iter() - .map(|v| { - let mut date = NaiveDate::from_ymd(1970, 1, 1); - let d = Duration::days(v.to_i64().unwrap()); - date.add_assign(d); - date.format(DATE_FMT).to_string() + fn serialize_json(&self, _format: &FormatSettings) -> Result> { + let result: Vec = (0..self.values.len()) + .map(|row_index| { + let s = v_to_string(&self.values[row_index].as_i64()); + serde_json::to_value(s).unwrap() }) .collect(); Ok(result) } - fn serialize_column_quoted( + fn serialize_clickhouse_const( &self, - column: &ColumnRef, _format: &FormatSettings, - ) -> Result> { - let column: &PrimitiveColumn = Series::check_get(column)?; - - let result: Vec = column - .iter() - .map(|v| { - let mut date = NaiveDate::from_ymd(1970, 1, 1); - let d = Duration::days(v.to_i64().unwrap()); - date.add_assign(d); - format!("\"{}\"", date.format(DATE_FMT)) - }) - .collect(); - Ok(result) - } - - fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result> { - let array: &PrimitiveColumn = Series::check_get(column)?; - let result: Vec = array - .iter() - .map(|v| { - let mut date = NaiveDate::from_ymd(1970, 1, 1); - let d = Duration::days(v.to_i64().unwrap()); - date.add_assign(d); - let str = date.format(DATE_FMT).to_string(); - serde_json::to_value(str).unwrap() - }) - .collect(); - Ok(result) + size: usize, + ) -> Result { + let tz: Tz = "UTC".parse().unwrap(); + let dates: Vec> = self.values.iter().map(|v| v.to_date(&tz)).collect(); + let mut values: Vec> = Vec::with_capacity(self.values.len() * size); + for _ in 0..size { + for v in dates.iter() { + values.push(*v) + } + } + Ok(Vec::column_from::(values)) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_column( &self, - column: &ColumnRef, _format: &FormatSettings, ) -> Result { - let array: &PrimitiveColumn = Series::check_get(column)?; let tz: Tz = "UTC".parse().unwrap(); - - let values: Vec> = array.iter().map(|v| v.to_date(&tz)).collect(); + let values: Vec> = self.values.iter().map(|v| v.to_date(&tz)).collect(); Ok(Vec::column_from::(values)) } } diff --git a/common/datavalues/src/types/serializations/mod.rs b/common/datavalues/src/types/serializations/mod.rs index 918c2e0c3a09f..91bb58c5eaa32 100644 --- a/common/datavalues/src/types/serializations/mod.rs +++ b/common/datavalues/src/types/serializations/mod.rs @@ -12,17 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_arrow::arrow::bitmap::Bitmap; -use common_exception::ErrorCode; -use common_exception::Result; -use common_io::prelude::FormatSettings; -use enum_dispatch::enum_dispatch; -use opensrv_clickhouse::types::column::ArcColumnData; -use serde_json::Value; - -use crate::prelude::*; mod array; mod boolean; +mod const_; mod date; mod null; mod nullable; @@ -32,40 +24,72 @@ mod struct_; mod timestamp; mod variant; -pub use array::*; -pub use boolean::*; -pub use date::*; -pub use null::*; -pub use nullable::*; -pub use number::*; -pub use string::*; -pub use struct_::*; -pub use timestamp::*; -pub use variant::*; +pub use array::ArraySerializer; +pub use boolean::BooleanSerializer; +use common_arrow::arrow::bitmap::Bitmap; +use common_exception::ErrorCode; +use common_exception::Result; +use common_exception::ToErrorCode; +use common_io::prelude::FormatSettings; +pub use const_::ConstSerializer; +pub use date::DateSerializer; +use enum_dispatch::enum_dispatch; +pub use null::NullSerializer; +pub use nullable::NullableSerializer; +pub use number::NumberSerializer; +use opensrv_clickhouse::types::column::ArcColumnData; +use serde_json::Value; +pub use string::StringSerializer; +pub use struct_::StructSerializer; +pub use timestamp::TimestampSerializer; +pub use variant::VariantSerializer; #[enum_dispatch] -pub trait TypeSerializer: Send + Sync { - fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result; - fn serialize_json(&self, column: &ColumnRef, format: &FormatSettings) -> Result>; - fn serialize_column(&self, column: &ColumnRef, format: &FormatSettings) -> Result>; - - fn serialize_column_quoted( +pub trait TypeSerializer<'a>: Send + Sync { + fn need_quote(&self) -> bool { + false + } + fn write_field(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings); + fn write_field_quoted( &self, - column: &ColumnRef, + row_index: usize, + buf: &mut Vec, format: &FormatSettings, - ) -> Result> { - self.serialize_column(column, format) + quote: u8, + ) { + let need_quote = self.need_quote(); + if need_quote { + buf.push(quote); + } + self.write_field(row_index, buf, format); + if need_quote { + buf.push(quote); + } + } + + fn serialize_field(&self, row_index: usize, format: &FormatSettings) -> Result { + let mut buf = Vec::with_capacity(100); + self.write_field(row_index, &mut buf, format); + String::from_utf8(buf).map_err_to_code(ErrorCode::BadBytes, || "fail to serialize field") + } + + fn serialize_json(&self, _format: &FormatSettings) -> Result> { + unimplemented!() } - fn serialize_clickhouse_format( + fn serialize_clickhouse_const( &self, - column: &ColumnRef, _format: &FormatSettings, - ) -> Result; + _size: usize, + ) -> Result { + unimplemented!() + } + fn serialize_clickhouse_column(&self, _format: &FormatSettings) -> Result { + unimplemented!() + } fn serialize_json_object( &self, - _column: &ColumnRef, _valids: Option<&Bitmap>, _format: &FormatSettings, ) -> Result> { @@ -76,7 +100,6 @@ pub trait TypeSerializer: Send + Sync { fn serialize_json_object_suppress_error( &self, - _column: &ColumnRef, _format: &FormatSettings, ) -> Result>> { Err(ErrorCode::BadDataValueType( @@ -85,28 +108,30 @@ pub trait TypeSerializer: Send + Sync { } } -#[derive(Debug, Clone)] +#[derive(Clone)] #[enum_dispatch(TypeSerializer)] -pub enum TypeSerializerImpl { +pub enum TypeSerializerImpl<'a> { + Const(ConstSerializer<'a>), + Null(NullSerializer), - Nullable(NullableSerializer), + Nullable(NullableSerializer<'a>), Boolean(BooleanSerializer), - Int8(NumberSerializer), - Int16(NumberSerializer), - Int32(NumberSerializer), - Int64(NumberSerializer), - UInt8(NumberSerializer), - UInt16(NumberSerializer), - UInt32(NumberSerializer), - UInt64(NumberSerializer), - Float32(NumberSerializer), - Float64(NumberSerializer), + Int8(NumberSerializer<'a, i8>), + Int16(NumberSerializer<'a, i16>), + Int32(NumberSerializer<'a, i32>), + Int64(NumberSerializer<'a, i64>), + UInt8(NumberSerializer<'a, u8>), + UInt16(NumberSerializer<'a, u16>), + UInt32(NumberSerializer<'a, u32>), + UInt64(NumberSerializer<'a, u64>), + Float32(NumberSerializer<'a, f32>), + Float64(NumberSerializer<'a, f64>), - Date(DateSerializer), - Interval(DateSerializer), - Timestamp(TimestampSerializer), - String(StringSerializer), - Array(ArraySerializer), - Struct(StructSerializer), - Variant(VariantSerializer), + Date(DateSerializer<'a, i32>), + Interval(DateSerializer<'a, i64>), + Timestamp(TimestampSerializer<'a>), + String(StringSerializer<'a>), + Array(ArraySerializer<'a>), + Struct(StructSerializer<'a>), + Variant(VariantSerializer<'a>), } diff --git a/common/datavalues/src/types/serializations/null.rs b/common/datavalues/src/types/serializations/null.rs index e2097a1e92c1f..a31fce345800d 100644 --- a/common/datavalues/src/types/serializations/null.rs +++ b/common/datavalues/src/types/serializations/null.rs @@ -21,42 +21,53 @@ use opensrv_clickhouse::types::column::ColumnFrom; use opensrv_clickhouse::types::column::NullableColumnData; use serde_json::Value; -use crate::prelude::DataValue; -use crate::ColumnRef; -use crate::TypeSerializer; +use crate::serializations::TypeSerializer; #[derive(Clone, Debug, Default)] -pub struct NullSerializer {} +pub struct NullSerializer { + pub size: usize, +} const NULL_STR: &str = "NULL"; +const NULL_BYTES: &[u8] = b"NULL"; -impl TypeSerializer for NullSerializer { - fn serialize_value(&self, _value: &DataValue, _format: &FormatSettings) -> Result { - Ok(NULL_STR.to_owned()) +impl<'a> TypeSerializer<'a> for NullSerializer { + fn need_quote(&self) -> bool { + false } - fn serialize_column( - &self, - column: &ColumnRef, - _format: &FormatSettings, - ) -> Result> { - let result: Vec = vec![NULL_STR.to_owned(); column.len()]; - Ok(result) + fn write_field(&self, _row_index: usize, buf: &mut Vec, _format: &FormatSettings) { + buf.extend_from_slice(NULL_BYTES); + } + + fn serialize_field(&self, _row_index: usize, _format: &FormatSettings) -> Result { + Ok(NULL_STR.to_owned()) } - fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result> { + fn serialize_json(&self, _format: &FormatSettings) -> Result> { let null = Value::Null; - let result: Vec = vec![null; column.len()]; + let result: Vec = vec![null; self.size]; Ok(result) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_const( + &self, + _format: &FormatSettings, + size: usize, + ) -> Result { + let n = size * self.size; + let nulls = vec![1u8; n]; + let inner = Vec::column_from::(vec![1u8; n]); + let data = NullableColumnData { nulls, inner }; + Ok(Arc::new(data)) + } + + fn serialize_clickhouse_column( &self, - column: &ColumnRef, _format: &FormatSettings, ) -> Result { - let nulls = vec![1u8; column.len()]; - let inner = Vec::column_from::(vec![1u8; column.len()]); + let nulls = vec![1u8; self.size]; + let inner = Vec::column_from::(vec![1u8; self.size]); let data = NullableColumnData { nulls, inner }; Ok(Arc::new(data)) } diff --git a/common/datavalues/src/types/serializations/nullable.rs b/common/datavalues/src/types/serializations/nullable.rs index 6f95f04d43538..9a35a68bd7c2a 100644 --- a/common/datavalues/src/types/serializations/nullable.rs +++ b/common/datavalues/src/types/serializations/nullable.rs @@ -14,86 +14,67 @@ use std::sync::Arc; +use common_arrow::arrow::bitmap::Bitmap; use common_exception::Result; use common_io::prelude::FormatSettings; use opensrv_clickhouse::types::column::NullableColumnData; use serde_json::Value; -use crate::prelude::DataValue; -use crate::Column; -use crate::ColumnRef; -use crate::NullableColumn; -use crate::Series; -use crate::TypeSerializer; -use crate::TypeSerializerImpl; +use crate::serializations::TypeSerializer; +use crate::serializations::TypeSerializerImpl; -#[derive(Debug, Clone)] -pub struct NullableSerializer { - pub inner: Box, +const NULL_BYTES: &[u8] = b"NULL"; + +#[derive(Clone)] +pub struct NullableSerializer<'a> { + pub validity: &'a Bitmap, + pub inner: Box>, } -impl TypeSerializer for NullableSerializer { - fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result { - if value.is_null() { - Ok("NULL".to_owned()) +impl<'a> TypeSerializer<'a> for NullableSerializer<'a> { + fn need_quote(&self) -> bool { + self.inner.need_quote() + } + + fn write_field(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { + if !self.validity.get_bit(row_index) { + buf.extend_from_slice(NULL_BYTES); } else { - self.inner.serialize_value(value, format) + self.inner.write_field(row_index, buf, format) } } - fn serialize_column(&self, column: &ColumnRef, format: &FormatSettings) -> Result> { - let column: &NullableColumn = Series::check_get(column)?; - let rows = column.len(); - let mut res = self.inner.serialize_column(column.inner(), format)?; + fn serialize_json(&self, format: &FormatSettings) -> Result> { + let mut res = self.inner.serialize_json(format)?; + let validity = self.validity; - (0..rows).for_each(|row| { - if column.null_at(row) { - res[row] = "NULL".to_owned(); + (0..validity.len()).for_each(|row| { + if !validity.get_bit(row) { + res[row] = Value::Null; } }); Ok(res) } - fn serialize_column_quoted( + fn serialize_clickhouse_const( &self, - column: &ColumnRef, format: &FormatSettings, - ) -> Result> { - let column: &NullableColumn = Series::check_get(column)?; - let rows = column.len(); - let mut res = self.inner.serialize_column_quoted(column.inner(), format)?; - - (0..rows).for_each(|row| { - if column.null_at(row) { - res[row] = "NULL".to_owned(); - } - }); - Ok(res) - } - - fn serialize_json(&self, column: &ColumnRef, format: &FormatSettings) -> Result> { - let column: &NullableColumn = Series::check_get(column)?; - let rows = column.len(); - let mut res = self.inner.serialize_json(column.inner(), format)?; + size: usize, + ) -> Result { + let inner = self.inner.serialize_clickhouse_const(format, size)?; + let nulls: Vec<_> = self.validity.iter().map(|v| !v as u8).collect(); + let nulls = nulls.repeat(size); + let data = NullableColumnData { nulls, inner }; - (0..rows).for_each(|row| { - if column.null_at(row) { - res[row] = Value::Null; - } - }); - Ok(res) + Ok(Arc::new(data)) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_column( &self, - column: &ColumnRef, format: &FormatSettings, ) -> Result { - let column: &NullableColumn = Series::check_get(column)?; - let inner = self - .inner - .serialize_clickhouse_format(column.inner(), format)?; - let nulls = column.ensure_validity().iter().map(|v| !v as u8).collect(); + let inner = self.inner.serialize_clickhouse_column(format)?; + let nulls = self.validity.iter().map(|v| !v as u8).collect(); let data = NullableColumnData { nulls, inner }; Ok(Arc::new(data)) diff --git a/common/datavalues/src/types/serializations/number.rs b/common/datavalues/src/types/serializations/number.rs index 965a7d63ea957..97b1d19595374 100644 --- a/common/datavalues/src/types/serializations/number.rs +++ b/common/datavalues/src/types/serializations/number.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::marker::PhantomData; - use common_arrow::arrow::bitmap::Bitmap; use common_exception::Result; use common_io::prelude::FormatSettings; @@ -24,22 +22,27 @@ use opensrv_clickhouse::types::column::ColumnFrom; use opensrv_clickhouse::types::HasSqlType; use serde_json::Value; -use crate::prelude::*; +use crate::ColumnRef; +use crate::PrimitiveColumn; +use crate::PrimitiveType; +use crate::Series; +use crate::TypeSerializer; #[derive(Debug, Clone)] -pub struct NumberSerializer { - t: PhantomData, +pub struct NumberSerializer<'a, T: PrimitiveType> { + pub(crate) values: &'a [T], } -impl Default for NumberSerializer { - fn default() -> Self { - Self { - t: Default::default(), - } +impl<'a, T: PrimitiveType> NumberSerializer<'a, T> { + pub fn try_create(col: &'a ColumnRef) -> Result { + let col: &PrimitiveColumn = Series::check_get(col)?; + Ok(NumberSerializer { + values: col.values(), + }) } } -impl TypeSerializer for NumberSerializer +impl<'a, T> TypeSerializer<'a> for NumberSerializer<'a, T> where T: PrimitiveType + opensrv_clickhouse::types::StatBuffer + Marshal @@ -49,56 +52,59 @@ where T: PrimitiveType + std::convert::From + opensrv_clickhouse::io::Marshal + opensrv_clickhouse::io::Unmarshal + + lexical_core::ToLexical { - fn serialize_value(&self, value: &DataValue, _format: &FormatSettings) -> Result { - Ok(format!("{:?}", value)) + fn need_quote(&self) -> bool { + false } - - fn serialize_column( - &self, - column: &ColumnRef, - _format: &FormatSettings, - ) -> Result> { - let column: &PrimitiveColumn = Series::check_get(column)?; - let result: Vec = column.iter().map(|x| x.to_string()).collect(); - Ok(result) + fn write_field(&self, row_index: usize, buf: &mut Vec, _format: &FormatSettings) { + extend_lexical(self.values[row_index], buf); } - fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result> { - let column: &PrimitiveColumn = Series::check_get(column)?; - let result: Vec = column + fn serialize_json(&self, _format: &FormatSettings) -> Result> { + let result: Vec = self + .values .iter() .map(|x| serde_json::to_value(x).unwrap()) .collect(); Ok(result) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_const( &self, - column: &ColumnRef, _format: &FormatSettings, + size: usize, ) -> Result { - let col: &PrimitiveColumn = Series::check_get(column)?; - let values: Vec = col.iter().map(|c| c.to_owned()).collect(); + let mut values: Vec = Vec::with_capacity(self.values.len() * size); + for _ in 0..size { + for v in self.values.iter() { + values.push(*v) + } + } + Ok(Vec::column_from::(values)) + } + fn serialize_clickhouse_column( + &self, + _format: &FormatSettings, + ) -> Result { + let values: Vec = self.values.iter().map(|c| c.to_owned()).collect(); Ok(Vec::column_from::(values)) } fn serialize_json_object( &self, - column: &ColumnRef, _valids: Option<&Bitmap>, format: &FormatSettings, ) -> Result> { - self.serialize_json(column, format) + self.serialize_json(format) } fn serialize_json_object_suppress_error( &self, - column: &ColumnRef, _format: &FormatSettings, ) -> Result>> { - let column: &PrimitiveColumn = Series::check_get(column)?; - let result: Vec> = column + let result: Vec> = self + .values .iter() .map(|x| match serde_json::to_value(x) { Ok(v) => Some(v), @@ -108,3 +114,16 @@ where T: PrimitiveType Ok(result) } } + +// 30% faster lexical_core::write to tmp buf and extend_from_slice +#[inline] +pub fn extend_lexical(n: N, buf: &mut Vec) { + buf.reserve(N::FORMATTED_SIZE_DECIMAL); + let len0 = buf.len(); + unsafe { + let slice = + std::slice::from_raw_parts_mut(buf.as_mut_ptr().add(len0), buf.capacity() - len0); + let len = lexical_core::write(n, slice).len(); + buf.set_len(len0 + len); + } +} diff --git a/common/datavalues/src/types/serializations/string.rs b/common/datavalues/src/types/serializations/string.rs index 466f3c1fbc225..15811c200163d 100644 --- a/common/datavalues/src/types/serializations/string.rs +++ b/common/datavalues/src/types/serializations/string.rs @@ -22,70 +22,64 @@ use serde_json::Value; use crate::prelude::*; -#[derive(Debug, Clone)] -pub struct StringSerializer {} +#[derive(Clone)] +pub struct StringSerializer<'a> { + pub(crate) column: &'a StringColumn, +} -impl TypeSerializer for StringSerializer { - fn serialize_value(&self, value: &DataValue, _format: &FormatSettings) -> Result { - if let DataValue::String(x) = value { - Ok(String::from_utf8_lossy(x).to_string()) - } else { - Err(ErrorCode::BadBytes("Incorrect String value")) - } +impl<'a> StringSerializer<'a> { + pub fn try_create(col: &'a ColumnRef) -> Result { + let column: &StringColumn = Series::check_get(col)?; + Ok(Self { column }) } +} - fn serialize_column( - &self, - column: &ColumnRef, - _format: &FormatSettings, - ) -> Result> { - let column: &StringColumn = Series::check_get(column)?; - let result: Vec = column - .iter() - .map(|v| String::from_utf8_lossy(v).to_string()) - .collect(); - Ok(result) +impl<'a> TypeSerializer<'a> for StringSerializer<'a> { + fn need_quote(&self) -> bool { + true } - - fn serialize_column_quoted( - &self, - column: &ColumnRef, - _format: &FormatSettings, - ) -> Result> { - let column: &StringColumn = Series::check_get(column)?; - let result: Vec = column - .iter() - .map(|v| format!("{:?}", String::from_utf8_lossy(v))) - .collect(); - Ok(result) + fn write_field(&self, row_index: usize, buf: &mut Vec, _format: &FormatSettings) { + buf.extend_from_slice(unsafe { self.column.value_unchecked(row_index) }); } - fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result> { - let column: &StringColumn = Series::check_get(column)?; - let result: Vec = column + fn serialize_json(&self, _format: &FormatSettings) -> Result> { + let result: Vec = self + .column .iter() .map(|x| serde_json::to_value(String::from_utf8_lossy(x).to_string()).unwrap()) .collect(); Ok(result) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_const( + &self, + _format: &FormatSettings, + size: usize, + ) -> Result { + let strings: Vec<&[u8]> = self.column.iter().collect(); + let mut values: Vec<&[u8]> = Vec::with_capacity(self.column.len() * size); + for _ in 0..size { + for v in strings.iter() { + values.push(v) + } + } + Ok(Vec::column_from::(values)) + } + + fn serialize_clickhouse_column( &self, - column: &ColumnRef, _format: &FormatSettings, ) -> Result { - let column: &StringColumn = Series::check_get(column)?; - let values: Vec<&[u8]> = column.iter().collect(); + let values: Vec<&[u8]> = self.column.iter().collect(); Ok(Vec::column_from::(values)) } fn serialize_json_object( &self, - column: &ColumnRef, valids: Option<&Bitmap>, _format: &FormatSettings, ) -> Result> { - let column: &StringColumn = Series::check_get(column)?; + let column = self.column; let mut result: Vec = Vec::new(); for (i, v) in column.iter().enumerate() { if let Some(valids) = valids { @@ -117,11 +111,10 @@ impl TypeSerializer for StringSerializer { fn serialize_json_object_suppress_error( &self, - column: &ColumnRef, _format: &FormatSettings, ) -> Result>> { - let column: &StringColumn = Series::check_get(column)?; - let result: Vec> = column + let result: Vec> = self + .column .iter() .map(|v| match std::str::from_utf8(v) { Ok(v) => match serde_json::from_str::(v) { diff --git a/common/datavalues/src/types/serializations/struct_.rs b/common/datavalues/src/types/serializations/struct_.rs index 54d9b19624742..0d6a51117c28c 100644 --- a/common/datavalues/src/types/serializations/struct_.rs +++ b/common/datavalues/src/types/serializations/struct_.rs @@ -12,67 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Write; use std::sync::Arc; -use common_exception::ErrorCode; use common_exception::Result; use common_io::prelude::FormatSettings; -use itertools::izip; use opensrv_clickhouse::types::column::ArcColumnData; use opensrv_clickhouse::types::column::TupleColumnData; use serde_json::Value; use crate::prelude::*; -#[derive(Debug, Clone)] -pub struct StructSerializer { - pub names: Vec, - pub inners: Vec>, - pub types: Vec, +#[derive(Clone)] +pub struct StructSerializer<'a> { + #[allow(unused)] + pub(crate) names: Vec, + pub(crate) inners: Vec>, + pub(crate) column: &'a ColumnRef, } -impl TypeSerializer for StructSerializer { - fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result { - if let DataValue::Struct(vals) = value { - let mut res = String::new(); - res.push('('); - let mut first = true; +impl<'a> TypeSerializer<'a> for StructSerializer<'a> { + fn write_field(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { + buf.push(b'('); + let mut first = true; - for (val, inner, typ) in izip!(vals, &self.inners, &self.types) { - if !first { - res.push_str(", "); - } - first = false; - - let s = inner.serialize_value(val, format)?; - if typ.data_type_id().is_quoted() { - write!(res, "'{s}'").expect("write to string must succeed"); - } else { - res.push_str(&s); - } + for inner in &self.inners { + if !first { + buf.extend_from_slice(b", "); } - res.push(')'); - Ok(res) - } else { - Err(ErrorCode::BadBytes("Incorrect Struct value")) - } - } - - fn serialize_column(&self, column: &ColumnRef, format: &FormatSettings) -> Result> { - let column: &StructColumn = Series::check_get(column)?; - let mut result = Vec::with_capacity(column.len()); - for i in 0..column.len() { - let val = column.get(i); - let s = self.serialize_value(&val, format)?; - result.push(s); + first = false; + inner.write_field_quoted(row_index, buf, format, b'\''); } - Ok(result) + buf.push(b')'); } - fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result> { - let column: &StructColumn = Series::check_get(column)?; - + fn serialize_json(&self, _format: &FormatSettings) -> Result> { + let column = self.column; let mut result = Vec::with_capacity(column.len()); for i in 0..column.len() { let val = column.get(i); @@ -82,20 +56,27 @@ impl TypeSerializer for StructSerializer { Ok(result) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_const( &self, - column: &ColumnRef, format: &FormatSettings, + size: usize, ) -> Result { - let column: &StructColumn = Series::check_get(column)?; - let result = self - .inners - .iter() - .zip(column.values().iter()) - .map(|(inner, col)| inner.serialize_clickhouse_format(col, format)) - .collect::>>()?; + let mut inner = vec![]; + for s in &self.inners { + inner.push(s.serialize_clickhouse_const(format, size)?) + } + + let data = TupleColumnData { inner }; + Ok(Arc::new(data)) + } + + fn serialize_clickhouse_column(&self, format: &FormatSettings) -> Result { + let mut inner = vec![]; + for s in &self.inners { + inner.push(s.serialize_clickhouse_column(format)?) + } - let data = TupleColumnData { inner: result }; + let data = TupleColumnData { inner }; Ok(Arc::new(data)) } } diff --git a/common/datavalues/src/types/serializations/timestamp.rs b/common/datavalues/src/types/serializations/timestamp.rs index 04ddafcd3c535..abe29ea2a51e5 100644 --- a/common/datavalues/src/types/serializations/timestamp.rs +++ b/common/datavalues/src/types/serializations/timestamp.rs @@ -14,79 +14,85 @@ use chrono::DateTime; use chrono_tz::Tz; -use common_exception::*; +use common_exception::Result; use common_io::prelude::FormatSettings; use opensrv_clickhouse::types::column::ArcColumnWrapper; use opensrv_clickhouse::types::column::ColumnFrom; use serde_json::Value; -use crate::prelude::*; +use crate::ColumnRef; +use crate::DateConverter; +use crate::PrimitiveColumn; +use crate::Series; +use crate::TypeSerializer; -#[derive(Debug, Clone, Default)] -pub struct TimestampSerializer; +const TIME_FMT: &str = "%Y-%m-%d %H:%M:%S"; + +#[derive(Debug, Clone)] +pub struct TimestampSerializer<'a> { + pub(crate) values: &'a [i64], +} -impl TimestampSerializer { +impl<'a> TimestampSerializer<'a> { + pub fn try_create(col: &'a ColumnRef) -> Result { + let col: &PrimitiveColumn = Series::check_get(col)?; + Ok(Self { + values: col.values(), + }) + } pub fn to_timestamp(&self, value: &i64, tz: &Tz) -> DateTime { value.to_timestamp(tz) } } -const TIME_FMT: &str = "%Y-%m-%d %H:%M:%S"; +impl<'a> TypeSerializer<'a> for TimestampSerializer<'a> { + fn need_quote(&self) -> bool { + true + } -impl TypeSerializer for TimestampSerializer { - fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result { - let value = DFTryFrom::try_from(value.clone())?; - let dt = self.to_timestamp(&value, &format.timezone); - Ok(dt.format(TIME_FMT).to_string()) + fn write_field(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { + let dt = self.to_timestamp(&self.values[row_index], &format.timezone); + let s = dt.format(TIME_FMT).to_string(); + buf.extend_from_slice(s.as_bytes()) } - fn serialize_column(&self, column: &ColumnRef, format: &FormatSettings) -> Result> { - let column: &PrimitiveColumn = Series::check_get(column)?; - let result: Vec = column + fn serialize_json(&self, format: &FormatSettings) -> Result> { + let result: Vec = self + .values .iter() .map(|v| { let dt = self.to_timestamp(v, &format.timezone); - dt.format(TIME_FMT).to_string() + serde_json::to_value(dt.format(TIME_FMT).to_string()).unwrap() }) .collect(); Ok(result) } - fn serialize_column_quoted( + fn serialize_clickhouse_const( &self, - column: &ColumnRef, format: &FormatSettings, - ) -> Result> { - let column: &PrimitiveColumn = Series::check_get(column)?; - let result: Vec = column - .iter() - .map(|v| { - let dt = self.to_timestamp(v, &format.timezone); - format!("\"{}\"", dt.format(TIME_FMT)) - }) - .collect(); - Ok(result) - } - - fn serialize_json(&self, column: &ColumnRef, format: &FormatSettings) -> Result> { - let array: &PrimitiveColumn = Series::check_get(column)?; - let result: Vec = array + size: usize, + ) -> Result { + let times: Vec> = self + .values .iter() - .map(|v| { - let dt = self.to_timestamp(v, &format.timezone); - serde_json::to_value(dt.format(TIME_FMT).to_string()).unwrap() - }) + .map(|v| self.to_timestamp(v, &format.timezone)) .collect(); - Ok(result) + let mut values: Vec> = Vec::with_capacity(self.values.len() * size); + for _ in 0..size { + for v in times.iter() { + values.push(*v) + } + } + Ok(Vec::column_from::(values)) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_column( &self, - column: &ColumnRef, format: &FormatSettings, ) -> Result { - let array: &PrimitiveColumn = Series::check_get(column)?; - let values: Vec> = array + let values: Vec> = self + .values .iter() .map(|v| self.to_timestamp(v, &format.timezone)) .collect(); diff --git a/common/datavalues/src/types/serializations/variant.rs b/common/datavalues/src/types/serializations/variant.rs index 175b8fcb492b8..6d4fbd5982ea3 100644 --- a/common/datavalues/src/types/serializations/variant.rs +++ b/common/datavalues/src/types/serializations/variant.rs @@ -24,53 +24,62 @@ use serde_json::Value; use crate::prelude::*; #[derive(Debug, Clone)] -pub struct VariantSerializer {} +pub struct VariantSerializer<'a> { + values: &'a [VariantValue], +} -impl TypeSerializer for VariantSerializer { - fn serialize_value(&self, value: &DataValue, _format: &FormatSettings) -> Result { - if let DataValue::Variant(v) = value { - Ok(v.to_string()) - } else { - Err(ErrorCode::BadBytes("Incorrect Variant value")) - } +impl<'a> VariantSerializer<'a> { + pub fn try_create(col: &'a ColumnRef) -> Result { + let column: &VariantColumn = Series::check_get(col)?; + let values = column.values(); + Ok(Self { values }) } +} - fn serialize_column( - &self, - column: &ColumnRef, - _format: &FormatSettings, - ) -> Result> { - let column: &VariantColumn = Series::check_get(column)?; - let result: Vec = column.iter().map(|v| v.to_string()).collect(); - Ok(result) +impl<'a> TypeSerializer<'a> for VariantSerializer<'a> { + fn write_field(&self, row_index: usize, buf: &mut Vec, _format: &FormatSettings) { + buf.extend_from_slice(self.values[row_index].to_string().as_bytes()); + } + + fn serialize_field(&self, row_index: usize, _format: &FormatSettings) -> Result { + Ok(self.values[row_index].to_string()) } - fn serialize_json(&self, column: &ColumnRef, _format: &FormatSettings) -> Result> { - let column: &VariantColumn = Series::check_get(column)?; - let result: Vec = column.iter().map(|v| v.as_ref().to_owned()).collect(); + fn serialize_json(&self, _format: &FormatSettings) -> Result> { + let result: Vec = self.values.iter().map(|v| v.as_ref().to_owned()).collect(); Ok(result) } - fn serialize_clickhouse_format( + fn serialize_clickhouse_const( &self, - column: &ColumnRef, _format: &FormatSettings, + size: usize, ) -> Result { - let column: &VariantColumn = Series::check_get(column)?; - let values: Vec = column.iter().map(|v| v.to_string()).collect(); + let strings: Vec = self.values.iter().map(|v| v.to_string()).collect(); + let mut values: Vec = Vec::with_capacity(self.values.len() * size); + for _ in 0..size { + for v in strings.iter() { + values.push(v.clone()) + } + } + Ok(Vec::column_from::(values)) + } + fn serialize_clickhouse_column( + &self, + _format: &FormatSettings, + ) -> Result { + let values: Vec = self.values.iter().map(|v| v.to_string()).collect(); Ok(Vec::column_from::(values)) } fn serialize_json_object( &self, - column: &ColumnRef, valids: Option<&Bitmap>, _format: &FormatSettings, ) -> Result> { - let column: &VariantColumn = Series::check_get(column)?; let mut result: Vec = Vec::new(); - for (i, v) in column.iter().enumerate() { + for (i, v) in self.values.iter().enumerate() { if let Some(valids) = valids { if !valids.get_bit(i) { result.push(Value::Null); @@ -95,11 +104,10 @@ impl TypeSerializer for VariantSerializer { fn serialize_json_object_suppress_error( &self, - column: &ColumnRef, _format: &FormatSettings, ) -> Result>> { - let column: &VariantColumn = Series::check_get(column)?; - let result: Vec> = column + let result: Vec> = self + .values .iter() .map(|v| match v.as_ref() { Value::String(v) => match serde_json::from_str::(v.as_str()) { diff --git a/common/datavalues/src/types/type_array.rs b/common/datavalues/src/types/type_array.rs index a947994922d36..dd5fbeb6a84be 100644 --- a/common/datavalues/src/types/type_array.rs +++ b/common/datavalues/src/types/type_array.rs @@ -23,6 +23,8 @@ use super::data_type::DataType; use super::data_type::DataTypeImpl; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::ArraySerializer; +use crate::serializations::TypeSerializerImpl; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub struct ArrayType { @@ -111,12 +113,8 @@ impl DataType for ArrayType { ArrowType::LargeList(Box::new(field)) } - fn create_serializer(&self) -> TypeSerializerImpl { - ArraySerializer { - inner: Box::new(self.inner.create_serializer()), - typ: *self.inner.clone(), - } - .into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(ArraySerializer::try_create(col, &self.inner)?.into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_boolean.rs b/common/datavalues/src/types/type_boolean.rs index 96277e27a1544..b5865b2efebe1 100644 --- a/common/datavalues/src/types/type_boolean.rs +++ b/common/datavalues/src/types/type_boolean.rs @@ -20,6 +20,8 @@ use common_exception::Result; use super::data_type::DataType; use super::type_id::TypeID; pub use crate::prelude::*; +use crate::serializations::BooleanSerializer; +use crate::serializations::TypeSerializerImpl; #[derive(Default, Clone, serde::Deserialize, serde::Serialize)] pub struct BooleanType {} @@ -71,8 +73,8 @@ impl DataType for BooleanType { ArrowType::Boolean } - fn create_serializer(&self) -> TypeSerializerImpl { - BooleanSerializer {}.into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(BooleanSerializer::try_create(col)?.into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_date.rs b/common/datavalues/src/types/type_date.rs index 8ad11bdb32800..187d5774e5396 100644 --- a/common/datavalues/src/types/type_date.rs +++ b/common/datavalues/src/types/type_date.rs @@ -22,6 +22,8 @@ use common_exception::Result; use super::data_type::DataType; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::DateSerializer; +use crate::serializations::TypeSerializerImpl; /// date ranges from 1000-01-01 to 9999-12-31 /// date_max and date_min means days offset from 1970-01-01 @@ -92,8 +94,8 @@ impl DataType for DateType { Some(mp) } - fn create_serializer(&self) -> TypeSerializerImpl { - DateSerializer::::default().into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(DateSerializer::<'a, i32>::try_create(col)?.into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_interval.rs b/common/datavalues/src/types/type_interval.rs index 942d334ab8a9c..bd7c790f50720 100644 --- a/common/datavalues/src/types/type_interval.rs +++ b/common/datavalues/src/types/type_interval.rs @@ -22,6 +22,8 @@ use common_exception::Result; use super::data_type::DataType; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::DateSerializer; +use crate::serializations::TypeSerializerImpl; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub struct IntervalType { @@ -129,8 +131,8 @@ impl DataType for IntervalType { Some(mp) } - fn create_serializer(&self) -> TypeSerializerImpl { - DateSerializer::::default().into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(DateSerializer::<'a, i64>::try_create(col)?.into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_null.rs b/common/datavalues/src/types/type_null.rs index 6b6cc97ea33f3..2834179aebe87 100644 --- a/common/datavalues/src/types/type_null.rs +++ b/common/datavalues/src/types/type_null.rs @@ -15,9 +15,12 @@ use std::sync::Arc; use common_arrow::arrow::datatypes::DataType as ArrowType; +use common_exception::Result; use super::data_type::DataType; use crate::prelude::*; +use crate::serializations::NullSerializer; +use crate::serializations::TypeSerializerImpl; #[derive(Default, Clone, serde::Deserialize, serde::Serialize)] pub struct NullType {} @@ -64,8 +67,8 @@ impl DataType for NullType { ArrowType::Null } - fn create_serializer(&self) -> TypeSerializerImpl { - NullSerializer::default().into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(NullSerializer { size: col.len() }.into()) } fn create_deserializer(&self, _capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_nullable.rs b/common/datavalues/src/types/type_nullable.rs index c47661d826ac0..7e85418dfd762 100644 --- a/common/datavalues/src/types/type_nullable.rs +++ b/common/datavalues/src/types/type_nullable.rs @@ -18,11 +18,14 @@ use std::sync::Arc; use common_arrow::arrow::bitmap::MutableBitmap; use common_arrow::arrow::datatypes::DataType as ArrowType; use common_exception::ErrorCode; +use common_exception::Result; use super::data_type::DataType; use super::data_type::DataTypeImpl; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::NullableSerializer; +use crate::serializations::TypeSerializerImpl; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub struct NullableType { @@ -80,11 +83,13 @@ impl DataType for NullableType { self.inner.custom_arrow_meta() } - fn create_serializer(&self) -> TypeSerializerImpl { - NullableSerializer { - inner: Box::new(self.inner.create_serializer()), + fn create_serializer_inner<'a>(&self, column: &'a ColumnRef) -> Result> { + let column: &NullableColumn = Series::check_get(column)?; + Ok(NullableSerializer { + validity: column.ensure_validity(), + inner: Box::new(self.inner.create_serializer(column.inner())?), } - .into() + .into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_primitive.rs b/common/datavalues/src/types/type_primitive.rs index 3708d63b3b71b..b8ef17c3acd41 100644 --- a/common/datavalues/src/types/type_primitive.rs +++ b/common/datavalues/src/types/type_primitive.rs @@ -21,6 +21,8 @@ use common_exception::Result; use super::data_type::DataType; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::NumberSerializer; +use crate::serializations::TypeSerializerImpl; #[derive(Default, Clone, Copy, serde::Deserialize, serde::Serialize)] @@ -108,8 +110,11 @@ macro_rules! impl_numeric { ArrowType::$tname } - fn create_serializer(&self) -> TypeSerializerImpl { - NumberSerializer::<$ty>::default().into() + fn create_serializer_inner<'a>( + &self, + col: &'a ColumnRef, + ) -> Result> { + Ok(NumberSerializer::<'a, $ty>::try_create(col)?.into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_string.rs b/common/datavalues/src/types/type_string.rs index c3703a7853789..63573d4990ceb 100644 --- a/common/datavalues/src/types/type_string.rs +++ b/common/datavalues/src/types/type_string.rs @@ -15,10 +15,13 @@ use std::sync::Arc; use common_arrow::arrow::datatypes::DataType as ArrowType; +use common_exception::Result; use super::data_type::DataType; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::StringSerializer; +use crate::serializations::TypeSerializerImpl; #[derive(Default, Clone, serde::Deserialize, serde::Serialize)] pub struct StringType {} @@ -71,10 +74,9 @@ impl DataType for StringType { ArrowType::LargeBinary } - fn create_serializer(&self) -> TypeSerializerImpl { - StringSerializer {}.into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(StringSerializer::try_create(col)?.into()) } - fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { StringDeserializer::with_capacity(capacity).into() } diff --git a/common/datavalues/src/types/type_struct.rs b/common/datavalues/src/types/type_struct.rs index ff8012f9d4bd5..7c33600acccf1 100644 --- a/common/datavalues/src/types/type_struct.rs +++ b/common/datavalues/src/types/type_struct.rs @@ -22,6 +22,8 @@ use super::data_type::DataType; use super::data_type::DataTypeImpl; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::StructSerializer; +use crate::serializations::TypeSerializerImpl; #[derive(Default, Clone, serde::Deserialize, serde::Serialize)] pub struct StructType { @@ -96,18 +98,19 @@ impl DataType for StructType { ArrowType::Struct(fields) } - fn create_serializer(&self) -> TypeSerializerImpl { - let inners = self - .types - .iter() - .map(|v| Box::new(v.create_serializer())) - .collect(); - StructSerializer { + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + let column: &StructColumn = Series::check_get(col)?; + let cols = column.values(); + let mut inners = vec![]; + for (t, c) in self.types.iter().zip(cols) { + inners.push(t.create_serializer(c)?) + } + Ok(StructSerializer { names: self.names.clone(), inners, - types: self.types.clone(), + column: col, } - .into() + .into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_timestamp.rs b/common/datavalues/src/types/type_timestamp.rs index 9065430e438f3..361b43b014202 100644 --- a/common/datavalues/src/types/type_timestamp.rs +++ b/common/datavalues/src/types/type_timestamp.rs @@ -27,6 +27,8 @@ use super::data_type::ARROW_EXTENSION_META; use super::data_type::ARROW_EXTENSION_NAME; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::TimestampSerializer; +use crate::serializations::TypeSerializerImpl; /// timestamp ranges from 1000-01-01 00:00:00.000000 to 9999-12-31 23:59:59.999999 /// timestamp_max and timestamp_min means days offset from 1970-01-01 00:00:00.000000 @@ -142,10 +144,9 @@ impl DataType for TimestampType { Some(mp) } - fn create_serializer(&self) -> TypeSerializerImpl { - TimestampSerializer::default().into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(TimestampSerializer::<'a>::try_create(col)?.into()) } - fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { TimestampDeserializer { builder: MutablePrimitiveColumn::::with_capacity(capacity), diff --git a/common/datavalues/src/types/type_variant.rs b/common/datavalues/src/types/type_variant.rs index a20d6030bdaab..3161740826e39 100644 --- a/common/datavalues/src/types/type_variant.rs +++ b/common/datavalues/src/types/type_variant.rs @@ -22,6 +22,8 @@ use super::data_type::DataType; use super::data_type::ARROW_EXTENSION_NAME; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::TypeSerializerImpl; +use crate::serializations::VariantSerializer; #[derive(Default, Clone, serde::Deserialize, serde::Serialize)] pub struct VariantType {} @@ -79,8 +81,8 @@ impl DataType for VariantType { Some(mp) } - fn create_serializer(&self) -> TypeSerializerImpl { - VariantSerializer {}.into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(VariantSerializer::try_create(col)?.into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_variant_array.rs b/common/datavalues/src/types/type_variant_array.rs index 57bee97d50a20..004e76fefa20d 100644 --- a/common/datavalues/src/types/type_variant_array.rs +++ b/common/datavalues/src/types/type_variant_array.rs @@ -22,6 +22,8 @@ use super::data_type::DataType; use super::data_type::ARROW_EXTENSION_NAME; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::TypeSerializerImpl; +use crate::serializations::VariantSerializer; #[derive(Default, Clone, serde::Deserialize, serde::Serialize)] pub struct VariantArrayType {} @@ -79,8 +81,8 @@ impl DataType for VariantArrayType { Some(mp) } - fn create_serializer(&self) -> TypeSerializerImpl { - VariantSerializer {}.into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(VariantSerializer::try_create(col)?.into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/src/types/type_variant_object.rs b/common/datavalues/src/types/type_variant_object.rs index 1974685270805..09d0c94886615 100644 --- a/common/datavalues/src/types/type_variant_object.rs +++ b/common/datavalues/src/types/type_variant_object.rs @@ -22,6 +22,8 @@ use super::data_type::DataType; use super::data_type::ARROW_EXTENSION_NAME; use super::type_id::TypeID; use crate::prelude::*; +use crate::serializations::TypeSerializerImpl; +use crate::serializations::VariantSerializer; #[derive(Default, Clone, serde::Deserialize, serde::Serialize)] pub struct VariantObjectType {} @@ -88,8 +90,8 @@ impl DataType for VariantObjectType { Some(mp) } - fn create_serializer(&self) -> TypeSerializerImpl { - VariantSerializer {}.into() + fn create_serializer_inner<'a>(&self, col: &'a ColumnRef) -> Result> { + Ok(VariantSerializer::try_create(col)?.into()) } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { diff --git a/common/datavalues/tests/it/types/serializations.rs b/common/datavalues/tests/it/types/serializations.rs index 0610bfe9b643e..ade8cf9f18db2 100644 --- a/common/datavalues/tests/it/types/serializations.rs +++ b/common/datavalues/tests/it/types/serializations.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use common_datavalues::prelude::*; +use common_datavalues::serializations::NullSerializer; use common_exception::Result; use common_io::prelude::FormatSettings; use pretty_assertions::assert_eq; @@ -25,7 +26,6 @@ fn test_serializers() -> Result<()> { struct Test { name: &'static str, data_type: DataTypeImpl, - value: DataValue, column: ColumnRef, val_str: &'static str, col_str: Vec, @@ -35,7 +35,6 @@ fn test_serializers() -> Result<()> { Test { name: "boolean", data_type: BooleanType::new_impl(), - value: DataValue::Boolean(true), column: Series::from_data(vec![true, false, true]), val_str: "1", col_str: vec!["1".to_owned(), "0".to_owned(), "1".to_owned()], @@ -43,7 +42,6 @@ fn test_serializers() -> Result<()> { Test { name: "int8", data_type: Int8Type::new_impl(), - value: DataValue::Int64(1), column: Series::from_data(vec![1i8, 2i8, 1]), val_str: "1", col_str: vec!["1".to_owned(), "2".to_owned(), "1".to_owned()], @@ -51,7 +49,6 @@ fn test_serializers() -> Result<()> { Test { name: "datetime32", data_type: TimestampType::new_impl(0), - value: DataValue::UInt64(1630320462000000), column: Series::from_data(vec![1630320462000000i64, 1637117572000000i64, 1000000]), val_str: "2021-08-30 10:47:42", col_str: vec![ @@ -63,7 +60,6 @@ fn test_serializers() -> Result<()> { Test { name: "date32", data_type: DateType::new_impl(), - value: DataValue::Int64(18869), column: Series::from_data(vec![18869i32, 18948i32, 1]), val_str: "2021-08-30", col_str: vec![ @@ -75,7 +71,6 @@ fn test_serializers() -> Result<()> { Test { name: "string", data_type: StringType::new_impl(), - value: DataValue::String("hello".as_bytes().to_vec()), column: Series::from_data(vec!["hello", "world", "NULL"]), val_str: "hello", col_str: vec!["hello".to_owned(), "world".to_owned(), "NULL".to_owned()], @@ -83,16 +78,12 @@ fn test_serializers() -> Result<()> { Test { name: "array", data_type: DataTypeImpl::Array(ArrayType::create(StringType::new_impl())), - value: DataValue::Array(vec![ - DataValue::String("data".as_bytes().to_vec()), - DataValue::String("bend".as_bytes().to_vec()), - ]), column: Arc::new(ArrayColumn::from_data( DataTypeImpl::Array(ArrayType::create(StringType::new_impl())), vec![0, 1, 3, 6].into(), Series::from_data(vec!["test", "data", "bend", "hello", "world", "NULL"]), )), - val_str: "['data', 'bend']", + val_str: "['test']", col_str: vec![ "['test']".to_owned(), "['data', 'bend']".to_owned(), @@ -105,7 +96,6 @@ fn test_serializers() -> Result<()> { vec!["date".to_owned(), "integer".to_owned()], vec![DateType::new_impl(), Int8Type::new_impl()], )), - value: DataValue::Struct(vec![DataValue::Int64(18869), DataValue::Int64(1)]), column: Arc::new(StructColumn::from_data( vec![ Series::from_data(vec![18869i32, 18948i32, 1]), @@ -126,7 +116,6 @@ fn test_serializers() -> Result<()> { Test { name: "variant", data_type: VariantType::new_impl(), - value: DataValue::Variant(VariantValue::from(json!(true))), column: Arc::new(VariantColumn::new_from_vec(vec![ VariantValue::from(json!(null)), VariantValue::from(json!(true)), @@ -134,7 +123,7 @@ fn test_serializers() -> Result<()> { VariantValue::from(json!(123)), VariantValue::from(json!(12.34)), ])), - val_str: "true", + val_str: "null", col_str: vec![ "null".to_owned(), "true".to_owned(), @@ -147,11 +136,14 @@ fn test_serializers() -> Result<()> { let format = FormatSettings::default(); for test in tests { - let serializer = test.data_type.create_serializer(); - let val_res = serializer.serialize_value(&test.value, &format)?; + let serializer = test.data_type.create_serializer(&test.column)?; + let val_res = serializer.serialize_field(0, &format)?; assert_eq!(&val_res, test.val_str, "case: {:#?}", test.name); - let col_res = serializer.serialize_column(&test.column, &format)?; + let mut col_res = vec![]; + for i in 0..test.column.len() { + col_res.push(serializer.serialize_field(i, &format)?); + } assert_eq!(col_res, test.col_str, "case: {:#?}", test.name); } @@ -170,14 +162,17 @@ fn test_serializers() -> Result<()> { DateType::new_impl(), ], ); - let serializer = data_type.create_serializer(); - let value = DataValue::Struct(vec![ - DataValue::Float64(1.2), - DataValue::String("hello".as_bytes().to_vec()), - DataValue::Boolean(true), - DataValue::UInt64(18869), - ]); - let result = serializer.serialize_value(&value, &format)?; + let column: ColumnRef = Arc::new(StructColumn::from_data( + vec![ + Series::from_data(vec![1.2f64]), + Series::from_data(vec!["hello"]), + Series::from_data(vec![true]), + Series::from_data(vec![18869i32]), + ], + DataTypeImpl::Struct(data_type), + )); + let serializer = column.data_type().create_serializer(&column)?; + let result = serializer.serialize_field(0, &format)?; let expect = "(1.2, 'hello', 1, '2021-08-30')"; assert_eq!(&result, expect); } @@ -196,8 +191,8 @@ fn test_convert_arrow() { #[test] fn test_enum_dispatch() -> Result<()> { - let c = StringSerializer {}; + let c = NullSerializer { size: 0 }; let d: TypeSerializerImpl = c.into(); - let _: StringSerializer = d.try_into()?; + let _: NullSerializer = d.try_into()?; Ok(()) } diff --git a/common/functions/src/scalars/expressions/cast_with_type.rs b/common/functions/src/scalars/expressions/cast_with_type.rs index dded41ded4c5a..74eaf762ef869 100644 --- a/common/functions/src/scalars/expressions/cast_with_type.rs +++ b/common/functions/src/scalars/expressions/cast_with_type.rs @@ -234,9 +234,9 @@ pub fn cast_to_variant( } let mut builder = ColumnBuilder::::with_capacity(size); if from_type.data_type_id().is_numeric() || from_type.data_type_id() == TypeID::Boolean { - let serializer = from_type.create_serializer(); + let serializer = from_type.create_serializer(&column)?; let format = FormatSettings::default(); - match serializer.serialize_json_object(&column, None, &format) { + match serializer.serialize_json_object(None, &format) { Ok(values) => { for v in values { builder.append(&VariantValue::from(v)); diff --git a/common/functions/src/scalars/semi_structureds/json_extract_path_text.rs b/common/functions/src/scalars/semi_structureds/json_extract_path_text.rs index 85c353edeca7a..72bb7572ad7e1 100644 --- a/common/functions/src/scalars/semi_structureds/json_extract_path_text.rs +++ b/common/functions/src/scalars/semi_structureds/json_extract_path_text.rs @@ -74,10 +74,10 @@ impl Function for JsonExtractPathTextFunction { let path_keys = parse_path_keys(columns[1].column())?; let data_type = columns[0].field().data_type(); - let serializer = data_type.create_serializer(); + let serializer = data_type.create_serializer(columns[0].column())?; // TODO(veeupup): check if we can use default format_settings let format = FormatSettings::default(); - let values = serializer.serialize_json_object(columns[0].column(), None, &format)?; + let values = serializer.serialize_json_object(None, &format)?; let mut builder = NullableColumnBuilder::::with_capacity(input_rows); if columns[0].column().is_const() { diff --git a/common/functions/src/scalars/semi_structureds/parse_json.rs b/common/functions/src/scalars/semi_structureds/parse_json.rs index b53edb3a97ad1..f37692125a0c1 100644 --- a/common/functions/src/scalars/semi_structureds/parse_json.rs +++ b/common/functions/src/scalars/semi_structureds/parse_json.rs @@ -102,8 +102,8 @@ impl Function for ParseJsonFunctionImpl { for v in values { match v { @@ -134,8 +134,8 @@ impl Function for ParseJsonFunctionImpl { for (i, v) in values.iter().enumerate() { if let Some(valids) = valids { @@ -164,10 +164,10 @@ impl Function for ParseJsonFunctionImpl { for v in values { builder.append(&VariantValue::from(v)); diff --git a/common/io/src/format_settings.rs b/common/io/src/format_settings.rs index 12569661afd4b..519af7416408f 100644 --- a/common/io/src/format_settings.rs +++ b/common/io/src/format_settings.rs @@ -28,6 +28,8 @@ pub struct FormatSettings { pub skip_header: bool, pub compression: Compression, pub timezone: Tz, + pub true_bytes: Vec, + pub false_bytes: Vec, } impl Default for FormatSettings { @@ -39,6 +41,8 @@ impl Default for FormatSettings { skip_header: false, compression: Compression::None, timezone: "UTC".parse::().unwrap(), + true_bytes: vec![b'1'], + false_bytes: vec![b'0'], } } } diff --git a/query/src/formats/output_format.rs b/query/src/formats/output_format.rs index 724ecbe763ecb..eae195583ac92 100644 --- a/query/src/formats/output_format.rs +++ b/query/src/formats/output_format.rs @@ -25,13 +25,14 @@ use super::output_format_parquet::ParquetOutputFormat; use super::output_format_values::ValuesOutputFormat; use crate::formats::output_format_csv::CSVOutputFormat; use crate::formats::output_format_csv::TSVOutputFormat; - pub trait OutputFormat: Send { fn serialize_block( &mut self, - data_block: &DataBlock, - format_setting: &FormatSettings, - ) -> Result>; + _data_block: &DataBlock, + _format_setting: &FormatSettings, + ) -> Result> { + unimplemented!() + } fn finalize(&mut self) -> Result>; } diff --git a/query/src/formats/output_format_csv.rs b/query/src/formats/output_format_csv.rs index 9f06e06ca4dd2..869ec59ca3caf 100644 --- a/query/src/formats/output_format_csv.rs +++ b/query/src/formats/output_format_csv.rs @@ -14,9 +14,7 @@ use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; -use common_datavalues::DataType; use common_datavalues::TypeSerializer; -use common_exception::ErrorCode; use common_exception::Result; use common_io::prelude::FormatSettings; @@ -40,31 +38,9 @@ impl TCSVOutputFormat { impl OutputFormat for TCSVOutputFormat { fn serialize_block(&mut self, block: &DataBlock, format: &FormatSettings) -> Result> { let rows_size = block.column(0).len(); - let columns_size = block.num_columns(); let mut buf = Vec::with_capacity(block.memory_size()); - let mut col_table = Vec::new(); - for col_index in 0..columns_size { - let column = block.column(col_index); - let column = column.convert_full_column(); - let field = block.schema().field(col_index); - let data_type = field.data_type(); - let serializer = data_type.create_serializer(); - - let res = if TSV { - serializer.serialize_column(&column, format) - } else { - serializer.serialize_column_quoted(&column, format) - }; - let res = res.map_err(|e| { - ErrorCode::UnexpectedError(format!( - "fail to serialize field {}, error = {}", - field.name(), - e - )) - })?; - col_table.push(res) - } + let serializers = block.get_serializers()?; let fd = if TSV { FIELD_DELIMITER @@ -79,17 +55,20 @@ impl OutputFormat for TCSVOutputFormat { }; for row_index in 0..rows_size { - for (i, col) in col_table.iter().enumerate() { - if i != 0 { + for (col_index, serializer) in serializers.iter().enumerate() { + if col_index != 0 { buf.push(fd); } - buf.extend_from_slice(col[row_index].as_bytes()); + if TSV { + serializer.write_field(row_index, &mut buf, format); + } else { + serializer.write_field_quoted(row_index, &mut buf, format, b'\"') + }; } - buf.push(rd); + buf.push(rd) } Ok(buf) } - fn finalize(&mut self) -> Result> { Ok(vec![]) } diff --git a/query/src/formats/output_format_ndjson.rs b/query/src/formats/output_format_ndjson.rs index 1ddbe9a6ead1e..b3a39d170997a 100644 --- a/query/src/formats/output_format_ndjson.rs +++ b/query/src/formats/output_format_ndjson.rs @@ -14,60 +14,48 @@ use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; -use common_datavalues::DataType; use common_datavalues::TypeSerializer; -use common_datavalues::TypeSerializerImpl; use common_exception::Result; use common_io::prelude::FormatSettings; use crate::formats::output_format::OutputFormat; #[derive(Default)] -pub struct NDJsonOutputFormat { - serializers: Vec, -} +pub struct NDJsonOutputFormat {} impl NDJsonOutputFormat { - pub fn create(schema: DataSchemaRef) -> Self { - let serializers: Vec<_> = schema - .fields() - .iter() - .map(|field| field.data_type().create_serializer()) - .collect(); - - Self { serializers } + pub fn create(_schema: DataSchemaRef) -> Self { + Self {} } } impl OutputFormat for NDJsonOutputFormat { fn serialize_block(&mut self, block: &DataBlock, format: &FormatSettings) -> Result> { let rows_size = block.column(0).len(); - let columns_size = block.num_columns(); - - assert_eq!(self.serializers.len(), columns_size); let mut buf = Vec::with_capacity(block.memory_size()); - let mut col_table = Vec::new(); - for col_index in 0..columns_size { - let column = block.column(col_index).convert_full_column(); - let res = self.serializers[col_index].serialize_column_quoted(&column, format)?; - col_table.push(res) - } + let serializers = block.get_serializers()?; + let field_names: Vec<_> = block + .schema() + .fields() + .iter() + .map(|f| f.name().as_bytes()) + .collect(); for row_index in 0..rows_size { - for (i, (col, field)) in col_table.iter().zip(block.schema().fields()).enumerate() { - if i != 0 { + for (col_index, serializer) in serializers.iter().enumerate() { + if col_index != 0 { buf.push(b','); } else { buf.push(b'{') } buf.push(b'"'); - buf.extend_from_slice(field.name().as_bytes()); + buf.extend_from_slice(field_names[col_index]); buf.push(b'"'); buf.push(b':'); - buf.extend_from_slice(col[row_index].as_bytes()); + serializer.write_field_quoted(row_index, &mut buf, format, b'\"'); } buf.extend_from_slice("}\n".as_bytes()); } diff --git a/query/src/formats/output_format_values.rs b/query/src/formats/output_format_values.rs index 478107c0a7d18..a1ccdbec27294 100644 --- a/query/src/formats/output_format_values.rs +++ b/query/src/formats/output_format_values.rs @@ -14,56 +14,38 @@ use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; -use common_datavalues::DataType; use common_datavalues::TypeSerializer; -use common_datavalues::TypeSerializerImpl; use common_exception::Result; use common_io::prelude::FormatSettings; use crate::formats::output_format::OutputFormat; #[derive(Default)] -pub struct ValuesOutputFormat { - serializers: Vec, -} +pub struct ValuesOutputFormat {} impl ValuesOutputFormat { - pub fn create(schema: DataSchemaRef) -> Self { - let serializers: Vec<_> = schema - .fields() - .iter() - .map(|field| field.data_type().create_serializer()) - .collect(); - - Self { serializers } + pub fn create(_schema: DataSchemaRef) -> Self { + Self {} } } impl OutputFormat for ValuesOutputFormat { fn serialize_block(&mut self, block: &DataBlock, format: &FormatSettings) -> Result> { let rows_size = block.column(0).len(); - let columns_size = block.num_columns(); - - assert_eq!(self.serializers.len(), columns_size); let mut buf = Vec::with_capacity(block.memory_size()); - let mut col_table = Vec::new(); - for col_index in 0..columns_size { - let column = block.column(col_index).convert_full_column(); - let res = self.serializers[col_index].serialize_column_quoted(&column, format)?; - col_table.push(res) - } + let serializers = block.get_serializers()?; for row_index in 0..rows_size { if row_index != 0 { buf.push(b','); } buf.push(b'('); - for (i, col) in col_table.iter().enumerate() { + for (i, serializer) in serializers.iter().enumerate() { if i != 0 { buf.push(b','); } - buf.extend_from_slice(col[row_index].as_bytes()); + serializer.write_field(row_index, &mut buf, format); } buf.push(b')'); } diff --git a/query/src/servers/clickhouse/writers/query_writer.rs b/query/src/servers/clickhouse/writers/query_writer.rs index acb00eb3574c1..dc8ccb87ed71c 100644 --- a/query/src/servers/clickhouse/writers/query_writer.rs +++ b/query/src/servers/clickhouse/writers/query_writer.rs @@ -149,12 +149,13 @@ pub fn to_clickhouse_block(block: DataBlock, format: &FormatSettings) -> Result< for column_index in 0..block.num_columns() { let column = block.column(column_index); + let column = &column.convert_full_column(); let field = block.schema().field(column_index); let name = field.name(); - let serializer = field.data_type().create_serializer(); + let serializer = field.data_type().create_serializer(column)?; result.append_column(column::new_column( name, - serializer.serialize_clickhouse_format(&column.convert_full_column(), format)?, + serializer.serialize_clickhouse_column(format)?, )); } Ok(result) diff --git a/query/src/servers/http/v1/http_query_handlers.rs b/query/src/servers/http/v1/http_query_handlers.rs index 3679de00f39eb..b0470887f94f3 100644 --- a/query/src/servers/http/v1/http_query_handlers.rs +++ b/query/src/servers/http/v1/http_query_handlers.rs @@ -226,7 +226,11 @@ pub(crate) async fn query_handler( let query = http_query_manager.try_create_query(ctx, req).await; // TODO(veeupup): get global query_ctx's format_settings, because we cann't set session settings now - let format = FormatSettings::default(); + let format = FormatSettings { + false_bytes: vec![b'f', b'a', b'l', b's', b'e'], + true_bytes: vec![b't', b'r', b'u', b'e'], + ..Default::default() + }; match query { Ok(query) => { let resp = query diff --git a/query/src/servers/http/v1/json_block.rs b/query/src/servers/http/v1/json_block.rs index 60c0da8b06182..9227c78028c07 100644 --- a/query/src/servers/http/v1/json_block.rs +++ b/query/src/servers/http/v1/json_block.rs @@ -46,8 +46,8 @@ pub fn block_to_json_value_columns( let column = column.convert_full_column(); let field = block.schema().field(col_index); let data_type = field.data_type(); - let serializer = data_type.create_serializer(); - col_table.push(serializer.serialize_json(&column, format).map_err(|e| { + let serializer = data_type.create_serializer(&column)?; + col_table.push(serializer.serialize_json(format).map_err(|e| { ErrorCode::UnexpectedError(format!( "fail to serialize filed {}, error = {}", field.name(), @@ -61,11 +61,44 @@ pub fn block_to_json_value_columns( pub fn block_to_json_value( block: &DataBlock, format: &FormatSettings, + string_fields: bool, +) -> Result>> { + if string_fields { + block_to_json_value_string_fields(block, format) + } else { + block_to_json_value_ast(block, format) + } +} + +fn block_to_json_value_ast( + block: &DataBlock, + format: &FormatSettings, ) -> Result>> { let cols = block_to_json_value_columns(block, format)?; Ok(transpose(cols)) } +fn block_to_json_value_string_fields( + block: &DataBlock, + format: &FormatSettings, +) -> Result>> { + if block.is_empty() { + return Ok(vec![]); + } + let rows_size = block.column(0).len(); + let mut res = Vec::new(); + let serializers = block.get_serializers()?; + for row_index in 0..rows_size { + let mut row: Vec = Vec::with_capacity(block.num_columns()); + for serializer in serializers.iter() { + let s = serializer.serialize_field(row_index, format)?; + row.push(serde_json::to_value(s)?) + } + res.push(row) + } + Ok(res) +} + impl JsonBlock { pub fn empty() -> Self { Self { @@ -74,9 +107,9 @@ impl JsonBlock { } } - pub fn new(block: &DataBlock, format: &FormatSettings) -> Result { + pub fn new(block: &DataBlock, format: &FormatSettings, string_fields: bool) -> Result { Ok(JsonBlock { - data: block_to_json_value(block, format)?, + data: block_to_json_value(block, format, string_fields)?, schema: block.schema().clone(), }) } diff --git a/query/src/servers/http/v1/query/http_query.rs b/query/src/servers/http/v1/query/http_query.rs index 221f4d1f0a527..dc6df1704b7a8 100644 --- a/query/src/servers/http/v1/query/http_query.rs +++ b/query/src/servers/http/v1/query/http_query.rs @@ -47,6 +47,8 @@ pub struct HttpQueryRequest { pub sql: String, #[serde(default)] pub pagination: PaginationConf, + #[serde(default)] + pub string_fields: bool, } const DEFAULT_MAX_ROWS_IN_BUFFER: usize = 5 * 1000 * 1000; @@ -209,6 +211,7 @@ impl HttpQuery { let data = Arc::new(TokioMutex::new(PageManager::new( request.pagination.max_rows_per_page, block_buffer, + request.string_fields, ))); let query = HttpQuery { id, diff --git a/query/src/servers/http/v1/query/page_manager.rs b/query/src/servers/http/v1/query/page_manager.rs index 73aec62e2e399..ec70156fcd3c7 100644 --- a/query/src/servers/http/v1/query/page_manager.rs +++ b/query/src/servers/http/v1/query/page_manager.rs @@ -55,10 +55,15 @@ pub struct PageManager { last_page: Option, page_buffer: VecDeque>>, block_buffer: Arc, + string_fields: bool, } impl PageManager { - pub fn new(max_rows_per_page: usize, block_buffer: Arc) -> PageManager { + pub fn new( + max_rows_per_page: usize, + block_buffer: Arc, + string_fields: bool, + ) -> PageManager { PageManager { total_rows: 0, last_page: None, @@ -69,6 +74,7 @@ impl PageManager { schema: Arc::new(DataSchema::empty()), block_buffer, max_rows_per_page, + string_fields, } } @@ -132,7 +138,9 @@ impl PageManager { if self.schema.fields().is_empty() { self.schema = block.schema().clone(); } - let mut iter = block_to_json_value(&block, format)?.into_iter().peekable(); + let mut iter = block_to_json_value(&block, format, self.string_fields)? + .into_iter() + .peekable(); if res.is_empty() { let mut chunk = iter.by_ref().take(self.max_rows_per_page).collect(); res.append(&mut chunk); diff --git a/query/src/servers/mysql/writers/query_result_writer.rs b/query/src/servers/mysql/writers/query_result_writer.rs index ae1e8c6628077..b73cf0eafd0f3 100644 --- a/query/src/servers/mysql/writers/query_result_writer.rs +++ b/query/src/servers/mysql/writers/query_result_writer.rs @@ -118,13 +118,13 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> { match convert_schema(block.schema()) { Err(error) => Self::err(&error, dataset_writer), Ok(columns) => { - let columns_size = block.num_columns(); let mut row_writer = dataset_writer.start(&columns)?; for block in &blocks { + let serializers = block.get_serializers()?; let rows_size = block.column(0).len(); for row_index in 0..rows_size { - for col_index in 0..columns_size { + for (col_index, serializer) in serializers.iter().enumerate() { let val = block.column(col_index).get_checked(row_index)?; if val.is_null() { row_writer.write_col(None::)?; @@ -155,36 +155,25 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> { (TypeID::String, DataValue::String(v)) => { row_writer.write_col(v)? } - (TypeID::Array, DataValue::Array(_)) => { - let serializer = data_type.create_serializer(); - row_writer - .write_col(serializer.serialize_value(&val, format)?)? - } - (TypeID::Struct, DataValue::Struct(_)) => { - let serializer = data_type.create_serializer(); - row_writer - .write_col(serializer.serialize_value(&val, format)?)? - } - (TypeID::Variant, DataValue::Variant(_)) => { - let serializer = data_type.create_serializer(); - row_writer - .write_col(serializer.serialize_value(&val, format)?)? - } - (TypeID::VariantArray, DataValue::Variant(_)) => { - let serializer = data_type.create_serializer(); - row_writer - .write_col(serializer.serialize_value(&val, format)?)? - } - (TypeID::VariantObject, DataValue::Variant(_)) => { - let serializer = data_type.create_serializer(); - row_writer - .write_col(serializer.serialize_value(&val, format)?)? - } + (TypeID::Array, DataValue::Array(_)) => row_writer + .write_col(serializer.serialize_field(row_index, format)?)?, + (TypeID::Struct, DataValue::Struct(_)) => row_writer + .write_col(serializer.serialize_field(row_index, format)?)?, + (TypeID::Variant, DataValue::Variant(_)) => row_writer + .write_col(serializer.serialize_field(row_index, format)?)?, + (TypeID::VariantArray, DataValue::Variant(_)) => row_writer + .write_col(serializer.serialize_field(row_index, format)?)?, + (TypeID::VariantObject, DataValue::Variant(_)) => row_writer + .write_col(serializer.serialize_field(row_index, format)?)?, (_, DataValue::Int64(v)) => row_writer.write_col(v)?, (_, DataValue::UInt64(v)) => row_writer.write_col(v)?, - (_, DataValue::Float64(v)) => row_writer.write_col(v)?, + (_, DataValue::Float64(_)) => row_writer + // mysql writer use a text protocol, + // it use format!() to serialize number, + // the result will be different with our serializer for floats + .write_col(serializer.serialize_field(row_index, format)?)?, (_, v) => { return Err(ErrorCode::BadDataValueType(format!( "Unsupported column type:{:?}, expected type in schema: {:?}", diff --git a/query/src/sessions/query_ctx_shared.rs b/query/src/sessions/query_ctx_shared.rs index e39ca1d937bc9..7376a29d67fbc 100644 --- a/query/src/sessions/query_ctx_shared.rs +++ b/query/src/sessions/query_ctx_shared.rs @@ -35,6 +35,7 @@ use crate::catalogs::CatalogManager; use crate::clusters::Cluster; use crate::servers::http::v1::HttpQueryHandle; use crate::sessions::Session; +use crate::sessions::SessionType; use crate::sessions::Settings; use crate::sql::SQLCommon; use crate::storages::Table; @@ -279,6 +280,10 @@ impl QueryContextShared { pub fn get_format_settings(&self) -> Result { let settings = self.get_settings(); let mut format = FormatSettings::default(); + if let SessionType::HTTPQuery = self.session.get_type() { + format.false_bytes = vec![b'f', b'a', b'l', b's', b'e']; + format.true_bytes = vec![b't', b'r', b'u', b'e']; + } { format.record_delimiter = settings.get_record_delimiter()?; format.field_delimiter = settings.get_field_delimiter()?; diff --git a/query/tests/it/formats/output_format_tcsv.rs b/query/tests/it/formats/output_format_tcsv.rs index 6cdc7c160c1cd..8ebd41afb8829 100644 --- a/query/tests/it/formats/output_format_tcsv.rs +++ b/query/tests/it/formats/output_format_tcsv.rs @@ -68,11 +68,11 @@ fn test_data_block(is_nullable: bool) -> Result<()> { let mut formater = fmt.create_format(schema.clone()); let buffer = formater.serialize_block(&block, &format_setting)?; - let json_block = String::from_utf8(buffer)?; + let csv_block = String::from_utf8(buffer)?; let expect = "1\ta\t1\t1.1\t1970-01-02\n\ 2\tb\t1\t2.2\t1970-01-03\n\ 3\tc\t0\t3.3\t1970-01-04\n"; - assert_eq!(&json_block, expect); + assert_eq!(&csv_block, expect); } { diff --git a/query/tests/it/servers/http/json_block.rs b/query/tests/it/servers/http/json_block.rs index e36d4a48f573d..705a2cfd89fb5 100644 --- a/query/tests/it/servers/http/json_block.rs +++ b/query/tests/it/servers/http/json_block.rs @@ -69,7 +69,7 @@ fn test_data_block(is_nullable: bool) -> Result<()> { block }; let format = FormatSettings::default(); - let json_block = JsonBlock::new(&block, &format)?; + let json_block = JsonBlock::new(&block, &format, false)?; let expect = vec![ vec![ val(1_i32), @@ -112,7 +112,7 @@ fn test_data_block_not_nullable() -> Result<()> { fn test_empty_block() -> Result<()> { let block = DataBlock::empty(); let format = FormatSettings::default(); - let json_block = JsonBlock::new(&block, &format)?; + let json_block = JsonBlock::new(&block, &format, false)?; assert!(json_block.is_empty()); Ok(()) } diff --git a/tests/logictest/http_connector.py b/tests/logictest/http_connector.py index 18e49e99d688b..7dcb35a5557d2 100644 --- a/tests/logictest/http_connector.py +++ b/tests/logictest/http_connector.py @@ -132,7 +132,7 @@ def parseSQL(sql): return sql # do nothing log.debug("http sql: " + parseSQL(statement)) - query_sql = {'sql': parseSQL(statement)} + query_sql = {'sql': parseSQL(statement), "string_fields": True} if session is not None: query_sql['session'] = session log.debug("http headers {}".format(self.make_headers())) diff --git a/tests/logictest/http_runner.py b/tests/logictest/http_runner.py index 614c320818c3d..4ef9dd0410aa2 100644 --- a/tests/logictest/http_runner.py +++ b/tests/logictest/http_runner.py @@ -35,7 +35,7 @@ def execute_error(self, statement): def execute_query(self, statement): results = self.get_connection().fetch_all(statement.text) - query_type = statement.s_type.query_type + # query_type = statement.s_type.query_type vals = [] for (ri, row) in enumerate(results): for (i, v) in enumerate(row): @@ -43,34 +43,35 @@ def execute_query(self, statement): vals.append("NULL") continue - if query_type[i] == 'I': - if not isinstance(v, int): - log.error( - "Expected int, got type {} in query {} row {} col {} value {}" - .format(type(v), statement.text, ri, i, v)) - elif query_type[i] == 'F' or query_type[i] == 'R': - if not isinstance(v, float): - log.error( - "Expected float, got type {} in query {} row {} col {} value {}" - .format(type(v), statement.text, ri, i, v)) - elif query_type[i] == 'T': - # include data, timestamp, dict, list ... - if not (isinstance(v, str) or isinstance(v, dict) or - isinstance(v, list)): - log.error( - "Expected string, got type {} in query {} row {} col {} value {}" - .format(type(v), statement.text, ri, i, v)) - elif query_type[i] == 'B': - if not isinstance(v, bool): - log.error( - "Expected bool, got type {} in query {} row {} col {} value {}" - .format(type(v), statement.text, ri, i, v)) - else: - log.error( - "Unknown type {} in query {} row {} col {} value {}". - format(query_type[i], statement.text, ri, i, v)) - if isinstance(v, bool): - v = str(v).lower( - ) # bool to string in python will be True/False + # todo(youngsofun) : check the schema instead + # if query_type[i] == 'I': + # if not isinstance(v, int): + # log.error( + # "Expected int, got type {} in query {} row {} col {} value {}" + # .format(type(v), statement.text, ri, i, v)) + # elif query_type[i] == 'F' or query_type[i] == 'R': + # if not isinstance(v, float): + # log.error( + # "Expected float, got type {} in query {} row {} col {} value {}" + # .format(type(v), statement.text, ri, i, v)) + # elif query_type[i] == 'T': + # # include data, timestamp, dict, list ... + # if not (isinstance(v, str) or isinstance(v, dict) or + # isinstance(v, list)): + # log.error( + # "Expected string, got type {} in query {} row {} col {} value {}" + # .format(type(v), statement.text, ri, i, v)) + # elif query_type[i] == 'B': + # if not isinstance(v, bool): + # log.error( + # "Expected bool, got type {} in query {} row {} col {} value {}" + # .format(type(v), statement.text, ri, i, v)) + # else: + # log.error( + # "Unknown type {} in query {} row {} col {} value {}". + # format(query_type[i], statement.text, ri, i, v)) + # if isinstance(v, bool): + # v = str(v).lower( + # ) # bool to string in python will be True/False vals.append(str(v)) return vals diff --git a/tests/logictest/suites/gen/02_function/02_0000_function_aggreate_on_empty b/tests/logictest/suites/gen/02_function/02_0000_function_aggreate_on_empty index 2f090977a28e5..2490c7d31a752 100644 --- a/tests/logictest/suites/gen/02_function/02_0000_function_aggreate_on_empty +++ b/tests/logictest/suites/gen/02_function/02_0000_function_aggreate_on_empty @@ -165,7 +165,8 @@ select avg(number) from numbers_mt (10) where 1 = 2; nan ---- http -NULL +NaN + statement query F label(mysql,http) select avg(1) from numbers_mt (10) where 1=2; @@ -174,7 +175,7 @@ select avg(1) from numbers_mt (10) where 1=2; nan ---- http -NULL +NaN statement ok select avg(number) from numbers_mt (10) where 1 = 2 group by number % 2; diff --git a/tests/logictest/suites/gen/02_function/02_0002_function_cast b/tests/logictest/suites/gen/02_function/02_0002_function_cast index c035703d55658..c76f155b5c53b 100644 --- a/tests/logictest/suites/gen/02_function/02_0002_function_cast +++ b/tests/logictest/suites/gen/02_function/02_0002_function_cast @@ -464,7 +464,7 @@ statement query F SELECT parse_json(12.34)::float32; ---- -12.34000015258789 +12.34 statement query F SELECT parse_json(1234.5678)::float64; @@ -476,7 +476,7 @@ statement query F SELECT parse_json('"12.34"')::float32; ---- -12.34000015258789 +12.34 statement query F SELECT parse_json('"1234.5678"')::float64; @@ -529,27 +529,21 @@ SELECT parse_json(1)::array; ---- [1] -statement query T label(mysql,http) +statement query T SELECT parse_json('"ab"')::array; ----- mysql +---- ["ab"] ----- http -['ab'] - statement error 1010 SELECT parse_json('null')::array; -statement query T label(mysql,http) +statement query T SELECT parse_json('{"a":1,"b":2}')::object; ----- mysql +---- {"a":1,"b":2} ----- http -{'a': 1, 'b': 2} - statement error 1010 SELECT parse_json('"abc"')::object; @@ -558,4 +552,3 @@ SELECT parse_json('[1,2,3]')::object; statement error 1010 SELECT parse_json('null')::object; - diff --git a/tests/logictest/suites/gen/02_function/02_0014_function_maths b/tests/logictest/suites/gen/02_function/02_0014_function_maths index 7fef7c69b38d7..7ff1de287b8fb 100644 --- a/tests/logictest/suites/gen/02_function/02_0014_function_maths +++ b/tests/logictest/suites/gen/02_function/02_0014_function_maths @@ -110,15 +110,12 @@ SELECT ln(NULL); ---- NULL -statement query F label(mysql,http) +statement query F SELECT ln(1, 2); ----- mysql +---- inf ----- http -NULL - statement query T SELECT log10(NULL); @@ -223,15 +220,12 @@ SELECT tan(pi()/4); ---- 0.9999999999999999 -statement query F label(mysql,http) +statement query F SELECT cot(0); ----- mysql +---- inf ----- http -NULL - statement query F SELECT cot(pi()/4); @@ -251,7 +245,7 @@ SELECT asin(1.1); nan ---- http -NULL +NaN statement query F SELECT acos(1); @@ -266,7 +260,7 @@ SELECT acos(1.0001); nan ---- http -NULL +NaN statement query F SELECT atan(1); @@ -355,7 +349,7 @@ SELECT sqrt(-4); nan ---- http -NULL +NaN statement error 1007 SELECT sqrt('a'); diff --git a/tests/logictest/suites/gen/02_function/02_0048_function_semi_structureds_parse_json b/tests/logictest/suites/gen/02_function/02_0048_function_semi_structureds_parse_json index be59777338317..1361c2fa00986 100644 --- a/tests/logictest/suites/gen/02_function/02_0048_function_semi_structureds_parse_json +++ b/tests/logictest/suites/gen/02_function/02_0048_function_semi_structureds_parse_json @@ -28,15 +28,12 @@ select parse_json(12.34); ---- 12.34 -statement query T label(mysql,http) +statement query T select parse_json('null'); ----- mysql +---- null ----- http -NULL - statement query T select parse_json('true'); @@ -61,33 +58,24 @@ select parse_json('1.912e2'); ---- 191.2 -statement query T label(mysql,http) +statement query T label select parse_json('"Om ara pa ca na dhih" '); ----- mysql +---- "Om ara pa ca na dhih" ----- http -Om ara pa ca na dhih - -statement query T label(mysql,http) +statement query T select parse_json('[-1, 12, 289, 2188, false]'); ----- mysql +---- [-1,12,289,2188,false] ----- http -[-1, 12, 289, 2188, False] - -statement query T label(mysql,http) +statement query T select parse_json('{ "x" : "abc", "y" : false, "z": 10} '); ----- mysql +---- {"x":"abc","y":false,"z":10} ----- http -{'x': 'abc', 'y': False, 'z': 10} - statement error 1010 select parse_json('[1,'); @@ -149,16 +137,13 @@ select try_parse_json(12.34); ---- 12.34 -statement query T label(mysql,http) +statement query T select try_parse_json('null'); ----- mysql +---- null ----- http -NULL - -statement query T +statement query T select try_parse_json('true'); ---- @@ -182,34 +167,25 @@ select try_parse_json('1.912e2'); ---- 191.2 -statement query T label(mysql,http) +statement query T select try_parse_json('"Om ara pa ca na dhih" '); ----- mysql +---- "Om ara pa ca na dhih" ----- http -Om ara pa ca na dhih - -statement query T label(mysql,http) +statement query T select try_parse_json('[-1, 12, 289, 2188, false]'); ----- mysql +---- [-1,12,289,2188,false] ----- http -[-1, 12, 289, 2188, False] - -statement query T label(mysql,http) +statement query T select try_parse_json('{ "x" : "abc", "y" : false, "z": 10} '); ----- mysql +---- {"x":"abc","y":false,"z":10} ----- http -{'x': 'abc', 'y': False, 'z': 10} - -statement query T +statement query T select try_parse_json('[1,'); ---- diff --git a/tests/logictest/suites/gen/02_function/02_0051_function_semi_structureds_get b/tests/logictest/suites/gen/02_function/02_0051_function_semi_structureds_get index 91c59983e3811..175e0f0fdf37d 100644 --- a/tests/logictest/suites/gen/02_function/02_0051_function_semi_structureds_get +++ b/tests/logictest/suites/gen/02_function/02_0051_function_semi_structureds_get @@ -70,25 +70,19 @@ select get_path(parse_json('{"customer":{"id":1, "name":"databend", "extras":["e ---- 1 -statement query T label(mysql,http) +statement query T select get_path(parse_json('{"customer":{"id":1, "name":"databend", "extras":["ext", "test"]}}'), 'customer.name'); ----- mysql +---- "databend" ----- http -databend - -statement query T label(mysql,http) +statement query T select get_path(parse_json('{"customer":{"id":1, "name":"databend", "extras":["ext", "test"]}}'), 'customer["extras"][0]'); ----- mysql +---- "ext" ----- http -ext - -statement query T +statement query T select get_path(parse_json('{"customer":{"id":1, "name":"databend", "extras":["ext", "test"]}}'), 'customer["extras"][2]'); ---- diff --git a/tests/logictest/suites/gen/02_function/02_0056_function_semi_structureds_as b/tests/logictest/suites/gen/02_function/02_0056_function_semi_structureds_as index b2d4ca3a6df66..e3ec94c46d073 100644 --- a/tests/logictest/suites/gen/02_function/02_0056_function_semi_structureds_as +++ b/tests/logictest/suites/gen/02_function/02_0056_function_semi_structureds_as @@ -82,48 +82,36 @@ select '==as_array=='; ---- ==as_array== -statement query T label(mysql,http) +statement query T select as_array(parse_json('[1,2,3]')); ----- mysql +---- [1,2,3] ----- http -[1, 2, 3] - -statement query T label(mysql,http) +statement query T select as_array(parse_json('["a","b","c"]')); ----- mysql +---- ["a","b","c"] ----- http -['a', 'b', 'c'] - -statement query T +statement query T select '==as_object=='; ---- ==as_object== -statement query T label(mysql,http) +statement query T select as_object(parse_json('{"a":"b"}')); ----- mysql +---- {"a":"b"} ----- http -{'a': 'b'} - -statement query T label(mysql,http) +statement query T select as_object(parse_json('{"k":123}')); ----- mysql +---- {"k":123} ----- http -{'k': 123} - statement ok DROP DATABASE IF EXISTS db1; diff --git a/tests/logictest/suites/gen/20+_others/20_0001_planner_v2 b/tests/logictest/suites/gen/20+_others/20_0001_planner_v2 index 0be1d6b7aeea7..d4bfaf0eb3fbb 100644 --- a/tests/logictest/suites/gen/20+_others/20_0001_planner_v2 +++ b/tests/logictest/suites/gen/20+_others/20_0001_planner_v2 @@ -1019,33 +1019,26 @@ select '===Array Literal==='; ---- ===Array Literal=== -statement query I label(mysql,http) +statement query I select [1, 2, 3]; ----- mysql +---- [1, 2, 3] ----- http -{'Array': [{'UInt64': 1}, {'UInt64': 2}, {'UInt64': 3}]} -statement query T label(mysql,http) +statement query T select []; ----- mysql +---- [] ----- http -{'Array': []} -statement query T label(mysql,http) +statement query T select [[1, 2, 3],[1, 2, 3]]; ----- mysql +---- [[1, 2, 3], [1, 2, 3]] ----- http -{'Array': [{'Array': [{'UInt64': 1}, {'UInt64': 2}, {'UInt64': 3}]}, {'Array': [{'UInt64': 1}, {'UInt64': 2}, {'UInt64': 3}]}]} - statement query T select '====Correlated Subquery===='; diff --git a/tests/logictest/suites/select_0 b/tests/logictest/suites/select_0 index 6eea401a97298..85e41a9e10bbd 100644 --- a/tests/logictest/suites/select_0 +++ b/tests/logictest/suites/select_0 @@ -17,10 +17,10 @@ statement query FFTB label(http,mysql) select a,b,c,d from t4; ---- http - 1.0 10.223999977111816 xxdsfs false + 1.0 10.224 xxdsfs false ---- mysql - 1.0 10.223999977111816 xxdsfs 0 + 1.0 10.224 xxdsfs 0 statment ok drop table t4; diff --git a/tests/suites/0_stateless/02_function/02_0000_function_aggregate_mix.result b/tests/suites/0_stateless/02_function/02_0000_function_aggregate_mix.result index 35afe2e96c80f..1af9f993ca725 100644 --- a/tests/suites/0_stateless/02_function/02_0000_function_aggregate_mix.result +++ b/tests/suites/0_stateless/02_function/02_0000_function_aggregate_mix.result @@ -19,7 +19,7 @@ 1 1 2.5 -2 +2.0 ==Variant== {"k":"v"} "abcd" 1 2 diff --git a/tests/suites/0_stateless/02_function/02_0000_function_arithmetic.result b/tests/suites/0_stateless/02_function/02_0000_function_arithmetic.result index 97e0abe3abcdb..8671767738974 100644 --- a/tests/suites/0_stateless/02_function/02_0000_function_arithmetic.result +++ b/tests/suites/0_stateless/02_function/02_0000_function_arithmetic.result @@ -1,6 +1,6 @@ 2 0 1 0.5 -1 1 -4 +4.0 20 10 6 @@ -12,4 +12,4 @@ 2 BIGINT BIGINT INT FLOAT TINYINT UNSIGNED NULL NULL NULL NULL -2 3 +2.0 3.0 diff --git a/tests/suites/0_stateless/02_function/02_0002_function_cast.result b/tests/suites/0_stateless/02_function/02_0002_function_cast.result index cf7675266965a..afcf225a9816a 100644 --- a/tests/suites/0_stateless/02_function/02_0002_function_cast.result +++ b/tests/suites/0_stateless/02_function/02_0002_function_cast.result @@ -9,7 +9,7 @@ INT UNSIGNED FLOAT DOUBLE BIGINT UNSIGNED -2 +2.0 3 1 1 @@ -57,9 +57,9 @@ BIGINT UNSIGNED 2147483647 -9223372036854775808 9223372036854775807 -12.34000015258789 +12.34 1234.5678 -12.34000015258789 +12.34 1234.5678 2022-01-01 2022-01-01 01:01:01 diff --git a/tests/suites/0_stateless/02_function/02_0002_function_try_cast.result b/tests/suites/0_stateless/02_function/02_0002_function_try_cast.result index 33d902c2a3419..0042808bdf883 100644 --- a/tests/suites/0_stateless/02_function/02_0002_function_try_cast.result +++ b/tests/suites/0_stateless/02_function/02_0002_function_try_cast.result @@ -1,4 +1,4 @@ -2 +2.0 3 NULL NULL diff --git a/tests/suites/0_stateless/02_function/02_0013_function_running_difference.result b/tests/suites/0_stateless/02_function/02_0013_function_running_difference.result index 5909fdb1523fa..2ec553894391b 100644 --- a/tests/suites/0_stateless/02_function/02_0013_function_running_difference.result +++ b/tests/suites/0_stateless/02_function/02_0013_function_running_difference.result @@ -1,4 +1,4 @@ -0 0 0 0 0 -2 2 2 2 0 -2 2 2 2 0 -5 5 5 5 0 +0 0 0 0.0 0 +2 2 2 2.0 0 +2 2 2 2.0 0 +5 5 5 5.0 0 diff --git a/tests/suites/0_stateless/02_function/02_0014_function_maths.result b/tests/suites/0_stateless/02_function/02_0014_function_maths.result index 6963b336c84e4..2196c982d33da 100644 --- a/tests/suites/0_stateless/02_function/02_0014_function_maths.result +++ b/tests/suites/0_stateless/02_function/02_0014_function_maths.result @@ -14,13 +14,13 @@ NULL NULL NULL NULL -2 +2.0 NULL inf NULL -2 -1 -10 +2.0 +1.0 +10.0 NULL NULL ===mod=== @@ -31,15 +31,15 @@ NULL NULL 7.38905609893065 ===trigonometric=== -0 -1 -0 +0.0 +1.0 +0.0 0.9999999999999999 inf 1.0000000000000002 0.2013579207903308 NaN -0 +0.0 NaN 0.7853981633974483 -0.7853981633974483 @@ -54,12 +54,12 @@ NULL NULL NULL ===sqrt=== -2 -0 +2.0 +0.0 NaN ===pow=== -4 -4 +4.0 +4.0 0.25 NULL NULL diff --git a/tests/suites/0_stateless/02_function/02_0056_function_semi_structureds_as.result b/tests/suites/0_stateless/02_function/02_0056_function_semi_structureds_as.result index b9597ed71380a..37de60ec25a03 100644 --- a/tests/suites/0_stateless/02_function/02_0056_function_semi_structureds_as.result +++ b/tests/suites/0_stateless/02_function/02_0056_function_semi_structureds_as.result @@ -18,7 +18,7 @@ xyz {"k":123} ==as from table== 1 true 1 NULL NULL NULL NULL NULL -2 123 NULL 123 123 NULL NULL NULL +2 123 NULL 123 123.0 NULL NULL NULL 3 45.67 NULL NULL 45.67 NULL NULL NULL 4 "abc" NULL NULL NULL abc NULL NULL 5 [1,2,3] NULL NULL NULL NULL [1,2,3] NULL diff --git a/tests/suites/0_stateless/03_dml/03_0003_select_group_by.result b/tests/suites/0_stateless/03_dml/03_0003_select_group_by.result index d7ffb8639a1e8..68adbc6c14704 100644 --- a/tests/suites/0_stateless/03_dml/03_0003_select_group_by.result +++ b/tests/suites/0_stateless/03_dml/03_0003_select_group_by.result @@ -1,4 +1,4 @@ -5001 10001 +5001.0 10001 0 0 0 1 1 0 diff --git a/tests/suites/0_stateless/03_dml/03_0013_select_udf.result b/tests/suites/0_stateless/03_dml/03_0013_select_udf.result index ddff744daf472..bb9283748f2fa 100644 --- a/tests/suites/0_stateless/03_dml/03_0013_select_udf.result +++ b/tests/suites/0_stateless/03_dml/03_0013_select_udf.result @@ -1,3 +1,3 @@ 0 1 -6 +6.0 diff --git a/tests/suites/0_stateless/03_dml/03_0023_insert_into_array.result b/tests/suites/0_stateless/03_dml/03_0023_insert_into_array.result index f15cff441c42b..d9997f06ac1a1 100644 --- a/tests/suites/0_stateless/03_dml/03_0023_insert_into_array.result +++ b/tests/suites/0_stateless/03_dml/03_0023_insert_into_array.result @@ -39,10 +39,10 @@ 1 2 -9223372036854775808 9223372036854775807 ==Array(Float32)== -1 [1.100000023841858, 1.2000000476837158, 1.2999999523162842] -2 [-1.100000023841858, -1.2000000476837158, -1.2999999523162842] -1.100000023841858 1.2000000476837158 --1.100000023841858 -1.2000000476837158 +1 [1.1, 1.2, 1.3] +2 [-1.1, -1.2, -1.3] +1.1 1.2 +-1.1 -1.2 ==Array(Float64)== 1 [1.1, 1.2, 1.3] 2 [-1.1, -1.2, -1.3] diff --git a/tests/suites/0_stateless/03_dml/03_0024_select_window_function.result b/tests/suites/0_stateless/03_dml/03_0024_select_window_function.result index 42730501b4789..ecdd54dad5a38 100644 --- a/tests/suites/0_stateless/03_dml/03_0024_select_window_function.result +++ b/tests/suites/0_stateless/03_dml/03_0024_select_window_function.result @@ -287,18 +287,18 @@ USA 2001 4575 USA 2000 4575 USA 2001 4575 ================sep================ -China 2001 310 155 -China 2001 310 155 -Finland 2001 110 55 -Finland 2000 110 55 -Finland 2000 1500 1500 -India 2000 150 75 -India 2000 150 75 -India 2000 1200 1200 +China 2001 310 155.0 +China 2001 310 155.0 +Finland 2001 110 55.0 +Finland 2000 110 55.0 +Finland 2000 1500 1500.0 +India 2000 150 75.0 +India 2000 150 75.0 +India 2000 1200 1200.0 USA 2001 375 93.75 USA 2000 375 93.75 USA 2001 375 93.75 USA 2001 375 93.75 -USA 2001 4200 1400 -USA 2000 4200 1400 -USA 2001 4200 1400 +USA 2001 4200 1400.0 +USA 2000 4200 1400.0 +USA 2001 4200 1400.0 diff --git a/tests/suites/0_stateless/05_ddl/05_0021_ddl_alter_view.result b/tests/suites/0_stateless/05_ddl/05_0021_ddl_alter_view.result index a02fb6ac2b3ed..96152028630f3 100644 --- a/tests/suites/0_stateless/05_ddl/05_0021_ddl_alter_view.result +++ b/tests/suites/0_stateless/05_ddl/05_0021_ddl_alter_view.result @@ -1,6 +1,6 @@ 0 499.5 -1 499 -2 500 +1 499.0 +2 500.0 0 1 2 diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result index 4baa1c1d4b16f..bee31716b5948 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result @@ -105,18 +105,18 @@ NULL 2 3 0 1 ====INNER_JOIN==== -1 1 -2 2 -3 3 +1 1.0 +2 2.0 +3 3.0 1 1 2 2 2 1 3 2 2 1 3 2 -1 1 -2 2 -3 3 +1.0 1 +2.0 2 +3.0 3 1 1 2 2 1 2 @@ -224,15 +224,15 @@ new_planner 18 19 ============================== -5 +5.0 5.5 -6 +6.0 6.5 -7 +7.0 7.5 -8 +8.0 8.5 -9 +9.0 9.5 === Test offset === 5 @@ -345,6 +345,6 @@ BuildHashTable × 1 processor 3 4 7 8 ====UDF==== 5 -6 +6.0 0 1 diff --git a/tests/suites/1_stateful/00_copy/00_0000_copy_from_s3_location.result b/tests/suites/1_stateful/00_copy/00_0000_copy_from_s3_location.result index 266f37798a933..fc81bdf440b07 100644 --- a/tests/suites/1_stateful/00_copy/00_0000_copy_from_s3_location.result +++ b/tests/suites/1_stateful/00_copy/00_0000_copy_from_s3_location.result @@ -1,11 +1,11 @@ Test copy from file -199 2020 769 +199 2020.0 769 Test copy from gzip file -199 2020 769 +199 2020.0 769 Test copy from zstd file -199 2020 769 +199 2020.0 769 Test copy from bzip2 file -199 2020 769 -398 2020 1538 -398 2020 1538 -398 2020 1538 +199 2020.0 769 +398 2020.0 1538 +398 2020.0 1538 +398 2020.0 1538 diff --git a/tests/suites/1_stateful/00_copy/00_0000_copy_from_stage.result b/tests/suites/1_stateful/00_copy/00_0000_copy_from_stage.result index 40012ab4ab340..cf669803ebf79 100755 --- a/tests/suites/1_stateful/00_copy/00_0000_copy_from_stage.result +++ b/tests/suites/1_stateful/00_copy/00_0000_copy_from_stage.result @@ -3,18 +3,18 @@ ontime_200.csv.bz2 ontime_200.csv.gz ontime_200.csv.zst ontime_200.parquet -199 2020 769 -199 2020 769 -199 2020 769 -199 2020 769 -597 2020 2307 +199 2020.0 769 +199 2020.0 769 +199 2020.0 769 +199 2020.0 769 +597 2020.0 2307 ontime_200.parquet ontime_200_v1.parquet -398 2020 1538 -199 2020 769 -199 2020 769 -199 2020 769 -597 2020 2307 +398 2020.0 1538 +199 2020.0 769 +199 2020.0 769 +199 2020.0 769 +597 2020.0 2307 ontime_200.csv ontime_200.csv ontime_200.csv diff --git a/tests/suites/1_stateful/01_load/01_0000_streaming_load.result b/tests/suites/1_stateful/01_load/01_0000_streaming_load.result index e6a5c9ec64875..66c521198a54f 100644 --- a/tests/suites/1_stateful/01_load/01_0000_streaming_load.result +++ b/tests/suites/1_stateful/01_load/01_0000_streaming_load.result @@ -1,6 +1,6 @@ -199 2020 769 -199 2020 769 -199 2020 769 -199 2020 769 -199 2020 769 -199 2020 769 +199 2020.0 769 +199 2020.0 769 +199 2020.0 769 +199 2020.0 769 +199 2020.0 769 +199 2020.0 769