diff --git a/Cargo.lock b/Cargo.lock index 71b30af69280..753bb53f2c35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,9 +246,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5ec52ba94edeed950e4a41f75d35376df196e8cb04437f7280a5aa49f20f796" +checksum = "3095aaf545942ff5abd46654534f15b03a90fba78299d661e045e5d587222f0d" dependencies = [ "arrow-arith", "arrow-array", @@ -265,14 +265,14 @@ dependencies = [ "arrow-string", "half", "pyo3", - "rand 0.8.5", + "rand 0.9.0", ] [[package]] name = "arrow-arith" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc766fdacaf804cb10c7c70580254fcdb5d55cdfda2bc57b02baf5223a3af9e" +checksum = "00752064ff47cee746e816ddb8450520c3a52cbad1e256f6fa861a35f86c45e7" dependencies = [ "arrow-array", "arrow-buffer", @@ -284,9 +284,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12fcdb3f1d03f69d3ec26ac67645a8fe3f878d77b5ebb0b15d64a116c212985" +checksum = "cebfe926794fbc1f49ddd0cdaf898956ca9f6e79541efce62dabccfd81380472" dependencies = [ "ahash 0.8.11", "arrow-buffer", @@ -301,9 +301,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "263f4801ff1839ef53ebd06f99a56cecd1dbaf314ec893d93168e2e860e0291c" +checksum = "0303c7ec4cf1a2c60310fc4d6bbc3350cd051a17bf9e9c0a8e47b4db79277824" dependencies = [ "bytes", "half", @@ -312,9 +312,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ede6175fbc039dfc946a61c1b6d42fd682fcecf5ab5d148fbe7667705798cac9" +checksum = "335f769c5a218ea823d3760a743feba1ef7857cba114c01399a891c2fff34285" dependencies = [ "arrow-array", "arrow-buffer", @@ -333,9 +333,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1644877d8bc9a0ef022d9153dc29375c2bda244c39aec05a91d0e87ccf77995f" +checksum = "510db7dfbb4d5761826516cc611d97b3a68835d0ece95b034a052601109c0b1b" dependencies = [ "arrow-array", "arrow-cast", @@ -349,9 +349,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61cfdd7d99b4ff618f167e548b2411e5dd2c98c0ddebedd7df433d34c20a4429" +checksum = "e8affacf3351a24039ea24adab06f316ded523b6f8c3dbe28fbac5f18743451b" dependencies = [ "arrow-buffer", "arrow-schema", @@ -361,9 +361,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a194f47959a4e111463cb6d02c8576fe084b3d7a3c092314baf3b9629b62595b" +checksum = "e2e0fad280f41a918d53ba48288a246ff04202d463b3b380fbc0edecdcb52cfd" dependencies = [ "arrow-arith", "arrow-array", @@ -388,9 +388,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ff528658b521e33905334723b795ee56b393dbe9cf76c8b1f64b648c65a60c" +checksum = "69880a9e6934d9cba2b8630dd08a3463a91db8693b16b499d54026b6137af284" dependencies = [ "arrow-array", "arrow-buffer", @@ -402,9 +402,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ee5b4ca98a7fb2efb9ab3309a5d1c88b5116997ff93f3147efdc1062a6158e9" +checksum = "d8dafd17a05449e31e0114d740530e0ada7379d7cb9c338fd65b09a8130960b0" dependencies = [ "arrow-array", "arrow-buffer", @@ -424,9 +424,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0a3334a743bd2a1479dbc635540617a3923b4b2f6870f37357339e6b5363c21" +checksum = "895644523af4e17502d42c3cb6b27cb820f0cb77954c22d75c23a85247c849e1" dependencies = [ "arrow-array", "arrow-buffer", @@ -437,9 +437,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d1d7a7291d2c5107e92140f75257a99343956871f3d3ab33a7b41532f79cb68" +checksum = "9be8a2a4e5e7d9c822b2b8095ecd77010576d824f654d347817640acfc97d229" dependencies = [ "arrow-array", "arrow-buffer", @@ -450,9 +450,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39cfaf5e440be44db5413b75b72c2a87c1f8f0627117d110264048f2969b99e9" +checksum = "7450c76ab7c5a6805be3440dc2e2096010da58f7cab301fdc996a4ee3ee74e49" dependencies = [ "bitflags 2.8.0", "serde", @@ -460,9 +460,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69efcd706420e52cd44f5c4358d279801993846d1c2a8e52111853d61d55a619" +checksum = "aa5f5a93c75f46ef48e4001535e7b6c922eeb0aa20b73cf58d09e13d057490d8" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -474,9 +474,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21546b337ab304a32cfc0770f671db7411787586b45b78b4593ae78e64e2b03" +checksum = "6e7005d858d84b56428ba2a98a107fe88c0132c61793cf6b8232a1f9bfc0452b" dependencies = [ "arrow-array", "arrow-buffer", @@ -2925,11 +2925,11 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flatbuffers" -version = "24.12.23" +version = "25.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f1baf0dbf96932ec9a3038d57900329c015b0bfb7b63d904f3bc27e2b02a096" +checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.8.0", "rustc_version", ] @@ -2940,6 +2940,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc" dependencies = [ "crc32fast", + "libz-rs-sys", "miniz_oxide", ] @@ -3961,6 +3962,15 @@ dependencies = [ "escape8259", ] +[[package]] +name = "libz-rs-sys" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "902bc563b5d65ad9bba616b490842ef0651066a1a1dc3ce1087113ffcb873c8d" +dependencies = [ + "zlib-rs", +] + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -4007,7 +4017,7 @@ version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" dependencies = [ - "twox-hash", + "twox-hash 1.6.3", ] [[package]] @@ -4263,18 +4273,21 @@ dependencies = [ [[package]] name = "object_store" -version = "0.11.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf" +checksum = "e9ce831b09395f933addbc56d894d889e4b226eba304d4e7adbab591e26daf1e" dependencies = [ "async-trait", "base64 0.22.1", "bytes", "chrono", + "form_urlencoded", "futures", + "http 1.2.0", + "http-body-util", "humantime", "hyper", - "itertools 0.13.0", + "itertools 0.14.0", "md-5", "parking_lot", "percent-encoding", @@ -4285,7 +4298,8 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", - "snafu", + "serde_urlencoded", + "thiserror 2.0.12", "tokio", "tracing", "url", @@ -4368,9 +4382,9 @@ dependencies = [ [[package]] name = "parquet" -version = "54.3.1" +version = "55.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb15796ac6f56b429fd99e33ba133783ad75b27c36b4b5ce06f1f82cc97754e" +checksum = "cd31a8290ac5b19f09ad77ee7a1e6a541f1be7674ad410547d5f1eef6eef4a9c" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -4398,7 +4412,7 @@ dependencies = [ "snap", "thrift", "tokio", - "twox-hash", + "twox-hash 2.1.0", "zstd", ] @@ -4771,7 +4785,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -4791,7 +4805,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.100", @@ -4846,9 +4860,9 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.23.5" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7778bffd85cf38175ac1f545509665d0b9b92a198ca7941f131f85f7a4f9a872" +checksum = "17da310086b068fbdcefbba30aeb3721d5bb9af8db4987d6735b2183ca567229" dependencies = [ "cfg-if", "indoc", @@ -4864,9 +4878,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.23.5" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94f6cbe86ef3bf18998d9df6e0f3fc1050a8c5efa409bf712e661a4366e010fb" +checksum = "e27165889bd793000a098bb966adc4300c312497ea25cf7a690a9f0ac5aa5fc1" dependencies = [ "once_cell", "target-lexicon", @@ -4874,9 +4888,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.23.5" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9f1b4c431c0bb1c8fb0a338709859eed0d030ff6daa34368d3b152a63dfdd8d" +checksum = "05280526e1dbf6b420062f3ef228b78c0c54ba94e157f5cb724a609d0f2faabc" dependencies = [ "libc", "pyo3-build-config", @@ -4884,9 +4898,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.23.5" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbc2201328f63c4710f68abdf653c89d8dbc2858b88c5d88b0ff38a75288a9da" +checksum = "5c3ce5686aa4d3f63359a5100c62a127c9f15e8398e5fdeb5deef1fed5cd5f44" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -4896,9 +4910,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.23.5" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fca6726ad0f3da9c9de093d6f116a93c1a38e417ed73bf138472cf4064f72028" +checksum = "f4cf6faa0cbfb0ed08e89beb8103ae9724eb4750e3a78084ba4017cbe94f3855" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -5796,27 +5810,6 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" -[[package]] -name = "snafu" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019" -dependencies = [ - "snafu-derive", -] - -[[package]] -name = "snafu-derive" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" -dependencies = [ - "heck 0.5.0", - "proc-macro2", - "quote", - "syn 2.0.100", -] - [[package]] name = "snap" version = "1.1.1" @@ -6134,9 +6127,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "target-lexicon" -version = "0.12.16" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" +checksum = "e502f78cdbb8ba4718f566c418c52bc729126ffd16baee5baa718cf25dd5a69a" [[package]] name = "tempfile" @@ -6353,9 +6346,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.2" +version = "1.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" +checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" dependencies = [ "backtrace", "bytes", @@ -6637,6 +6630,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "twox-hash" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908" + [[package]] name = "typed-arena" version = "2.0.2" @@ -7501,6 +7500,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "zlib-rs" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b20717f0917c908dc63de2e44e97f1e6b126ca58d0e391cee86d504eb8fbd05" + [[package]] name = "zstd" version = "0.13.3" diff --git a/Cargo.toml b/Cargo.toml index 920629a23d1c..de53b7df50d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,19 +87,19 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.17", default-features = false } -arrow = { version = "54.3.1", features = [ +arrow = { version = "55.0.0", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "54.3.0", default-features = false } -arrow-flight = { version = "54.3.1", features = [ +arrow-buffer = { version = "55.0.0", default-features = false } +arrow-flight = { version = "55.0.0", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "54.3.0", default-features = false, features = [ +arrow-ipc = { version = "55.0.0", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "54.3.0", default-features = false } -arrow-schema = { version = "54.3.0", default-features = false } +arrow-ord = { version = "55.0.0", default-features = false } +arrow-schema = { version = "55.0.0", default-features = false } async-trait = "0.1.88" bigdecimal = "0.4.8" bytes = "1.10" @@ -147,9 +147,9 @@ hashbrown = { version = "0.14.5", features = ["raw"] } indexmap = "2.8.0" itertools = "0.14" log = "^0.4" -object_store = { version = "0.11.0", default-features = false } +object_store = { version = "0.12.0", default-features = false } parking_lot = "0.12" -parquet = { version = "54.3.1", default-features = false, features = [ +parquet = { version = "55.0.0", default-features = false, features = [ "arrow", "async", "object_store", diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index b8c303e22161..03ef3d66f9d7 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -571,7 +571,9 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { .to_string(); let object_store = Arc::clone(&self.object_store); - let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta); + let mut inner = + ParquetObjectReader::new(object_store, file_meta.object_meta.location) + .with_file_size(file_meta.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) @@ -599,7 +601,7 @@ struct ParquetReaderWithCache { impl AsyncFileReader for ParquetReaderWithCache { fn get_bytes( &mut self, - range: Range, + range: Range, ) -> BoxFuture<'_, datafusion::parquet::errors::Result> { println!("get_bytes: {} Reading range {:?}", self.filename, range); self.inner.get_bytes(range) @@ -607,7 +609,7 @@ impl AsyncFileReader for ParquetReaderWithCache { fn get_byte_ranges( &mut self, - ranges: Vec>, + ranges: Vec>, ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { println!( "get_byte_ranges: {} Reading ranges {:?}", @@ -618,6 +620,7 @@ impl AsyncFileReader for ParquetReaderWithCache { fn get_metadata( &mut self, + _options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { println!("get_metadata: {} returning cached metadata", self.filename); diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 39b47a96bccf..74e99163955e 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -63,7 +63,7 @@ log = { workspace = true } object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } paste = "1.0.15" -pyo3 = { version = "0.23.5", optional = true } +pyo3 = { version = "0.24.0", optional = true } recursive = { workspace = true, optional = true } sqlparser = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 6c7c9463cf3b..7fc27453d1ad 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -144,6 +144,7 @@ impl FileFormat for ArrowFormat { for object in objects { let r = store.as_ref().get(&object.location).await?; let schema = match r.payload { + #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { let reader = FileReader::try_new(&mut file, None)?; reader.schema() @@ -442,7 +443,7 @@ mod tests { let object_meta = ObjectMeta { location, last_modified: DateTime::default(), - size: usize::MAX, + size: u64::MAX, e_tag: None, version: None, }; @@ -485,7 +486,7 @@ mod tests { let object_meta = ObjectMeta { location, last_modified: DateTime::default(), - size: usize::MAX, + size: u64::MAX, e_tag: None, version: None, }; diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 309458975ab6..9fa4c00e6af2 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -72,7 +72,7 @@ mod tests { #[derive(Debug)] struct VariableStream { bytes_to_repeat: Bytes, - max_iterations: usize, + max_iterations: u64, iterations_detected: Arc>, } @@ -103,14 +103,15 @@ mod tests { async fn get(&self, location: &Path) -> object_store::Result { let bytes = self.bytes_to_repeat.clone(); - let range = 0..bytes.len() * self.max_iterations; + let len = bytes.len() as u64; + let range = 0..len * self.max_iterations; let arc = self.iterations_detected.clone(); let stream = futures::stream::repeat_with(move || { let arc_inner = arc.clone(); *arc_inner.lock().unwrap() += 1; Ok(bytes.clone()) }) - .take(self.max_iterations) + .take(self.max_iterations as usize) .boxed(); Ok(GetResult { @@ -138,7 +139,7 @@ mod tests { async fn get_ranges( &self, _location: &Path, - _ranges: &[Range], + _ranges: &[Range], ) -> object_store::Result> { unimplemented!() } @@ -154,7 +155,7 @@ mod tests { fn list( &self, _prefix: Option<&Path>, - ) -> BoxStream<'_, object_store::Result> { + ) -> BoxStream<'static, object_store::Result> { unimplemented!() } @@ -179,7 +180,7 @@ mod tests { } impl VariableStream { - pub fn new(bytes_to_repeat: Bytes, max_iterations: usize) -> Self { + pub fn new(bytes_to_repeat: Bytes, max_iterations: u64) -> Self { Self { bytes_to_repeat, max_iterations, @@ -371,7 +372,7 @@ mod tests { let object_meta = ObjectMeta { location: Path::parse("/")?, last_modified: DateTime::default(), - size: usize::MAX, + size: u64::MAX, e_tag: None, version: None, }; @@ -429,7 +430,7 @@ mod tests { let object_meta = ObjectMeta { location: Path::parse("/")?, last_modified: DateTime::default(), - size: usize::MAX, + size: u64::MAX, e_tag: None, version: None, }; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index e921f0158e54..ad8c0bdb5680 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -127,7 +127,7 @@ mod tests { .write_parquet(out_dir_url, DataFrameWriteOptions::new(), None) .await .expect_err("should fail because input file does not match inferred schema"); - assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4"); + assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'"); Ok(()) } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 67a7ba8dc776..76009ccd80b0 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -331,7 +331,7 @@ mod tests { fn list( &self, _prefix: Option<&Path>, - ) -> BoxStream<'_, object_store::Result> { + ) -> BoxStream<'static, object_store::Result> { Box::pin(futures::stream::once(async { Err(object_store::Error::NotImplemented) })) @@ -408,7 +408,7 @@ mod tests { ))); // Use the file size as the hint so we can get the full metadata from the first fetch - let size_hint = meta[0].size; + let size_hint = meta[0].size as usize; fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint)) .await @@ -443,7 +443,7 @@ mod tests { ))); // Use the a size hint larger than the file size to make sure we don't panic - let size_hint = meta[0].size + 100; + let size_hint = (meta[0].size + 100) as usize; fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint)) .await diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index a15b2b6ffe13..25a89644cd2a 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -106,7 +106,7 @@ mod tests { let meta = ObjectMeta { location, last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), - size: metadata.len() as usize, + size: metadata.len(), e_tag: None, version: None, }; diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 5dcf4df73f57..f0a1f94d87e1 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -273,6 +273,7 @@ impl FileOpener for ArrowOpener { None => { let r = object_store.get(file_meta.location()).await?; match r.payload { + #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { let arrow_reader = arrow::ipc::reader::FileReader::try_new( file, projection, @@ -305,7 +306,7 @@ impl FileOpener for ArrowOpener { )?; // read footer according to footer_len let get_option = GetOptions { - range: Some(GetRange::Suffix(10 + footer_len)), + range: Some(GetRange::Suffix(10 + (footer_len as u64))), ..Default::default() }; let get_result = object_store @@ -332,9 +333,9 @@ impl FileOpener for ArrowOpener { .iter() .flatten() .map(|block| { - let block_len = block.bodyLength() as usize - + block.metaDataLength() as usize; - let block_offset = block.offset() as usize; + let block_len = + block.bodyLength() as u64 + block.metaDataLength() as u64; + let block_offset = block.offset() as u64; block_offset..block_offset + block_len }) .collect_vec(); @@ -354,9 +355,9 @@ impl FileOpener for ArrowOpener { .iter() .flatten() .filter(|block| { - let block_offset = block.offset() as usize; - block_offset >= range.start as usize - && block_offset < range.end as usize + let block_offset = block.offset() as u64; + block_offset >= range.start as u64 + && block_offset < range.end as u64 }) .copied() .collect_vec(); @@ -364,9 +365,9 @@ impl FileOpener for ArrowOpener { let recordbatch_ranges = recordbatches .iter() .map(|block| { - let block_len = block.bodyLength() as usize - + block.metaDataLength() as usize; - let block_offset = block.offset() as usize; + let block_len = + block.bodyLength() as u64 + block.metaDataLength() as u64; + let block_offset = block.offset() as u64; block_offset..block_offset + block_len }) .collect_vec(); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 5914924797dc..3ef403013452 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -658,7 +658,7 @@ mod tests { ) .await .expect_err("should fail because input file does not match inferred schema"); - assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4"); + assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'"); Ok(()) } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 910c4316d973..736248fbd95d 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -495,7 +495,7 @@ mod tests { .write_json(out_dir_url, DataFrameWriteOptions::new(), None) .await .expect_err("should fail because input file does not match inferred schema"); - assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4"); + assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'"); Ok(()) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 5c06c3902c1c..5986460cb539 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1786,13 +1786,13 @@ mod tests { path: &str, store: Arc, batch: RecordBatch, - ) -> usize { + ) -> u64 { let mut writer = ArrowWriter::try_new(BytesMut::new().writer(), batch.schema(), None).unwrap(); writer.write(&batch).unwrap(); writer.flush().unwrap(); let bytes = writer.into_inner().unwrap().into_inner().freeze(); - let total_size = bytes.len(); + let total_size = bytes.len() as u64; let path = Path::from(path); let payload = object_store::PutPayload::from_bytes(bytes); store diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index e1328770cabd..8b19658bb147 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -66,7 +66,7 @@ pub fn local_unpartitioned_file(path: impl AsRef) -> ObjectMeta ObjectMeta { location, last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), - size: metadata.len() as usize, + size: metadata.len(), e_tag: None, version: None, } @@ -166,7 +166,7 @@ impl ObjectStore for BlockingObjectStore { fn list( &self, prefix: Option<&Path>, - ) -> BoxStream<'_, object_store::Result> { + ) -> BoxStream<'static, object_store::Result> { self.inner.list(prefix) } diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 084554eecbdb..f5753af64d93 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -102,7 +102,7 @@ impl TestParquetFile { println!("Generated test dataset with {num_rows} rows"); - let size = std::fs::metadata(&path)?.len() as usize; + let size = std::fs::metadata(&path)?.len(); let mut canonical_path = path.canonicalize()?; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index ce5c0d720174..761a78a29fd3 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -44,6 +44,7 @@ use insta::assert_snapshot; use object_store::memory::InMemory; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ArrowWriter; use parquet::errors::ParquetError; @@ -186,7 +187,7 @@ async fn store_parquet_in_memory( location: Path::parse(format!("file-{offset}.parquet")) .expect("creating path"), last_modified: chrono::DateTime::from(SystemTime::now()), - size: buf.len(), + size: buf.len() as u64, e_tag: None, version: None, }; @@ -218,9 +219,10 @@ struct ParquetFileReader { impl AsyncFileReader for ParquetFileReader { fn get_bytes( &mut self, - range: Range, + range: Range, ) -> BoxFuture<'_, parquet::errors::Result> { - self.metrics.bytes_scanned.add(range.end - range.start); + let bytes_scanned = range.end - range.start; + self.metrics.bytes_scanned.add(bytes_scanned as usize); self.store .get_range(&self.meta.location, range) @@ -232,6 +234,7 @@ impl AsyncFileReader for ParquetFileReader { fn get_metadata( &mut self, + _options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { Box::pin(async move { let metadata = fetch_parquet_metadata( diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 7006bf083eee..f693485cbe01 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -52,7 +52,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec let meta = ObjectMeta { location, last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), - size: metadata.len() as usize, + size: metadata.len(), e_tag: None, version: None, }; diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index bf8466d849f2..fa6c7432413f 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -712,7 +712,7 @@ impl ObjectStore for MirroringObjectStore { let meta = ObjectMeta { location: location.clone(), last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), - size: metadata.len() as usize, + size: metadata.len(), e_tag: None, version: None, }; @@ -728,14 +728,15 @@ impl ObjectStore for MirroringObjectStore { async fn get_range( &self, location: &Path, - range: Range, + range: Range, ) -> object_store::Result { self.files.iter().find(|x| *x == location).unwrap(); let path = std::path::PathBuf::from(&self.mirrored_file); let mut file = File::open(path).unwrap(); - file.seek(SeekFrom::Start(range.start as u64)).unwrap(); + file.seek(SeekFrom::Start(range.start)).unwrap(); let to_read = range.end - range.start; + let to_read: usize = to_read.try_into().unwrap(); let mut data = Vec::with_capacity(to_read); let read = file.take(to_read as u64).read_to_end(&mut data).unwrap(); assert_eq!(read, to_read); @@ -750,9 +751,10 @@ impl ObjectStore for MirroringObjectStore { fn list( &self, prefix: Option<&Path>, - ) -> BoxStream<'_, object_store::Result> { + ) -> BoxStream<'static, object_store::Result> { let prefix = prefix.cloned().unwrap_or_default(); - Box::pin(stream::iter(self.files.iter().filter_map( + let size = self.file_size; + Box::pin(stream::iter(self.files.clone().into_iter().filter_map( move |location| { // Don't return for exact prefix match let filter = location @@ -762,9 +764,9 @@ impl ObjectStore for MirroringObjectStore { filter.then(|| { Ok(ObjectMeta { - location: location.clone(), + location, last_modified: Utc.timestamp_nanos(0), - size: self.file_size as usize, + size, e_tag: None, version: None, }) @@ -802,7 +804,7 @@ impl ObjectStore for MirroringObjectStore { let object = ObjectMeta { location: k.clone(), last_modified: Utc.timestamp_nanos(0), - size: self.file_size as usize, + size: self.file_size, e_tag: None, version: None, }; diff --git a/datafusion/core/tests/tracing/traceable_object_store.rs b/datafusion/core/tests/tracing/traceable_object_store.rs index e979200c8d9b..dfcafc3a63da 100644 --- a/datafusion/core/tests/tracing/traceable_object_store.rs +++ b/datafusion/core/tests/tracing/traceable_object_store.rs @@ -96,7 +96,7 @@ impl ObjectStore for TraceableObjectStore { fn list( &self, prefix: Option<&Path>, - ) -> BoxStream<'_, object_store::Result> { + ) -> BoxStream<'static, object_store::Result> { futures::executor::block_on(assert_traceability()); self.inner.list(prefix) } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 6db4d1870320..f5d45cd3fc88 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -704,6 +704,7 @@ impl FileOpener for CsvOpener { let result = store.get_opts(file_meta.location(), options).await?; match result.payload { + #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { let is_whole_file_scanned = file_meta.range.is_none(); let decoder = if is_whole_file_scanned { diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index a6c52312e412..8d0515804fc7 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -209,6 +209,7 @@ impl FileFormat for JsonFormat { let r = store.as_ref().get(&object.location).await?; let schema = match r.payload { + #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; let mut reader = BufReader::new(decoder); diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index f1adccf9ded7..ee96d050966d 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -355,6 +355,7 @@ impl FileOpener for JsonOpener { let result = store.get_opts(file_meta.location(), options).await?; match result.payload { + #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { let bytes = match file_meta.range { None => file_compression_type.convert_read(file)?, diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 1d9a67fd2eb6..7617d4d70cee 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -735,10 +735,7 @@ impl<'a> ObjectStoreFetch<'a> { } impl MetadataFetch for ObjectStoreFetch<'_> { - fn fetch( - &mut self, - range: Range, - ) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { async { self.store .get_range(&self.meta.location, range) diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 5924a5b5038f..27ec843c1991 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -18,19 +18,19 @@ //! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for //! low level control of parquet file readers +use crate::ParquetFileMetrics; use bytes::Bytes; use datafusion_datasource::file_meta::FileMeta; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::future::BoxFuture; use object_store::ObjectStore; +use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::file::metadata::ParquetMetaData; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; -use crate::ParquetFileMetrics; - /// Interface for reading parquet files. /// /// The combined implementations of [`ParquetFileReaderFactory`] and @@ -96,28 +96,30 @@ pub(crate) struct ParquetFileReader { impl AsyncFileReader for ParquetFileReader { fn get_bytes( &mut self, - range: Range, + range: Range, ) -> BoxFuture<'_, parquet::errors::Result> { - self.file_metrics.bytes_scanned.add(range.end - range.start); + let bytes_scanned = range.end - range.start; + self.file_metrics.bytes_scanned.add(bytes_scanned as usize); self.inner.get_bytes(range) } fn get_byte_ranges( &mut self, - ranges: Vec>, + ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> where Self: Send, { - let total = ranges.iter().map(|r| r.end - r.start).sum(); - self.file_metrics.bytes_scanned.add(total); + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total as usize); self.inner.get_byte_ranges(ranges) } - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { - self.inner.get_metadata() + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + self.inner.get_metadata(options) } } @@ -135,7 +137,8 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { metrics, ); let store = Arc::clone(&self.store); - let mut inner = ParquetObjectReader::new(store, file_meta.object_meta); + let mut inner = ParquetObjectReader::new(store, file_meta.object_meta.location) + .with_file_size(file_meta.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 9d5f9fa16b6e..13418cdeee22 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -1513,7 +1513,7 @@ mod tests { let object_meta = ObjectMeta { location: object_store::path::Path::parse(file_name).expect("creating path"), last_modified: chrono::DateTime::from(std::time::SystemTime::now()), - size: data.len(), + size: data.len() as u64, e_tag: None, version: None, }; @@ -1526,8 +1526,11 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); let file_metrics = ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics); + let inner = ParquetObjectReader::new(Arc::new(in_memory), object_meta.location) + .with_file_size(object_meta.size); + let reader = ParquetFileReader { - inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta), + inner, file_metrics: file_metrics.clone(), }; let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index a1f966c22f35..15c86427ed00 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -224,10 +224,11 @@ impl FileGroupPartitioner { return None; } - let target_partition_size = (total_size as usize).div_ceil(target_partitions); + let target_partition_size = + (total_size as u64).div_ceil(target_partitions as u64); let current_partition_index: usize = 0; - let current_partition_size: usize = 0; + let current_partition_size: u64 = 0; // Partition byte range evenly for all `PartitionedFile`s let repartitioned_files = flattened_files @@ -497,15 +498,15 @@ struct ToRepartition { /// the index from which the original file will be taken source_index: usize, /// the size of the original file - file_size: usize, + file_size: u64, /// indexes of which group(s) will this be distributed to (including `source_index`) new_groups: Vec, } impl ToRepartition { - // how big will each file range be when this file is read in its new groups? - fn range_size(&self) -> usize { - self.file_size / self.new_groups.len() + /// How big will each file range be when this file is read in its new groups? + fn range_size(&self) -> u64 { + self.file_size / (self.new_groups.len() as u64) } } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index b93e917c94a5..3e44851d145b 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -52,7 +52,7 @@ pub use self::url::ListingTableUrl; use crate::file_groups::FileGroup; use chrono::TimeZone; use datafusion_common::stats::Precision; -use datafusion_common::{ColumnStatistics, Result}; +use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result}; use datafusion_common::{ScalarValue, Statistics}; use file_meta::FileMeta; use futures::{Stream, StreamExt}; @@ -125,7 +125,7 @@ impl PartitionedFile { object_meta: ObjectMeta { location: Path::from(path.into()), last_modified: chrono::Utc.timestamp_nanos(0), - size: size as usize, + size, e_tag: None, version: None, }, @@ -143,7 +143,7 @@ impl PartitionedFile { object_meta: ObjectMeta { location: Path::from(path), last_modified: chrono::Utc.timestamp_nanos(0), - size: size as usize, + size, e_tag: None, version: None, }, @@ -226,7 +226,7 @@ impl From for PartitionedFile { /// Indicates that the range calculation determined no further action is /// necessary, possibly because the calculated range is empty or invalid. pub enum RangeCalculation { - Range(Option>), + Range(Option>), TerminateEarly, } @@ -252,7 +252,12 @@ pub async fn calculate_range( match file_meta.range { None => Ok(RangeCalculation::Range(None)), Some(FileRange { start, end }) => { - let (start, end) = (start as usize, end as usize); + let start: u64 = start.try_into().map_err(|_| { + exec_datafusion_err!("Expect start range to fit in u64, got {start}") + })?; + let end: u64 = end.try_into().map_err(|_| { + exec_datafusion_err!("Expect end range to fit in u64, got {end}") + })?; let start_delta = if start != 0 { find_first_newline(store, location, start - 1, file_size, newline).await? @@ -291,10 +296,10 @@ pub async fn calculate_range( async fn find_first_newline( object_store: &Arc, location: &Path, - start: usize, - end: usize, + start: u64, + end: u64, newline: u8, -) -> Result { +) -> Result { let options = GetOptions { range: Some(GetRange::Bounded(start..end)), ..Default::default() @@ -307,10 +312,11 @@ async fn find_first_newline( while let Some(chunk) = result_stream.next().await.transpose()? { if let Some(position) = chunk.iter().position(|&byte| byte == newline) { + let position = position as u64; return Ok(index + position); } - index += chunk.len(); + index += chunk.len() as u64; } Ok(index) diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 8f642f3384d2..20e507e98b68 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -44,7 +44,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } futures = { workspace = true } log = { workspace = true } -object_store = { workspace = true } +object_store = { workspace = true, features = ["fs"] } parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } diff --git a/datafusion/functions-aggregate/benches/array_agg.rs b/datafusion/functions-aggregate/benches/array_agg.rs index fb605e87ed0c..e22be611d8d7 100644 --- a/datafusion/functions-aggregate/benches/array_agg.rs +++ b/datafusion/functions-aggregate/benches/array_agg.rs @@ -19,17 +19,23 @@ use std::sync::Arc; use arrow::array::{ Array, ArrayRef, ArrowPrimitiveType, AsArray, ListArray, NullBufferBuilder, + PrimitiveArray, }; use arrow::datatypes::{Field, Int64Type}; -use arrow::util::bench_util::create_primitive_array; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_expr::Accumulator; use datafusion_functions_aggregate::array_agg::ArrayAggAccumulator; use arrow::buffer::OffsetBuffer; -use arrow::util::test_util::seedable_rng; use rand::distributions::{Distribution, Standard}; +use rand::prelude::StdRng; use rand::Rng; +use rand::SeedableRng; + +/// Returns fixed seedable RNG +pub fn seedable_rng() -> StdRng { + StdRng::seed_from_u64(42) +} fn merge_batch_bench(c: &mut Criterion, name: &str, values: ArrayRef) { let list_item_data_type = values.as_list::().values().data_type().clone(); @@ -46,6 +52,24 @@ fn merge_batch_bench(c: &mut Criterion, name: &str, values: ArrayRef) { }); } +pub fn create_primitive_array(size: usize, null_density: f32) -> PrimitiveArray +where + T: ArrowPrimitiveType, + Standard: Distribution, +{ + let mut rng = seedable_rng(); + + (0..size) + .map(|_| { + if rng.gen::() < null_density { + None + } else { + Some(rng.gen()) + } + }) + .collect() +} + /// Create List array with the given item data type, null density, null locations and zero length lists density /// Creates an random (but fixed-seeded) array of a given size and null density pub fn create_list_array( diff --git a/datafusion/functions/benches/chr.rs b/datafusion/functions/benches/chr.rs index 4750fb466653..8575809c21c8 100644 --- a/datafusion/functions/benches/chr.rs +++ b/datafusion/functions/benches/chr.rs @@ -17,15 +17,21 @@ extern crate criterion; -use arrow::{array::PrimitiveArray, datatypes::Int64Type, util::test_util::seedable_rng}; +use arrow::{array::PrimitiveArray, datatypes::Int64Type}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; use datafusion_functions::string::chr; -use rand::Rng; +use rand::{Rng, SeedableRng}; use arrow::datatypes::DataType; +use rand::rngs::StdRng; use std::sync::Arc; +/// Returns fixed seedable RNG +pub fn seedable_rng() -> StdRng { + StdRng::seed_from_u64(42) +} + fn criterion_benchmark(c: &mut Criterion) { let cot_fn = chr(); let size = 1024; diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index cb4017afaeac..a886fc242545 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -555,7 +555,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { object_meta: ObjectMeta { location: Path::from(val.path.as_str()), last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64), - size: val.size as usize, + size: val.size, e_tag: None, version: None, }, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index af7800d6febe..d1b1f51ae107 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -445,7 +445,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { })? as u64; Ok(protobuf::PartitionedFile { path: pf.object_meta.location.as_ref().to_owned(), - size: pf.object_meta.size as u64, + size: pf.object_meta.size, last_modified_ns, partition_values: pf .partition_values diff --git a/datafusion/sqllogictest/test_files/expr/date_part.slt b/datafusion/sqllogictest/test_files/expr/date_part.slt index dec796aa59cb..39c42cbe1e97 100644 --- a/datafusion/sqllogictest/test_files/expr/date_part.slt +++ b/datafusion/sqllogictest/test_files/expr/date_part.slt @@ -884,7 +884,7 @@ SELECT extract(day from arrow_cast('14400 minutes', 'Interval(DayTime)')) query I SELECT extract(minute from arrow_cast('14400 minutes', 'Interval(DayTime)')) ---- -14400 +0 query I SELECT extract(second from arrow_cast('5.1 seconds', 'Interval(DayTime)')) @@ -894,7 +894,7 @@ SELECT extract(second from arrow_cast('5.1 seconds', 'Interval(DayTime)')) query I SELECT extract(second from arrow_cast('14400 minutes', 'Interval(DayTime)')) ---- -864000 +0 query I SELECT extract(second from arrow_cast('2 months', 'Interval(MonthDayNano)')) @@ -954,7 +954,7 @@ from t order by id; ---- 0 0 5 -1 0 15 +1 0 3 2 0 0 3 2 0 4 0 8 diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs index 9ba0e0c964e9..cb725a7277fd 100644 --- a/datafusion/substrait/src/physical_plan/producer.rs +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -61,7 +61,7 @@ pub fn to_substrait_rel( substrait_files.push(FileOrFiles { partition_index: partition_index.try_into().unwrap(), start: 0, - length: file.object_meta.size as u64, + length: file.object_meta.size, path_type: Some(PathType::UriPath( file.object_meta.location.as_ref().to_string(), )),