Skip to content

Commit 47e1d96

Browse files
asl3gengliangwang
authored andcommitted
[SPARK-48623][CORE] Structured logging migrations [Part 3]
### What changes were proposed in this pull request? This PR makes additional Scala logging migrations to comply with the scala style changes in apache#46947 ### Why are the changes needed? This makes development and PR review of the structured logging migration easier. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested by ensuring dev/scalastyle checks pass ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47275 from asl3/formatstructuredlogmigrations. Lead-authored-by: Amanda Liu <[email protected]> Co-authored-by: Gengliang Wang <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent 9b5c00b commit 47e1d96

File tree

17 files changed

+81
-58
lines changed

17 files changed

+81
-58
lines changed

common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ private[spark] object LogKeys {
101101
case object BLOCK_TYPE extends LogKey
102102
case object BOOT extends LogKey
103103
case object BOOTSTRAP_TIME extends LogKey
104+
case object BOOT_TIME extends LogKey
104105
case object BROADCAST extends LogKey
105106
case object BROADCAST_ID extends LogKey
106107
case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
@@ -110,6 +111,7 @@ private[spark] object LogKeys {
110111
case object BYTE_SIZE extends LogKey
111112
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
112113
case object CACHE_AUTO_REMOVED_SIZE extends LogKey
114+
case object CACHE_SIZE extends LogKey
113115
case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
114116
case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
115117
case object CALL_SITE_LONG_FORM extends LogKey
@@ -282,6 +284,7 @@ private[spark] object LogKeys {
282284
case object FINAL_CONTEXT extends LogKey
283285
case object FINAL_OUTPUT_PATH extends LogKey
284286
case object FINAL_PATH extends LogKey
287+
case object FINISH_TIME extends LogKey
285288
case object FINISH_TRIGGER_DURATION extends LogKey
286289
case object FREE_MEMORY_SIZE extends LogKey
287290
case object FROM_OFFSET extends LogKey
@@ -320,10 +323,12 @@ private[spark] object LogKeys {
320323
case object INITIAL_CAPACITY extends LogKey
321324
case object INITIAL_HEARTBEAT_INTERVAL extends LogKey
322325
case object INIT_MODE extends LogKey
326+
case object INIT_TIME extends LogKey
323327
case object INPUT extends LogKey
324328
case object INPUT_SPLIT extends LogKey
325329
case object INTEGRAL extends LogKey
326330
case object INTERVAL extends LogKey
331+
case object INVALID_PARAMS extends LogKey
327332
case object ISOLATION_LEVEL extends LogKey
328333
case object ISSUE_DATE extends LogKey
329334
case object IS_NETWORK_REQUEST_DONE extends LogKey
@@ -369,6 +374,7 @@ private[spark] object LogKeys {
369374
case object LOG_LEVEL extends LogKey
370375
case object LOG_OFFSET extends LogKey
371376
case object LOG_TYPE extends LogKey
377+
case object LOSSES extends LogKey
372378
case object LOWER_BOUND extends LogKey
373379
case object MALFORMATTED_STRING extends LogKey
374380
case object MAP_ID extends LogKey
@@ -566,6 +572,7 @@ private[spark] object LogKeys {
566572
case object OS_NAME extends LogKey
567573
case object OS_VERSION extends LogKey
568574
case object OUTPUT extends LogKey
575+
case object OUTPUT_BUFFER extends LogKey
569576
case object OVERHEAD_MEMORY_SIZE extends LogKey
570577
case object PAGE_SIZE extends LogKey
571578
case object PARENT_STAGES extends LogKey

common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -650,8 +650,9 @@ private[spark] object MavenUtils extends Logging {
650650
val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq
651651
if (invalidParams.nonEmpty) {
652652
logWarning(
653-
s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " +
654-
s"in Ivy URI query `$uriQuery`.")
653+
log"Invalid parameters `${MDC(LogKeys.INVALID_PARAMS,
654+
invalidParams.sorted.mkString(","))}` " +
655+
log"found in Ivy URI query `${MDC(LogKeys.URI, uriQuery)}`.")
655656
}
656657

657658
(transitive, exclusionList, repos)

connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ import com.google.common.cache.{Cache, CacheBuilder}
3333
import org.apache.spark.{SparkEnv, SparkException, SparkSQLException}
3434
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
3535
import org.apache.spark.connect.proto
36-
import org.apache.spark.internal.{Logging, MDC}
37-
import org.apache.spark.internal.LogKeys._
36+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3837
import org.apache.spark.sql.DataFrame
3938
import org.apache.spark.sql.SparkSession
4039
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -63,9 +62,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
6362
private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = {
6463
if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) {
6564
logWarning(
66-
s"Session plan cache is disabled due to non-positive cache size." +
67-
s" Current value of '${Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is" +
68-
s" ${SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE)}.")
65+
log"Session plan cache is disabled due to non-positive cache size." +
66+
log" Current value of " +
67+
log"'${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC(
68+
LogKeys.CACHE_SIZE,
69+
SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}")
6970
None
7071
} else {
7172
Some(
@@ -248,15 +249,17 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
248249
private[connect] def updateAccessTime(): Unit = {
249250
lastAccessTimeMs = System.currentTimeMillis()
250251
logInfo(
251-
log"Session ${MDC(SESSION_KEY, key)} accessed, " +
252-
log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.")
252+
log"Session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
253+
log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)} accessed," +
254+
log"time ${MDC(LogKeys.LAST_ACCESS_TIME, lastAccessTimeMs)} ms.")
253255
}
254256

255257
private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = {
256258
customInactiveTimeoutMs = newInactiveTimeoutMs
257259
logInfo(
258-
log"Session ${MDC(SESSION_KEY, key)} " +
259-
log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.")
260+
log"Session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
261+
log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)} inactive timeout set to " +
262+
log"${MDC(LogKeys.TIMEOUT, customInactiveTimeoutMs)} ms")
260263
}
261264

262265
/**
@@ -282,8 +285,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
282285
throw new IllegalStateException(s"Session $key is already closed.")
283286
}
284287
logInfo(
285-
log"Closing session with userId: ${MDC(USER_ID, userId)} and " +
286-
log"sessionId: ${MDC(SESSION_ID, sessionId)}")
288+
log"Closing session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
289+
log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}")
287290
closedTimeMs = Some(System.currentTimeMillis())
288291

289292
if (Utils.isTesting && eventManager.status == SessionStatus.Pending) {

connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.connect.proto.ExecutePlanResponse
2828
import org.apache.spark.connect.proto.StreamingQueryEventType
2929
import org.apache.spark.connect.proto.StreamingQueryListenerEvent
3030
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
31-
import org.apache.spark.internal.Logging
31+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3232
import org.apache.spark.sql.streaming.StreamingQueryListener
3333
import org.apache.spark.util.ArrayImplicits._
3434

@@ -131,10 +131,10 @@ private[sql] class SparkConnectListenerBusListener(
131131
.build())
132132
} catch {
133133
case NonFatal(e) =>
134-
logError(
135-
s"[SessionId: ${sessionHolder.sessionId}][UserId: ${sessionHolder.userId}] " +
136-
s"Removing SparkConnectListenerBusListener and terminating the long-running thread " +
137-
s"because of exception: $e")
134+
logError(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" +
135+
log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " +
136+
log"Removing SparkConnectListenerBusListener and terminating the long-running thread " +
137+
log"because of exception: ${MDC(LogKeys.EXCEPTION, e)}")
138138
// This likely means that the client is not responsive even with retry, we should
139139
// remove this listener and cleanup resources.
140140
serverSideListenerHolder.cleanUp()

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private[spark] class SecurityManager(
122122
*/
123123
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = {
124124
viewAcls = adminAcls ++ defaultUsers ++ allowedUsers
125-
logInfo("Changing view acls to: " + viewAcls.mkString(","))
125+
logInfo(log"Changing view acls to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}")
126126
}
127127

128128
def setViewAcls(defaultUser: String, allowedUsers: Seq[String]): Unit = {
@@ -135,7 +135,7 @@ private[spark] class SecurityManager(
135135
*/
136136
def setViewAclsGroups(allowedUserGroups: Seq[String]): Unit = {
137137
viewAclsGroups = adminAclsGroups ++ allowedUserGroups
138-
logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))
138+
logInfo(log"Changing view acls groups to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}")
139139
}
140140

141141
/**
@@ -163,7 +163,7 @@ private[spark] class SecurityManager(
163163
*/
164164
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = {
165165
modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers
166-
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
166+
logInfo(log"Changing modify acls to: ${MDC(LogKeys.MODIFY_ACLS, modifyAcls.mkString(","))}")
167167
}
168168

169169
/**
@@ -172,7 +172,8 @@ private[spark] class SecurityManager(
172172
*/
173173
def setModifyAclsGroups(allowedUserGroups: Seq[String]): Unit = {
174174
modifyAclsGroups = adminAclsGroups ++ allowedUserGroups
175-
logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))
175+
logInfo(log"Changing modify acls groups to: ${MDC(LogKeys.MODIFY_ACLS,
176+
modifyAcls.mkString(","))}")
176177
}
177178

178179
/**
@@ -200,7 +201,7 @@ private[spark] class SecurityManager(
200201
*/
201202
def setAdminAcls(adminUsers: Seq[String]): Unit = {
202203
adminAcls = adminUsers.toSet
203-
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
204+
logInfo(log"Changing admin acls to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}")
204205
}
205206

206207
/**
@@ -209,7 +210,7 @@ private[spark] class SecurityManager(
209210
*/
210211
def setAdminAclsGroups(adminUserGroups: Seq[String]): Unit = {
211212
adminAclsGroups = adminUserGroups.toSet
212-
logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))
213+
logInfo(log"Changing admin acls groups to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}")
213214
}
214215

215216
def setAcls(aclSetting: Boolean): Unit = {

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import scala.util.control.NonFatal
3131

3232
import org.apache.spark._
3333
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
34-
import org.apache.spark.internal.{Logging, MDC}
34+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3535
import org.apache.spark.internal.LogKeys.TASK_NAME
3636
import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python}
3737
import org.apache.spark.internal.config.Python._
@@ -131,19 +131,23 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
131131
private val daemonModule =
132132
conf.get(PYTHON_DAEMON_MODULE).map { value =>
133133
logInfo(
134-
s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " +
135-
"using this to start the daemon up. Note that this configuration only has an effect when " +
136-
s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.")
134+
log"Python daemon module in PySpark is set to " +
135+
log"[${MDC(LogKeys.VALUE, value)}] in '${MDC(LogKeys.CONFIG,
136+
PYTHON_DAEMON_MODULE.key)}', using this to start the daemon up. Note that this " +
137+
log"configuration only has an effect when '${MDC(LogKeys.CONFIG2,
138+
PYTHON_USE_DAEMON.key)}' is enabled and the platform is not Windows.")
137139
value
138140
}.getOrElse("pyspark.daemon")
139141

140142
// This configuration indicates the module to run each Python worker.
141143
private val workerModule =
142144
conf.get(PYTHON_WORKER_MODULE).map { value =>
143145
logInfo(
144-
s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " +
145-
"using this to start the worker up. Note that this configuration only has an effect when " +
146-
s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.")
146+
log"Python worker module in PySpark is set to ${MDC(LogKeys.VALUE, value)} " +
147+
log"in ${MDC(LogKeys.CONFIG, PYTHON_WORKER_MODULE.key)}, " +
148+
log"using this to start the worker up. Note that this configuration only has " +
149+
log"an effect when ${MDC(LogKeys.CONFIG2, PYTHON_USE_DAEMON.key)} " +
150+
log"is disabled or the platform is Windows.")
147151
value
148152
}.getOrElse("pyspark.worker")
149153

@@ -509,8 +513,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
509513
val init = initTime - bootTime
510514
val finish = finishTime - initTime
511515
val total = finishTime - startTime
512-
logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
513-
init, finish))
516+
logInfo(log"Times: total = ${MDC(LogKeys.TOTAL_TIME, total)}, " +
517+
log"boot = ${MDC(LogKeys.BOOT_TIME, boot)}, " +
518+
log"init = ${MDC(LogKeys.INIT_TIME, init)}, " +
519+
log"finish = ${MDC(LogKeys.FINISH_TIME, finish)}")
514520
val memoryBytesSpilled = stream.readLong()
515521
val diskBytesSpilled = stream.readLong()
516522
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark._
3636
import org.apache.spark.broadcast.Broadcast
3737
import org.apache.spark.errors.SparkCoreErrors
3838
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
39-
import org.apache.spark.internal.{config, Logging, MDC}
39+
import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
4040
import org.apache.spark.internal.LogKeys._
4141
import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED}
4242
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
@@ -998,11 +998,13 @@ private[spark] class DAGScheduler(
998998
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
999999
waiter.completionFuture.value.get match {
10001000
case scala.util.Success(_) =>
1001-
logInfo("Job %d finished: %s, took %f s".format
1002-
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
1001+
logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} finished: " +
1002+
log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " +
1003+
log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms")
10031004
case scala.util.Failure(exception) =>
1004-
logInfo("Job %d failed: %s, took %f s".format
1005-
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
1005+
logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} failed: " +
1006+
log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " +
1007+
log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms")
10061008
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
10071009
val callerStackTrace = Thread.currentThread().getStackTrace.tail
10081010
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)

core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private[spark] object ShutdownHookManager extends Logging {
110110
}
111111
}
112112
if (retval) {
113-
logInfo("path = " + file + ", already present as root for deletion.")
113+
logInfo(log"path = ${MDC(LogKeys.FILE_NAME, file)}, already present as root for deletion.")
114114
}
115115
retval
116116
}

core/src/main/scala/org/apache/spark/util/collection/Spillable.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.util.collection
1919

2020
import org.apache.spark.SparkEnv
21-
import org.apache.spark.internal.Logging
21+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
2222
import org.apache.spark.internal.config._
2323
import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}
2424

@@ -143,8 +143,9 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
143143
*/
144144
@inline private def logSpillage(size: Long): Unit = {
145145
val threadId = Thread.currentThread().getId
146-
logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
147-
.format(threadId, org.apache.spark.util.Utils.bytesToString(size),
148-
_spillCount, if (_spillCount > 1) "s" else ""))
146+
logInfo(log"Thread ${MDC(LogKeys.THREAD_ID, threadId)} " +
147+
log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE,
148+
org.apache.spark.util.Utils.bytesToString(size))} to disk " +
149+
log"(${MDC(LogKeys.SPILL_TIMES, _spillCount)} times so far)")
149150
}
150151
}

mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark.annotation.Since
2323
import org.apache.spark.api.java.JavaSparkContext._
24-
import org.apache.spark.internal.{Logging, MDC}
24+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
2525
import org.apache.spark.internal.LogKeys.{CLUSTER_CENTROIDS, CLUSTER_LABEL, CLUSTER_WEIGHT, LARGEST_CLUSTER_INDEX, SMALLEST_CLUSTER_INDEX}
2626
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
2727
import org.apache.spark.rdd.RDD
@@ -222,7 +222,7 @@ class StreamingKMeans @Since("1.2.0") (
222222
throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit)
223223
}
224224
this.decayFactor = math.exp(math.log(0.5) / halfLife)
225-
logInfo("Setting decay factor to: %g ".format (this.decayFactor))
225+
logInfo(log"Setting decay factor to: ${MDC(LogKeys.VALUE, this.decayFactor)}")
226226
this.timeUnit = timeUnit
227227
this
228228
}

0 commit comments

Comments
 (0)