Skip to content

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Nov 18, 2020

What changes were proposed in this pull request?

Two new options, modifiiedBefore and modifiedAfter, is provided expecting a value in 'YYYY-MM-DDTHH:mm:ss' format. PartioningAwareFileIndex considers these options during the process of checking for files, just before considering applied PathFilters such as pathGlobFilter. In order to filter file results, a new PathFilter class was derived for this purpose. General house-keeping around classes extending PathFilter was performed for neatness. It became apparent support was needed to handle multiple potential path filters. Logic was introduced for this purpose and the associated tests written.

Why are the changes needed?

When loading files from a data source, there can often times be thousands of file within a respective file path. In many cases I've seen, we want to start loading from a folder path and ideally be able to begin loading files having modification dates past a certain point. This would mean out of thousands of potential files, only the ones with modification dates greater than the specified timestamp would be considered. This saves a ton of time automatically and reduces significant complexity managing this in code.

Does this PR introduce any user-facing change?

This PR introduces an option that can be used with batch-based Spark file data sources. A documentation update was made to reflect an example and usage of the new data source option.

Example Usages
Load all CSV files modified after date:
spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()

Load all CSV files modified before date:
spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()

Load all CSV files modified between two dates:
spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00").option("modifiedBefore","2020-06-15T05:00:00").load()

How was this patch tested?

A handful of unit tests were added to support the positive, negative, and edge case code paths.

It's also live in a handful of our Databricks dev environments. (quoted from @cchighman)

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35885/

@HeartSaVioR
Copy link
Contributor Author

cc. @maropu @gengliangwang @cloud-fan @bart-samwel @zsxwing @dongjoon-hyun @HyukjinKwon

I've cc-ed all reviewers who left at least one review comment in original PR (#28841)

Just FYI to @cchighman as he's an author of original PR.

@HeartSaVioR
Copy link
Contributor Author

First commit squashed the commits in origin PR. Remaining commits are my own to address my own review comments. So if you've done with the origin PR and it looked good to you, you may probably want to only look at remaining commits.

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Test build #131284 has finished for PR 30411 at commit 088b630.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35885/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35886/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35890/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35886/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35890/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35891/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35891/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Test build #131281 has finished for PR 30411 at commit 80170b4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Test build #131285 has finished for PR 30411 at commit 71456f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Test build #131287 has finished for PR 30411 at commit d216fcb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

test("Option pathGlobFilter: filter files correctly") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: two tests were moved from FileBasedDataSourceSuite as this PR adds the specific suite for filtering option.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left minor comments and the other parts look fine. cc: @HyukjinKwon @dongjoon-hyun @viirya

// $example on:load_with_modified_time_filter$
val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: two indents to follow the other examples.

// +-------------+
val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

`modifiedBefore` and `modifiedAfter` are options that can be
applied together or separately in order to achieve greater
granularity over which files may load during a Spark batch query.
(Structured Streaming file source doesn't support these options.)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (Structured Streaming file source doesn't support these options.) -> Note that Structured Streaming file sources don't support these options.?

properties = dict()
jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
jprop = JavaClass("java.util.Properties",
self._spark._sc._gateway._gateway_client)()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a unnecessary change?

gateway = self._spark._sc._gateway
jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
jpredicates = utils.toJArray(
gateway, gateway.jvm.java.lang.String, predicates)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto (I have the same comments on the changes below, too).

protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]

private val caseInsensitiveMap = CaseInsensitiveMap(parameters)
protected val pathFilters = PathFilterFactory.create(caseInsensitiveMap)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected -> private?

private def checkDisallowedOptions(options: Map[String, String]): Unit = {
Seq(ModifiedBeforeFilter.PARAM_NAME, ModifiedAfterFilter.PARAM_NAME).foreach { param =>
if (parameters.contains(param)) {
throw new IllegalArgumentException(s"option '$param' is not allowed in file stream source")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: file stream source -> file stream sources?

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35996/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35996/

@HeartSaVioR
Copy link
Contributor Author

Thanks for the review, @maropu !

The origin PR has been open for months, and I only refactored a bit & fixed the doc. I'll merge this in early next week if there's no further comment.

@maropu
Copy link
Member

maropu commented Nov 20, 2020

The origin PR has been open for months, and I only refactored a bit & fixed the doc. I'll merge this in early next week if there's no further comment.

Yea, it looks fine to me. Thanks for the take-over, @HeartSaVioR and thanks a lot for the valuable contribution, @cchighman !

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Test build #131392 has finished for PR 30411 at commit ce00b6d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Build failure is not related. I'll let it go and check just before the merge.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Nov 22, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36110/

@SparkQA
Copy link

SparkQA commented Nov 22, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36110/

@SparkQA
Copy link

SparkQA commented Nov 22, 2020

Test build #131506 has finished for PR 30411 at commit ce00b6d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

OK. No further comments, and test passed. Merging to master.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 22, 2020

Thanks all for reviewing, and thanks again @cchighman for providing great stuff!

@HeartSaVioR HeartSaVioR deleted the SPARK-31962 branch November 22, 2020 23:33
@HyukjinKwon
Copy link
Member

I am supportive of this change FWIW. I think it's good to have this feature.

@amadav
Copy link

amadav commented Jan 26, 2021

Does it still need to be merged?

@HeartSaVioR
Copy link
Contributor Author

No this is merged and will be available in new minor version, release 3.1.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants