Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 102 additions & 242 deletions Cargo.lock

Large diffs are not rendered by default.

37 changes: 33 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ half = { version = "2.6.0", default-features = false }
hashbrown = { version = "0.14.5", features = ["raw"] }
hex = { version = "0.4.3" }
indexmap = "2.11.4"
insta = { version = "1.43.2", features = ["glob", "filters"] }
itertools = "0.14"
log = "^0.4"
object_store = { version = "0.12.4", default-features = false }
Expand All @@ -162,11 +163,10 @@ parquet = { version = "56.2.0", default-features = false, features = [
"async",
"object_store",
] }
pbjson = { version = "0.7.0" }
pbjson-types = "0.7"
pbjson = { version = "0.8.0" }
pbjson-types = "0.8"
# Should match arrow-flight's version of prost.
insta = { version = "1.43.2", features = ["glob", "filters"] }
prost = "0.13.1"
prost = "0.14.1"
rand = "0.9"
recursive = "0.1.1"
regex = "1.11"
Expand Down Expand Up @@ -258,3 +258,32 @@ incremental = false
inherits = "release"
debug = true
strip = false

[patch.crates-io]
# Local pin
# arrow = { path = "/Users/andrewlamb/Software/arrow-rs/arrow" }
# arrow-array = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-array" }
# arrow-buffer = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-buffer" }
# arrow-cast = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-cast" }
# arrow-data = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-data" }
# arrow-ipc = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-ipc" }
# arrow-schema = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-schema" }
# arrow-select = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-select" }
# arrow-string = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-string" }
# arrow-ord = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-ord" }
# arrow-flight = { path = "/Users/andrewlamb/Software/arrow-rs/arrow-flight" }
# parquet = { path = "/Users/andrewlamb/Software/arrow-rs/parquet" }

# latest arrow main: https://github.com/apache/arrow-rs
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-cast = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-data = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-ipc = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-select = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-string = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-ord = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "b51a0003ad13bc91245e350a9e8d9beaa3c8b35f" }
12 changes: 6 additions & 6 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,9 +585,9 @@ mod tests {
+-----------------------------------+-----------------+---------------------+------+------------------+
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
+-----------------------------------+-----------------+---------------------+------+------------------+
| alltypes_plain.parquet | 1851 | 10181 | 2 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 2939 | 2 | page_index=false |
| alltypes_plain.parquet | 1851 | 7166 | 2 | page_index=false |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why the heap size of the metadata is reported to be so much smaller. I don't expect our thrift decoding work to have reduce the in-memory size of the parquet metadata 🤔

@etseidl any ideas? I can perhaps go audit the heap_size implementations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did box some of the encryption structures...but maybe the HeapSize impl for Box is still wrong?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and FileDecryptor in ParquetMetaData was boxed but still not included in memory_size.

| alltypes_tiny_pages.parquet | 454233 | 267261 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 1014 | 2 | page_index=false |
+-----------------------------------+-----------------+---------------------+------+------------------+
");

Expand Down Expand Up @@ -616,9 +616,9 @@ mod tests {
+-----------------------------------+-----------------+---------------------+------+------------------+
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
+-----------------------------------+-----------------+---------------------+------+------------------+
| alltypes_plain.parquet | 1851 | 10181 | 5 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 2939 | 3 | page_index=false |
| alltypes_plain.parquet | 1851 | 7166 | 5 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 267261 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 1014 | 3 | page_index=false |
+-----------------------------------+-----------------+---------------------+------+------------------+
");

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ serde_json = { workspace = true }
tempfile = { workspace = true }
test-utils = { path = "../test-utils" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.13.1"
tonic = "0.14"
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3" }
url = { workspace = true }
Expand Down
5 changes: 4 additions & 1 deletion datafusion-examples/examples/flight/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::HashMap;
use std::sync::Arc;
use tonic::transport::Endpoint;

use datafusion::arrow::datatypes::Schema;

Expand All @@ -34,7 +35,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let testdata = datafusion::test_util::parquet_test_data();

// Create Flight client
let mut client = FlightServiceClient::connect("http://localhost:50051").await?;
let endpoint = Endpoint::new("http://localhost:50051")?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to new version of tonic

let channel = endpoint.connect().await?;
let mut client = FlightServiceClient::new(channel);

// Call get_schema to get the schema of a Parquet file
let request = tonic::Request::new(FlightDescriptor {
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/flight/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
use arrow::ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator};
use std::sync::Arc;

use arrow_flight::{PollInfo, SchemaAsIpc};
Expand Down Expand Up @@ -106,6 +106,7 @@ impl FlightService for FlightServiceImpl {

// add an initial FlightData message that sends schema
let options = arrow::ipc::writer::IpcWriteOptions::default();
let mut compression_context = CompressionContext::default();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let schema_flight_data = SchemaAsIpc::new(&schema, &options);

let mut flights = vec![FlightData::from(schema_flight_data)];
Expand All @@ -115,7 +116,7 @@ impl FlightService for FlightServiceImpl {

for batch in &results {
let (flight_dictionaries, flight_batch) = encoder
.encoded_batch(batch, &mut tracker, &options)
.encode(batch, &mut tracker, &options, &mut compression_context)
.map_err(|e: ArrowError| Status::internal(e.to_string()))?;

flights.extend(flight_dictionaries.into_iter().map(Into::into));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.25", optional = true }
pyo3 = { version = "0.26", optional = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true, optional = true }
tokio = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ mod tests {
fn from_qualified_schema_into_arrow_schema() -> Result<()> {
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let arrow_schema = schema.as_arrow();
insta::assert_snapshot!(arrow_schema, @r#"Field { name: "c0", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#);
insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many many diffs are due to the changes in formatting of Fields and DataTypes (see below)

Ok(())
}

Expand Down
26 changes: 12 additions & 14 deletions datafusion/common/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use pyo3::exceptions::PyException;
use pyo3::prelude::PyErr;
use pyo3::types::{PyAnyMethods, PyList};
use pyo3::{Bound, FromPyObject, IntoPyObject, PyAny, PyObject, PyResult, Python};
use pyo3::{Bound, FromPyObject, IntoPyObject, PyAny, PyResult, Python};

use crate::{DataFusionError, ScalarValue};

Expand Down Expand Up @@ -52,11 +52,11 @@ impl FromPyArrow for ScalarValue {
}

impl ToPyArrow for ScalarValue {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let array = self.to_array()?;
// convert to pyarrow array using C data interface
let pyarray = array.to_data().to_pyarrow(py)?;
let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?;
let pyscalar = pyarray.call_method1("__getitem__", (0,))?;

Ok(pyscalar)
}
Expand All @@ -79,23 +79,22 @@ impl<'source> IntoPyObject<'source> for ScalarValue {
let array = self.to_array()?;
// convert to pyarrow array using C data interface
let pyarray = array.to_data().to_pyarrow(py)?;
let pyarray_bound = pyarray.bind(py);
pyarray_bound.call_method1("__getitem__", (0,))
pyarray.call_method1("__getitem__", (0,))
}
}

#[cfg(test)]
mod tests {
use pyo3::ffi::c_str;
use pyo3::prepare_freethreaded_python;
use pyo3::py_run;
use pyo3::types::PyDict;
use pyo3::Python;

use super::*;

fn init_python() {
prepare_freethreaded_python();
Python::with_gil(|py| {
Python::initialize();
Python::attach(|py| {
if py.run(c_str!("import pyarrow"), None, None).is_err() {
let locals = PyDict::new(py);
py.run(
Expand Down Expand Up @@ -135,12 +134,11 @@ mod tests {
ScalarValue::Date32(Some(1234)),
];

Python::with_gil(|py| {
Python::attach(|py| {
for scalar in example_scalars.iter() {
let result = ScalarValue::from_pyarrow_bound(
scalar.to_pyarrow(py).unwrap().bind(py),
)
.unwrap();
let result =
ScalarValue::from_pyarrow_bound(&scalar.to_pyarrow(py).unwrap())
.unwrap();
assert_eq!(scalar, &result);
}
});
Expand All @@ -150,7 +148,7 @@ mod tests {
fn test_py_scalar() -> PyResult<()> {
init_python();

Python::with_gil(|py| -> PyResult<()> {
Python::attach(|py| -> PyResult<()> {
let scalar_float = ScalarValue::Float64(Some(12.34));
let py_float = scalar_float
.into_pyobject(py)?
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,12 @@ fn generate_file() -> NamedTempFile {
}

let metadata = writer.close().unwrap();
let file_metadata = metadata.file_metadata();
assert_eq!(
metadata.num_rows as usize,
file_metadata.num_rows() as usize,
WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
);
assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);
assert_eq!(metadata.row_groups().len(), EXPECTED_ROW_GROUPS);

println!(
"Generated parquet file in {} seconds",
Expand Down
83 changes: 44 additions & 39 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ mod tests {
use futures::stream::BoxStream;
use futures::StreamExt;
use insta::assert_snapshot;
use log::error;
use object_store::local::LocalFileSystem;
use object_store::ObjectMeta;
use object_store::{
Expand All @@ -163,9 +162,10 @@ mod tests {
};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use parquet::format::FileMetaData;
use parquet::file::metadata::{
KeyValue, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex,
};
use parquet::file::page_index::column_index::ColumnIndexMetaData;
use tokio::fs::File;

enum ForceViews {
Expand Down Expand Up @@ -1144,18 +1144,14 @@ mod tests {

// 325 pages in int_col
assert_eq!(int_col_offset.len(), 325);
match int_col_index {
Index::INT32(index) => {
assert_eq!(index.indexes.len(), 325);
for min_max in index.clone().indexes {
assert!(min_max.min.is_some());
assert!(min_max.max.is_some());
assert!(min_max.null_count.is_some());
}
}
_ => {
error!("fail to read page index.")
}
let ColumnIndexMetaData::INT32(index) = int_col_index else {
panic!("fail to read page index.")
};
assert_eq!(index.min_values().len(), 325);
assert_eq!(index.max_values().len(), 325);
// all values are non null
for idx in 0..325 {
assert_eq!(index.null_count(idx), Some(0));
}
}

Expand Down Expand Up @@ -1556,7 +1552,7 @@ mod tests {
Ok(parquet_sink)
}

fn get_written(parquet_sink: Arc<ParquetSink>) -> Result<(Path, FileMetaData)> {
fn get_written(parquet_sink: Arc<ParquetSink>) -> Result<(Path, ParquetMetaData)> {
let mut written = parquet_sink.written();
let written = written.drain();
assert_eq!(
Expand All @@ -1566,28 +1562,33 @@ mod tests {
written.len()
);

let (path, file_metadata) = written.take(1).next().unwrap();
Ok((path, file_metadata))
let (path, parquet_meta_data) = written.take(1).next().unwrap();
Ok((path, parquet_meta_data))
}

fn assert_file_metadata(file_metadata: FileMetaData, expected_kv: &Vec<KeyValue>) {
let FileMetaData {
num_rows,
schema,
key_value_metadata,
..
} = file_metadata;
assert_eq!(num_rows, 2, "file metadata to have 2 rows");
fn assert_file_metadata(
parquet_meta_data: ParquetMetaData,
expected_kv: &Vec<KeyValue>,
) {
let file_metadata = parquet_meta_data.file_metadata();
let schema_descr = file_metadata.schema_descr();
assert_eq!(file_metadata.num_rows(), 2, "file metadata to have 2 rows");
assert!(
schema.iter().any(|col_schema| col_schema.name == "a"),
schema_descr
.columns()
.iter()
.any(|col_schema| col_schema.name() == "a"),
"output file metadata should contain col a"
);
assert!(
schema.iter().any(|col_schema| col_schema.name == "b"),
schema_descr
.columns()
.iter()
.any(|col_schema| col_schema.name() == "b"),
"output file metadata should contain col b"
);

let mut key_value_metadata = key_value_metadata.unwrap();
let mut key_value_metadata = file_metadata.key_value_metadata().unwrap().clone();
key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
assert_eq!(&key_value_metadata, expected_kv);
}
Expand Down Expand Up @@ -1644,13 +1645,11 @@ mod tests {

// check the file metadata includes partitions
let mut expected_partitions = std::collections::HashSet::from(["a=foo", "a=bar"]);
for (
path,
FileMetaData {
num_rows, schema, ..
},
) in written.take(2)
{
for (path, parquet_metadata) in written.take(2) {
let file_metadata = parquet_metadata.file_metadata();
let schema = file_metadata.schema_descr();
let num_rows = file_metadata.num_rows();

let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 2, "should have path prefix");

Expand All @@ -1663,11 +1662,17 @@ mod tests {

assert_eq!(num_rows, 1, "file metadata to have 1 row");
assert!(
!schema.iter().any(|col_schema| col_schema.name == "a"),
!schema
.columns()
.iter()
.any(|col_schema| col_schema.name() == "a"),
"output file metadata will not contain partitioned col a"
);
assert!(
schema.iter().any(|col_schema| col_schema.name == "b"),
schema
.columns()
.iter()
.any(|col_schema| col_schema.name() == "b"),
"output file metadata should contain col b"
);
}
Expand Down
Loading
Loading