diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ecb25483ce07..9e15a2c7670a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -203,6 +203,8 @@ jobs: run: cargo check --profile ci --no-default-features -p datafusion --features=string_expressions - name: Check datafusion (unicode_expressions) run: cargo check --profile ci --no-default-features -p datafusion --features=unicode_expressions + - name: Check parquet encryption (parquet_encryption) + run: cargo check --profile ci --no-default-features -p datafusion --features=parquet_encryption # Check datafusion-functions crate features # diff --git a/Cargo.lock b/Cargo.lock index b61a08a470dd..8ce3515710d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1873,6 +1873,7 @@ dependencies = [ "flate2", "futures", "glob", + "hex", "insta", "itertools 0.14.0", "log", @@ -2162,6 +2163,7 @@ dependencies = [ "datafusion-pruning", "datafusion-session", "futures", + "hex", "itertools 0.14.0", "log", "object_store", diff --git a/Cargo.toml b/Cargo.toml index db704dcdaaab..434e608b49dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -150,6 +150,7 @@ env_logger = "0.11" futures = "0.3" half = { version = "2.6.0", default-features = false } hashbrown = { version = "0.14.5", features = ["raw"] } +hex = { version = "0.4.3" } indexmap = "2.10.0" itertools = "0.14" log = "^0.4" diff --git a/README.md b/README.md index c142d8f366b2..fb7f838a572b 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,7 @@ Default features: - `datetime_expressions`: date and time functions such as `to_timestamp` - `encoding_expressions`: `encode` and `decode` functions - `parquet`: support for reading the [Apache Parquet] format +- `parquet_encryption`: support for using [Parquet Modular Encryption] - `regex_expressions`: regular expression functions, such as `regexp_match` - `unicode_expressions`: Include unicode aware functions such as `character_length` - `unparser`: enables support to reverse LogicalPlans back into SQL @@ -134,6 +135,7 @@ Optional features: [apache avro]: https://avro.apache.org/ [apache parquet]: https://parquet.apache.org/ +[parquet modular encryption]: https://parquet.apache.org/docs/file-format/data-pages/encryption/ ## DataFusion API Evolution and Deprecation Guidelines diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index b356f249b79b..83e539e31d72 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -40,9 +40,15 @@ name = "datafusion_common" [features] avro = ["apache-avro"] backtrace = [] +parquet_encryption = [ + "parquet", + "parquet/encryption", + "dep:hex", +] pyarrow = ["pyo3", "arrow/pyarrow", "parquet"] force_hash_collisions = [] recursive_protection = ["dep:recursive"] +parquet = ["dep:parquet"] [dependencies] ahash = { workspace = true } @@ -58,7 +64,7 @@ base64 = "0.22.1" chrono = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } -hex = "0.4.3" +hex = { workspace = true, optional = true } indexmap = { workspace = true } libc = "0.2.174" log = { workspace = true } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6618c6aeec28..31159d4a8588 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -19,6 +19,8 @@ use arrow_ipc::CompressionType; +#[cfg(feature = "parquet_encryption")] +use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; @@ -29,12 +31,8 @@ use std::error::Error; use std::fmt::{self, Display}; use std::str::FromStr; -#[cfg(feature = "parquet")] +#[cfg(feature = "parquet_encryption")] use hex; -#[cfg(feature = "parquet")] -use parquet::encryption::decrypt::FileDecryptionProperties; -#[cfg(feature = "parquet")] -use parquet::encryption::encrypt::FileEncryptionProperties; /// A macro that wraps a configuration struct and automatically derives /// [`Default`] and [`ConfigField`] for it, allowing it to be used @@ -2148,7 +2146,7 @@ impl ConfigField for ConfigFileEncryptionProperties { } } -#[cfg(feature = "parquet")] +#[cfg(feature = "parquet_encryption")] impl From for FileEncryptionProperties { fn from(val: ConfigFileEncryptionProperties) -> Self { let mut fep = FileEncryptionProperties::builder( @@ -2194,7 +2192,7 @@ impl From for FileEncryptionProperties { } } -#[cfg(feature = "parquet")] +#[cfg(feature = "parquet_encryption")] impl From<&FileEncryptionProperties> for ConfigFileEncryptionProperties { fn from(f: &FileEncryptionProperties) -> Self { let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys(); @@ -2308,7 +2306,7 @@ impl ConfigField for ConfigFileDecryptionProperties { } } -#[cfg(feature = "parquet")] +#[cfg(feature = "parquet_encryption")] impl From for FileDecryptionProperties { fn from(val: ConfigFileDecryptionProperties) -> Self { let mut column_names: Vec<&str> = Vec::new(); @@ -2342,7 +2340,7 @@ impl From for FileDecryptionProperties { } } -#[cfg(feature = "parquet")] +#[cfg(feature = "parquet_encryption")] impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties { fn from(f: &FileDecryptionProperties) -> Self { let (column_names_vec, column_keys_vec) = f.column_keys(); @@ -2688,7 +2686,7 @@ mod tests { ); } - #[cfg(feature = "parquet")] + #[cfg(feature = "parquet_encryption")] #[test] fn parquet_table_encryption() { use crate::config::{ diff --git a/datafusion/common/src/encryption.rs b/datafusion/common/src/encryption.rs new file mode 100644 index 000000000000..5d50d4a9efd3 --- /dev/null +++ b/datafusion/common/src/encryption.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Support optional features for encryption in Parquet files. +//! This module provides types and functions related to encryption in Parquet files. + +#[cfg(feature = "parquet_encryption")] +pub use parquet::encryption::decrypt::FileDecryptionProperties; +#[cfg(feature = "parquet_encryption")] +pub use parquet::encryption::encrypt::FileEncryptionProperties; + +#[cfg(not(feature = "parquet_encryption"))] +pub struct FileDecryptionProperties; +#[cfg(not(feature = "parquet_encryption"))] +pub struct FileEncryptionProperties; + +#[cfg(feature = "parquet")] +use crate::config::ParquetEncryptionOptions; +pub use crate::config::{ConfigFileDecryptionProperties, ConfigFileEncryptionProperties}; +#[cfg(feature = "parquet")] +use parquet::file::properties::WriterPropertiesBuilder; + +#[cfg(feature = "parquet")] +pub fn add_crypto_to_writer_properties( + #[allow(unused)] crypto: &ParquetEncryptionOptions, + #[allow(unused_mut)] mut builder: WriterPropertiesBuilder, +) -> WriterPropertiesBuilder { + #[cfg(feature = "parquet_encryption")] + if let Some(file_encryption_properties) = &crypto.file_encryption { + builder = builder + .with_file_encryption_properties(file_encryption_properties.clone().into()); + } + builder +} + +#[cfg(feature = "parquet_encryption")] +pub fn map_encryption_to_config_encryption( + encryption: Option<&FileEncryptionProperties>, +) -> Option { + encryption.map(|fe| fe.into()) +} + +#[cfg(not(feature = "parquet_encryption"))] +pub fn map_encryption_to_config_encryption( + _encryption: Option<&FileEncryptionProperties>, +) -> Option { + None +} + +#[cfg(feature = "parquet_encryption")] +pub fn map_config_decryption_to_decryption( + decryption: Option<&ConfigFileDecryptionProperties>, +) -> Option { + decryption.map(|fd| fd.clone().into()) +} + +#[cfg(not(feature = "parquet_encryption"))] +pub fn map_config_decryption_to_decryption( + _decryption: Option<&ConfigFileDecryptionProperties>, +) -> Option { + None +} diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 60f0f4abb0c0..cde0ea129979 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -27,6 +27,7 @@ use crate::{ use arrow::datatypes::Schema; // TODO: handle once deprecated +use crate::encryption::add_crypto_to_writer_properties; #[allow(deprecated)] use parquet::{ arrow::ARROW_SCHEMA_META_KEY, @@ -100,11 +101,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { let mut builder = global.into_writer_properties_builder()?; - if let Some(file_encryption_properties) = &crypto.file_encryption { - builder = builder.with_file_encryption_properties( - file_encryption_properties.clone().into(), - ); - } + builder = add_crypto_to_writer_properties(crypto, builder); // check that the arrow schema is present in the kv_metadata, if configured to do so if !global.skip_arrow_metadata @@ -456,12 +453,10 @@ mod tests { }; use std::collections::HashMap; - use crate::config::{ - ConfigFileEncryptionProperties, ParquetColumnOptions, ParquetEncryptionOptions, - ParquetOptions, - }; - use super::*; + use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; + #[cfg(feature = "parquet_encryption")] + use crate::encryption::map_encryption_to_config_encryption; const COL_NAME: &str = "configured"; @@ -590,8 +585,10 @@ mod tests { HashMap::from([(COL_NAME.into(), configured_col_props)]) }; - let fep: Option = - props.file_encryption_properties().map(|fe| fe.into()); + #[cfg(feature = "parquet_encryption")] + let fep = map_encryption_to_config_encryption(props.file_encryption_properties()); + #[cfg(not(feature = "parquet_encryption"))] + let fep = None; #[allow(deprecated)] // max_statistics_size TableParquetOptions { diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 73e7be439ca9..3a558fa86789 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -41,6 +41,7 @@ pub mod config; pub mod cse; pub mod diagnostic; pub mod display; +pub mod encryption; pub mod error; pub mod file_options; pub mod format; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index ec7ee07d7f8e..c4455e271c84 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -61,6 +61,7 @@ default = [ "unicode_expressions", "compression", "parquet", + "parquet_encryption", "recursive_protection", ] encoding_expressions = ["datafusion-functions/encoding_expressions"] @@ -68,6 +69,13 @@ encoding_expressions = ["datafusion-functions/encoding_expressions"] force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] math_expressions = ["datafusion-functions/math_expressions"] parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"] +parquet_encryption = [ + "dep:parquet", + "parquet/encryption", + "datafusion-common/parquet_encryption", + "datafusion-datasource-parquet/parquet_encryption", + "dep:hex", +] pyarrow = ["datafusion-common/pyarrow", "parquet"] regex_expressions = [ "datafusion-functions/regex_expressions", @@ -127,6 +135,7 @@ datafusion-session = { workspace = true } datafusion-sql = { workspace = true } flate2 = { version = "1.1.2", optional = true } futures = { workspace = true } +hex = { workspace = true, optional = true } itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index a2bec74ee140..83bb60184fb9 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -247,6 +247,7 @@ mod tests { Ok(()) } + #[cfg(feature = "parquet_encryption")] #[tokio::test] async fn roundtrip_parquet_with_encryption() -> Result<()> { use parquet::encryption::decrypt::FileDecryptionProperties; diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 8719a16f4919..68f83e7f1f11 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -38,6 +38,7 @@ use crate::test_util::{aggr_test_schema, arrow_test_data}; use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; +#[cfg(feature = "compression")] use datafusion_common::DataFusionError; use datafusion_datasource::source::DataSourceExec; diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 203c985428bc..8e90b9aaa955 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -74,6 +74,7 @@ pub fn write_batches( Ok(num_rows) } +#[cfg(feature = "parquet_encryption")] #[tokio::test] async fn round_trip_encryption() { let ctx: SessionContext = SessionContext::new(); diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index 08d258852a20..8a75a445c8ff 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -48,6 +48,7 @@ datafusion-physical-plan = { workspace = true } datafusion-pruning = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } +hex = { workspace = true, optional = true } itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } @@ -65,3 +66,10 @@ workspace = true [lib] name = "datafusion_datasource_parquet" path = "src/mod.rs" + +[features] +parquet_encryption = [ + "parquet/encryption", + "datafusion-common/parquet_encryption", + "dep:hex", +] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 82cf06b5387e..43b0886193e7 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -39,6 +39,9 @@ use datafusion_datasource::write::demux::DemuxedStreamReceiver; use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; +use datafusion_common::encryption::{ + map_config_decryption_to_decryption, FileDecryptionProperties, +}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -78,7 +81,7 @@ use parquet::arrow::arrow_writer::{ use parquet::arrow::async_reader::MetadataFetch; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; use parquet::basic::Type; -use parquet::encryption::decrypt::FileDecryptionProperties; + use parquet::errors::ParquetError; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; @@ -354,15 +357,11 @@ impl FileFormat for ParquetFormat { Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), None => None, }; - let config_file_decryption_properties = &self.options.crypto.file_decryption; let file_decryption_properties: Option = - match config_file_decryption_properties { - Some(cfd) => { - let fd: FileDecryptionProperties = cfd.clone().into(); - Some(fd) - } - None => None, - }; + map_config_decryption_to_decryption( + self.options.crypto.file_decryption.as_ref(), + ); + let mut schemas: Vec<_> = futures::stream::iter(objects) .map(|object| { fetch_schema_with_location( @@ -419,15 +418,10 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { - let config_file_decryption_properties = &self.options.crypto.file_decryption; let file_decryption_properties: Option = - match config_file_decryption_properties { - Some(cfd) => { - let fd: FileDecryptionProperties = cfd.clone().into(); - Some(fd) - } - None => None, - }; + map_config_decryption_to_decryption( + self.options.crypto.file_decryption.as_ref(), + ); let stats = fetch_statistics( store.as_ref(), table_schema, @@ -963,14 +957,17 @@ pub async fn fetch_parquet_metadata( store: &dyn ObjectStore, meta: &ObjectMeta, size_hint: Option, - decryption_properties: Option<&FileDecryptionProperties>, + #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { let file_size = meta.size; let fetch = ObjectStoreFetch::new(store, meta); - ParquetMetaDataReader::new() - .with_prefetch_hint(size_hint) - .with_decryption_properties(decryption_properties) + let reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); + + #[cfg(feature = "parquet_encryption")] + let reader = reader.with_decryption_properties(decryption_properties); + + reader .load_and_finish(fetch, file_size) .await .map_err(DataFusionError::from) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index b39ec3929f97..daed52e05950 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -31,6 +31,8 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit}; use arrow::error::ArrowError; +use datafusion_common::encryption::FileDecryptionProperties; + use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_datasource::PartitionedFile; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; @@ -46,7 +48,6 @@ use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::file::metadata::ParquetMetaDataReader; /// Implements [`FileOpener`] for a parquet file @@ -182,6 +183,7 @@ impl FileOpener for ParquetOpener { // pruning predicates. Thus default to not requesting if from the // underlying reader. let mut options = ArrowReaderOptions::new().with_page_index(false); + #[cfg(feature = "parquet_encryption")] if let Some(fd_val) = file_decryption_properties { options = options.with_file_decryption_properties((*fd_val).clone()); } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 8ca36e7cd321..5aea70986c63 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -48,8 +48,10 @@ use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; +use datafusion_common::encryption::map_config_decryption_to_decryption; use itertools::Itertools; use object_store::ObjectStore; + /// Execution plan for reading one or more Parquet files. /// /// ```text @@ -474,12 +476,10 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); - let file_decryption_properties = self - .table_parquet_options() - .crypto - .file_decryption - .as_ref() - .map(|props| Arc::new(props.clone().into())); + let file_decryption_properties = map_config_decryption_to_decryption( + self.table_parquet_options().crypto.file_decryption.as_ref(), + ) + .map(Arc::new); let coerce_int96 = self .table_parquet_options