diff --git a/Cargo.lock b/Cargo.lock
index 4e1d29a62342..39f6396f9392 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2172,6 +2172,7 @@ dependencies = [
"arrow-flight",
"arrow-schema",
"async-trait",
+ "base64 0.22.1",
"bytes",
"dashmap",
"datafusion",
@@ -2184,6 +2185,7 @@ dependencies = [
"nix",
"object_store",
"prost",
+ "rand 0.9.2",
"serde_json",
"tempfile",
"test-utils",
@@ -2209,6 +2211,7 @@ dependencies = [
"log",
"object_store",
"parking_lot",
+ "parquet",
"rand 0.9.2",
"tempfile",
"url",
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index c46c24d0c935..409fc12bcbc5 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -65,6 +65,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
# note only use main datafusion crate for examples
+base64 = "0.22.1"
datafusion = { workspace = true, default-features = true }
datafusion-ffi = { workspace = true }
datafusion-proto = { workspace = true }
@@ -74,6 +75,7 @@ log = { workspace = true }
mimalloc = { version = "0.1", default-features = false }
object_store = { workspace = true, features = ["aws", "http"] }
prost = { workspace = true }
+rand = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
test-utils = { path = "../test-utils" }
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 02f83b9bd0d9..75a53bc568eb 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -68,6 +68,7 @@ cargo run --example dataframe
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
- [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries
- [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
+- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs
new file mode 100644
index 000000000000..d30608ce7a1c
--- /dev/null
+++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use base64::Engine;
+use datafusion::common::extensions_options;
+use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions};
+use datafusion::dataframe::DataFrameWriteOptions;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::ListingOptions;
+use datafusion::error::Result;
+use datafusion::execution::parquet_encryption::EncryptionFactory;
+use datafusion::parquet::encryption::decrypt::KeyRetriever;
+use datafusion::parquet::encryption::{
+ decrypt::FileDecryptionProperties, encrypt::FileEncryptionProperties,
+};
+use datafusion::prelude::SessionContext;
+use futures::StreamExt;
+use object_store::path::Path;
+use rand::rand_core::{OsRng, TryRngCore};
+use std::collections::HashSet;
+use std::sync::Arc;
+use tempfile::TempDir;
+
+const ENCRYPTION_FACTORY_ID: &str = "example.mock_kms_encryption";
+
+/// This example demonstrates reading and writing Parquet files that
+/// are encrypted using Parquet Modular Encryption.
+///
+/// Compared to the `parquet_encrypted` example, where AES keys
+/// are specified directly, this example implements an `EncryptionFactory` that
+/// generates encryption keys dynamically per file.
+/// Encryption key metadata is stored inline in the Parquet files and is used to determine
+/// the decryption keys when reading the files.
+///
+/// In this example, encryption keys are simply stored base64 encoded in the Parquet metadata,
+/// which is not a secure way to store encryption keys.
+/// For production use, it is recommended to use a key-management service (KMS) to encrypt
+/// data encryption keys.
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ // Register an `EncryptionFactory` implementation to be used for Parquet encryption
+ // in the runtime environment.
+ // `EncryptionFactory` instances are registered with a name to identify them so
+ // they can be later referenced in configuration options, and it's possible to register
+ // multiple different factories to handle different ways of encrypting Parquet.
+ let encryption_factory = TestEncryptionFactory::default();
+ ctx.runtime_env().register_parquet_encryption_factory(
+ ENCRYPTION_FACTORY_ID,
+ Arc::new(encryption_factory),
+ );
+
+ // Register some simple test data
+ let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100]));
+ let c: ArrayRef = Arc::new(Int32Array::from(vec![2, 20, 20, 200]));
+ let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)])?;
+ ctx.register_batch("test_data", batch)?;
+
+ {
+ // Write and read encrypted Parquet with the programmatic API
+ let tmpdir = TempDir::new()?;
+ let table_path = format!("{}/", tmpdir.path().to_str().unwrap());
+ write_encrypted(&ctx, &table_path).await?;
+ read_encrypted(&ctx, &table_path).await?;
+ }
+
+ {
+ // Write and read encrypted Parquet with the SQL API
+ let tmpdir = TempDir::new()?;
+ let table_path = format!("{}/", tmpdir.path().to_str().unwrap());
+ write_encrypted_with_sql(&ctx, &table_path).await?;
+ read_encrypted_with_sql(&ctx, &table_path).await?;
+ }
+
+ Ok(())
+}
+
+/// Write an encrypted Parquet file
+async fn write_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> {
+ let df = ctx.table("test_data").await?;
+
+ let mut parquet_options = TableParquetOptions::new();
+ // We specify that we want to use Parquet encryption by setting the identifier of the
+ // encryption factory to use and providing the factory-specific configuration.
+ // Our encryption factory only requires specifying the columns to encrypt.
+ let encryption_config = EncryptionConfig {
+ encrypted_columns: "b,c".to_owned(),
+ };
+ parquet_options
+ .crypto
+ .configure_factory(ENCRYPTION_FACTORY_ID, &encryption_config);
+
+ df.write_parquet(
+ table_path,
+ DataFrameWriteOptions::new(),
+ Some(parquet_options),
+ )
+ .await?;
+
+ println!("Encrypted Parquet written to {table_path}");
+ Ok(())
+}
+
+/// Read from an encrypted Parquet file
+async fn read_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> {
+ let mut parquet_options = TableParquetOptions::new();
+ // Specify the encryption factory to use for decrypting Parquet.
+ // In this example, we don't require any additional configuration options when reading
+ // as we only need the key metadata from the Parquet files to determine the decryption keys.
+ parquet_options
+ .crypto
+ .configure_factory(ENCRYPTION_FACTORY_ID, &EncryptionConfig::default());
+
+ let file_format = ParquetFormat::default().with_options(parquet_options);
+ let listing_options = ListingOptions::new(Arc::new(file_format));
+
+ ctx.register_listing_table(
+ "encrypted_parquet_table",
+ &table_path,
+ listing_options.clone(),
+ None,
+ None,
+ )
+ .await?;
+
+ let mut batch_stream = ctx
+ .table("encrypted_parquet_table")
+ .await?
+ .execute_stream()
+ .await?;
+ println!("Reading encrypted Parquet as a RecordBatch stream");
+ while let Some(batch) = batch_stream.next().await {
+ let batch = batch?;
+ println!("Read batch with {} rows", batch.num_rows());
+ }
+
+ println!("Finished reading");
+ Ok(())
+}
+
+/// Write an encrypted Parquet file using only SQL syntax with string configuration
+async fn write_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> {
+ let query = format!(
+ "COPY test_data \
+ TO '{table_path}' \
+ STORED AS parquet
+ OPTIONS (\
+ 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}', \
+ 'format.crypto.factory_options.encrypted_columns' 'b,c' \
+ )"
+ );
+ let _ = ctx.sql(&query).await?.collect().await?;
+
+ println!("Encrypted Parquet written to {table_path}");
+ Ok(())
+}
+
+/// Read from an encrypted Parquet file using only the SQL API and string-based configuration
+async fn read_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> {
+ let ddl = format!(
+ "CREATE EXTERNAL TABLE encrypted_parquet_table_2 \
+ STORED AS PARQUET LOCATION '{table_path}' OPTIONS (\
+ 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}' \
+ )"
+ );
+ ctx.sql(&ddl).await?;
+ let df = ctx.sql("SELECT * FROM encrypted_parquet_table_2").await?;
+ let mut batch_stream = df.execute_stream().await?;
+
+ println!("Reading encrypted Parquet as a RecordBatch stream");
+ while let Some(batch) = batch_stream.next().await {
+ let batch = batch?;
+ println!("Read batch with {} rows", batch.num_rows());
+ }
+ println!("Finished reading");
+ Ok(())
+}
+
+// Options used to configure our example encryption factory
+extensions_options! {
+ struct EncryptionConfig {
+ /// Comma-separated list of columns to encrypt
+ pub encrypted_columns: String, default = "".to_owned()
+ }
+}
+
+/// Mock implementation of an `EncryptionFactory` that stores encryption keys
+/// base64 encoded in the Parquet encryption metadata.
+/// For production use, integrating with a key-management service to encrypt
+/// data encryption keys is recommended.
+#[derive(Default, Debug)]
+struct TestEncryptionFactory {}
+
+/// `EncryptionFactory` is a DataFusion trait for types that generate
+/// file encryption and decryption properties.
+impl EncryptionFactory for TestEncryptionFactory {
+ /// Generate file encryption properties to use when writing a Parquet file.
+ /// The `schema` is provided so that it may be used to dynamically configure
+ /// per-column encryption keys.
+ /// The file path is also available. We don't use the path in this example,
+ /// but other implementations may want to use this to compute an
+ /// AAD prefix for the file, or to allow use of external key material
+ /// (where key metadata is stored in a JSON file alongside Parquet files).
+ fn get_file_encryption_properties(
+ &self,
+ options: &EncryptionFactoryOptions,
+ schema: &SchemaRef,
+ _file_path: &Path,
+ ) -> Result> {
+ let config: EncryptionConfig = options.to_extension_options()?;
+
+ // Generate a random encryption key for this file.
+ let mut key = vec![0u8; 16];
+ OsRng.try_fill_bytes(&mut key).unwrap();
+
+ // Generate the key metadata that allows retrieving the key when reading the file.
+ let key_metadata = wrap_key(&key);
+
+ let mut builder = FileEncryptionProperties::builder(key.to_vec())
+ .with_footer_key_metadata(key_metadata.clone());
+
+ let encrypted_columns: HashSet<&str> =
+ config.encrypted_columns.split(",").collect();
+ if !encrypted_columns.is_empty() {
+ // Set up per-column encryption.
+ for field in schema.fields().iter() {
+ if encrypted_columns.contains(field.name().as_str()) {
+ // Here we re-use the same key for all encrypted columns,
+ // but new keys could also be generated per column.
+ builder = builder.with_column_key_and_metadata(
+ field.name().as_str(),
+ key.clone(),
+ key_metadata.clone(),
+ );
+ }
+ }
+ }
+
+ let encryption_properties = builder.build()?;
+
+ Ok(Some(encryption_properties))
+ }
+
+ /// Generate file decryption properties to use when reading a Parquet file.
+ /// Rather than provide the AES keys directly for decryption, we set a `KeyRetriever`
+ /// that can determine the keys using the encryption metadata.
+ fn get_file_decryption_properties(
+ &self,
+ _options: &EncryptionFactoryOptions,
+ _file_path: &Path,
+ ) -> Result > {
+ let decryption_properties =
+ FileDecryptionProperties::with_key_retriever(Arc::new(TestKeyRetriever {}))
+ .build()?;
+ Ok(Some(decryption_properties))
+ }
+}
+
+/// Mock implementation of encrypting a key that simply base64 encodes the key.
+/// Note that this is not a secure way to store encryption keys,
+/// and for production use keys should be encrypted with a KMS.
+fn wrap_key(key: &[u8]) -> Vec {
+ base64::prelude::BASE64_STANDARD
+ .encode(key)
+ .as_bytes()
+ .to_vec()
+}
+
+struct TestKeyRetriever {}
+
+impl KeyRetriever for TestKeyRetriever {
+ /// Get a data encryption key using the metadata stored in the Parquet file.
+ fn retrieve_key(
+ &self,
+ key_metadata: &[u8],
+ ) -> datafusion::parquet::errors::Result> {
+ let key_metadata = std::str::from_utf8(key_metadata)?;
+ let key = base64::prelude::BASE64_STANDARD
+ .decode(key_metadata)
+ .unwrap();
+ Ok(key)
+ }
+}
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index be2a734d37fc..939d13d9690e 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -194,6 +194,7 @@ macro_rules! config_namespace {
}
}
}
+
config_namespace! {
/// Options related to catalog and directory scanning
///
@@ -676,6 +677,31 @@ config_namespace! {
/// Optional file encryption properties
pub file_encryption: Option, default = None
+
+ /// Identifier for the encryption factory to use to create file encryption and decryption properties.
+ /// Encryption factories can be registered in the runtime environment with
+ /// `RuntimeEnv::register_parquet_encryption_factory`.
+ pub factory_id: Option, default = None
+
+ /// Any encryption factory specific options
+ pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
+ }
+}
+
+impl ParquetEncryptionOptions {
+ /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
+ pub fn configure_factory(
+ &mut self,
+ factory_id: &str,
+ config: &impl ExtensionOptions,
+ ) {
+ self.factory_id = Some(factory_id.to_owned());
+ self.factory_options.options.clear();
+ for entry in config.entries() {
+ if let Some(value) = entry.value {
+ self.factory_options.options.insert(entry.key, value);
+ }
+ }
}
}
@@ -2382,6 +2408,40 @@ impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties {
}
}
+/// Holds implementation-specific options for an encryption factory
+#[derive(Clone, Debug, Default, PartialEq)]
+pub struct EncryptionFactoryOptions {
+ pub options: HashMap,
+}
+
+impl ConfigField for EncryptionFactoryOptions {
+ fn visit(&self, v: &mut V, key: &str, _description: &'static str) {
+ for (option_key, option_value) in &self.options {
+ v.some(
+ &format!("{key}.{option_key}"),
+ option_value,
+ "Encryption factory specific option",
+ );
+ }
+ }
+
+ fn set(&mut self, key: &str, value: &str) -> Result<()> {
+ self.options.insert(key.to_owned(), value.to_owned());
+ Ok(())
+ }
+}
+
+impl EncryptionFactoryOptions {
+ /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
+ pub fn to_extension_options(&self) -> Result {
+ let mut options = T::default();
+ for (key, value) in &self.options {
+ options.set(key, value)?;
+ }
+ Ok(options)
+ }
+}
+
config_namespace! {
/// Options controlling CSV format
pub struct CsvOptions {
@@ -2821,6 +2881,36 @@ mod tests {
);
}
+ #[cfg(feature = "parquet_encryption")]
+ #[test]
+ fn parquet_encryption_factory_config() {
+ let mut parquet_options = crate::config::TableParquetOptions::default();
+
+ assert_eq!(parquet_options.crypto.factory_id, None);
+ assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
+
+ let mut input_config = TestExtensionConfig::default();
+ input_config
+ .properties
+ .insert("key1".to_string(), "value 1".to_string());
+ input_config
+ .properties
+ .insert("key2".to_string(), "value 2".to_string());
+
+ parquet_options
+ .crypto
+ .configure_factory("example_factory", &input_config);
+
+ assert_eq!(
+ parquet_options.crypto.factory_id,
+ Some("example_factory".to_string())
+ );
+ let factory_options = &parquet_options.crypto.factory_options.options;
+ assert_eq!(factory_options.len(), 2);
+ assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
+ assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
+ }
+
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_entry() {
diff --git a/datafusion/common/src/encryption.rs b/datafusion/common/src/encryption.rs
index 5d50d4a9efd3..5dd603a08112 100644
--- a/datafusion/common/src/encryption.rs
+++ b/datafusion/common/src/encryption.rs
@@ -28,24 +28,7 @@ pub struct FileDecryptionProperties;
#[cfg(not(feature = "parquet_encryption"))]
pub struct FileEncryptionProperties;
-#[cfg(feature = "parquet")]
-use crate::config::ParquetEncryptionOptions;
pub use crate::config::{ConfigFileDecryptionProperties, ConfigFileEncryptionProperties};
-#[cfg(feature = "parquet")]
-use parquet::file::properties::WriterPropertiesBuilder;
-
-#[cfg(feature = "parquet")]
-pub fn add_crypto_to_writer_properties(
- #[allow(unused)] crypto: &ParquetEncryptionOptions,
- #[allow(unused_mut)] mut builder: WriterPropertiesBuilder,
-) -> WriterPropertiesBuilder {
- #[cfg(feature = "parquet_encryption")]
- if let Some(file_encryption_properties) = &crypto.file_encryption {
- builder = builder
- .with_file_encryption_properties(file_encryption_properties.clone().into());
- }
- builder
-}
#[cfg(feature = "parquet_encryption")]
pub fn map_encryption_to_config_encryption(
@@ -63,14 +46,14 @@ pub fn map_encryption_to_config_encryption(
#[cfg(feature = "parquet_encryption")]
pub fn map_config_decryption_to_decryption(
- decryption: Option<&ConfigFileDecryptionProperties>,
-) -> Option {
- decryption.map(|fd| fd.clone().into())
+ decryption: &ConfigFileDecryptionProperties,
+) -> FileDecryptionProperties {
+ decryption.clone().into()
}
#[cfg(not(feature = "parquet_encryption"))]
pub fn map_config_decryption_to_decryption(
- _decryption: Option<&ConfigFileDecryptionProperties>,
-) -> Option {
- None
+ _decryption: &ConfigFileDecryptionProperties,
+) -> FileDecryptionProperties {
+ FileDecryptionProperties {}
}
diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs
index 91683ccb1b37..185826aef47d 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -27,7 +27,6 @@ use crate::{
use arrow::datatypes::Schema;
// TODO: handle once deprecated
-use crate::encryption::add_crypto_to_writer_properties;
#[allow(deprecated)]
use parquet::{
arrow::ARROW_SCHEMA_META_KEY,
@@ -90,19 +89,19 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
/// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
+ /// Note that any encryption options are ignored as building the `FileEncryptionProperties`
+ /// might require other inputs besides the [`TableParquetOptions`].
fn try_from(table_parquet_options: &TableParquetOptions) -> Result {
// Table options include kv_metadata and col-specific options
let TableParquetOptions {
global,
column_specific_options,
key_value_metadata,
- crypto,
+ crypto: _,
} = table_parquet_options;
let mut builder = global.into_writer_properties_builder()?;
- builder = add_crypto_to_writer_properties(crypto, builder);
-
// check that the arrow schema is present in the kv_metadata, if configured to do so
if !global.skip_arrow_metadata
&& !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
@@ -617,6 +616,8 @@ mod tests {
crypto: ParquetEncryptionOptions {
file_encryption: fep,
file_decryption: None,
+ factory_id: None,
+ factory_options: Default::default(),
},
}
}
diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs
index 3cc177feac0f..b4b88ba5aa79 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -567,6 +567,10 @@ async fn setup_context(
disk_manager: Arc::new(disk_manager),
cache_manager: runtime.cache_manager.clone(),
object_store_registry: runtime.object_store_registry.clone(),
+ #[cfg(feature = "parquet_encryption")]
+ parquet_encryption_factory_registry: runtime
+ .parquet_encryption_factory_registry
+ .clone(),
});
let config = SessionConfig::new()
diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs
index 8e90b9aaa955..a71a4f1ea24f 100644
--- a/datafusion/core/tests/parquet/encryption.rs
+++ b/datafusion/core/tests/parquet/encryption.rs
@@ -15,27 +15,29 @@
// specific language governing permissions and limitations
// under the License.
-//! non trivial integration testing for parquet predicate pushdown
-//!
-//! Testing hints: If you run this test with --nocapture it will tell you where
-//! the generated parquet file went. You can then test it and try out various queries
-//! datafusion-cli like:
-//!
-//! ```sql
-//! create external table data stored as parquet location 'data.parquet';
-//! select * from data limit 10;
-//! ```
+//! Tests for reading and writing Parquet files that use Parquet modular encryption
+use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::record_batch::RecordBatch;
+use arrow_schema::{DataType, SchemaRef};
+use datafusion::dataframe::DataFrameWriteOptions;
+use datafusion::datasource::listing::ListingOptions;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
-use std::fs::File;
-use std::path::{Path, PathBuf};
-use std::sync::Arc;
-
+use datafusion_common::config::{EncryptionFactoryOptions, TableParquetOptions};
+use datafusion_common::{assert_batches_sorted_eq, DataFusionError};
+use datafusion_datasource_parquet::ParquetFormat;
+use datafusion_execution::parquet_encryption::EncryptionFactory;
+use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::ArrowWriter;
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;
+use parquet::file::column_crypto_metadata::ColumnCryptoMetaData;
use parquet::file::properties::WriterProperties;
+use std::collections::HashMap;
+use std::fs::File;
+use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicU8, Ordering};
+use std::sync::{Arc, Mutex};
use tempfile::TempDir;
async fn read_parquet_test_data<'a, T: Into>(
@@ -74,7 +76,6 @@ pub fn write_batches(
Ok(num_rows)
}
-#[cfg(feature = "parquet_encryption")]
#[tokio::test]
async fn round_trip_encryption() {
let ctx: SessionContext = SessionContext::new();
@@ -128,3 +129,226 @@ async fn round_trip_encryption() {
assert_eq!(num_rows_written, num_rows_read);
}
+
+#[tokio::test]
+async fn round_trip_parquet_with_encryption_factory() {
+ let ctx = SessionContext::new();
+ let encryption_factory = Arc::new(MockEncryptionFactory::default());
+ ctx.runtime_env().register_parquet_encryption_factory(
+ "test_encryption_factory",
+ Arc::clone(&encryption_factory) as Arc,
+ );
+
+ let tmpdir = TempDir::new().unwrap();
+
+ // Register some simple test data
+ let strings: ArrayRef =
+ Arc::new(StringArray::from(vec!["a", "b", "c", "a", "b", "c"]));
+ let x1: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 11, 100, 101, 111]));
+ let x2: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
+ let batch =
+ RecordBatch::try_from_iter(vec![("string", strings), ("x1", x1), ("x2", x2)])
+ .unwrap();
+ let test_data_schema = batch.schema();
+ ctx.register_batch("test_data", batch).unwrap();
+ let df = ctx.table("test_data").await.unwrap();
+
+ // Write encrypted Parquet, partitioned by string column into separate files
+ let mut parquet_options = TableParquetOptions::new();
+ parquet_options.crypto.factory_id = Some("test_encryption_factory".to_string());
+ parquet_options
+ .crypto
+ .factory_options
+ .options
+ .insert("test_key".to_string(), "test value".to_string());
+
+ let df_write_options =
+ DataFrameWriteOptions::default().with_partition_by(vec!["string".to_string()]);
+ df.write_parquet(
+ tmpdir.path().to_str().unwrap(),
+ df_write_options,
+ Some(parquet_options.clone()),
+ )
+ .await
+ .unwrap();
+
+ // Crypto factory should have generated one key per partition file
+ assert_eq!(encryption_factory.encryption_keys.lock().unwrap().len(), 3);
+
+ verify_table_encrypted(tmpdir.path(), &encryption_factory).unwrap();
+
+ // Registering table without decryption properties should fail
+ let table_path = format!("file://{}/", tmpdir.path().to_str().unwrap());
+ let without_decryption_register = ctx
+ .register_listing_table(
+ "parquet_missing_decryption",
+ &table_path,
+ ListingOptions::new(Arc::new(ParquetFormat::default())),
+ None,
+ None,
+ )
+ .await;
+ assert!(matches!(
+ without_decryption_register.unwrap_err(),
+ DataFusionError::ParquetError(_)
+ ));
+
+ // Registering table succeeds if schema is provided
+ ctx.register_listing_table(
+ "parquet_missing_decryption",
+ &table_path,
+ ListingOptions::new(Arc::new(ParquetFormat::default())),
+ Some(test_data_schema),
+ None,
+ )
+ .await
+ .unwrap();
+
+ // But trying to read from the table should fail
+ let without_decryption_read = ctx
+ .table("parquet_missing_decryption")
+ .await
+ .unwrap()
+ .collect()
+ .await;
+ assert!(matches!(
+ without_decryption_read.unwrap_err(),
+ DataFusionError::ParquetError(_)
+ ));
+
+ // Register table with encryption factory specified
+ let listing_options = ListingOptions::new(Arc::new(
+ ParquetFormat::default().with_options(parquet_options),
+ ))
+ .with_table_partition_cols(vec![("string".to_string(), DataType::Utf8)]);
+ ctx.register_listing_table(
+ "parquet_with_decryption",
+ &table_path,
+ listing_options,
+ None,
+ None,
+ )
+ .await
+ .unwrap();
+
+ // Can read correct data when encryption factory has been specified
+ let table = ctx
+ .table("parquet_with_decryption")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let expected = [
+ "+-----+----+--------+",
+ "| x1 | x2 | string |",
+ "+-----+----+--------+",
+ "| 1 | 1 | a |",
+ "| 100 | 4 | a |",
+ "| 10 | 2 | b |",
+ "| 101 | 5 | b |",
+ "| 11 | 3 | c |",
+ "| 111 | 6 | c |",
+ "+-----+----+--------+",
+ ];
+ assert_batches_sorted_eq!(expected, &table);
+}
+
+fn verify_table_encrypted(
+ table_path: &Path,
+ encryption_factory: &Arc,
+) -> datafusion_common::Result<()> {
+ let mut directories = vec![table_path.to_path_buf()];
+ let mut files_visited = 0;
+ while let Some(directory) = directories.pop() {
+ for entry in std::fs::read_dir(&directory)? {
+ let path = entry?.path();
+ if path.is_dir() {
+ directories.push(path);
+ } else {
+ verify_file_encrypted(&path, encryption_factory)?;
+ files_visited += 1;
+ }
+ }
+ }
+ assert!(files_visited > 0);
+ Ok(())
+}
+
+fn verify_file_encrypted(
+ file_path: &Path,
+ encryption_factory: &Arc,
+) -> datafusion_common::Result<()> {
+ let mut options = EncryptionFactoryOptions::default();
+ options
+ .options
+ .insert("test_key".to_string(), "test value".to_string());
+ let object_path = object_store::path::Path::from(file_path.to_str().unwrap());
+ let decryption_properties = encryption_factory
+ .get_file_decryption_properties(&options, &object_path)?
+ .unwrap();
+
+ let reader_options =
+ ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+ let file = File::open(file_path)?;
+ let reader_metadata = ArrowReaderMetadata::load(&file, reader_options)?;
+ let metadata = reader_metadata.metadata();
+ assert!(metadata.num_row_groups() > 0);
+ for row_group in metadata.row_groups() {
+ assert!(row_group.num_columns() > 0);
+ for col in row_group.columns() {
+ assert!(matches!(
+ col.crypto_metadata(),
+ Some(ColumnCryptoMetaData::EncryptionWithFooterKey)
+ ));
+ }
+ }
+ Ok(())
+}
+
+/// Encryption factory implementation for use in tests,
+/// which generates encryption keys in a sequence
+#[derive(Debug, Default)]
+struct MockEncryptionFactory {
+ pub encryption_keys: Mutex>>,
+ pub counter: AtomicU8,
+}
+
+impl EncryptionFactory for MockEncryptionFactory {
+ fn get_file_encryption_properties(
+ &self,
+ config: &EncryptionFactoryOptions,
+ _schema: &SchemaRef,
+ file_path: &object_store::path::Path,
+ ) -> datafusion_common::Result> {
+ assert_eq!(
+ config.options.get("test_key"),
+ Some(&"test value".to_string())
+ );
+ let file_idx = self.counter.fetch_add(1, Ordering::Relaxed);
+ let key = vec![file_idx, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
+ let mut keys = self.encryption_keys.lock().unwrap();
+ keys.insert(file_path.clone(), key.clone());
+ let encryption_properties = FileEncryptionProperties::builder(key).build()?;
+ Ok(Some(encryption_properties))
+ }
+
+ fn get_file_decryption_properties(
+ &self,
+ config: &EncryptionFactoryOptions,
+ file_path: &object_store::path::Path,
+ ) -> datafusion_common::Result > {
+ assert_eq!(
+ config.options.get("test_key"),
+ Some(&"test value".to_string())
+ );
+ let keys = self.encryption_keys.lock().unwrap();
+ let key = keys.get(file_path).ok_or_else(|| {
+ DataFusionError::Execution(format!("No key for file {file_path:?}"))
+ })?;
+ let decryption_properties =
+ FileDecryptionProperties::builder(key.clone()).build()?;
+ Ok(Some(decryption_properties))
+ }
+}
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index 82444e8b6ed5..c44d14abd381 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -43,6 +43,7 @@ use std::sync::Arc;
use tempfile::NamedTempFile;
mod custom_reader;
+#[cfg(feature = "parquet_encryption")]
mod encryption;
mod external_access_plan;
mod file_statistics;
diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml
index 8a75a445c8ff..6bccd76b60fc 100644
--- a/datafusion/datasource-parquet/Cargo.toml
+++ b/datafusion/datasource-parquet/Cargo.toml
@@ -71,5 +71,6 @@ path = "src/mod.rs"
parquet_encryption = [
"parquet/encryption",
"datafusion-common/parquet_encryption",
+ "datafusion-execution/parquet_encryption",
"dep:hex",
]
diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs
index 8276a3a8ff3c..e7c449468543 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -39,9 +39,9 @@ use datafusion_datasource::write::demux::DemuxedStreamReceiver;
use arrow::compute::sum;
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
-use datafusion_common::encryption::{
- map_config_decryption_to_decryption, FileDecryptionProperties,
-};
+#[cfg(feature = "parquet_encryption")]
+use datafusion_common::encryption::map_config_decryption_to_decryption;
+use datafusion_common::encryption::FileDecryptionProperties;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
@@ -68,6 +68,7 @@ use crate::source::{parse_coerce_int96_string, ParquetSource};
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::source::DataSourceExec;
+use datafusion_execution::runtime_env::RuntimeEnv;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use log::debug;
@@ -305,24 +306,60 @@ fn clear_metadata(
}
async fn fetch_schema_with_location(
+ state: &dyn Session,
store: &dyn ObjectStore,
+ options: &TableParquetOptions,
file: &ObjectMeta,
metadata_size_hint: Option,
- file_decryption_properties: Option<&FileDecryptionProperties>,
coerce_int96: Option,
) -> Result<(Path, Schema)> {
+ let file_decryption_properties =
+ get_file_decryption_properties(state, options, &file.location)?;
let loc_path = file.location.clone();
let schema = fetch_schema(
store,
file,
metadata_size_hint,
- file_decryption_properties,
+ file_decryption_properties.as_ref(),
coerce_int96,
)
.await?;
Ok((loc_path, schema))
}
+#[cfg(feature = "parquet_encryption")]
+fn get_file_decryption_properties(
+ state: &dyn Session,
+ options: &TableParquetOptions,
+ file_path: &Path,
+) -> Result> {
+ let file_decryption_properties: Option =
+ match &options.crypto.file_decryption {
+ Some(cfd) => Some(map_config_decryption_to_decryption(cfd)),
+ None => match &options.crypto.factory_id {
+ Some(factory_id) => {
+ let factory =
+ state.runtime_env().parquet_encryption_factory(factory_id)?;
+ factory.get_file_decryption_properties(
+ &options.crypto.factory_options,
+ file_path,
+ )?
+ }
+ None => None,
+ },
+ };
+ Ok(file_decryption_properties)
+}
+
+#[cfg(not(feature = "parquet_encryption"))]
+fn get_file_decryption_properties(
+ _state: &dyn Session,
+ _options: &TableParquetOptions,
+ _file_path: &Path,
+) -> Result> {
+ Ok(None)
+}
+
#[async_trait]
impl FileFormat for ParquetFormat {
fn as_any(&self) -> &dyn Any {
@@ -358,18 +395,15 @@ impl FileFormat for ParquetFormat {
Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
None => None,
};
- let file_decryption_properties: Option =
- map_config_decryption_to_decryption(
- self.options.crypto.file_decryption.as_ref(),
- );
let mut schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| {
fetch_schema_with_location(
+ state,
store.as_ref(),
+ &self.options,
object,
self.metadata_size_hint(),
- file_decryption_properties.as_ref(),
coerce_int96,
)
})
@@ -414,15 +448,13 @@ impl FileFormat for ParquetFormat {
async fn infer_stats(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
store: &Arc,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result {
- let file_decryption_properties: Option =
- map_config_decryption_to_decryption(
- self.options.crypto.file_decryption.as_ref(),
- );
+ let file_decryption_properties =
+ get_file_decryption_properties(state, &self.options, &object.location)?;
let stats = fetch_statistics(
store.as_ref(),
table_schema,
@@ -459,6 +491,9 @@ impl FileFormat for ParquetFormat {
if let Some(metadata_size_hint) = metadata_size_hint {
source = source.with_metadata_size_hint(metadata_size_hint)
}
+
+ source = self.set_source_encryption_factory(source, state)?;
+
// Apply schema adapter factory before building the new config
let file_source = source.apply_schema_adapter(&conf)?;
@@ -489,6 +524,41 @@ impl FileFormat for ParquetFormat {
}
}
+#[cfg(feature = "parquet_encryption")]
+impl ParquetFormat {
+ fn set_source_encryption_factory(
+ &self,
+ source: ParquetSource,
+ state: &dyn Session,
+ ) -> Result {
+ if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
+ Ok(source.with_encryption_factory(
+ state
+ .runtime_env()
+ .parquet_encryption_factory(encryption_factory_id)?,
+ ))
+ } else {
+ Ok(source)
+ }
+ }
+}
+
+#[cfg(not(feature = "parquet_encryption"))]
+impl ParquetFormat {
+ fn set_source_encryption_factory(
+ &self,
+ source: ParquetSource,
+ _state: &dyn Session,
+ ) -> Result {
+ if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
+ Err(DataFusionError::Configuration(
+ format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled")))
+ } else {
+ Ok(source)
+ }
+ }
+}
+
/// Apply necessary schema type coercions to make file schema match table schema.
///
/// This function performs two main types of transformations in a single pass:
@@ -1243,7 +1313,11 @@ impl ParquetSink {
/// Create writer properties based upon configuration settings,
/// including partitioning and the inclusion of arrow schema metadata.
- fn create_writer_props(&self) -> Result {
+ fn create_writer_props(
+ &self,
+ runtime: &Arc,
+ path: &Path,
+ ) -> Result {
let schema = if self.parquet_options.global.allow_single_file_parallelism {
// If parallelizing writes, we may be also be doing hive style partitioning
// into multiple files which impacts the schema per file.
@@ -1260,7 +1334,15 @@ impl ParquetSink {
parquet_opts.arrow_schema(schema);
}
- Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build())
+ let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
+ builder = set_writer_encryption_properties(
+ builder,
+ runtime,
+ &parquet_opts,
+ schema,
+ path,
+ )?;
+ Ok(builder.build())
}
/// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
@@ -1299,6 +1381,48 @@ impl ParquetSink {
}
}
+#[cfg(feature = "parquet_encryption")]
+fn set_writer_encryption_properties(
+ builder: WriterPropertiesBuilder,
+ runtime: &Arc,
+ parquet_opts: &TableParquetOptions,
+ schema: &Arc,
+ path: &Path,
+) -> Result {
+ if let Some(file_encryption_properties) = &parquet_opts.crypto.file_encryption {
+ // Encryption properties have been specified directly
+ return Ok(builder
+ .with_file_encryption_properties(file_encryption_properties.clone().into()));
+ } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
+ // Encryption properties will be generated by an encryption factory
+ let encryption_factory =
+ runtime.parquet_encryption_factory(encryption_factory_id)?;
+ let file_encryption_properties = encryption_factory
+ .get_file_encryption_properties(
+ &parquet_opts.crypto.factory_options,
+ schema,
+ path,
+ )?;
+ if let Some(file_encryption_properties) = file_encryption_properties {
+ return Ok(
+ builder.with_file_encryption_properties(file_encryption_properties)
+ );
+ }
+ }
+ Ok(builder)
+}
+
+#[cfg(not(feature = "parquet_encryption"))]
+fn set_writer_encryption_properties(
+ builder: WriterPropertiesBuilder,
+ _runtime: &Arc,
+ _parquet_opts: &TableParquetOptions,
+ _schema: &Arc,
+ _path: &Path,
+) -> Result {
+ Ok(builder)
+}
+
#[async_trait]
impl FileSink for ParquetSink {
fn config(&self) -> &FileSinkConfig {
@@ -1316,7 +1440,9 @@ impl FileSink for ParquetSink {
let mut allow_single_file_parallelism =
parquet_opts.global.allow_single_file_parallelism;
- if parquet_opts.crypto.file_encryption.is_some() {
+ if parquet_opts.crypto.file_encryption.is_some()
+ || parquet_opts.crypto.factory_id.is_some()
+ {
// For now, arrow-rs does not support parallel writes with encryption
// See https://github.com/apache/arrow-rs/issues/7359
allow_single_file_parallelism = false;
@@ -1326,7 +1452,7 @@ impl FileSink for ParquetSink {
std::result::Result<(Path, FileMetaData), DataFusionError>,
> = JoinSet::new();
- let parquet_props = self.create_writer_props()?;
+ let runtime = context.runtime_env();
let parallel_options = ParallelParquetWriterOptions {
max_parallel_row_groups: parquet_opts
.global
@@ -1337,6 +1463,7 @@ impl FileSink for ParquetSink {
};
while let Some((path, mut rx)) = file_stream_rx.recv().await {
+ let parquet_props = self.create_writer_props(&runtime, &path)?;
if !allow_single_file_parallelism {
let mut writer = self
.create_async_arrow_writer(
diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs
index 7c208d1426ac..62dc0fccc21a 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -43,6 +43,10 @@ use datafusion_physical_expr_common::physical_expr::{
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};
+#[cfg(feature = "parquet_encryption")]
+use datafusion_common::config::EncryptionFactoryOptions;
+#[cfg(feature = "parquet_encryption")]
+use datafusion_execution::parquet_encryption::EncryptionFactory;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
@@ -96,13 +100,18 @@ pub(super) struct ParquetOpener {
pub file_decryption_properties: Option>,
/// Rewrite expressions in the context of the file schema
pub(crate) expr_adapter_factory: Option>,
+ /// Optional factory to create file decryption properties dynamically
+ #[cfg(feature = "parquet_encryption")]
+ pub encryption_factory:
+ Option<(Arc, EncryptionFactoryOptions)>,
}
impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result {
let file_range = file_meta.range.clone();
let extensions = file_meta.extensions.clone();
- let file_name = file_meta.location().to_string();
+ let file_location = file_meta.location();
+ let file_name = file_location.to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
@@ -141,7 +150,8 @@ impl FileOpener for ParquetOpener {
let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);
let mut enable_page_index = self.enable_page_index;
- let file_decryption_properties = self.file_decryption_properties.clone();
+ let file_decryption_properties =
+ self.get_file_decryption_properties(file_location)?;
// For now, page index does not work with encrypted files. See:
// https://github.com/apache/arrow-rs/issues/7629
@@ -411,6 +421,36 @@ impl FileOpener for ParquetOpener {
}
}
+#[cfg(feature = "parquet_encryption")]
+impl ParquetOpener {
+ fn get_file_decryption_properties(
+ &self,
+ file_location: &object_store::path::Path,
+ ) -> Result>> {
+ match &self.file_decryption_properties {
+ Some(file_decryption_properties) => {
+ Ok(Some(Arc::clone(file_decryption_properties)))
+ }
+ None => match &self.encryption_factory {
+ Some((encryption_factory, encryption_config)) => Ok(encryption_factory
+ .get_file_decryption_properties(encryption_config, file_location)?
+ .map(Arc::new)),
+ None => Ok(None),
+ },
+ }
+ }
+}
+
+#[cfg(not(feature = "parquet_encryption"))]
+impl ParquetOpener {
+ fn get_file_decryption_properties(
+ &self,
+ _file_location: &object_store::path::Path,
+ ) -> Result >> {
+ Ok(self.file_decryption_properties.clone())
+ }
+}
+
/// Return the initial [`ParquetAccessPlan`]
///
/// If the user has supplied one as an extension, use that
@@ -672,6 +712,8 @@ mod test {
coerce_int96: None,
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
}
};
@@ -758,6 +800,8 @@ mod test {
coerce_int96: None,
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
}
};
@@ -860,6 +904,8 @@ mod test {
coerce_int96: None,
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
}
};
let make_meta = || FileMeta {
@@ -972,6 +1018,8 @@ mod test {
coerce_int96: None,
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
}
};
@@ -1085,6 +1133,8 @@ mod test {
coerce_int96: None,
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
}
};
@@ -1265,6 +1315,8 @@ mod test {
coerce_int96: None,
file_decryption_properties: None,
expr_adapter_factory: None,
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: None,
};
let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema);
diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs
index 430cb5ce54af..366d42700fcf 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -27,6 +27,8 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use datafusion_common::config::ConfigOptions;
+#[cfg(feature = "parquet_encryption")]
+use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::schema_adapter::{
@@ -51,6 +53,8 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
use datafusion_common::encryption::map_config_decryption_to_decryption;
+#[cfg(feature = "parquet_encryption")]
+use datafusion_execution::parquet_encryption::EncryptionFactory;
use itertools::Itertools;
use object_store::ObjectStore;
@@ -282,6 +286,8 @@ pub struct ParquetSource {
/// Optional hint for the size of the parquet metadata
pub(crate) metadata_size_hint: Option,
pub(crate) projected_statistics: Option,
+ #[cfg(feature = "parquet_encryption")]
+ pub(crate) encryption_factory: Option>,
}
impl ParquetSource {
@@ -320,6 +326,16 @@ impl ParquetSource {
conf
}
+ /// Set the encryption factory to use to generate file decryption properties
+ #[cfg(feature = "parquet_encryption")]
+ pub fn with_encryption_factory(
+ mut self,
+ encryption_factory: Arc,
+ ) -> Self {
+ self.encryption_factory = Some(encryption_factory);
+ self
+ }
+
/// Options passed to the parquet reader for this scan
pub fn table_parquet_options(&self) -> &TableParquetOptions {
&self.table_parquet_options
@@ -431,6 +447,19 @@ impl ParquetSource {
Ok(file_source)
}
}
+
+ #[cfg(feature = "parquet_encryption")]
+ fn get_encryption_factory_with_config(
+ &self,
+ ) -> Option<(Arc, EncryptionFactoryOptions)> {
+ match &self.encryption_factory {
+ None => None,
+ Some(factory) => Some((
+ Arc::clone(factory),
+ self.table_parquet_options.crypto.factory_options.clone(),
+ )),
+ }
+ }
}
/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
@@ -512,10 +541,13 @@ impl FileSource for ParquetSource {
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
});
- let file_decryption_properties = map_config_decryption_to_decryption(
- self.table_parquet_options().crypto.file_decryption.as_ref(),
- )
- .map(Arc::new);
+ let file_decryption_properties = self
+ .table_parquet_options()
+ .crypto
+ .file_decryption
+ .as_ref()
+ .map(map_config_decryption_to_decryption)
+ .map(Arc::new);
let coerce_int96 = self
.table_parquet_options
@@ -546,6 +578,8 @@ impl FileSource for ParquetSource {
coerce_int96,
file_decryption_properties,
expr_adapter_factory,
+ #[cfg(feature = "parquet_encryption")]
+ encryption_factory: self.get_encryption_factory_with_config(),
})
}
diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml
index 5988d3a33660..f6d02615e39a 100644
--- a/datafusion/execution/Cargo.toml
+++ b/datafusion/execution/Cargo.toml
@@ -37,6 +37,11 @@ workspace = true
[lib]
name = "datafusion_execution"
+[features]
+parquet_encryption = [
+ "parquet/encryption",
+]
+
[dependencies]
arrow = { workspace = true }
dashmap = { workspace = true }
@@ -46,6 +51,7 @@ futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true, features = ["fs"] }
parking_lot = { workspace = true }
+parquet = { workspace = true, optional = true }
rand = { workspace = true }
tempfile = { workspace = true }
url = { workspace = true }
diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs
index 6a0a4b6322ee..e971e838a6e5 100644
--- a/datafusion/execution/src/lib.rs
+++ b/datafusion/execution/src/lib.rs
@@ -31,6 +31,8 @@ pub mod config;
pub mod disk_manager;
pub mod memory_pool;
pub mod object_store;
+#[cfg(feature = "parquet_encryption")]
+pub mod parquet_encryption;
pub mod runtime_env;
mod stream;
mod task;
diff --git a/datafusion/execution/src/parquet_encryption.rs b/datafusion/execution/src/parquet_encryption.rs
new file mode 100644
index 000000000000..13a18390d02a
--- /dev/null
+++ b/datafusion/execution/src/parquet_encryption.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::SchemaRef;
+use dashmap::DashMap;
+use datafusion_common::config::EncryptionFactoryOptions;
+use datafusion_common::error::Result;
+use datafusion_common::DataFusionError;
+use object_store::path::Path;
+use parquet::encryption::decrypt::FileDecryptionProperties;
+use parquet::encryption::encrypt::FileEncryptionProperties;
+use std::sync::Arc;
+
+/// Trait for types that generate file encryption and decryption properties to
+/// write and read encrypted Parquet files.
+/// This allows flexibility in how encryption keys are managed, for example, to
+/// integrate with a user's key management service (KMS).
+/// For example usage, see the [`parquet_encrypted_with_kms` example].
+///
+/// [`parquet_encrypted_with_kms` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_encrypted_with_kms.rs
+pub trait EncryptionFactory: Send + Sync + std::fmt::Debug + 'static {
+ /// Generate file encryption properties to use when writing a Parquet file.
+ fn get_file_encryption_properties(
+ &self,
+ config: &EncryptionFactoryOptions,
+ schema: &SchemaRef,
+ file_path: &Path,
+ ) -> Result>;
+
+ /// Generate file decryption properties to use when reading a Parquet file.
+ fn get_file_decryption_properties(
+ &self,
+ config: &EncryptionFactoryOptions,
+ file_path: &Path,
+ ) -> Result >;
+}
+
+/// Stores [`EncryptionFactory`] implementations that can be retrieved by a unique string identifier
+#[derive(Clone, Debug, Default)]
+pub struct EncryptionFactoryRegistry {
+ factories: DashMap>,
+}
+
+impl EncryptionFactoryRegistry {
+ /// Register an [`EncryptionFactory`] with an associated identifier that can be later
+ /// used to configure encryption when reading or writing Parquet.
+ /// If an encryption factory with the same identifier was already registered, it is replaced and returned.
+ pub fn register_factory(
+ &self,
+ id: &str,
+ factory: Arc,
+ ) -> Option> {
+ self.factories.insert(id.to_owned(), factory)
+ }
+
+ /// Retrieve an [`EncryptionFactory`] by its identifier
+ pub fn get_factory(&self, id: &str) -> Result> {
+ self.factories
+ .get(id)
+ .map(|f| Arc::clone(f.value()))
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "No Parquet encryption factory found for id '{id}'"
+ ))
+ })
+ }
+}
diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs
index fc26f997a2e0..3d0e2cfaaf7c 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -29,6 +29,8 @@ use crate::{
};
use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
+#[cfg(feature = "parquet_encryption")]
+use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
use datafusion_common::{config::ConfigEntry, Result};
use object_store::ObjectStore;
use std::path::PathBuf;
@@ -78,6 +80,9 @@ pub struct RuntimeEnv {
pub cache_manager: Arc,
/// Object Store Registry
pub object_store_registry: Arc,
+ /// Parquet encryption factory registry
+ #[cfg(feature = "parquet_encryption")]
+ pub parquet_encryption_factory_registry: Arc,
}
impl Debug for RuntimeEnv {
@@ -154,6 +159,28 @@ impl RuntimeEnv {
pub fn object_store(&self, url: impl AsRef) -> Result> {
self.object_store_registry.get_store(url.as_ref())
}
+
+ /// Register an [`EncryptionFactory`] with an associated identifier that can be later
+ /// used to configure encryption when reading or writing Parquet.
+ /// If an encryption factory with the same identifier was already registered, it is replaced and returned.
+ #[cfg(feature = "parquet_encryption")]
+ pub fn register_parquet_encryption_factory(
+ &self,
+ id: &str,
+ encryption_factory: Arc,
+ ) -> Option> {
+ self.parquet_encryption_factory_registry
+ .register_factory(id, encryption_factory)
+ }
+
+ /// Retrieve an [`EncryptionFactory`] by its identifier
+ #[cfg(feature = "parquet_encryption")]
+ pub fn parquet_encryption_factory(
+ &self,
+ id: &str,
+ ) -> Result> {
+ self.parquet_encryption_factory_registry.get_factory(id)
+ }
}
impl Default for RuntimeEnv {
@@ -185,6 +212,9 @@ pub struct RuntimeEnvBuilder {
pub cache_manager: CacheManagerConfig,
/// ObjectStoreRegistry to get object store based on url
pub object_store_registry: Arc,
+ /// Parquet encryption factory registry
+ #[cfg(feature = "parquet_encryption")]
+ pub parquet_encryption_factory_registry: Arc,
}
impl Default for RuntimeEnvBuilder {
@@ -202,6 +232,8 @@ impl RuntimeEnvBuilder {
memory_pool: Default::default(),
cache_manager: Default::default(),
object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
+ #[cfg(feature = "parquet_encryption")]
+ parquet_encryption_factory_registry: Default::default(),
}
}
@@ -282,6 +314,8 @@ impl RuntimeEnvBuilder {
memory_pool,
cache_manager,
object_store_registry,
+ #[cfg(feature = "parquet_encryption")]
+ parquet_encryption_factory_registry,
} = self;
let memory_pool =
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
@@ -296,6 +330,8 @@ impl RuntimeEnvBuilder {
},
cache_manager: CacheManager::try_new(&cache_manager)?,
object_store_registry,
+ #[cfg(feature = "parquet_encryption")]
+ parquet_encryption_factory_registry,
})
}
@@ -326,6 +362,10 @@ impl RuntimeEnvBuilder {
memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
cache_manager: cache_config,
object_store_registry: Arc::clone(&runtime_env.object_store_registry),
+ #[cfg(feature = "parquet_encryption")]
+ parquet_encryption_factory_registry: Arc::clone(
+ &runtime_env.parquet_encryption_factory_registry,
+ ),
}
}