Skip to content

Commit e814b14

Browse files
committed
Make parquet an option by adding multiple cfg attributes without
significant code changes.
1 parent 3d1b23a commit e814b14

File tree

19 files changed

+323
-143
lines changed

19 files changed

+323
-143
lines changed

datafusion-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async-trait = "0.1.41"
3434
aws-config = "0.55"
3535
aws-credential-types = "0.55"
3636
clap = { version = "3", features = ["derive", "cargo"] }
37-
datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"] }
37+
datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
3838
dirs = "4.0.0"
3939
env_logger = "0.9"
4040
mimalloc = { version = "0.1", default-features = false }

datafusion/common/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ path = "src/lib.rs"
3535
[features]
3636
avro = ["apache-avro"]
3737
backtrace = []
38-
default = ["parquet"]
39-
pyarrow = ["pyo3", "arrow/pyarrow"]
38+
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
4039

4140
[dependencies]
4241
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }

datafusion/core/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
3939
backtrace = ["datafusion-common/backtrace"]
4040
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"]
4141
crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"]
42-
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"]
42+
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"]
4343
encoding_expressions = ["datafusion-physical-expr/encoding_expressions"]
4444
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
4545
force_hash_collisions = []
46-
pyarrow = ["datafusion-common/pyarrow"]
46+
parquet = ["datafusion-common/parquet", "dep:parquet"]
47+
pyarrow = ["datafusion-common/pyarrow", "parquet"]
4748
regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
4849
simd = ["arrow/simd"]
4950
unicode_expressions = ["datafusion-physical-expr/unicode_expressions", "datafusion-optimizer/unicode_expressions", "datafusion-sql/unicode_expressions"]
@@ -60,7 +61,7 @@ bytes = "1.4"
6061
bzip2 = { version = "0.4.3", optional = true }
6162
chrono = { workspace = true }
6263
dashmap = "5.4.0"
63-
datafusion-common = { path = "../common", version = "32.0.0", features = ["parquet", "object_store"] }
64+
datafusion-common = { path = "../common", version = "32.0.0", features = ["object_store"], default-features = false }
6465
datafusion-execution = { path = "../execution", version = "32.0.0" }
6566
datafusion-expr = { path = "../expr", version = "32.0.0" }
6667
datafusion-optimizer = { path = "../optimizer", version = "32.0.0", default-features = false }
@@ -79,7 +80,7 @@ num-traits = { version = "0.2", optional = true }
7980
num_cpus = "1.13.0"
8081
object_store = "0.7.0"
8182
parking_lot = "0.12"
82-
parquet = { workspace = true }
83+
parquet = { workspace = true, optional = true }
8384
percent-encoding = "2.2.0"
8485
pin-project-lite = "^0.2.7"
8586
rand = "0.8"
@@ -92,7 +93,6 @@ uuid = { version = "1.0", features = ["v4"] }
9293
xz2 = { version = "0.1", optional = true }
9394
zstd = { version = "0.12", optional = true, default-features = false }
9495

95-
9696
[dev-dependencies]
9797
async-trait = "0.1.53"
9898
bigdecimal = "0.4.1"

datafusion/core/src/dataframe.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use arrow::datatypes::{DataType, Field};
2727
use async_trait::async_trait;
2828
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
2929
use datafusion_common::file_options::json_writer::JsonWriterOptions;
30+
#[cfg(feature = "parquet")]
3031
use datafusion_common::file_options::parquet_writer::{
3132
default_builder, ParquetWriterOptions,
3233
};
@@ -35,6 +36,7 @@ use datafusion_common::{
3536
DataFusionError, FileType, FileTypeWriterOptions, SchemaError, UnnestOptions,
3637
};
3738
use datafusion_expr::dml::CopyOptions;
39+
#[cfg(feature = "parquet")]
3840
use parquet::file::properties::WriterProperties;
3941

4042
use datafusion_common::{Column, DFSchema, ScalarValue};
@@ -1054,6 +1056,7 @@ impl DataFrame {
10541056
}
10551057

10561058
/// Write a `DataFrame` to a Parquet file.
1059+
#[cfg(feature = "parquet")]
10571060
pub async fn write_parquet(
10581061
self,
10591062
path: &str,
@@ -1320,7 +1323,9 @@ mod tests {
13201323
};
13211324
use datafusion_physical_expr::expressions::Column;
13221325
use object_store::local::LocalFileSystem;
1326+
#[cfg(feature = "parquet")]
13231327
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
1328+
#[cfg(feature = "parquet")]
13241329
use parquet::file::reader::FileReader;
13251330
use tempfile::TempDir;
13261331
use url::Url;
@@ -2368,6 +2373,7 @@ mod tests {
23682373
Ok(())
23692374
}
23702375

2376+
#[cfg(feature = "parquet")]
23712377
#[tokio::test]
23722378
async fn write_parquet_with_compression() -> Result<()> {
23732379
let test_df = test_table().await?;

datafusion/core/src/datasource/file_format/file_compression_type.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,14 @@ impl FileTypeExt for FileType {
237237

238238
match self {
239239
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
240-
FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant {
240+
FileType::AVRO | FileType::ARROW => match c.variant {
241+
UNCOMPRESSED => Ok(ext),
242+
_ => Err(DataFusionError::Internal(
243+
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
244+
)),
245+
},
246+
#[cfg(feature = "parquet")]
247+
FileType::PARQUET => match c.variant {
241248
UNCOMPRESSED => Ok(ext),
242249
_ => Err(DataFusionError::Internal(
243250
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
@@ -276,10 +283,13 @@ mod tests {
276283
);
277284
}
278285

286+
let mut ty_ext_tuple = vec![];
287+
ty_ext_tuple.push((FileType::AVRO, ".avro"));
288+
#[cfg(feature = "parquet")]
289+
ty_ext_tuple.push((FileType::PARQUET, ".parquet"));
290+
279291
// Cannot specify compression for these file types
280-
for (file_type, extension) in
281-
[(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")]
282-
{
292+
for (file_type, extension) in ty_ext_tuple {
283293
assert_eq!(
284294
file_type
285295
.get_ext_with_compression(FileCompressionType::UNCOMPRESSED)

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub mod csv;
2727
pub mod file_compression_type;
2828
pub mod json;
2929
pub mod options;
30+
#[cfg(feature = "parquet")]
3031
pub mod parquet;
3132
pub mod write;
3233

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ use datafusion_common::{plan_err, DataFusionError};
2525

2626
use crate::datasource::file_format::arrow::ArrowFormat;
2727
use crate::datasource::file_format::file_compression_type::FileCompressionType;
28+
#[cfg(feature = "parquet")]
29+
use crate::datasource::file_format::parquet::ParquetFormat;
2830
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
2931
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
3032
use crate::datasource::{
31-
file_format::{
32-
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
33-
},
33+
file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
3434
listing::ListingOptions,
3535
};
3636
use crate::error::Result;
@@ -542,6 +542,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
542542
}
543543
}
544544

545+
#[cfg(feature = "parquet")]
545546
#[async_trait]
546547
impl ReadOptions<'_> for ParquetReadOptions<'_> {
547548
fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions {

datafusion/core/src/datasource/listing/table.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ use super::PartitionedFile;
2626
use crate::datasource::file_format::file_compression_type::{
2727
FileCompressionType, FileTypeExt,
2828
};
29+
#[cfg(feature = "parquet")]
30+
use crate::datasource::file_format::parquet::ParquetFormat;
2931
use crate::datasource::physical_plan::{
3032
is_plan_streaming, FileScanConfig, FileSinkConfig,
3133
};
3234
use crate::datasource::{
3335
file_format::{
3436
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
35-
parquet::ParquetFormat, FileFormat,
37+
FileFormat,
3638
},
3739
get_statistics_with_limit,
3840
listing::ListingTableUrl,
@@ -147,6 +149,7 @@ impl ListingTableConfig {
147149
FileType::JSON => Arc::new(
148150
JsonFormat::default().with_file_compression_type(file_compression_type),
149151
),
152+
#[cfg(feature = "parquet")]
150153
FileType::PARQUET => Arc::new(ParquetFormat::default()),
151154
};
152155

@@ -1004,15 +1007,15 @@ mod tests {
10041007
use std::fs::File;
10051008

10061009
use super::*;
1010+
#[cfg(feature = "parquet")]
1011+
use crate::datasource::file_format::parquet::ParquetFormat;
10071012
use crate::datasource::{provider_as_source, MemTable};
10081013
use crate::execution::options::ArrowReadOptions;
10091014
use crate::physical_plan::collect;
10101015
use crate::prelude::*;
10111016
use crate::{
10121017
assert_batches_eq,
1013-
datasource::file_format::{
1014-
avro::AvroFormat, file_compression_type::FileTypeExt, parquet::ParquetFormat,
1015-
},
1018+
datasource::file_format::{avro::AvroFormat, file_compression_type::FileTypeExt},
10161019
execution::options::ReadOptions,
10171020
logical_expr::{col, lit},
10181021
test::{columns, object_store::register_test_store},
@@ -1075,6 +1078,7 @@ mod tests {
10751078
Ok(())
10761079
}
10771080

1081+
#[cfg(feature = "parquet")]
10781082
#[tokio::test]
10791083
async fn load_table_stats_by_default() -> Result<()> {
10801084
let testdata = crate::test_util::parquet_test_data();
@@ -1098,6 +1102,7 @@ mod tests {
10981102
Ok(())
10991103
}
11001104

1105+
#[cfg(feature = "parquet")]
11011106
#[tokio::test]
11021107
async fn load_table_stats_when_no_stats() -> Result<()> {
11031108
let testdata = crate::test_util::parquet_test_data();
@@ -1122,6 +1127,7 @@ mod tests {
11221127
Ok(())
11231128
}
11241129

1130+
#[cfg(feature = "parquet")]
11251131
#[tokio::test]
11261132
async fn test_try_create_output_ordering() {
11271133
let testdata = crate::test_util::parquet_test_data();

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::datasource::file_format::avro::AvroFormat;
3232
use crate::datasource::file_format::csv::CsvFormat;
3333
use crate::datasource::file_format::file_compression_type::FileCompressionType;
3434
use crate::datasource::file_format::json::JsonFormat;
35+
#[cfg(feature = "parquet")]
3536
use crate::datasource::file_format::parquet::ParquetFormat;
3637
use crate::datasource::file_format::FileFormat;
3738
use crate::datasource::listing::{
@@ -81,6 +82,7 @@ impl TableProviderFactory for ListingTableFactory {
8182
.with_delimiter(cmd.delimiter as u8)
8283
.with_file_compression_type(file_compression_type),
8384
),
85+
#[cfg(feature = "parquet")]
8486
FileType::PARQUET => Arc::new(ParquetFormat::default()),
8587
FileType::AVRO => Arc::new(AvroFormat),
8688
FileType::JSON => Arc::new(
@@ -159,6 +161,7 @@ impl TableProviderFactory for ListingTableFactory {
159161
Some(mode) => ListingTableInsertMode::from_str(mode.as_str()),
160162
None => match file_type {
161163
FileType::CSV => Ok(ListingTableInsertMode::AppendToFile),
164+
#[cfg(feature = "parquet")]
162165
FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles),
163166
FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles),
164167
FileType::JSON => Ok(ListingTableInsertMode::AppendToFile),
@@ -199,6 +202,7 @@ impl TableProviderFactory for ListingTableFactory {
199202
json_writer_options.compression = cmd.file_compression_type;
200203
FileTypeWriterOptions::JSON(json_writer_options)
201204
}
205+
#[cfg(feature = "parquet")]
202206
FileType::PARQUET => file_type_writer_options,
203207
FileType::ARROW => file_type_writer_options,
204208
FileType::AVRO => file_type_writer_options,

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,5 @@ pub use self::provider::TableProvider;
4343
pub use self::view::ViewTable;
4444
pub use crate::logical_expr::TableType;
4545
pub use statistics::get_statistics_with_limit;
46+
#[cfg(feature = "parquet")]
4647
pub(crate) use statistics::{create_max_min_accs, get_col_stats};

0 commit comments

Comments
 (0)