-
Notifications
You must be signed in to change notification settings - Fork 1.7k
[WIP] Upgrade to arrow/parquet 57.0.0 #17888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
28452cf
50e8f25
369ec4f
bf48fde
ec8efea
7c58fa3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
use std::collections::HashMap; | ||
use std::sync::Arc; | ||
use tonic::transport::Endpoint; | ||
|
||
use datafusion::arrow::datatypes::Schema; | ||
|
||
|
@@ -34,7 +35,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let testdata = datafusion::test_util::parquet_test_data(); | ||
|
||
// Create Flight client | ||
let mut client = FlightServiceClient::connect("http://localhost:50051").await?; | ||
let endpoint = Endpoint::new("http://localhost:50051")?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is due to new version of tonic |
||
let channel = endpoint.connect().await?; | ||
let mut client = FlightServiceClient::new(channel); | ||
|
||
// Call get_schema to get the schema of a Parquet file | ||
let request = tonic::Request::new(FlightDescriptor { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator}; | ||
use arrow::ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator}; | ||
use std::sync::Arc; | ||
|
||
use arrow_flight::{PollInfo, SchemaAsIpc}; | ||
|
@@ -106,6 +106,7 @@ impl FlightService for FlightServiceImpl { | |
|
||
// add an initial FlightData message that sends schema | ||
let options = arrow::ipc::writer::IpcWriteOptions::default(); | ||
let mut compression_context = CompressionContext::default(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
let schema_flight_data = SchemaAsIpc::new(&schema, &options); | ||
|
||
let mut flights = vec![FlightData::from(schema_flight_data)]; | ||
|
@@ -115,7 +116,7 @@ impl FlightService for FlightServiceImpl { | |
|
||
for batch in &results { | ||
let (flight_dictionaries, flight_batch) = encoder | ||
.encoded_batch(batch, &mut tracker, &options) | ||
.encode(batch, &mut tracker, &options, &mut compression_context) | ||
.map_err(|e: ArrowError| Status::internal(e.to_string()))?; | ||
|
||
flights.extend(flight_dictionaries.into_iter().map(Into::into)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1417,7 +1417,7 @@ mod tests { | |
fn from_qualified_schema_into_arrow_schema() -> Result<()> { | ||
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; | ||
let arrow_schema = schema.as_arrow(); | ||
insta::assert_snapshot!(arrow_schema, @r#"Field { name: "c0", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); | ||
insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. many many diffs are due to the changes in formatting of Fields and DataTypes (see below) |
||
Ok(()) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why the heap size of the metadata is reported to be so much smaller. I don't expect our thrift decoding work to have reduce the in-memory size of the parquet metadata 🤔
@etseidl any ideas? I can perhaps go audit the
heap_size
implementationsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did box some of the encryption structures...but maybe the
HeapSize
impl forBox
is still wrong?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, and
FileDecryptor
inParquetMetaData
was boxed but still not included inmemory_size
.