Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,035 changes: 679 additions & 356 deletions Cargo.lock

Large diffs are not rendered by default.

44 changes: 22 additions & 22 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ build = "build.rs"
[dependencies]
### apache arrow/datafusion dependencies
# arrow = "51.0.0"
arrow-schema = { version = "52.1.0", features = ["serde"] }
arrow-array = { version = "52.1.0" }
arrow-json = "52.1.0"
arrow-ipc = { version = "52.1.0", features = ["zstd"] }
arrow-select = "52.1.0"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "52.1.0"
arrow-flight = { version = "52.1.0", features = [ "tls" ] }
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.11.0"
tower-http = { version = "0.4.4", features = ["cors"] }
arrow-schema = { version = "53.0.0", features = ["serde"] }
arrow-array = { version = "53.0.0" }
arrow-json = "53.0.0"
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
arrow-select = "53.0.0"
datafusion = "42.0.0"
object_store = { version = "0.11.0", features = ["cloud", "aws"] }
parquet = "53.0.0"
arrow-flight = { version = "53.0.0", features = [ "tls" ] }
tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.12.3"
tower-http = { version = "0.6.1", features = ["cors"] }

### actix dependencies
actix-web-httpauth = "0.8"
Expand Down Expand Up @@ -53,8 +53,8 @@ clap = { version = "4.1", default-features = false, features = [
"error-context",
] }
clokwerk = "0.4"
crossterm = "0.27.0"
derive_more = "0.99"
crossterm = "0.28.1"
derive_more = "0.99.18"
env_logger = "0.11.3"
fs_extra = "1.3"
futures = "0.3"
Expand All @@ -68,7 +68,7 @@ log = "0.4"
num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
rand = "0.8.5"
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand All @@ -81,8 +81,8 @@ semver = "1.0"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.30.11"
thiserror = "1"
sysinfo = "0.31.4"
thiserror = "1.0.64"
thread-priority = "1.0.0"
tokio = { version = "1.28", default-features = false, features = [
"sync",
Expand All @@ -97,13 +97,13 @@ xz2 = { version = "*", features = ["static"] }
nom = "7.1.3"
humantime = "2.1.0"
human-size = "0.4"
openid = { version = "0.14.0", default-features = false, features = ["rustls"] }
openid = { version = "0.15.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.12.3"
prost = "0.13.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"

Expand All @@ -113,13 +113,13 @@ sha1_smol = { version = "1.0", features = ["std"] }
static-files = "0.2"
ureq = "2.6"
vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] }
zip = { version = "1.1.1", default-features = false, features = ["deflate"] }
zip = { version = "2.2.0", default-features = false, features = ["deflate"] }
url = "2.4.0"
prost-build = "0.12.3"
prost-build = "0.13.3"

[dev-dependencies]
maplit = "1.0"
rstest = "0.19.0"
rstest = "0.23.0"

[package.metadata.parseable_ui]
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.6/build.zip"
Expand Down
50 changes: 33 additions & 17 deletions server/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,60 @@ pub struct Column {
impl TryFrom<&Statistics> for TypedStatistics {
type Error = parquet::errors::ParquetError;
fn try_from(value: &Statistics) -> Result<Self, Self::Error> {
if !value.has_min_max_set() {
if value.min_bytes_opt().is_none() || value.max_bytes_opt().is_none() {
return Err(parquet::errors::ParquetError::General(
"min max is not set".to_string(),
));
}

let res = match value {
Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Boolean stats min not set"),
max: *stats.max_opt().expect("Boolean stats max not set"),
}),
Statistics::Int32(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min() as i64,
max: *stats.max() as i64,
min: *stats.min_opt().expect("Int32 stats min not set") as i64,
max: *stats.max_opt().expect("Int32 stats max not set") as i64,
}),
Statistics::Int64(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Int64 stats min not set"),
max: *stats.max_opt().expect("Int64 stats max not set"),
}),
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
min: stats.min().to_i64(),
max: stats.max().to_i64(),
min: stats.min_opt().expect("Int96 stats min not set").to_i64(),
max: stats.max_opt().expect("Int96 stats max not set").to_i64(),
}),
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min() as f64,
max: *stats.max() as f64,
min: *stats.min_opt().expect("Float32 stats min not set") as f64,
max: *stats.max_opt().expect("Float32 stats max not set") as f64,
}),
Statistics::Double(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Float64 stats min not set"),
max: *stats.max_opt().expect("Float64 stats max not set"),
}),
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
min: stats
.min_opt()
.expect("Utf8 stats min not set")
.as_utf8()?
.to_owned(),
max: stats
.max_opt()
.expect("Utf8 stats max not set")
.as_utf8()?
.to_owned(),
}),
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
min: stats
.min_opt()
.expect("Utf8 stats min not set")
.as_utf8()?
.to_owned(),
max: stats
.max_opt()
.expect("Utf8 stats max not set")
.as_utf8()?
.to_owned(),
}),
};

Expand Down
5 changes: 4 additions & 1 deletion server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,15 @@ impl Cli {
.help("Set a fixed memory limit for query"),
)
.arg(
// RowGroupSize controls the number of rows present in one row group
// More rows = better compression but HIGHER Memory consumption during read/write
// 1048576 is the default value for DataFusion
Arg::new(Self::ROW_GROUP_SIZE)
.long(Self::ROW_GROUP_SIZE)
.env("P_PARQUET_ROW_GROUP_SIZE")
.value_name("NUMBER")
.required(false)
.default_value("16384")
.default_value("1048576")
.value_parser(value_parser!(usize))
.help("Number of rows in a row group"),
).arg(
Expand Down
31 changes: 28 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use datafusion::arrow::record_batch::RecordBatch;

use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan};
use datafusion::prelude::*;
use itertools::Itertools;
Expand Down Expand Up @@ -81,12 +81,37 @@ impl Query {
let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());

let config = SessionConfig::default()
let mut config = SessionConfig::default()
.with_parquet_pruning(true)
.with_prefer_existing_sort(true)
.with_round_robin_repartition(true);

let state = SessionState::new_with_config_rt(config, runtime);
// For more details refer https://datafusion.apache.org/user-guide/configs.html

// Reduce the number of rows read (if possible)
config.options_mut().execution.parquet.enable_page_index = true;

// Pushdown filters allows DF to push the filters as far down in the plan as possible
// and thus, reducing the number of rows decoded
config.options_mut().execution.parquet.pushdown_filters = true;

// Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation
config.options_mut().execution.parquet.reorder_filters = true;

// Enable StringViewArray
// https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/
config
.options_mut()
.execution
.parquet
.schema_force_view_types = true;

let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(runtime)
.build();

let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
});
Expand Down
4 changes: 2 additions & 2 deletions server/src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::{
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::DataFusionError,
logical_expr::{col, Expr},
logical_expr::{col, SortExpr},
};
use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -188,7 +188,7 @@ impl ListingTableBuilder {
if self.listing.is_empty() {
return Ok(None);
}
let file_sort_order: Vec<Vec<Expr>>;
let file_sort_order: Vec<Vec<SortExpr>>;
let file_format = ParquetFormat::default().with_enable_pruning(true);
if let Some(time_partition) = time_partition {
file_sort_order = vec![vec![col(time_partition).sort(true, false)]];
Expand Down
30 changes: 14 additions & 16 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef, SortOptions};
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
use datafusion::catalog::Session;
use datafusion::common::stats::Precision;
use datafusion::logical_expr::utils::conjunction;
use datafusion::{
catalog::schema::SchemaProvider,
catalog::SchemaProvider,
common::{
tree_node::{TreeNode, TreeNodeRecursion},
ToDFSchema,
Expand Down Expand Up @@ -122,7 +123,7 @@ async fn create_parquet_physical_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
Expand All @@ -149,7 +150,7 @@ async fn create_parquet_physical_plan(
// create the execution plan
let plan = file_format
.create_physical_plan(
state,
state.as_any().downcast_ref::<SessionState>().unwrap(), // Remove this when ParquetFormat catches up
FileScanConfig {
object_store_url,
file_schema: schema.clone(),
Expand Down Expand Up @@ -216,8 +217,8 @@ async fn collect_from_snapshot(
fn partitioned_files(
manifest_files: Vec<catalog::manifest::File>,
table_schema: &Schema,
target_partition: usize,
) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) {
let target_partition = num_cpus::get();
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
let mut column_statistics = HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
let mut count = 0;
Expand Down Expand Up @@ -288,7 +289,7 @@ impl TableProvider for StandardTableProvider {

async fn scan(
&self,
state: &SessionState,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -435,7 +436,7 @@ impl TableProvider for StandardTableProvider {
);
}

let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1);
let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema);
let remote_exec = create_parquet_physical_plan(
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),
partitioned_files,
Expand Down Expand Up @@ -496,7 +497,7 @@ async fn get_cache_exectuion_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (cached, remainder) = cache_manager
Expand All @@ -519,7 +520,7 @@ async fn get_cache_exectuion_plan(
})
.collect();

let (partitioned_files, statistics) = partitioned_files(cached, &schema, 1);
let (partitioned_files, statistics) = partitioned_files(cached, &schema);
let plan = create_parquet_physical_plan(
ObjectStoreUrl::parse("file:///").unwrap(),
partitioned_files,
Expand All @@ -545,7 +546,7 @@ async fn get_hottier_exectuion_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (hot_tier_files, remainder) = hot_tier_manager
Expand All @@ -570,7 +571,7 @@ async fn get_hottier_exectuion_plan(
})
.collect();

let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema, 1);
let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema);
let plan = create_parquet_physical_plan(
ObjectStoreUrl::parse("file:///").unwrap(),
partitioned_files,
Expand All @@ -594,7 +595,7 @@ async fn legacy_listing_table(
object_store: Arc<dyn ObjectStore>,
time_filters: &[PartialTimeFilter],
schema: Arc<Schema>,
state: &SessionState,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -868,10 +869,7 @@ fn extract_timestamp_bound(
binexpr: BinaryExpr,
time_partition: Option<String>,
) -> Option<(Operator, NaiveDateTime)> {
Some((
binexpr.op.clone(),
extract_from_lit(binexpr, time_partition)?,
))
Some((binexpr.op, extract_from_lit(binexpr, time_partition)?))
}

async fn collect_manifest_files(
Expand Down Expand Up @@ -942,7 +940,7 @@ trait ManifestExt: ManifestFile {
return None;
};
/* `BinaryExp` doesn't implement `Copy` */
Some((expr.op.clone(), value))
Some((expr.op, value))
}

let Some(col) = self.find_matching_column(partial_filter) else {
Expand Down
4 changes: 2 additions & 2 deletions server/src/utils/arrow/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use futures::{stream, TryStreamExt};
use tonic::{Request, Response, Status};

use arrow_flight::FlightClient;
use http::Uri;
use tonic::transport::Channel;
// use http::Uri;
use tonic::transport::{Channel, Uri};

pub type DoGetStream = stream::BoxStream<'static, Result<FlightData, Status>>;

Expand Down
Loading
Loading