Skip to content

Commit ada586f

Browse files
committed
Merge branch 'ESPARK-112' into 'spark_2.1'
[ESPARK-112] 收集sql以及表依赖关系 实现思路就是通过event的方式来收集sql和依赖 1、在sql执行的前收集所有sql 2、依赖是在optimizer的阶段把依赖都解析出来,传到event里面,然后通过外置的listener去收集 listener的地址是:https://git.elenet.me/dt-arch/spark-query-log 使用方式: ```shell spark.extraListeners=com.eleme.tools.sql.SparkDependencyCollect,com.eleme.tools.sql.SparkSQLQueryLog spark.sql.querylog.url= spark.sql.querylog.user= spark.sql.querylog.password= #spark.listener.printError=false #spark.collect.sqlDependency ``` resolve apache#112 See merge request !94
2 parents 9669f12 + 47a0186 commit ada586f

File tree

4 files changed

+44
-3
lines changed

4 files changed

+44
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
2020
import java.util.Properties
2121
import javax.annotation.Nullable
2222

23+
import scala.collection.mutable.HashSet
2324
import scala.collection.Map
2425

2526
import com.fasterxml.jackson.annotation.JsonTypeInfo
@@ -39,6 +40,17 @@ trait SparkListenerEvent {
3940
protected[spark] def logEvent: Boolean = true
4041
}
4142

43+
@DeveloperApi
44+
case class SQLEvent(sql: String) extends SparkListenerEvent {
45+
override protected[spark] def logEvent: Boolean = false
46+
}
47+
48+
@DeveloperApi
49+
case class DependencyEvent(readTables: HashSet[String], writeTables: HashSet[String])
50+
extends SparkListenerEvent {
51+
override protected[spark] def logEvent: Boolean = false
52+
}
53+
4254
@DeveloperApi
4355
case class TimeSeriesMetricEvent(executorId: String, name: String, stat: StatCounter)
4456
extends SparkListenerEvent

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.rdd.RDD
21-
import org.apache.spark.sql.{SaveMode, Strategy}
21+
import org.apache.spark.sql.{execution, SaveMode, Strategy}
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.planning._
2626
import org.apache.spark.sql.catalyst.plans._
2727
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, EventTimeWatermark, LogicalPlan}
2828
import org.apache.spark.sql.catalyst.plans.physical._
29-
import org.apache.spark.sql.execution
3029
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
3130
import org.apache.spark.sql.execution.command._
3231
import org.apache.spark.sql.execution.datasources._

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizerRules.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ package org.apache.spark.sql.hive
1919

2020
import java.io.IOException
2121

22+
import scala.collection.mutable
2223
import scala.util.control.Breaks.{break, breakable}
2324

2425
import org.apache.hadoop.fs.Path
2526
import org.apache.hadoop.hive.common.StatsSetupConst
2627

28+
import org.apache.spark.scheduler.DependencyEvent
2729
import org.apache.spark.sql.SparkSession
2830
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSet, PredicateHelper, Rand}
31+
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2932
import org.apache.spark.sql.catalyst.plans.logical._
3033
import org.apache.spark.sql.catalyst.rules.Rule
3134
import org.apache.spark.sql.execution.command.DDLUtils
@@ -128,3 +131,24 @@ case class MergeSmallFiles(sparkSession: SparkSession) extends Rule[LogicalPlan]
128131
}
129132
}
130133
}
134+
135+
case class DependencyCollect(sparkSession: SparkSession) extends Rule[LogicalPlan] {
136+
def apply(plan: LogicalPlan): LogicalPlan = {
137+
if (sparkSession.sparkContext.conf.getBoolean("spark.collectDependencies", true)) {
138+
val readTables = mutable.HashSet[String]()
139+
val writeTables = mutable.HashSet[String]()
140+
plan transformDown {
141+
case i @ InsertIntoTable(table: MetastoreRelation, _, _, _, _) =>
142+
writeTables += s"${table.databaseName}.${table.tableName}"
143+
i
144+
case p @ PhysicalOperation(_, _, table: MetastoreRelation) =>
145+
readTables += s"${table.databaseName}.${table.tableName}"
146+
p
147+
}
148+
if (readTables.size > 0 || writeTables.size > 0) {
149+
sparkSession.sparkContext.listenerBus.post(DependencyEvent(readTables, writeTables))
150+
}
151+
}
152+
plan
153+
}
154+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import org.apache.spark.scheduler.SQLEvent
2021
import org.apache.spark.sql._
2122
import org.apache.spark.sql.catalyst.analysis.Analyzer
2223
import org.apache.spark.sql.catalyst.optimizer.Optimizer
@@ -54,6 +55,9 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
5455
}
5556

5657
override protected[sql] def auth(command: String): Unit = {
58+
if (sparkSession.sparkContext.conf.getBoolean("spark.hive.sql.collect", true)) {
59+
sparkSession.sparkContext.listenerBus.post(SQLEvent(command))
60+
}
5761
if (!sparkSession.sparkContext.conf.getBoolean("spark.hive.auth.enable", true)) {
5862
return
5963
}
@@ -81,7 +85,9 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
8185
override def batches: Seq[Batch] = super.batches :+
8286
Batch("Determine stats of partitionedTable", Once,
8387
DeterminePartitionedTableStats(sparkSession)) :+
84-
Batch("Merge small files when insert into hive tables", Once, MergeSmallFiles(sparkSession))
88+
Batch("Merge small files when insert into hive tables", Once,
89+
MergeSmallFiles(sparkSession)) :+
90+
Batch("Collect read and write tables", Once, DependencyCollect(sparkSession))
8591
}
8692

8793
/**

0 commit comments

Comments
 (0)