Skip to content

Commit 92f191a

Browse files
committed
[KYUUBI #4617] [AUTHZ] Collect results for filtered show objects ahead to prevent holding unserializable spark plan
### _Why are the changes needed?_ To fix #4617. - The reason for issue #4617 is that delegated SparkPlan is not serilizable when execution - Collect results for filtered show objects ahead in FilterDataSourceV2Strategy to prevent holding the delegated plan ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4634 from bowenliang123/4617-filter. Closes #4617 fe00ef5 [liangbowen] rename results to result 65ce03a [liangbowen] fix 4617 Authored-by: liangbowen <[email protected]> Signed-off-by: liangbowen <[email protected]>
1 parent 4c2f1e6 commit 92f191a

File tree

3 files changed

+32
-13
lines changed

3 files changed

+32
-13
lines changed

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@ import org.apache.kyuubi.plugin.spark.authz.util.ObjectFilterPlaceHolder
2525
class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
2626
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
2727
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" =>
28-
spark.sessionState.planner.plan(child).map(FilteredShowNamespaceExec).toSeq
28+
spark.sessionState.planner.plan(child)
29+
.map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
2930

3031
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowTables" =>
31-
spark.sessionState.planner.plan(child).map(FilteredShowTablesExec).toSeq
32+
spark.sessionState.planner.plan(child)
33+
.map(FilteredShowTablesExec(_, spark.sparkContext)).toSeq
34+
3235
case _ => Nil
3336
}
3437
}

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kyuubi.plugin.spark.authz.ranger
1818

1919
import org.apache.hadoop.security.UserGroupInformation
20+
import org.apache.spark.SparkContext
2021
import org.apache.spark.rdd.RDD
2122
import org.apache.spark.sql.catalyst.InternalRow
2223
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -26,24 +27,29 @@ import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
2627
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils
2728

2829
trait FilteredShowObjectsExec extends LeafExecNode {
29-
def delegated: SparkPlan
30+
def result: Array[InternalRow]
3031

31-
final override def output: Seq[Attribute] = delegated.output
32-
33-
final private lazy val result = {
34-
delegated.executeCollect().filter(isAllowed(_, AuthZUtils.getAuthzUgi(sparkContext)))
35-
}
32+
override def output: Seq[Attribute]
3633

3734
final override def doExecute(): RDD[InternalRow] = {
3835
sparkContext.parallelize(result, 1)
3936
}
37+
}
4038

41-
protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean
39+
trait FilteredShowObjectsCheck {
40+
def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean
4241
}
4342

44-
case class FilteredShowNamespaceExec(delegated: SparkPlan) extends FilteredShowObjectsExec {
43+
case class FilteredShowNamespaceExec(result: Array[InternalRow], output: Seq[Attribute])
44+
extends FilteredShowObjectsExec {}
45+
object FilteredShowNamespaceExec extends FilteredShowObjectsCheck {
46+
def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec = {
47+
val result = delegated.executeCollect()
48+
.filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
49+
new FilteredShowNamespaceExec(result, delegated.output)
50+
}
4551

46-
override protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
52+
override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
4753
val database = r.getString(0)
4854
val resource = AccessResource(ObjectType.DATABASE, database, null, null)
4955
val request = AccessRequest(resource, ugi, OperationType.SHOWDATABASES, AccessType.USE)
@@ -52,8 +58,16 @@ case class FilteredShowNamespaceExec(delegated: SparkPlan) extends FilteredShowO
5258
}
5359
}
5460

55-
case class FilteredShowTablesExec(delegated: SparkPlan) extends FilteredShowObjectsExec {
56-
override protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
61+
case class FilteredShowTablesExec(result: Array[InternalRow], output: Seq[Attribute])
62+
extends FilteredShowObjectsExec {}
63+
object FilteredShowTablesExec extends FilteredShowObjectsCheck {
64+
def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec = {
65+
val result = delegated.executeCollect()
66+
.filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
67+
new FilteredShowNamespaceExec(result, delegated.output)
68+
}
69+
70+
override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
5771
val database = r.getString(0)
5872
val table = r.getString(1)
5973
val isTemp = r.getBoolean(2)

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
233233
doAs("admin", assert(sql(s"show tables from $db").collect().length === 2))
234234
doAs("bob", assert(sql(s"show tables from $db").collect().length === 0))
235235
doAs("i_am_invisible", assert(sql(s"show tables from $db").collect().length === 0))
236+
doAs("i_am_invisible", assert(sql(s"show tables from $db").limit(1).isEmpty))
236237
}
237238
}
238239

@@ -247,6 +248,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
247248

248249
doAs("bob", assert(sql(s"SHOW DATABASES").collect().length == 1))
249250
doAs("bob", assert(sql(s"SHOW DATABASES").collectAsList().get(0).getString(0) == "default"))
251+
doAs("i_am_invisible", assert(sql(s"SHOW DATABASES").limit(1).isEmpty))
250252
}
251253
}
252254

0 commit comments

Comments
 (0)