Skip to content

Commit aeae9ff

Browse files
shardulm94LuciferYang
authored andcommitted
[SPARK-52737][CORE] Pushdown predicate and number of apps to FsHistoryProvider when listing applications
### What changes were proposed in this pull request? SPARK-38896 modified how applications are listed from the KVStore to close the KVStore iterator eagerly [Link](https://github.com/apache/spark/pull/36237/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R328). This meant that `FsHistoryProvider.getListing` now eagerly goes through every application in the KVStore before returning an iterator to the caller. In a couple of contexts where `FsHistoryProvider.getListing` is used, this is very detrimental. e.g. [here](https://github.com/apache/spark/blame/589e93a02725939c266f9ee97f96fdc6d3db33cd/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala#L112), due to `.exists()` we would previously only need to go through a handful of applications before the condition is satisfied. This causes significant perf regression for the SHS homepage in our environment which contains ~10000 Spark apps in a single history server. To fix the issue, while preserving the original intent of closing the iterator early, this PR proposes pushing down filter predicates and number of applications required to FsHistoryProvider. ### Why are the changes needed? To fix a perf regression in SHS due to SPARK-38896 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests for `HistoryPage` and `ApplicationListResource` Tested performance on local SHS with a large number of apps (~75k) consistent with production. Before: ``` smahadiklocalhost [ ~ ]$ curl http://localhost:18080/api/v1/applications | jq 'length' 75061 smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: %{time_total}s\n" -o /dev/null http://localhost:18080; done Total time: 3.607995s Total time: 3.564875s Total time: 3.095895s Total time: 3.153576s Total time: 3.157186s Total time: 3.251107s Total time: 3.681727s Total time: 4.622074s Total time: 6.866931s Total time: 3.523224s smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: %{time_total}s\n" -o /dev/null http://localhost:18080/api/v1/applications?limit=10; done Total time: 3.340698s Total time: 3.206455s Total time: 3.140326s Total time: 4.704944s Total time: 3.982831s Total time: 7.375094s Total time: 3.328329s Total time: 3.264700s Total time: 3.283851s Total time: 3.456416s ``` After: ``` smahadiklocalhost [ ~ ]$ curl http://localhost:18080/api/v1/applications | jq 'length' % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 36.7M 0 36.7M 0 0 7662k 0 --:--:-- 0:00:04 --:--:-- 7663k 75077 smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: %{time_total}s\n" -o /dev/null http://localhost:18080; done Total time: 0.224714s Total time: 0.012205s Total time: 0.014709s Total time: 0.008092s Total time: 0.007284s Total time: 0.006350s Total time: 0.005414s Total time: 0.006391s Total time: 0.005668s Total time: 0.004738s smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: %{time_total}s\n" -o /dev/null http://localhost:18080/api/v1/applications?limit=10; done Total time: 1.439507s Total time: 0.015126s Total time: 0.009085s Total time: 0.007620s Total time: 0.007692s Total time: 0.007420s Total time: 0.007152s Total time: 0.010515s Total time: 0.011493s Total time: 0.007564s ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #51428 from shardulm94/smahadik/shs-slow. Authored-by: Shardul Mahadik <[email protected]> Signed-off-by: yangjie01 <[email protected]>
1 parent 25892a7 commit aeae9ff

File tree

8 files changed

+48
-3
lines changed

8 files changed

+48
-3
lines changed

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ private[history] abstract class ApplicationHistoryProvider {
9999
*/
100100
def getListing(): Iterator[ApplicationInfo]
101101

102+
/**
103+
* Returns a list of applications available for the history server to show.
104+
*
105+
* @param max The maximum number of applications to return
106+
* @param predicate A function that filters the applications to be returned
107+
* @return An iterator of matching applications up to the specified maximum
108+
*/
109+
def getListing(max: Int)(predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo]
110+
102111
/**
103112
* Returns the Spark UI for a specific application.
104113
*

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
307307
.index("endTime").reverse())(_.toApplicationInfo()).iterator
308308
}
309309

310+
override def getListing(max: Int)(
311+
predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
312+
// Return the filtered listing in end time descending order.
313+
KVUtils.mapToSeqWithFilter(
314+
listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(),
315+
max)(_.toApplicationInfo())(predicate).iterator
316+
}
317+
310318
override def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
311319
try {
312320
Some(load(appId).toApplicationInfo())

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
109109
}
110110

111111
def shouldDisplayApplications(requestedIncomplete: Boolean): Boolean = {
112-
parent.getApplicationList().exists(isApplicationCompleted(_) != requestedIncomplete)
112+
parent.getApplicationInfoList(1)(isApplicationCompleted(_) != requestedIncomplete).nonEmpty
113113
}
114114

115115
private def makePageLink(request: HttpServletRequest, showIncomplete: Boolean): String = {

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,11 @@ class HistoryServer(
224224
getApplicationList()
225225
}
226226

227+
override def getApplicationInfoList(max: Int)(
228+
filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
229+
provider.getListing(max)(filter)
230+
}
231+
227232
def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
228233
provider.getApplicationInfo(appId)
229234
}

core/src/main/scala/org/apache/spark/status/KVUtils.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,20 @@ private[spark] object KVUtils extends Logging {
213213
}
214214
}
215215

216+
/**
217+
* Maps all values of KVStoreView to new values using a transformation function
218+
* and filtered by a filter function.
219+
*/
220+
def mapToSeqWithFilter[T, B](
221+
view: KVStoreView[T],
222+
max: Int)
223+
(mapFunc: T => B)
224+
(filterFunc: B => Boolean): Seq[B] = {
225+
Utils.tryWithResource(view.closeableIterator()) { iter =>
226+
iter.asScala.map(mapFunc).filter(filterFunc).take(max).toList
227+
}
228+
}
229+
216230
def size[T](view: KVStoreView[T]): Int = {
217231
Utils.tryWithResource(view.closeableIterator()) { iter =>
218232
iter.asScala.size

core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ private[spark] trait UIRoot {
8282
def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T
8383

8484
def getApplicationInfoList: Iterator[ApplicationInfo]
85+
86+
def getApplicationInfoList(max: Int)(
87+
filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo]
88+
8589
def getApplicationInfo(appId: String): Option[ApplicationInfo]
8690

8791
/**

core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ private[v1] class ApplicationListResource extends ApiRequestContext {
3838
val includeCompleted = status.isEmpty || status.contains(ApplicationStatus.COMPLETED)
3939
val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING)
4040

41-
uiRoot.getApplicationInfoList.filter { app =>
41+
uiRoot.getApplicationInfoList(numApps) { app =>
4242
val anyRunning = app.attempts.isEmpty || !app.attempts.head.completed
4343
// if any attempt is still running, we consider the app to also still be running;
4444
// keep the app if *any* attempts fall in the right time window
4545
((!anyRunning && includeCompleted) || (anyRunning && includeRunning)) &&
4646
app.attempts.exists { attempt =>
4747
isAttemptInRange(attempt, minDate, maxDate, minEndDate, maxEndDate, anyRunning)
4848
}
49-
}.take(numApps)
49+
}
5050
}
5151

5252
private def isAttemptInRange(

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ private[spark] class SparkUI private (
201201
))
202202
}
203203

204+
override def getApplicationInfoList(max: Int)(
205+
filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
206+
getApplicationInfoList.filter(filter).take(max)
207+
}
208+
204209
def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
205210
getApplicationInfoList.find(_.id == appId)
206211
}

0 commit comments

Comments
 (0)