Skip to content

Commit 8cebb9b

Browse files
committed
[SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry
### What changes were proposed in this pull request? This PR propose to improve concurrency performance for `FunctionRegistry`. ### Why are the changes needed? Currently, `SimpleFunctionRegistryBase` adopted the `mutable.Map` caching function infos. The `SimpleFunctionRegistryBase` guarded by this so as ensure security under multithreading. Because all the mutable state are related to `functionBuilders`, we can delegate security to `ConcurrentHashMap`. `ConcurrentHashMap ` has higher concurrency activity and responsiveness. After this change, `FunctionRegistry` have better perf than before. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? GA. The benchmark test. ``` object FunctionRegistryBenchmark extends BenchmarkBase { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("FunctionRegistry") { val iters = 1000000 val threadNum = 4 val functionRegistry = FunctionRegistry.builtin val names = FunctionRegistry.expressions.keys.toSeq val barrier = new CyclicBarrier(threadNum + 1) val threadPool = ThreadUtils.newDaemonFixedThreadPool(threadNum, "test-function-registry") val benchmark = new Benchmark("SimpleFunctionRegistry", iters, output = output) benchmark.addCase("only read") { _ => for (_ <- 1 to threadNum) { threadPool.execute(new Runnable { val random = new Random() override def run(): Unit = { barrier.await() for (_ <- 1 to iters) { val name = names(random.nextInt(names.size)) val fun = functionRegistry.lookupFunction(new FunctionIdentifier(name)) assert(fun.map(_.getName).get == name) functionRegistry.listFunction() } barrier.await() } }) } barrier.await() barrier.await() } benchmark.run() } } } ``` The benchmark output before this PR. ``` Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6 Intel(R) Core(TM) i5-5350U CPU 1.80GHz SimpleFunctionRegistry: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ only read 54858 55043 261 0.0 54858.1 1.0X ``` The benchmark output after this PR. ``` Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6 Intel(R) Core(TM) i5-5350U CPU 1.80GHz SimpleFunctionRegistry: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ only read 20202 20264 88 0.0 20202.1 1.0X ``` ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes apache#44976 from beliefer/SPARK-46937. Authored-by: beliefer <[email protected]> Signed-off-by: beliefer <[email protected]>
1 parent abbe301 commit 8cebb9b

File tree

1 file changed

+26
-28
lines changed

1 file changed

+26
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import java.util.Locale
21-
import javax.annotation.concurrent.GuardedBy
21+
import java.util.concurrent.ConcurrentHashMap
2222

23-
import scala.collection.mutable
23+
import scala.jdk.CollectionConverters._
2424
import scala.reflect.ClassTag
2525

2626
import org.apache.spark.SparkUnsupportedOperationException
@@ -195,9 +195,8 @@ object FunctionRegistryBase {
195195

196196
trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging {
197197

198-
@GuardedBy("this")
199198
protected val functionBuilders =
200-
new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
199+
new ConcurrentHashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
201200

202201
// Resolution of the function name is always case insensitive, but the database name
203202
// depends on the caller
@@ -220,45 +219,36 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging
220219
def internalRegisterFunction(
221220
name: FunctionIdentifier,
222221
info: ExpressionInfo,
223-
builder: FunctionBuilder): Unit = synchronized {
222+
builder: FunctionBuilder): Unit = {
224223
val newFunction = (info, builder)
225224
functionBuilders.put(name, newFunction) match {
226-
case Some(previousFunction) if previousFunction != newFunction =>
225+
case previousFunction if previousFunction != newFunction =>
227226
logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " +
228227
log"previously registered function.")
229228
case _ =>
230229
}
231230
}
232231

233232
override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): T = {
234-
val func = synchronized {
235-
functionBuilders.get(normalizeFuncName(name)).map(_._2).getOrElse {
236-
throw QueryCompilationErrors.unresolvedRoutineError(name, Seq("system.builtin"))
237-
}
233+
val func = Option(functionBuilders.get(normalizeFuncName(name))).map(_._2).getOrElse {
234+
throw QueryCompilationErrors.unresolvedRoutineError(name, Seq("system.builtin"))
238235
}
239236
func(children)
240237
}
241238

242-
override def listFunction(): Seq[FunctionIdentifier] = synchronized {
243-
functionBuilders.iterator.map(_._1).toList
244-
}
239+
override def listFunction(): Seq[FunctionIdentifier] =
240+
functionBuilders.keys().asScala.toSeq
245241

246-
override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = synchronized {
247-
functionBuilders.get(normalizeFuncName(name)).map(_._1)
248-
}
242+
override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] =
243+
Option(functionBuilders.get(normalizeFuncName(name))).map(_._1)
249244

250-
override def lookupFunctionBuilder(
251-
name: FunctionIdentifier): Option[FunctionBuilder] = synchronized {
252-
functionBuilders.get(normalizeFuncName(name)).map(_._2)
253-
}
245+
override def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] =
246+
Option(functionBuilders.get(normalizeFuncName(name))).map(_._2)
254247

255-
override def dropFunction(name: FunctionIdentifier): Boolean = synchronized {
256-
functionBuilders.remove(normalizeFuncName(name)).isDefined
257-
}
248+
override def dropFunction(name: FunctionIdentifier): Boolean =
249+
Option(functionBuilders.remove(normalizeFuncName(name))).isDefined
258250

259-
override def clear(): Unit = synchronized {
260-
functionBuilders.clear()
261-
}
251+
override def clear(): Unit = functionBuilders.clear()
262252
}
263253

264254
/**
@@ -308,7 +298,11 @@ class SimpleFunctionRegistry
308298

309299
override def clone(): SimpleFunctionRegistry = synchronized {
310300
val registry = new SimpleFunctionRegistry
311-
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
301+
val iterator = functionBuilders.entrySet().iterator()
302+
while (iterator.hasNext) {
303+
val entry = iterator.next()
304+
val name = entry.getKey
305+
val (info, builder) = entry.getValue
312306
registry.internalRegisterFunction(name, info, builder)
313307
}
314308
registry
@@ -1036,7 +1030,11 @@ class SimpleTableFunctionRegistry extends SimpleFunctionRegistryBase[LogicalPlan
10361030

10371031
override def clone(): SimpleTableFunctionRegistry = synchronized {
10381032
val registry = new SimpleTableFunctionRegistry
1039-
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
1033+
val iterator = functionBuilders.entrySet().iterator()
1034+
while (iterator.hasNext) {
1035+
val entry = iterator.next()
1036+
val name = entry.getKey
1037+
val (info, builder) = entry.getValue
10401038
registry.internalRegisterFunction(name, info, builder)
10411039
}
10421040
registry

0 commit comments

Comments
 (0)