-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source #30411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…hen filtering from a batch-based file data source
Kubernetes integration test starting |
cc. @maropu @gengliangwang @cloud-fan @bart-samwel @zsxwing @dongjoon-hyun @HyukjinKwon I've cc-ed all reviewers who left at least one review comment in original PR (#28841) Just FYI to @cchighman as he's an author of original PR. |
First commit squashed the commits in origin PR. Remaining commits are my own to address my own review comments. So if you've done with the origin PR and it looked good to you, you may probably want to only look at remaining commits. |
Test build #131284 has finished for PR 30411 at commit
|
Kubernetes integration test status success |
…tion message on schema inference
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #131281 has finished for PR 30411 at commit
|
Test build #131285 has finished for PR 30411 at commit
|
Test build #131287 has finished for PR 30411 at commit
|
} | ||
} | ||
|
||
test("Option pathGlobFilter: filter files correctly") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: two tests were moved from FileBasedDataSourceSuite as this PR adds the specific suite for filtering option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left minor comments and the other parts look fine. cc: @HyukjinKwon @dongjoon-hyun @viirya
// $example on:load_with_modified_time_filter$ | ||
val beforeFilterDF = spark.read.format("parquet") | ||
// Files modified before 07/01/2020 at 05:30 are allowed | ||
.option("modifiedBefore", "2020-07-01T05:30:00") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: two indents to follow the other examples.
// +-------------+ | ||
val afterFilterDF = spark.read.format("parquet") | ||
// Files modified after 06/01/2020 at 05:30 are allowed | ||
.option("modifiedAfter", "2020-06-01T05:30:00") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
`modifiedBefore` and `modifiedAfter` are options that can be | ||
applied together or separately in order to achieve greater | ||
granularity over which files may load during a Spark batch query. | ||
(Structured Streaming file source doesn't support these options.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (Structured Streaming file source doesn't support these options.)
-> Note that Structured Streaming file sources don't support these options.
?
python/pyspark/sql/readwriter.py
Outdated
properties = dict() | ||
jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)() | ||
jprop = JavaClass("java.util.Properties", | ||
self._spark._sc._gateway._gateway_client)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a unnecessary change?
python/pyspark/sql/readwriter.py
Outdated
gateway = self._spark._sc._gateway | ||
jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates) | ||
jpredicates = utils.toJArray( | ||
gateway, gateway.jvm.java.lang.String, predicates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto (I have the same comments on the changes below, too).
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] | ||
|
||
private val caseInsensitiveMap = CaseInsensitiveMap(parameters) | ||
protected val pathFilters = PathFilterFactory.create(caseInsensitiveMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected
-> private
?
private def checkDisallowedOptions(options: Map[String, String]): Unit = { | ||
Seq(ModifiedBeforeFilter.PARAM_NAME, ModifiedAfterFilter.PARAM_NAME).foreach { param => | ||
if (parameters.contains(param)) { | ||
throw new IllegalArgumentException(s"option '$param' is not allowed in file stream source") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: file stream source
-> file stream sources
?
Kubernetes integration test starting |
Kubernetes integration test status failure |
Thanks for the review, @maropu ! The origin PR has been open for months, and I only refactored a bit & fixed the doc. I'll merge this in early next week if there's no further comment. |
Yea, it looks fine to me. Thanks for the take-over, @HeartSaVioR and thanks a lot for the valuable contribution, @cchighman ! |
Test build #131392 has finished for PR 30411 at commit
|
Build failure is not related. I'll let it go and check just before the merge. |
retest this, please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #131506 has finished for PR 30411 at commit
|
OK. No further comments, and test passed. Merging to master. |
Thanks all for reviewing, and thanks again @cchighman for providing great stuff! |
I am supportive of this change FWIW. I think it's good to have this feature. |
Does it still need to be merged? |
No this is merged and will be available in new minor version, release 3.1.1. |
What changes were proposed in this pull request?
Two new options, modifiiedBefore and modifiedAfter, is provided expecting a value in 'YYYY-MM-DDTHH:mm:ss' format. PartioningAwareFileIndex considers these options during the process of checking for files, just before considering applied PathFilters such as
pathGlobFilter.
In order to filter file results, a new PathFilter class was derived for this purpose. General house-keeping around classes extending PathFilter was performed for neatness. It became apparent support was needed to handle multiple potential path filters. Logic was introduced for this purpose and the associated tests written.Why are the changes needed?
When loading files from a data source, there can often times be thousands of file within a respective file path. In many cases I've seen, we want to start loading from a folder path and ideally be able to begin loading files having modification dates past a certain point. This would mean out of thousands of potential files, only the ones with modification dates greater than the specified timestamp would be considered. This saves a ton of time automatically and reduces significant complexity managing this in code.
Does this PR introduce any user-facing change?
This PR introduces an option that can be used with batch-based Spark file data sources. A documentation update was made to reflect an example and usage of the new data source option.
Example Usages
Load all CSV files modified after date:
spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
Load all CSV files modified before date:
spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
Load all CSV files modified between two dates:
spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00").option("modifiedBefore","2020-06-15T05:00:00").load()
How was this patch tested?
A handful of unit tests were added to support the positive, negative, and edge case code paths.
It's also live in a handful of our Databricks dev environments. (quoted from @cchighman)