Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This fixes a perf regression caused by #21376 .

We should not use RDD#toLocalIterator, which triggers one Spark job per RDD partition. This is very bad for RDDs with a lot of small partitions.

To fix it, this PR introduces a way to access SQLConf in the scheduler event loop thread, so that we don't need to use RDD#toLocalIterator anymore in JsonInferSchema.

How was this patch tested?

a new test

@cloud-fan
Copy link
Contributor Author

// the fold functions in the scheduler event loop thread.
val existingConf = SQLConf.get
var rootType: DataType = StructType(Nil)
val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to do sc.clean(typeMerger) manually here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closure is defined by us and I don't think we leak outer reference here. If we do, it's a bug and we should fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed.

@SparkQA
Copy link

SparkQA commented Aug 20, 2018

Test build #94952 has finished for PR 22152 at commit cf13d71.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val schedulerEventLoopThread =
SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread
if (schedulerEventLoopThread.getId == Thread.currentThread().getId) {
// will return `fallbackConf` which is unexpected. Here we requires the caller to get the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we require

var rootType: DataType = StructType(Nil)
val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger)
val mergeResult = (index: Int, taskResult: DataType) => {
rootType = SQLConf.withExistingConf(existingConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question, wouldn't:

val partitionsResult = json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition)
partitionsResult.fold(typeMerger)

do the same without requiring these changes?

Copy link
Contributor Author

@cloud-fan cloud-fan Aug 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can work, but the problem is, we have to keep a large result array which can cause GC problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would contain one result per partition, do you think this is enough to cause GC problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema can be very complex (e.g. very wide and deep schema).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, makes sense, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question was in my mind. thanks for clarification.

// triggers one Spark job per RDD partition.
Seq(1 -> "a", 2 -> "b").toDF("i", "p")
// The data set has 2 partitions, so Spark will write at least 2 json files.
// Use a non-splittable compression (gzip), to make sure the json scan RDD has at lease 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: at least

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95022 has finished for PR 22152 at commit 95ec4d7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Aug 22, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95068 has finished for PR 22152 at commit 95ec4d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the test passes. The last commit is a typo fix. It should be fine.

LGTM

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@asfgit asfgit closed this in 4a9c9d8 Aug 22, 2018
@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95077 has finished for PR 22152 at commit 23dfcda.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants