diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 28d690d353b5..2f31a290e398 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -45,7 +45,7 @@ arrow-data = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } arrow-select = { workspace = true, optional = true } arrow-ipc = { workspace = true, optional = true } -object_store = { version = "0.11.0", default-features = false, optional = true } +object_store = { version = "0.12.0", default-features = false, optional = true } bytes = { version = "1.1", default-features = false, features = ["std"] } thrift = { version = "0.17", default-features = false } @@ -85,7 +85,7 @@ serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] } rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] } -object_store = { version = "0.11.0", default-features = false, features = ["azure"] } +object_store = { version = "0.12.0", default-features = false, features = ["azure", "fs"] } sysinfo = { version = "0.34.0", default-features = false, features = ["system"] } [package.metadata.docs.rs] diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index dff7d9362aec..d5595a83be6e 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -46,7 +46,7 @@ use tokio::runtime::Handle; /// println!("Found Blob with {}B at {}", meta.size, meta.location); /// /// // Show Parquet metadata -/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size); +/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size.try_into().unwrap()); /// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); /// print_parquet_metadata(&mut stdout(), builder.metadata()); /// # } @@ -163,7 +163,7 @@ impl ParquetObjectReader { impl MetadataSuffixFetch for &mut ParquetObjectReader { fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { let options = GetOptions { - range: Some(GetRange::Suffix(suffix)), + range: Some(GetRange::Suffix(suffix as u64)), ..Default::default() }; self.spawn(|store, path| { @@ -178,6 +178,7 @@ impl MetadataSuffixFetch for &mut ParquetObjectReader { impl AsyncFileReader for ParquetObjectReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + let range = range.start as u64..range.end as u64; self.spawn(|store, path| store.get_range(path, range)) } @@ -185,6 +186,10 @@ impl AsyncFileReader for ParquetObjectReader { where Self: Send, { + let ranges = ranges + .into_iter() + .map(|range| range.start as u64..range.end as u64) + .collect::>(); self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) } @@ -254,8 +259,8 @@ mod tests { #[tokio::test] async fn test_simple() { let (meta, store) = get_meta_store().await; - let object_reader = - ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); + let object_reader = ParquetObjectReader::new(store, meta.location) + .with_file_size(meta.size.try_into().unwrap()); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await @@ -285,8 +290,8 @@ mod tests { let (mut meta, store) = get_meta_store().await; meta.location = Path::from("I don't exist.parquet"); - let object_reader = - ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); + let object_reader = ParquetObjectReader::new(store, meta.location) + .with_file_size(meta.size.try_into().unwrap()); // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug match ParquetRecordBatchStreamBuilder::new(object_reader).await { Ok(_) => panic!("expected failure"), @@ -320,7 +325,7 @@ mod tests { let initial_actions = num_actions.load(Ordering::Relaxed); let reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size) + .with_file_size(meta.size.try_into().unwrap()) .with_runtime(rt.handle().clone()); let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); @@ -348,7 +353,7 @@ mod tests { let (meta, store) = get_meta_store().await; let reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size) + .with_file_size(meta.size.try_into().unwrap()) .with_runtime(rt.handle().clone()); let current_id = std::thread::current().id(); @@ -373,7 +378,7 @@ mod tests { let (meta, store) = get_meta_store().await; let mut reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size) + .with_file_size(meta.size.try_into().unwrap()) .with_runtime(rt.handle().clone()); rt.shutdown_background(); diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 11448207c6fc..9deadece9544 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -310,7 +310,8 @@ async fn test_read_encrypted_file_from_object_store() { .unwrap(); let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); - let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); + let mut reader = ParquetObjectReader::new(store, meta.location) + .with_file_size(meta.size.try_into().unwrap()); let metadata = reader.get_metadata(Some(&options)).await.unwrap(); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await