From 3e0b209d2df81d8b6d27bb5b12ff2fc275f50742 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 03:05:40 +0200 Subject: [PATCH 01/27] implement globbing on ObjectStore --- data-access/Cargo.toml | 1 + data-access/src/object_store/local.rs | 27 +++++++++++++++ data-access/src/object_store/mod.rs | 48 +++++++++++++++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/data-access/Cargo.toml b/data-access/Cargo.toml index aaa869f0ab86..951d74167d6a 100644 --- a/data-access/Cargo.toml +++ b/data-access/Cargo.toml @@ -36,6 +36,7 @@ path = "src/lib.rs" async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } futures = "0.3" +glob = "0.3.0" parking_lot = "0.12" tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index f4872ae17420..b175310d188b 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -230,4 +230,31 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_globbing() -> Result<()> { + let tmp = tempdir()?; + let a1_path = tmp.path().join("a1.txt"); + let a2_path = tmp.path().join("a2.txt"); + let b1_path = tmp.path().join("b1.txt"); + File::create(&a1_path)?; + File::create(&a2_path)?; + File::create(&b1_path)?; + + let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap()); + + let mut all_files = HashSet::new(); + let mut files = LocalFileSystem.glob_file(&glob).await?; + while let Some(file) = files.next().await { + let file = file?; + assert_eq!(file.size(), 0); + all_files.insert(file.path().to_owned()); + } + + assert_eq!(all_files.len(), 2); + assert!(all_files.contains(a1_path.to_str().unwrap())); + assert!(all_files.contains(a2_path.to_str().unwrap())); + + Ok(()) + } } diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 5d2f76e27931..0b8fb25aaf2b 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::{AsyncRead, Stream, StreamExt}; +use glob::Pattern; use crate::{FileMeta, ListEntry, Result, SizedFile}; @@ -80,6 +81,19 @@ pub trait ObjectStore: Sync + Send + Debug { prefix: &str, suffix: &str, ) -> Result { + /* could use this, but it's slower, so lets' stick with the original + if prefix.ends_with(suffix) { + self.list_file(prefix).await + } else { + if(prefix.ends_with("/")) { + let path = format!("{}*{}", prefix, suffix); + self.glob_file(&path).await + } else { + let path = format!("{}/**/ +*{}", prefix, suffix); + self.glob_file(&path).await + } + }*/ let file_stream = self.list_file(prefix).await?; let suffix = suffix.to_owned(); Ok(Box::pin(file_stream.filter(move |fr| { @@ -91,6 +105,40 @@ pub trait ObjectStore: Sync + Send + Debug { }))) } + /// Calls `list_file` with a glob_pattern + async fn glob_file(&self, path: &str) -> Result { + const GLOB_CHARS: [char; 7] = ['{', '}', '[', ']', '*', '?', '\\']; + + /// Determine whether the path contains a globbing character + fn is_glob_path(path: &str) -> bool { + path.chars().any(|c| GLOB_CHARS.contains(&c)) + } + + if !is_glob_path(path) { + let result = self.list_file(path).await?; + Ok(result) + } else { + // take path up to first occurence of a glob char + let path_to_first_glob_character: Vec<&str> = + path.splitn(2, |c| GLOB_CHARS.contains(&c)).collect(); + // find last occurrence of folder / + let path_parts: Vec<&str> = path_to_first_glob_character[0] + .rsplitn(2, |c| c == '/') + .collect(); + let start_path = path_parts[1]; + + let file_stream = self.list_file(start_path).await?; + let pattern = Pattern::new(path).unwrap(); + Ok(Box::pin(file_stream.filter(move |fr| { + let matches_pattern = match fr { + Ok(f) => pattern.matches(f.path()), + Err(_) => true, + }; + async move { matches_pattern } + }))) + } + } + /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. async fn list_dir( From c9c9a1962b8031199ef190ca7747287c0a2286ad Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 03:10:26 +0200 Subject: [PATCH 02/27] remove unused code --- data-access/src/object_store/mod.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 0b8fb25aaf2b..422212baf09a 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -81,19 +81,6 @@ pub trait ObjectStore: Sync + Send + Debug { prefix: &str, suffix: &str, ) -> Result { - /* could use this, but it's slower, so lets' stick with the original - if prefix.ends_with(suffix) { - self.list_file(prefix).await - } else { - if(prefix.ends_with("/")) { - let path = format!("{}*{}", prefix, suffix); - self.glob_file(&path).await - } else { - let path = format!("{}/**/ -*{}", prefix, suffix); - self.glob_file(&path).await - } - }*/ let file_stream = self.list_file(prefix).await?; let suffix = suffix.to_owned(); Ok(Box::pin(file_stream.filter(move |fr| { From 7b39083b5856d8383032337075bf68a7a7f8d155 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 03:24:30 +0200 Subject: [PATCH 03/27] update list_file_with_suffix to use glob_file --- data-access/src/object_store/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 422212baf09a..478797c63d37 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -81,7 +81,7 @@ pub trait ObjectStore: Sync + Send + Debug { prefix: &str, suffix: &str, ) -> Result { - let file_stream = self.list_file(prefix).await?; + let file_stream = self.glob_file(prefix).await?; let suffix = suffix.to_owned(); Ok(Box::pin(file_stream.filter(move |fr| { let has_suffix = match fr { @@ -102,8 +102,7 @@ pub trait ObjectStore: Sync + Send + Debug { } if !is_glob_path(path) { - let result = self.list_file(path).await?; - Ok(result) + self.list_file(path).await } else { // take path up to first occurence of a glob char let path_to_first_glob_character: Vec<&str> = From 5750e4337b0ca2881bf8bcb93e6d63059c73ec64 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 11:43:25 +0200 Subject: [PATCH 04/27] reworked code such that glob_file matches list_file and glob_file_with_suffix list_file_with_suffix --- data-access/src/object_store/mod.rs | 70 +++++++++++++------ .../core/src/datasource/listing/helpers.rs | 6 +- .../core/src/datasource/listing/table.rs | 2 +- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 478797c63d37..9c6150762841 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -82,31 +82,18 @@ pub trait ObjectStore: Sync + Send + Debug { suffix: &str, ) -> Result { let file_stream = self.glob_file(prefix).await?; - let suffix = suffix.to_owned(); - Ok(Box::pin(file_stream.filter(move |fr| { - let has_suffix = match fr { - Ok(f) => f.path().ends_with(&suffix), - Err(_) => true, - }; - async move { has_suffix } - }))) + filter_suffix(file_stream, suffix).await } - /// Calls `list_file` with a glob_pattern - async fn glob_file(&self, path: &str) -> Result { - const GLOB_CHARS: [char; 7] = ['{', '}', '[', ']', '*', '?', '\\']; - - /// Determine whether the path contains a globbing character - fn is_glob_path(path: &str) -> bool { - path.chars().any(|c| GLOB_CHARS.contains(&c)) - } - - if !is_glob_path(path) { - self.list_file(path).await + /// Returns all the files matching `glob_pattern` + async fn glob_file(&self, glob_pattern: &str) -> Result { + if !is_glob_path(glob_pattern) { + self.list_file(glob_pattern).await } else { // take path up to first occurence of a glob char - let path_to_first_glob_character: Vec<&str> = - path.splitn(2, |c| GLOB_CHARS.contains(&c)).collect(); + let path_to_first_glob_character: Vec<&str> = glob_pattern + .splitn(2, |c| GLOB_CHARS.contains(&c)) + .collect(); // find last occurrence of folder / let path_parts: Vec<&str> = path_to_first_glob_character[0] .rsplitn(2, |c| c == '/') @@ -114,7 +101,7 @@ pub trait ObjectStore: Sync + Send + Debug { let start_path = path_parts[1]; let file_stream = self.list_file(start_path).await?; - let pattern = Pattern::new(path).unwrap(); + let pattern = Pattern::new(glob_pattern).unwrap(); Ok(Box::pin(file_stream.filter(move |fr| { let matches_pattern = match fr { Ok(f) => pattern.matches(f.path()), @@ -125,6 +112,23 @@ pub trait ObjectStore: Sync + Send + Debug { } } + /// Calls `glob_file` with a suffix filter + async fn glob_file_with_suffix( + &self, + prefix: &str, + suffix: &str, + ) -> Result { + let files_to_consider = match is_glob_path(prefix) { + true => self.glob_file(prefix).await, + false => self.list_file(prefix).await, + }?; + + match suffix.is_empty() { + true => Ok(files_to_consider), + false => filter_suffix(files_to_consider, suffix).await, + } + } + /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. async fn list_dir( @@ -136,3 +140,25 @@ pub trait ObjectStore: Sync + Send + Debug { /// Get object reader for one file fn file_reader(&self, file: SizedFile) -> Result>; } + +const GLOB_CHARS: [char; 7] = ['{', '}', '[', ']', '*', '?', '\\']; + +/// Determine whether the path contains a globbing character +fn is_glob_path(path: &str) -> bool { + path.chars().any(|c| GLOB_CHARS.contains(&c)) +} + +/// Filters the file_stream to only contain files that end with suffix +async fn filter_suffix( + file_stream: FileMetaStream, + suffix: &str, +) -> Result { + let suffix = suffix.to_owned(); + Ok(Box::pin(file_stream.filter(move |fr| { + let has_suffix = match fr { + Ok(f) => f.path().ends_with(&suffix), + Err(_) => true, + }; + async move { has_suffix } + }))) +} diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index d066d8d9dd2c..9518986a14da 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -171,7 +171,7 @@ pub async fn pruned_partition_list( if table_partition_cols.is_empty() { return Ok(Box::pin( store - .list_file_with_suffix(table_path, file_extension) + .glob_file_with_suffix(table_path, file_extension) .await? .map(|f| { Ok(PartitionedFile { @@ -196,7 +196,7 @@ pub async fn pruned_partition_list( let table_partition_cols_stream = table_partition_cols.to_vec(); Ok(Box::pin( store - .list_file_with_suffix(table_path, file_extension) + .glob_file_with_suffix(table_path, file_extension) .await? .filter_map(move |f| { let stream_path = stream_path.clone(); @@ -231,7 +231,7 @@ pub async fn pruned_partition_list( // parse the partition values and serde them as a RecordBatch to filter them // TODO avoid collecting but have a streaming memory table instead let batches: Vec = store - .list_file_with_suffix(table_path, file_extension) + .glob_file_with_suffix(table_path, file_extension) .await? // TODO we set an arbitrary high batch size here, it does not matter as we list // all the files anyway. This number will need to be adjusted according to the object diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9e554c13d32e..6881f674b9e4 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -216,7 +216,7 @@ impl ListingOptions { path: &'a str, ) -> Result { let file_stream = object_store - .list_file_with_suffix(path, &self.file_extension) + .glob_file_with_suffix(path, &self.file_extension) .await? .map(move |file_meta| object_store.file_reader(file_meta?.sized_file)); let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?; From 7930206f769fd157a135e931c08815b9151ca079 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 14:18:31 +0200 Subject: [PATCH 05/27] rework the way we figure out what the greatest common base path is --- data-access/src/object_store/mod.rs | 134 ++++++++++++++++++++++++---- 1 file changed, 118 insertions(+), 16 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 9c6150762841..41110d77a44a 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -21,6 +21,7 @@ pub mod local; use std::fmt::Debug; use std::io::Read; +use std::path::Path; use std::pin::Pin; use std::sync::Arc; @@ -81,8 +82,7 @@ pub trait ObjectStore: Sync + Send + Debug { prefix: &str, suffix: &str, ) -> Result { - let file_stream = self.glob_file(prefix).await?; - filter_suffix(file_stream, suffix).await + self.glob_file_with_suffix(prefix, suffix).await } /// Returns all the files matching `glob_pattern` @@ -90,16 +90,7 @@ pub trait ObjectStore: Sync + Send + Debug { if !is_glob_path(glob_pattern) { self.list_file(glob_pattern).await } else { - // take path up to first occurence of a glob char - let path_to_first_glob_character: Vec<&str> = glob_pattern - .splitn(2, |c| GLOB_CHARS.contains(&c)) - .collect(); - // find last occurrence of folder / - let path_parts: Vec<&str> = path_to_first_glob_character[0] - .rsplitn(2, |c| c == '/') - .collect(); - let start_path = path_parts[1]; - + let start_path = find_longest_base_path(glob_pattern); let file_stream = self.list_file(start_path).await?; let pattern = Pattern::new(glob_pattern).unwrap(); Ok(Box::pin(file_stream.filter(move |fr| { @@ -115,12 +106,12 @@ pub trait ObjectStore: Sync + Send + Debug { /// Calls `glob_file` with a suffix filter async fn glob_file_with_suffix( &self, - prefix: &str, + glob_pattern: &str, suffix: &str, ) -> Result { - let files_to_consider = match is_glob_path(prefix) { - true => self.glob_file(prefix).await, - false => self.list_file(prefix).await, + let files_to_consider = match is_glob_path(glob_pattern) { + true => self.glob_file(glob_pattern).await, + false => self.list_file(glob_pattern).await, }?; match suffix.is_empty() { @@ -162,3 +153,114 @@ async fn filter_suffix( async move { has_suffix } }))) } + +fn find_longest_base_path(glob_pattern: &str) -> &str { + // in case the glob_pattern is not actually a glob pattern, take the entire thing + if !is_glob_path(&glob_pattern) { + glob_pattern //.to_string() + } else { + // take path up to first occurence of a glob char + let path_to_first_glob_character = glob_pattern + .splitn(2, |c| GLOB_CHARS.contains(&c)) + .collect::>()[0]; // always find one, because otherwise is_glob_pattern would not be true + let path = Path::new(path_to_first_glob_character); + + let dir_path = if path.is_file() { + path.parent().unwrap_or(Path::new("/")) + } else { + path + }; + dir_path.to_str().unwrap() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_is_glob_path() -> Result<()> { + assert!(!is_glob_path("/")); + assert!(!is_glob_path("/test")); + assert!(!is_glob_path("/test/")); + assert!(is_glob_path("/test*")); + Ok(()) + } + + #[tokio::test] + async fn test_find_longest_base_path() -> Result<()> { + assert_eq!( + find_longest_base_path("/"), + "/", + "testing longest_path with {}", + "/" + ); + assert_eq!( + find_longest_base_path("/a.txt"), + "/a.txt", + "testing longest_path with {}", + "/a.txt" + ); + assert_eq!( + find_longest_base_path("/a"), + "/a", + "testing longest_path with {}", + "/a" + ); + assert_eq!( + find_longest_base_path("/a/"), + "/a/", + "testing longest_path with {}", + "/a/" + ); + assert_eq!( + find_longest_base_path("/a/b"), + "/a/b", + "testing longest_path with {}", + "/a/b" + ); + assert_eq!( + find_longest_base_path("/a/b/"), + "/a/b/", + "testing longest_path with {}", + "/a/b/" + ); + assert_eq!( + find_longest_base_path("/a/b.txt"), + "/a/b.txt", + "testing longest_path with {}", + "/a/bt.xt" + ); + assert_eq!( + find_longest_base_path("/a/b/c.txt"), + "/a/b/c.txt", + "testing longest_path with {}", + "/a/b/c.txt" + ); + assert_eq!( + find_longest_base_path("/*.txt"), + "/", + "testing longest_path with {}", + "/*.txt" + ); + assert_eq!( + find_longest_base_path("/a/*b.txt"), + "/a/", + "testing longest_path with {}", + "/a/*b.txt" + ); + assert_eq!( + find_longest_base_path("/a/*/b.txt"), + "/a/", + "testing longest_path with {}", + "/a/*/b.txt" + ); + assert_eq!( + find_longest_base_path("/a/b/[123]/file*.txt"), + "/a/b/", + "testing longest_path with {}", + "/a/b/[123]/file*.txt" + ); + Ok(()) + } +} From eb1e32e2456901410758e5f71fe29a307636bc69 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 14:37:22 +0200 Subject: [PATCH 06/27] refactor tests on longested_search_path_without_glob_pattern --- data-access/src/object_store/mod.rs | 97 +++++++---------------------- 1 file changed, 23 insertions(+), 74 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 41110d77a44a..cd0a71ad5dc1 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -90,7 +90,7 @@ pub trait ObjectStore: Sync + Send + Debug { if !is_glob_path(glob_pattern) { self.list_file(glob_pattern).await } else { - let start_path = find_longest_base_path(glob_pattern); + let start_path = find_longest_search_path_without_glob_pattern(glob_pattern); let file_stream = self.list_file(start_path).await?; let pattern = Pattern::new(glob_pattern).unwrap(); Ok(Box::pin(file_stream.filter(move |fr| { @@ -154,7 +154,7 @@ async fn filter_suffix( }))) } -fn find_longest_base_path(glob_pattern: &str) -> &str { +fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> &str { // in case the glob_pattern is not actually a glob pattern, take the entire thing if !is_glob_path(&glob_pattern) { glob_pattern //.to_string() @@ -187,80 +187,29 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_find_longest_base_path() -> Result<()> { - assert_eq!( - find_longest_base_path("/"), - "/", - "testing longest_path with {}", - "/" - ); - assert_eq!( - find_longest_base_path("/a.txt"), - "/a.txt", - "testing longest_path with {}", - "/a.txt" - ); - assert_eq!( - find_longest_base_path("/a"), - "/a", - "testing longest_path with {}", - "/a" - ); - assert_eq!( - find_longest_base_path("/a/"), - "/a/", - "testing longest_path with {}", - "/a/" - ); - assert_eq!( - find_longest_base_path("/a/b"), - "/a/b", - "testing longest_path with {}", - "/a/b" - ); - assert_eq!( - find_longest_base_path("/a/b/"), - "/a/b/", - "testing longest_path with {}", - "/a/b/" - ); + fn test_longest_base_path(input: &str, expected: &str) { assert_eq!( - find_longest_base_path("/a/b.txt"), - "/a/b.txt", - "testing longest_path with {}", - "/a/bt.xt" - ); - assert_eq!( - find_longest_base_path("/a/b/c.txt"), - "/a/b/c.txt", - "testing longest_path with {}", - "/a/b/c.txt" - ); - assert_eq!( - find_longest_base_path("/*.txt"), - "/", - "testing longest_path with {}", - "/*.txt" - ); - assert_eq!( - find_longest_base_path("/a/*b.txt"), - "/a/", - "testing longest_path with {}", - "/a/*b.txt" - ); - assert_eq!( - find_longest_base_path("/a/*/b.txt"), - "/a/", - "testing longest_path with {}", - "/a/*/b.txt" - ); - assert_eq!( - find_longest_base_path("/a/b/[123]/file*.txt"), - "/a/b/", - "testing longest_path with {}", - "/a/b/[123]/file*.txt" + find_longest_search_path_without_glob_pattern(input), + expected, + "testing find_longest_search_path_without_glob_pattern with {}", + input ); + } + + #[tokio::test] + async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> { + test_longest_base_path("/", "/"); + test_longest_base_path("/a.txt", "/a.txt"); + test_longest_base_path("/a", "/a"); + test_longest_base_path("/a/", "/a/"); + test_longest_base_path("/a/b", "/a/b"); + test_longest_base_path("/a/b/", "/a/b/"); + test_longest_base_path("/a/b.txt", "/a/b.txt"); + test_longest_base_path("/a/b/c.txt", "/a/b/c.txt"); + test_longest_base_path("/*.txt", "/"); + test_longest_base_path("/a/*b.txt", "/a/"); + test_longest_base_path("/a/*/b.txt", "/a/"); + test_longest_base_path("/a/b/[123]/file*.txt", "/a/b/"); Ok(()) } } From 79e45a90201d66dd79f0f2d9f6708b63ac5a99c4 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 14:38:29 +0200 Subject: [PATCH 07/27] added comment on / value --- data-access/src/object_store/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index cd0a71ad5dc1..94cc868f0cd1 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -21,6 +21,7 @@ pub mod local; use std::fmt::Debug; use std::io::Read; +use std::path; use std::path::Path; use std::pin::Pin; use std::sync::Arc; @@ -166,6 +167,7 @@ fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> &str { let path = Path::new(path_to_first_glob_character); let dir_path = if path.is_file() { + // &path::MAIN_SEPARATOR_STR is unstabled but / is the vale for unix and windows path.parent().unwrap_or(Path::new("/")) } else { path From bbc4bbf9b37f65abd274c1d660d64267f6c1a189 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 14:39:26 +0200 Subject: [PATCH 08/27] remove unused use stmt --- data-access/src/object_store/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 94cc868f0cd1..3db98e0a9025 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -21,7 +21,6 @@ pub mod local; use std::fmt::Debug; use std::io::Read; -use std::path; use std::path::Path; use std::pin::Pin; use std::sync::Arc; From 5244250f5a4814fa6b314e4b319b4f300deb4b28 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 16:00:18 +0200 Subject: [PATCH 09/27] rework implementation to find largest common path --- data-access/src/object_store/local.rs | 5 +-- data-access/src/object_store/mod.rs | 56 ++++++++++++++++++--------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index b175310d188b..3717578c4561 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -110,7 +110,7 @@ async fn list_all(prefix: String) -> Result { path, size: metadata.len(), }, - last_modified: metadata.modified().map(chrono::DateTime::from).ok(), + last_modified: None, /*metadata.modified().map(chrono::DateTime::from).ok()*/ } } @@ -185,7 +185,7 @@ pub fn local_unpartitioned_file(file: String) -> FileMeta { size: metadata.len(), path: file, }, - last_modified: metadata.modified().map(chrono::DateTime::from).ok(), + last_modified: None, //metadata.modified().map(chrono::DateTime::from).ok(), } } @@ -242,7 +242,6 @@ mod tests { File::create(&b1_path)?; let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap()); - let mut all_files = HashSet::new(); let mut files = LocalFileSystem.glob_file(&glob).await?; while let Some(file) = files.next().await { diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 3db98e0a9025..24cff014a51c 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -21,7 +21,8 @@ pub mod local; use std::fmt::Debug; use std::io::Read; -use std::path::Path; +use std::path; +use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; @@ -91,7 +92,7 @@ pub trait ObjectStore: Sync + Send + Debug { self.list_file(glob_pattern).await } else { let start_path = find_longest_search_path_without_glob_pattern(glob_pattern); - let file_stream = self.list_file(start_path).await?; + let file_stream = self.list_file(&start_path).await?; let pattern = Pattern::new(glob_pattern).unwrap(); Ok(Box::pin(file_stream.filter(move |fr| { let matches_pattern = match fr { @@ -154,24 +155,42 @@ async fn filter_suffix( }))) } -fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> &str { +fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { // in case the glob_pattern is not actually a glob pattern, take the entire thing - if !is_glob_path(&glob_pattern) { - glob_pattern //.to_string() + if !is_glob_path(glob_pattern) { + glob_pattern.to_string() } else { - // take path up to first occurence of a glob char - let path_to_first_glob_character = glob_pattern - .splitn(2, |c| GLOB_CHARS.contains(&c)) - .collect::>()[0]; // always find one, because otherwise is_glob_pattern would not be true - let path = Path::new(path_to_first_glob_character); - - let dir_path = if path.is_file() { - // &path::MAIN_SEPARATOR_STR is unstabled but / is the vale for unix and windows - path.parent().unwrap_or(Path::new("/")) - } else { - path - }; - dir_path.to_str().unwrap() + // take all the components of the path (left-to-right) which do not contain a glob pattern + let components_in_glob_pattern = + Path::new(glob_pattern).components().collect::>(); + let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new(); + let mut encountered_glob = false; + for component_in_glob_pattern in components_in_glob_pattern { + let component_as_str = + component_in_glob_pattern.as_os_str().to_str().unwrap(); + if !encountered_glob { + let component_str_is_glob = is_glob_path(component_as_str); + encountered_glob = component_str_is_glob; + if !encountered_glob { + path_buf_for_longest_search_path_without_glob_pattern + .push(component_in_glob_pattern); + } + } + } + + let mut result = path_buf_for_longest_search_path_without_glob_pattern + .to_str() + .unwrap() + .to_string(); + // when we're not at the root, append a separator + if path_buf_for_longest_search_path_without_glob_pattern + .components() + .count() + > 1 + { + result.push(path::MAIN_SEPARATOR); + } + result } } @@ -211,6 +230,7 @@ mod tests { test_longest_base_path("/a/*b.txt", "/a/"); test_longest_base_path("/a/*/b.txt", "/a/"); test_longest_base_path("/a/b/[123]/file*.txt", "/a/b/"); + test_longest_base_path("/a/b*.txt", "/a/"); Ok(()) } } From 77f07f9b6267e0c42f27e1f1f341146b7f9fd285 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 16:32:55 +0200 Subject: [PATCH 10/27] revert accidental/temp changes --- data-access/src/object_store/local.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index 3717578c4561..118f20564d91 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -110,7 +110,7 @@ async fn list_all(prefix: String) -> Result { path, size: metadata.len(), }, - last_modified: None, /*metadata.modified().map(chrono::DateTime::from).ok()*/ + last_modified: metadata.modified().map(chrono::DateTime::from).ok(), } } @@ -185,7 +185,7 @@ pub fn local_unpartitioned_file(file: String) -> FileMeta { size: metadata.len(), path: file, }, - last_modified: None, //metadata.modified().map(chrono::DateTime::from).ok(), + last_modified: metadata.modified().map(chrono::DateTime::from).ok(), } } From 445c95b3383b99355c3e5383adc6c2985616c8ab Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Tue, 3 May 2022 18:02:44 +0200 Subject: [PATCH 11/27] added tests to verify globbing --- datafusion/core/src/execution/context.rs | 36 ++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 0f544ca89e0c..891011261715 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1600,6 +1600,7 @@ mod tests { use crate::logical_plan::{binary_expr, lit, Operator}; use crate::physical_plan::functions::make_scalar_function; use crate::test; + use crate::test_util::parquet_test_data; use crate::variable::VarType; use crate::{ assert_batches_eq, assert_batches_sorted_eq, @@ -2127,6 +2128,41 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn read_with_glob_path() -> Result<()> { + let ctx = SessionContext::new(); + + let df = ctx + .read_parquet( + format!("{}/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let results = df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows + assert_eq!(total_rows, 10); + Ok(()) + } + + #[tokio::test] + async fn read_from_registered_table_with_glob_path() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_parquet( + "test", + &format!("{}/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let df = ctx.sql("SELECT * FROM test").await?; + let results = df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows + assert_eq!(total_rows, 10); + Ok(()) + } struct MyPhysicalPlanner {} From 485f59d729c9ece1964d1bbdbfaf9b935290e7d6 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 15:38:14 +0200 Subject: [PATCH 12/27] find inspiration in glob crate to better deal with windows --- data-access/src/object_store/mod.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 24cff014a51c..e7303c849405 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -88,7 +88,7 @@ pub trait ObjectStore: Sync + Send + Debug { /// Returns all the files matching `glob_pattern` async fn glob_file(&self, glob_pattern: &str) -> Result { - if !is_glob_path(glob_pattern) { + if !contains_glob_start_char(glob_pattern) { self.list_file(glob_pattern).await } else { let start_path = find_longest_search_path_without_glob_pattern(glob_pattern); @@ -110,7 +110,7 @@ pub trait ObjectStore: Sync + Send + Debug { glob_pattern: &str, suffix: &str, ) -> Result { - let files_to_consider = match is_glob_path(glob_pattern) { + let files_to_consider = match contains_glob_start_char(glob_pattern) { true => self.glob_file(glob_pattern).await, false => self.list_file(glob_pattern).await, }?; @@ -133,11 +133,11 @@ pub trait ObjectStore: Sync + Send + Debug { fn file_reader(&self, file: SizedFile) -> Result>; } -const GLOB_CHARS: [char; 7] = ['{', '}', '[', ']', '*', '?', '\\']; +const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; /// Determine whether the path contains a globbing character -fn is_glob_path(path: &str) -> bool { - path.chars().any(|c| GLOB_CHARS.contains(&c)) +fn contains_glob_start_char(path: &str) -> bool { + path.chars().any(|c| GLOB_START_CHARS.contains(&c)) } /// Filters the file_stream to only contain files that end with suffix @@ -157,7 +157,7 @@ async fn filter_suffix( fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { // in case the glob_pattern is not actually a glob pattern, take the entire thing - if !is_glob_path(glob_pattern) { + if !contains_glob_start_char(glob_pattern) { glob_pattern.to_string() } else { // take all the components of the path (left-to-right) which do not contain a glob pattern @@ -169,7 +169,7 @@ fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { let component_as_str = component_in_glob_pattern.as_os_str().to_str().unwrap(); if !encountered_glob { - let component_str_is_glob = is_glob_path(component_as_str); + let component_str_is_glob = contains_glob_start_char(component_as_str); encountered_glob = component_str_is_glob; if !encountered_glob { path_buf_for_longest_search_path_without_glob_pattern @@ -200,10 +200,10 @@ mod tests { #[tokio::test] async fn test_is_glob_path() -> Result<()> { - assert!(!is_glob_path("/")); - assert!(!is_glob_path("/test")); - assert!(!is_glob_path("/test/")); - assert!(is_glob_path("/test*")); + assert!(!contains_glob_start_char("/")); + assert!(!contains_glob_start_char("/test")); + assert!(!contains_glob_start_char("/test/")); + assert!(contains_glob_start_char("/test*")); Ok(()) } From 8589ef69ef2f4e2b4b03f80430054316c8b010f5 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 16:34:02 +0200 Subject: [PATCH 13/27] when running on windows, the expected path is slightly different (\ instead of /). --- data-access/src/object_store/mod.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index e7303c849405..f3631bd9f06a 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -210,12 +210,20 @@ mod tests { fn test_longest_base_path(input: &str, expected: &str) { assert_eq!( find_longest_search_path_without_glob_pattern(input), - expected, + make_expected(input, expected), "testing find_longest_search_path_without_glob_pattern with {}", input ); } + fn make_expected(input: &str, expected: &str) -> String { + if contains_glob_start_char(input) { + expected.replace("/", &String::from(path::MAIN_SEPARATOR)) + } else { + expected.to_string() + } + } + #[tokio::test] async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> { test_longest_base_path("/", "/"); From 1cfe2d69fbaaf401019a91af0f7e6241b6756b68 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 19:44:35 +0200 Subject: [PATCH 14/27] fixed clippy issue --- data-access/src/object_store/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index f3631bd9f06a..6cf9fb874ab0 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -218,7 +218,7 @@ mod tests { fn make_expected(input: &str, expected: &str) -> String { if contains_glob_start_char(input) { - expected.replace("/", &String::from(path::MAIN_SEPARATOR)) + expected.replace('/', &String::from(path::MAIN_SEPARATOR)) } else { expected.to_string() } From c8cbc733974b202c3d2bf61041eeb4627bce0c94 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 19:51:41 +0200 Subject: [PATCH 15/27] added section on checks that are executed during a PR build --- CONTRIBUTING.md | 10 ++++++++++ dev/pr-checks.sh | 22 ++++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100755 dev/pr-checks.sh diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4f0fe7163654..7ee6512dddd0 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,6 +53,16 @@ Testing setup: - `git submodule init` - `git submodule update` +Each PR also needs to pass the following checks: + +- `cargo fmt --all -- --check` +- `cargo clippy --all-targets --workspace -- -D warnings` +- `find . -mindepth 2 -name 'Cargo.toml' -exec cargo tomlfmt -p {} \; ; git diff --exit-code` + +or simply run: + +- `./dev/pr-checks.sh` + ## Test Organization DataFusion has several levels of tests in its [Test diff --git a/dev/pr-checks.sh b/dev/pr-checks.sh new file mode 100755 index 000000000000..d512b9b0115b --- /dev/null +++ b/dev/pr-checks.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +set -e +cargo fmt --all -- --check +#cargo clippy --all-targets --workspace -- -D warnings +find . -mindepth 2 -name 'Cargo.toml' -exec cargo tomlfmt -p {} \; ; git diff --exit-code \ No newline at end of file From b00c430b045b65616168b597fd8ce82bb166734e Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 20:05:24 +0200 Subject: [PATCH 16/27] updated section (and script) to make explicit this is about formatting --- CONTRIBUTING.md | 4 ++-- dev/{pr-checks.sh => format-code.sh} | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename dev/{pr-checks.sh => format-code.sh} (94%) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7ee6512dddd0..a5f41ca24cd4 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,7 +53,7 @@ Testing setup: - `git submodule init` - `git submodule update` -Each PR also needs to pass the following checks: +Formatting instructions: - `cargo fmt --all -- --check` - `cargo clippy --all-targets --workspace -- -D warnings` @@ -61,7 +61,7 @@ Each PR also needs to pass the following checks: or simply run: -- `./dev/pr-checks.sh` +- `./dev/format-code.sh` ## Test Organization diff --git a/dev/pr-checks.sh b/dev/format-code.sh similarity index 94% rename from dev/pr-checks.sh rename to dev/format-code.sh index d512b9b0115b..038e4bf3caa3 100755 --- a/dev/pr-checks.sh +++ b/dev/format-code.sh @@ -18,5 +18,5 @@ # under the License. set -e cargo fmt --all -- --check -#cargo clippy --all-targets --workspace -- -D warnings +cargo clippy --all-targets --workspace -- -D warnings find . -mindepth 2 -name 'Cargo.toml' -exec cargo tomlfmt -p {} \; ; git diff --exit-code \ No newline at end of file From b4edeb9bd5a6176dfcb8225d8d310e26e49908d0 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 20:14:34 +0200 Subject: [PATCH 17/27] replace with simple break --- data-access/src/object_store/mod.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 6cf9fb874ab0..e2e8db2e3378 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -164,24 +164,21 @@ fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { let components_in_glob_pattern = Path::new(glob_pattern).components().collect::>(); let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new(); - let mut encountered_glob = false; for component_in_glob_pattern in components_in_glob_pattern { let component_as_str = component_in_glob_pattern.as_os_str().to_str().unwrap(); - if !encountered_glob { - let component_str_is_glob = contains_glob_start_char(component_as_str); - encountered_glob = component_str_is_glob; - if !encountered_glob { - path_buf_for_longest_search_path_without_glob_pattern - .push(component_in_glob_pattern); - } + let component_str_is_glob = contains_glob_start_char(component_as_str); + if component_str_is_glob { + break; } + path_buf_for_longest_search_path_without_glob_pattern.push(component_in_glob_pattern); } let mut result = path_buf_for_longest_search_path_without_glob_pattern .to_str() .unwrap() .to_string(); + // when we're not at the root, append a separator if path_buf_for_longest_search_path_without_glob_pattern .components() From a49a63a22cb682b821b323886fafa8487736b80c Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 20:18:47 +0200 Subject: [PATCH 18/27] make filter_suffix not-async as it does not need to be async --- data-access/src/object_store/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index e2e8db2e3378..5e361b01a6d3 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -117,7 +117,7 @@ pub trait ObjectStore: Sync + Send + Debug { match suffix.is_empty() { true => Ok(files_to_consider), - false => filter_suffix(files_to_consider, suffix).await, + false => filter_suffix(files_to_consider, suffix), } } @@ -141,7 +141,7 @@ fn contains_glob_start_char(path: &str) -> bool { } /// Filters the file_stream to only contain files that end with suffix -async fn filter_suffix( +fn filter_suffix( file_stream: FileMetaStream, suffix: &str, ) -> Result { From e3ffac38258050baf5db20728ad70f651e41b5c4 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 20:21:50 +0200 Subject: [PATCH 19/27] no need to collect --- data-access/src/object_store/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 5e361b01a6d3..0a9b674fd270 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -162,7 +162,7 @@ fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { } else { // take all the components of the path (left-to-right) which do not contain a glob pattern let components_in_glob_pattern = - Path::new(glob_pattern).components().collect::>(); + Path::new(glob_pattern).components(); let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new(); for component_in_glob_pattern in components_in_glob_pattern { let component_as_str = From b0da3a7350109bad074b23ab8f2ffa6d38de657a Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 20:43:13 +0200 Subject: [PATCH 20/27] attempt to make tests more understandable --- data-access/src/object_store/mod.rs | 46 ++++++++++++++++------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 0a9b674fd270..5e1f638ddf5f 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -141,10 +141,7 @@ fn contains_glob_start_char(path: &str) -> bool { } /// Filters the file_stream to only contain files that end with suffix -fn filter_suffix( - file_stream: FileMetaStream, - suffix: &str, -) -> Result { +fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> Result { let suffix = suffix.to_owned(); Ok(Box::pin(file_stream.filter(move |fr| { let has_suffix = match fr { @@ -161,8 +158,7 @@ fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { glob_pattern.to_string() } else { // take all the components of the path (left-to-right) which do not contain a glob pattern - let components_in_glob_pattern = - Path::new(glob_pattern).components(); + let components_in_glob_pattern = Path::new(glob_pattern).components(); let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new(); for component_in_glob_pattern in components_in_glob_pattern { let component_as_str = @@ -171,7 +167,8 @@ fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { if component_str_is_glob { break; } - path_buf_for_longest_search_path_without_glob_pattern.push(component_in_glob_pattern); + path_buf_for_longest_search_path_without_glob_pattern + .push(component_in_glob_pattern); } let mut result = path_buf_for_longest_search_path_without_glob_pattern @@ -207,22 +204,15 @@ mod tests { fn test_longest_base_path(input: &str, expected: &str) { assert_eq!( find_longest_search_path_without_glob_pattern(input), - make_expected(input, expected), + expected, "testing find_longest_search_path_without_glob_pattern with {}", input ); } - fn make_expected(input: &str, expected: &str) -> String { - if contains_glob_start_char(input) { - expected.replace('/', &String::from(path::MAIN_SEPARATOR)) - } else { - expected.to_string() - } - } - #[tokio::test] async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> { + // no glob patterns, thus we get the full path (as-is) test_longest_base_path("/", "/"); test_longest_base_path("/a.txt", "/a.txt"); test_longest_base_path("/a", "/a"); @@ -231,11 +221,25 @@ mod tests { test_longest_base_path("/a/b/", "/a/b/"); test_longest_base_path("/a/b.txt", "/a/b.txt"); test_longest_base_path("/a/b/c.txt", "/a/b/c.txt"); - test_longest_base_path("/*.txt", "/"); - test_longest_base_path("/a/*b.txt", "/a/"); - test_longest_base_path("/a/*/b.txt", "/a/"); - test_longest_base_path("/a/b/[123]/file*.txt", "/a/b/"); - test_longest_base_path("/a/b*.txt", "/a/"); + // glob patterns, thus we build the longest path (os-specific) + use path::MAIN_SEPARATOR; + test_longest_base_path("/*.txt", &format!("{MAIN_SEPARATOR}")); + test_longest_base_path( + "/a/*b.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), + ); + test_longest_base_path( + "/a/*/b.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), + ); + test_longest_base_path( + "/a/b/[123]/file*.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"), + ); + test_longest_base_path( + "/a/b*.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), + ); Ok(()) } } From 1f8f5025abceb5b98aae3faaa8a94d70eb395168 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 20:43:59 +0200 Subject: [PATCH 21/27] actually format the code instead of only verifying --- dev/format-code.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/format-code.sh b/dev/format-code.sh index 038e4bf3caa3..3a09f6475b79 100755 --- a/dev/format-code.sh +++ b/dev/format-code.sh @@ -17,6 +17,6 @@ # specific language governing permissions and limitations # under the License. set -e -cargo fmt --all -- --check +cargo fmt --all cargo clippy --all-targets --workspace -- -D warnings find . -mindepth 2 -name 'Cargo.toml' -exec cargo tomlfmt -p {} \; ; git diff --exit-code \ No newline at end of file From 2a26116d069de0c5d694456b44db744aca6e5d54 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 20:52:01 +0200 Subject: [PATCH 22/27] added test with ** as glob pattern as well --- data-access/src/object_store/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 5e1f638ddf5f..243acccbd71d 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -240,6 +240,10 @@ mod tests { "/a/b*.txt", &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), ); + test_longest_base_path( + "/a/b/**/c*.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"), + ); Ok(()) } } From 4035050cc3c6cbeafe59d1739f72b6aacc82dd2c Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 20:58:15 +0200 Subject: [PATCH 23/27] remove changes related to code formatting --- CONTRIBUTING.md | 10 ---------- dev/format-code.sh | 22 ---------------------- 2 files changed, 32 deletions(-) delete mode 100755 dev/format-code.sh diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a5f41ca24cd4..4f0fe7163654 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,16 +53,6 @@ Testing setup: - `git submodule init` - `git submodule update` -Formatting instructions: - -- `cargo fmt --all -- --check` -- `cargo clippy --all-targets --workspace -- -D warnings` -- `find . -mindepth 2 -name 'Cargo.toml' -exec cargo tomlfmt -p {} \; ; git diff --exit-code` - -or simply run: - -- `./dev/format-code.sh` - ## Test Organization DataFusion has several levels of tests in its [Test diff --git a/dev/format-code.sh b/dev/format-code.sh deleted file mode 100755 index 3a09f6475b79..000000000000 --- a/dev/format-code.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -set -e -cargo fmt --all -cargo clippy --all-targets --workspace -- -D warnings -find . -mindepth 2 -name 'Cargo.toml' -exec cargo tomlfmt -p {} \; ; git diff --exit-code \ No newline at end of file From ca1ef1ef331d981180264bb9c9512648b5d97932 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 21:38:35 +0200 Subject: [PATCH 24/27] remove unneeded empty line --- datafusion/core/src/execution/context.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 891011261715..3a6b3a798107 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -2125,7 +2125,6 @@ mod tests { "+----------+", ]; assert_batches_eq!(expected, &result); - Ok(()) } From 130af0f7c73f23d92738c2f53b9c2585517286cc Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 21:39:22 +0200 Subject: [PATCH 25/27] run cargo fmt --- datafusion/core/src/execution/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 3a6b3a798107..0b8189b56515 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -2127,7 +2127,7 @@ mod tests { assert_batches_eq!(expected, &result); Ok(()) } - + #[tokio::test] async fn read_with_glob_path() -> Result<()> { let ctx = SessionContext::new(); From 8cedd50abd9013170b3c5117703922246dbfb46d Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 21:46:10 +0200 Subject: [PATCH 26/27] Update data-access/src/object_store/mod.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- data-access/src/object_store/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 243acccbd71d..c7f5cd441f9d 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -163,8 +163,7 @@ fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { for component_in_glob_pattern in components_in_glob_pattern { let component_as_str = component_in_glob_pattern.as_os_str().to_str().unwrap(); - let component_str_is_glob = contains_glob_start_char(component_as_str); - if component_str_is_glob { + if contains_glob_start_char(component_as_str) { break; } path_buf_for_longest_search_path_without_glob_pattern From 223afed7a9441d9b73c702c56785a5d56acfc518 Mon Sep 17 00:00:00 2001 From: Tim Van Wassenhove Date: Wed, 4 May 2022 23:06:30 +0200 Subject: [PATCH 27/27] use try_filter as suggested in pr review --- data-access/src/object_store/mod.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index c7f5cd441f9d..39d1bf04d147 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -27,7 +27,8 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; -use futures::{AsyncRead, Stream, StreamExt}; +use futures::future::ready; +use futures::{AsyncRead, Stream, StreamExt, TryStreamExt}; use glob::Pattern; use crate::{FileMeta, ListEntry, Result, SizedFile}; @@ -143,13 +144,9 @@ fn contains_glob_start_char(path: &str) -> bool { /// Filters the file_stream to only contain files that end with suffix fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> Result { let suffix = suffix.to_owned(); - Ok(Box::pin(file_stream.filter(move |fr| { - let has_suffix = match fr { - Ok(f) => f.path().ends_with(&suffix), - Err(_) => true, - }; - async move { has_suffix } - }))) + Ok(Box::pin( + file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))), + )) } fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String {