-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25159][SQL] json schema inference should only trigger one job #22152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // the fold functions in the scheduler event loop thread. | ||
| val existingConf = SQLConf.get | ||
| var rootType: DataType = StructType(Nil) | ||
| val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to do sc.clean(typeMerger) manually here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This closure is defined by us and I don't think we leak outer reference here. If we do, it's a bug and we should fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, agreed.
|
Test build #94952 has finished for PR 22152 at commit
|
| val schedulerEventLoopThread = | ||
| SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread | ||
| if (schedulerEventLoopThread.getId == Thread.currentThread().getId) { | ||
| // will return `fallbackConf` which is unexpected. Here we requires the caller to get the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we require
| var rootType: DataType = StructType(Nil) | ||
| val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) | ||
| val mergeResult = (index: Int, taskResult: DataType) => { | ||
| rootType = SQLConf.withExistingConf(existingConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a question, wouldn't:
val partitionsResult = json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition)
partitionsResult.fold(typeMerger)
do the same without requiring these changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can work, but the problem is, we have to keep a large result array which can cause GC problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would contain one result per partition, do you think this is enough to cause GC problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the schema can be very complex (e.g. very wide and deep schema).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, makes sense, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question was in my mind. thanks for clarification.
| // triggers one Spark job per RDD partition. | ||
| Seq(1 -> "a", 2 -> "b").toDF("i", "p") | ||
| // The data set has 2 partitions, so Spark will write at least 2 json files. | ||
| // Use a non-splittable compression (gzip), to make sure the json scan RDD has at lease 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: at least
|
Test build #95022 has finished for PR 22152 at commit
|
|
retest this please. |
|
Test build #95068 has finished for PR 22152 at commit
|
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the test passes. The last commit is a typo fix. It should be fine.
LGTM
|
Thanks! Merged to master. |
|
Test build #95077 has finished for PR 22152 at commit
|
What changes were proposed in this pull request?
This fixes a perf regression caused by #21376 .
We should not use
RDD#toLocalIterator, which triggers one Spark job per RDD partition. This is very bad for RDDs with a lot of small partitions.To fix it, this PR introduces a way to access SQLConf in the scheduler event loop thread, so that we don't need to use
RDD#toLocalIteratoranymore inJsonInferSchema.How was this patch tested?
a new test