Skip to content

Commit ceade58

Browse files
committed
Thread through max_predicate_cache_size, add test
1 parent 8f7571e commit ceade58

File tree

14 files changed

+250
-12
lines changed

14 files changed

+250
-12
lines changed

datafusion/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,14 @@ config_namespace! {
565565
/// (reading) Use any available bloom filters when reading parquet files
566566
pub bloom_filter_on_read: bool, default = true
567567

568+
/// (reading) The maximum predicate cache size, in bytes. When
569+
/// `pushdown_filters` is enabled, sets the maximum memory used to cache
570+
/// the results of predicate evaluation between filter evaluation and
571+
/// output generation. Decreasing this value will reduce memory usage,
572+
/// but may increase IO and CPU usage. None means use the default
573+
/// parquet reader setting. 0 means no caching.
574+
pub max_predicate_cache_size: Option<usize>, default = None
575+
568576
// The following options affect writing to parquet files
569577
// and map to parquet::file::properties::WriterProperties
570578

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ impl ParquetOptions {
233233
binary_as_string: _, // not used for writer props
234234
coerce_int96: _, // not used for writer props
235235
skip_arrow_metadata: _,
236+
max_predicate_cache_size: _,
236237
} = self;
237238

238239
let mut builder = WriterProperties::builder()
@@ -425,6 +426,10 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatis
425426
#[cfg(feature = "parquet")]
426427
#[cfg(test)]
427428
mod tests {
429+
use super::*;
430+
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
431+
#[cfg(feature = "parquet_encryption")]
432+
use crate::encryption::map_encryption_to_config_encryption;
428433
use parquet::{
429434
basic::Compression,
430435
file::properties::{
@@ -434,11 +439,6 @@ mod tests {
434439
};
435440
use std::collections::HashMap;
436441

437-
use super::*;
438-
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
439-
#[cfg(feature = "parquet_encryption")]
440-
use crate::encryption::map_encryption_to_config_encryption;
441-
442442
const COL_NAME: &str = "configured";
443443

444444
/// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
@@ -500,6 +500,7 @@ mod tests {
500500
binary_as_string: defaults.binary_as_string,
501501
skip_arrow_metadata: defaults.skip_arrow_metadata,
502502
coerce_int96: None,
503+
max_predicate_cache_size: defaults.max_predicate_cache_size,
503504
}
504505
}
505506

@@ -606,6 +607,8 @@ mod tests {
606607
maximum_buffered_record_batches_per_stream: global_options_defaults
607608
.maximum_buffered_record_batches_per_stream,
608609
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
610+
max_predicate_cache_size: global_options_defaults
611+
.max_predicate_cache_size,
609612
schema_force_view_types: global_options_defaults.schema_force_view_types,
610613
binary_as_string: global_options_defaults.binary_as_string,
611614
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
//! select * from data limit 10;
2727
//! ```
2828
29-
use std::path::Path;
30-
3129
use arrow::compute::concat_batches;
3230
use arrow::record_batch::RecordBatch;
3331
use datafusion::physical_plan::collect;
@@ -37,7 +35,10 @@ use datafusion::prelude::{
3735
};
3836
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
3937
use datafusion_expr::utils::{conjunction, disjunction, split_conjunction};
38+
use std::path::Path;
4039

40+
use datafusion_common::test_util::parquet_test_data;
41+
use datafusion_execution::config::SessionConfig;
4142
use itertools::Itertools;
4243
use parquet::file::properties::WriterProperties;
4344
use tempfile::TempDir;
@@ -601,3 +602,99 @@ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
601602
}
602603
}
603604
}
605+
606+
#[tokio::test]
607+
async fn predicate_cache_default() -> datafusion_common::Result<()> {
608+
let ctx = SessionContext::new();
609+
// The cache is on by default, but not used unless filter pushdown is enabled
610+
PredicateCacheTest {
611+
expected_inner_records: 0,
612+
expected_records: 0,
613+
}
614+
.run(&ctx)
615+
.await
616+
}
617+
618+
#[tokio::test]
619+
async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
620+
let mut config = SessionConfig::new();
621+
config.options_mut().execution.parquet.pushdown_filters = true;
622+
let ctx = SessionContext::new_with_config(config);
623+
// The cache is on by default, and used when filter pushdown is enabled
624+
PredicateCacheTest {
625+
expected_inner_records: 8,
626+
expected_records: 4,
627+
}
628+
.run(&ctx)
629+
.await
630+
}
631+
632+
#[tokio::test]
633+
async fn predicate_cache_pushdown_disable() -> datafusion_common::Result<()> {
634+
// Can disable the cache even with filter pushdown by setting the size to 0. In this case we
635+
// expect the inner records are reported but no records are read from the cache
636+
let mut config = SessionConfig::new();
637+
config.options_mut().execution.parquet.pushdown_filters = true;
638+
config
639+
.options_mut()
640+
.execution
641+
.parquet
642+
.max_predicate_cache_size = Some(0);
643+
let ctx = SessionContext::new_with_config(config);
644+
PredicateCacheTest {
645+
// file has 8 rows, which need to be read twice, one for filter, one for
646+
// final output
647+
expected_inner_records: 16,
648+
// Expect this to 0 records read as the cache is disabled. However, it is
649+
// non zero due to https://github.com/apache/arrow-rs/issues/8307
650+
expected_records: 3,
651+
}
652+
.run(&ctx)
653+
.await
654+
}
655+
656+
/// Runs the query "SELECT * FROM alltypes_plain WHERE double_col != 0.0"
657+
/// with a given SessionContext and asserts that the predicate cache metrics
658+
/// are as expected
659+
#[derive(Debug)]
660+
struct PredicateCacheTest {
661+
/// Expected records read from the underlying reader (to evaluate filters)
662+
/// -- this is the total number of records in the file
663+
expected_inner_records: usize,
664+
/// Expected records to be read from the cache (after filtering)
665+
expected_records: usize,
666+
}
667+
668+
impl PredicateCacheTest {
669+
async fn run(self, ctx: &SessionContext) -> datafusion_common::Result<()> {
670+
let Self {
671+
expected_inner_records,
672+
expected_records,
673+
} = self;
674+
// Create a dataframe that scans the "alltypes_plain.parquet" file with
675+
// a filter on `double_col != 0.0`
676+
let path = parquet_test_data() + "/alltypes_plain.parquet";
677+
let exec = ctx
678+
.read_parquet(path, ParquetReadOptions::default())
679+
.await?
680+
.filter(col("double_col").not_eq(lit(0.0)))?
681+
.create_physical_plan()
682+
.await?;
683+
684+
// run the plan to completion
685+
let _ = collect(exec.clone(), ctx.task_ctx()).await?; // run plan
686+
let metrics =
687+
TestParquetFile::parquet_metrics(&exec).expect("found parquet metrics");
688+
689+
// verify the predicate cache metrics
690+
assert_eq!(
691+
get_value(&metrics, "predicate_cache_inner_records"),
692+
expected_inner_records
693+
);
694+
assert_eq!(
695+
get_value(&metrics, "predicate_cache_records"),
696+
expected_records
697+
);
698+
Ok(())
699+
}
700+
}

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ pub struct ParquetFileMetrics {
7272
pub page_index_eval_time: Time,
7373
/// Total time spent reading and parsing metadata from the footer
7474
pub metadata_load_time: Time,
75+
/// Predicate Cache: number of records read directly from the inner reader.
76+
/// This is the number of rows decoded while evaluating predicates
77+
pub predicate_cache_inner_records: Count,
78+
/// Predicate Cache: number of records read from the cache. This is the
79+
/// number of rows that were stored in the cache after evaluating predicates
80+
/// reused for the output.
81+
pub predicate_cache_records: Count,
7582
}
7683

7784
impl ParquetFileMetrics {
@@ -140,6 +147,14 @@ impl ParquetFileMetrics {
140147
let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
141148
.counter("files_ranges_pruned_statistics", partition);
142149

150+
let predicate_cache_inner_records = MetricBuilder::new(metrics)
151+
.with_new_label("filename", filename.to_string())
152+
.counter("predicate_cache_inner_records", partition);
153+
154+
let predicate_cache_records = MetricBuilder::new(metrics)
155+
.with_new_label("filename", filename.to_string())
156+
.counter("predicate_cache_records", partition);
157+
143158
Self {
144159
files_ranges_pruned_statistics,
145160
predicate_evaluation_errors,
@@ -157,6 +172,8 @@ impl ParquetFileMetrics {
157172
bloom_filter_eval_time,
158173
page_index_eval_time,
159174
metadata_load_time,
175+
predicate_cache_inner_records,
176+
predicate_cache_records,
160177
}
161178
}
162179
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use datafusion_execution::parquet_encryption::EncryptionFactory;
5151
use futures::{ready, Stream, StreamExt, TryStreamExt};
5252
use itertools::Itertools;
5353
use log::debug;
54+
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
5455
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
5556
use parquet::arrow::async_reader::AsyncFileReader;
5657
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
@@ -105,6 +106,9 @@ pub(super) struct ParquetOpener {
105106
#[cfg(feature = "parquet_encryption")]
106107
pub encryption_factory:
107108
Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
109+
/// Maximum size of the predicate cache, in bytes. If none, uses
110+
/// the arrow-rs default.
111+
pub max_predicate_cache_size: Option<usize>,
108112
}
109113

110114
impl FileOpener for ParquetOpener {
@@ -152,6 +156,7 @@ impl FileOpener for ParquetOpener {
152156

153157
let enable_page_index = self.enable_page_index;
154158
let encryption_context = self.get_encryption_context();
159+
let max_predicate_cache_size = self.max_predicate_cache_size;
155160

156161
Ok(Box::pin(async move {
157162
let file_decryption_properties = encryption_context
@@ -401,21 +406,42 @@ impl FileOpener for ParquetOpener {
401406
builder = builder.with_limit(limit)
402407
}
403408

409+
if let Some(max_predicate_cache_size) = max_predicate_cache_size {
410+
builder = builder.with_max_predicate_cache_size(max_predicate_cache_size);
411+
}
412+
413+
// metrics from the arrow reader itself
414+
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
415+
404416
let stream = builder
405417
.with_projection(mask)
406418
.with_batch_size(batch_size)
407419
.with_row_groups(row_group_indexes)
420+
.with_metrics(arrow_reader_metrics.clone())
408421
.build()?;
409422

410-
let stream = stream
411-
.map_err(DataFusionError::from)
412-
.map(move |b| b.and_then(|b| schema_mapping.map_batch(b)));
423+
let files_ranges_pruned_statistics =
424+
file_metrics.files_ranges_pruned_statistics.clone();
425+
let predicate_cache_inner_records =
426+
file_metrics.predicate_cache_inner_records.clone();
427+
let predicate_cache_records = file_metrics.predicate_cache_records.clone();
428+
429+
let stream = stream.map_err(DataFusionError::from).map(move |b| {
430+
b.and_then(|b| {
431+
copy_arrow_reader_metrics(
432+
&arrow_reader_metrics,
433+
&predicate_cache_inner_records,
434+
&predicate_cache_records,
435+
);
436+
schema_mapping.map_batch(b)
437+
})
438+
});
413439

414440
if let Some(file_pruner) = file_pruner {
415441
Ok(EarlyStoppingStream::new(
416442
stream,
417443
file_pruner,
418-
file_metrics.files_ranges_pruned_statistics.clone(),
444+
files_ranges_pruned_statistics,
419445
)
420446
.boxed())
421447
} else {
@@ -425,6 +451,22 @@ impl FileOpener for ParquetOpener {
425451
}
426452
}
427453

454+
/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
455+
/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
456+
fn copy_arrow_reader_metrics(
457+
arrow_reader_metrics: &ArrowReaderMetrics,
458+
predicate_cache_inner_records: &Count,
459+
predicate_cache_records: &Count,
460+
) {
461+
if let Some(v) = arrow_reader_metrics.records_read_from_inner() {
462+
predicate_cache_inner_records.add(v);
463+
}
464+
465+
if let Some(v) = arrow_reader_metrics.records_read_from_cache() {
466+
predicate_cache_records.add(v);
467+
}
468+
}
469+
428470
/// Wraps an inner RecordBatchStream and a [`FilePruner`]
429471
///
430472
/// This can terminate the scan early when some dynamic filters is updated after
@@ -823,6 +865,7 @@ mod test {
823865
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
824866
#[cfg(feature = "parquet_encryption")]
825867
encryption_factory: None,
868+
max_predicate_cache_size: None,
826869
}
827870
};
828871

@@ -911,6 +954,7 @@ mod test {
911954
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
912955
#[cfg(feature = "parquet_encryption")]
913956
encryption_factory: None,
957+
max_predicate_cache_size: None,
914958
}
915959
};
916960

@@ -1015,6 +1059,7 @@ mod test {
10151059
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
10161060
#[cfg(feature = "parquet_encryption")]
10171061
encryption_factory: None,
1062+
max_predicate_cache_size: None,
10181063
}
10191064
};
10201065
let make_meta = || FileMeta {
@@ -1129,6 +1174,7 @@ mod test {
11291174
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
11301175
#[cfg(feature = "parquet_encryption")]
11311176
encryption_factory: None,
1177+
max_predicate_cache_size: None,
11321178
}
11331179
};
11341180

@@ -1244,6 +1290,7 @@ mod test {
12441290
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
12451291
#[cfg(feature = "parquet_encryption")]
12461292
encryption_factory: None,
1293+
max_predicate_cache_size: None,
12471294
}
12481295
};
12491296

@@ -1426,6 +1473,7 @@ mod test {
14261473
expr_adapter_factory: None,
14271474
#[cfg(feature = "parquet_encryption")]
14281475
encryption_factory: None,
1476+
max_predicate_cache_size: None,
14291477
};
14301478

14311479
let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema);

datafusion/datasource-parquet/src/source.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,12 @@ impl ParquetSource {
426426
self.table_parquet_options.global.bloom_filter_on_read
427427
}
428428

429+
/// Return the maxium predicate cache size, in bytes, used when
430+
/// `pushdown_filters`
431+
pub fn max_predicate_cache_size(&self) -> Option<usize> {
432+
self.table_parquet_options.global.max_predicate_cache_size
433+
}
434+
429435
/// Applies schema adapter factory from the FileScanConfig if present.
430436
///
431437
/// # Arguments
@@ -580,6 +586,7 @@ impl FileSource for ParquetSource {
580586
expr_adapter_factory,
581587
#[cfg(feature = "parquet_encryption")]
582588
encryption_factory: self.get_encryption_factory_with_config(),
589+
max_predicate_cache_size: self.max_predicate_cache_size(),
583590
})
584591
}
585592

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,10 @@ message ParquetOptions {
581581
oneof coerce_int96_opt {
582582
string coerce_int96 = 32;
583583
}
584+
585+
oneof max_predicate_cache_size_opt {
586+
uint64 max_predicate_cache_size = 33;
587+
}
584588
}
585589

586590
enum JoinSide {

0 commit comments

Comments
 (0)