Skip to content

Conversation

@rajeshbalamohan
Copy link

Size of broadcasted data in OrcRelation was significantly higher when running query with large number of partitions (e.g TPC-DS). And it has an impact on the job runtime. This would be more evident when there is large number of partitions/splits. Profiler snapshot is attached in SPARK-12948 (https://issues.apache.org/jira/secure/attachment/12783513/SPARK-12948_cpuProf.png).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to let users share the broadcast of the conf across multiple hadoopRDD calls (e.g. when unioning many HadoopRDDs together)? If so, this issue has come up a number of times in the past and may be worth a holistic design review because I think there are some hacks in Spark SQL to address this problem there and it would be nice to have a unified solution for this.

@JoshRosen
Copy link
Contributor

Can you add more description to explain how this patch reduces the size of broadcasts? The change isn't obvious to me at first glance, so one or two sentences of description would help me and other reviewers who aren't as familiar with this corner of the code.

@rajeshbalamohan
Copy link
Author

Usecase: User tries to map the dataset which is partitioned (e.g TPC-DS dataset at 200 GB scale) & runs a query in spark-shell.

E.g
...
val o_store_sales = sqlContext.read.format("orc").load("/tmp/spark_tpcds_bin_partitioned_orc_200/store_sales")
o_store_sales.registerTempTable("o_store_sales")
..
sqlContext.sql("SELECT..").show();
...

When this is executed, OrcRelation creates Config objects for every partition (Ref: OrcRelation.execute()). In the case of TPC-DS, it generates 1826 partitions. This info is broadcasted in DAGScheduler#submitMissingTasks(). As a part of this, the configurations created for 1826 partitions are also streamed through (i.e embedded in HadoopMapParitionsWithSplitRDD -->f()--> wrappedConf). Each of these configuration takes around 251 KB per partition. Please refer to the profiler snapshot attached in the JIRA (mem_snap_shot). This causes quite a bit of delay in the overall job runtime.

Patch reuses the already broadcastedconf from SparkContext. fillObject() function is executed later for every partition, which internally sets up any additional config details. This drastically reduces the amount of payload that is broadcasted and helps in reducing the overall job runtime.

@rajeshbalamohan
Copy link
Author

@JoshRosen - Please let me know if my latest comment on the usecase addresses your question. Can you.

may be worth a holistic design review because I think there are some hacks in Spark SQL to address this problem there and it would be nice to have a unified solution for this

Can you plz provide more details/pointers on this?

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57670 has finished for PR 10861 at commit 4da7a22.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Hi @rajeshbalamohan, I think this should be a mergeable state at least and the conflicts and style issues should be resolved. Would you be able to update this for now?

@gatorsmile
Copy link
Member

We are closing it due to inactivity. please do reopen if you want to push it forward. Thanks!

@asfgit asfgit closed this in b32bd00 Jun 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants