From 384a13dc3046ab283c6c78a89a2018d50b87fcb5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 11 Jun 2024 10:46:04 -0700 Subject: [PATCH 1/2] Revert "[SPARK-46937][SQL][FOLLOWUP] Properly check registered function replacement" This reverts commit 88b8dc29e100a51501701ffdffbcd0eff1f97c98. --- .../apache/spark/sql/catalyst/analysis/FunctionRegistry.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 588752f3fc176..a52feaa41acf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -222,7 +222,7 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging builder: FunctionBuilder): Unit = { val newFunction = (info, builder) functionBuilders.put(name, newFunction) match { - case previousFunction if previousFunction != null => + case previousFunction if previousFunction != newFunction => logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " + log"previously registered function.") case _ => From 913e4e50b6e906880d903a7fc87e928a2b97fc04 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 11 Jun 2024 10:46:24 -0700 Subject: [PATCH 2/2] Revert "[SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry" This reverts commit 8cebb9b56cc3716fb5afaafa317751924f0f8062. --- .../catalyst/analysis/FunctionRegistry.scala | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index a52feaa41acf9..3a418497fa537 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale -import java.util.concurrent.ConcurrentHashMap +import javax.annotation.concurrent.GuardedBy -import scala.jdk.CollectionConverters._ +import scala.collection.mutable import scala.reflect.ClassTag import org.apache.spark.SparkUnsupportedOperationException @@ -195,8 +195,9 @@ object FunctionRegistryBase { trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging { + @GuardedBy("this") protected val functionBuilders = - new ConcurrentHashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)] + new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)] // Resolution of the function name is always case insensitive, but the database name // depends on the caller @@ -219,10 +220,10 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging def internalRegisterFunction( name: FunctionIdentifier, info: ExpressionInfo, - builder: FunctionBuilder): Unit = { + builder: FunctionBuilder): Unit = synchronized { val newFunction = (info, builder) functionBuilders.put(name, newFunction) match { - case previousFunction if previousFunction != newFunction => + case Some(previousFunction) if previousFunction != newFunction => logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " + log"previously registered function.") case _ => @@ -230,25 +231,34 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging } override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): T = { - val func = Option(functionBuilders.get(normalizeFuncName(name))).map(_._2).getOrElse { - throw QueryCompilationErrors.unresolvedRoutineError(name, Seq("system.builtin")) + val func = synchronized { + functionBuilders.get(normalizeFuncName(name)).map(_._2).getOrElse { + throw QueryCompilationErrors.unresolvedRoutineError(name, Seq("system.builtin")) + } } func(children) } - override def listFunction(): Seq[FunctionIdentifier] = - functionBuilders.keys().asScala.toSeq + override def listFunction(): Seq[FunctionIdentifier] = synchronized { + functionBuilders.iterator.map(_._1).toList + } - override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = - Option(functionBuilders.get(normalizeFuncName(name))).map(_._1) + override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = synchronized { + functionBuilders.get(normalizeFuncName(name)).map(_._1) + } - override def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] = - Option(functionBuilders.get(normalizeFuncName(name))).map(_._2) + override def lookupFunctionBuilder( + name: FunctionIdentifier): Option[FunctionBuilder] = synchronized { + functionBuilders.get(normalizeFuncName(name)).map(_._2) + } - override def dropFunction(name: FunctionIdentifier): Boolean = - Option(functionBuilders.remove(normalizeFuncName(name))).isDefined + override def dropFunction(name: FunctionIdentifier): Boolean = synchronized { + functionBuilders.remove(normalizeFuncName(name)).isDefined + } - override def clear(): Unit = functionBuilders.clear() + override def clear(): Unit = synchronized { + functionBuilders.clear() + } } /** @@ -298,11 +308,7 @@ class SimpleFunctionRegistry override def clone(): SimpleFunctionRegistry = synchronized { val registry = new SimpleFunctionRegistry - val iterator = functionBuilders.entrySet().iterator() - while (iterator.hasNext) { - val entry = iterator.next() - val name = entry.getKey - val (info, builder) = entry.getValue + functionBuilders.iterator.foreach { case (name, (info, builder)) => registry.internalRegisterFunction(name, info, builder) } registry @@ -1030,11 +1036,7 @@ class SimpleTableFunctionRegistry extends SimpleFunctionRegistryBase[LogicalPlan override def clone(): SimpleTableFunctionRegistry = synchronized { val registry = new SimpleTableFunctionRegistry - val iterator = functionBuilders.entrySet().iterator() - while (iterator.hasNext) { - val entry = iterator.next() - val name = entry.getKey - val (info, builder) = entry.getValue + functionBuilders.iterator.foreach { case (name, (info, builder)) => registry.internalRegisterFunction(name, info, builder) } registry