Skip to content

Commit c78e31a

Browse files
committed
Merge remote-tracking branch 'apache/master' into SPARK-6542
2 parents 85f3106 + 276ef1c commit c78e31a

File tree

15 files changed

+331
-89
lines changed

15 files changed

+331
-89
lines changed

core/src/main/scala/org/apache/spark/SparkStatusTracker.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
4545
*/
4646
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
4747
jobProgressListener.synchronized {
48-
val jobData = jobProgressListener.jobIdToData.valuesIterator
49-
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
48+
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
5049
}
5150
}
5251

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
4444
// These type aliases are public because they're used in the types of public fields:
4545

4646
type JobId = Int
47+
type JobGroupId = String
4748
type StageId = Int
4849
type StageAttemptId = Int
4950
type PoolName = String
@@ -54,6 +55,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
5455
val completedJobs = ListBuffer[JobUIData]()
5556
val failedJobs = ListBuffer[JobUIData]()
5657
val jobIdToData = new HashMap[JobId, JobUIData]
58+
val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
5759

5860
// Stages:
5961
val pendingStages = new HashMap[StageId, StageInfo]
@@ -119,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
119121
Map(
120122
"jobIdToData" -> jobIdToData.size,
121123
"stageIdToData" -> stageIdToData.size,
122-
"stageIdToStageInfo" -> stageIdToInfo.size
124+
"stageIdToStageInfo" -> stageIdToInfo.size,
125+
"jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
126+
// Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values:
127+
"jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
123128
)
124129
}
125130

@@ -140,7 +145,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
140145
if (jobs.size > retainedJobs) {
141146
val toRemove = math.max(retainedJobs / 10, 1)
142147
jobs.take(toRemove).foreach { job =>
143-
jobIdToData.remove(job.jobId)
148+
// Remove the job's UI data, if it exists
149+
jobIdToData.remove(job.jobId).foreach { removedJob =>
150+
// A null jobGroupId is used for jobs that are run without a job group
151+
val jobGroupId = removedJob.jobGroup.orNull
152+
// Remove the job group -> job mapping entry, if it exists
153+
jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
154+
jobsInGroup.remove(job.jobId)
155+
// If this was the last job in this job group, remove the map entry for the job group
156+
if (jobsInGroup.isEmpty) {
157+
jobGroupToJobIds.remove(jobGroupId)
158+
}
159+
}
160+
}
144161
}
145162
jobs.trimStart(toRemove)
146163
}
@@ -158,6 +175,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
158175
stageIds = jobStart.stageIds,
159176
jobGroup = jobGroup,
160177
status = JobExecutionStatus.RUNNING)
178+
// A null jobGroupId is used for jobs that are run without a job group
179+
jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
161180
jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
162181
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
163182
// This may be an underestimate because the job start event references all of the result

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.ui.jobs
1919

20+
import java.util.Properties
21+
2022
import org.scalatest.FunSuite
2123
import org.scalatest.Matchers
2224

@@ -44,11 +46,19 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
4446
SparkListenerStageCompleted(stageInfo)
4547
}
4648

47-
private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
49+
private def createJobStartEvent(
50+
jobId: Int,
51+
stageIds: Seq[Int],
52+
jobGroup: Option[String] = None): SparkListenerJobStart = {
4853
val stageInfos = stageIds.map { stageId =>
4954
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
5055
}
51-
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
56+
val properties: Option[Properties] = jobGroup.map { groupId =>
57+
val props = new Properties()
58+
props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
59+
props
60+
}
61+
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull)
5262
}
5363

5464
private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
@@ -110,6 +120,23 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
110120
listener.stageIdToActiveJobIds.size should be (0)
111121
}
112122

123+
test("test clearing of jobGroupToJobIds") {
124+
val conf = new SparkConf()
125+
conf.set("spark.ui.retainedJobs", 5.toString)
126+
val listener = new JobProgressListener(conf)
127+
128+
// Run 50 jobs, each with one stage
129+
for (jobId <- 0 to 50) {
130+
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
131+
listener.onStageSubmitted(createStageStartEvent(0))
132+
listener.onStageCompleted(createStageEndEvent(0, failed = false))
133+
listener.onJobEnd(createJobEndEvent(jobId, false))
134+
}
135+
assertActiveJobsStateIsEmpty(listener)
136+
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
137+
listener.jobGroupToJobIds.size should be (5)
138+
}
139+
113140
test("test LRU eviction of jobs") {
114141
val conf = new SparkConf()
115142
conf.set("spark.ui.retainedStages", 5.toString)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])
5858

5959
/** Returns true if the members of this AttributeSet and other are the same. */
6060
override def equals(other: Any): Boolean = other match {
61-
case otherSet: AttributeSet => baseSet.map(_.a).forall(otherSet.contains)
61+
case otherSet: AttributeSet =>
62+
otherSet.size == baseSet.size && baseSet.map(_.a).forall(otherSet.contains)
6263
case _ => false
6364
}
6465

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -394,10 +394,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
394394
val casts = from.fields.zip(to.fields).map {
395395
case (fromField, toField) => cast(fromField.dataType, toField.dataType)
396396
}
397-
// TODO: This is very slow!
398-
buildCast[Row](_, row => Row(row.toSeq.zip(casts).map {
399-
case (v, cast) => if (v == null) null else cast(v)
400-
}: _*))
397+
// TODO: Could be faster?
398+
val newRow = new GenericMutableRow(from.fields.size)
399+
buildCast[Row](_, row => {
400+
var i = 0
401+
while (i < row.length) {
402+
val v = row(i)
403+
newRow.update(i, if (v == null) null else casts(i)(v))
404+
i += 1
405+
}
406+
newRow.copy()
407+
})
401408
}
402409

403410
private[this] def cast(from: DataType, to: DataType): Any => Any = to match {
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.sql.types.IntegerType
23+
24+
class AttributeSetSuite extends FunSuite {
25+
26+
val aUpper = AttributeReference("A", IntegerType)(exprId = ExprId(1))
27+
val aLower = AttributeReference("a", IntegerType)(exprId = ExprId(1))
28+
val fakeA = AttributeReference("a", IntegerType)(exprId = ExprId(3))
29+
val aSet = AttributeSet(aLower :: Nil)
30+
31+
val bUpper = AttributeReference("B", IntegerType)(exprId = ExprId(2))
32+
val bLower = AttributeReference("b", IntegerType)(exprId = ExprId(2))
33+
val bSet = AttributeSet(bUpper :: Nil)
34+
35+
val aAndBSet = AttributeSet(aUpper :: bUpper :: Nil)
36+
37+
test("sanity check") {
38+
assert(aUpper != aLower)
39+
assert(bUpper != bLower)
40+
}
41+
42+
test("checks by id not name") {
43+
assert(aSet.contains(aUpper) === true)
44+
assert(aSet.contains(aLower) === true)
45+
assert(aSet.contains(fakeA) === false)
46+
47+
assert(aSet.contains(bUpper) === false)
48+
assert(aSet.contains(bLower) === false)
49+
}
50+
51+
test("++ preserves AttributeSet") {
52+
assert((aSet ++ bSet).contains(aUpper) === true)
53+
assert((aSet ++ bSet).contains(aLower) === true)
54+
}
55+
56+
test("extracts all references references") {
57+
val addSet = AttributeSet(Add(aUpper, Alias(bUpper, "test")()):: Nil)
58+
assert(addSet.contains(aUpper))
59+
assert(addSet.contains(aLower))
60+
assert(addSet.contains(bUpper))
61+
assert(addSet.contains(bLower))
62+
}
63+
64+
test("dedups attributes") {
65+
assert(AttributeSet(aUpper :: aLower :: Nil).size === 1)
66+
}
67+
68+
test("subset") {
69+
assert(aSet.subsetOf(aAndBSet) === true)
70+
assert(aAndBSet.subsetOf(aSet) === false)
71+
}
72+
73+
test("equality") {
74+
assert(aSet != aAndBSet)
75+
assert(aAndBSet != aSet)
76+
assert(aSet != bSet)
77+
assert(bSet != aSet)
78+
79+
assert(aSet == aSet)
80+
assert(aSet == AttributeSet(aUpper :: Nil))
81+
}
82+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
459459
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
460460
val parquetRelation = convertToParquetRelation(relation)
461461
val attributedRewrites = relation.output.zip(parquetRelation.output)
462-
(relation, parquetRelation, attributedRewrites)
462+
(relation -> relation.output, parquetRelation, attributedRewrites)
463463

464464
// Write path
465465
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
@@ -470,7 +470,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
470470
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
471471
val parquetRelation = convertToParquetRelation(relation)
472472
val attributedRewrites = relation.output.zip(parquetRelation.output)
473-
(relation, parquetRelation, attributedRewrites)
473+
(relation -> relation.output, parquetRelation, attributedRewrites)
474474

475475
// Read path
476476
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
@@ -479,33 +479,35 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
479479
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
480480
val parquetRelation = convertToParquetRelation(relation)
481481
val attributedRewrites = relation.output.zip(parquetRelation.output)
482-
(relation, parquetRelation, attributedRewrites)
482+
(relation -> relation.output, parquetRelation, attributedRewrites)
483483
}
484484

485+
// Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
486+
// their output attributes as the key of the map. This is because MetastoreRelation.equals
487+
// doesn't take output attributes into account, thus multiple MetastoreRelation instances
488+
// pointing to the same table get collapsed into a single entry in the map. A proper fix for
489+
// this should be overriding equals & hashCode in MetastoreRelation.
485490
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
486491
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
487492

488493
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
489494
// attribute IDs referenced in other nodes.
490495
plan.transformUp {
491-
case r: MetastoreRelation if relationMap.contains(r) => {
492-
val parquetRelation = relationMap(r)
493-
val withAlias =
494-
r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
495-
Subquery(r.tableName, parquetRelation))
496+
case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
497+
val parquetRelation = relationMap(r -> r.output)
498+
val alias = r.alias.getOrElse(r.tableName)
499+
Subquery(alias, parquetRelation)
496500

497-
withAlias
498-
}
499501
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
500-
if relationMap.contains(r) => {
501-
val parquetRelation = relationMap(r)
502+
if relationMap.contains(r -> r.output) =>
503+
val parquetRelation = relationMap(r -> r.output)
502504
InsertIntoTable(parquetRelation, partition, child, overwrite)
503-
}
505+
504506
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
505-
if relationMap.contains(r) => {
506-
val parquetRelation = relationMap(r)
507+
if relationMap.contains(r -> r.output) =>
508+
val parquetRelation = relationMap(r -> r.output)
507509
InsertIntoTable(parquetRelation, partition, child, overwrite)
508-
}
510+
509511
case other => other.transformExpressions {
510512
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
511513
}

0 commit comments

Comments
 (0)