Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
213 commits
Select commit Hold shift + click to select a range
81fff20
Start of work on adventures
holdenk May 12, 2017
e470bac
Mini progresss
holdenk Jun 2, 2017
a00c707
Go down the path of handling as lost but urgh lets just blacklist ins…
holdenk Jun 2, 2017
74ade44
Plumb through executor loss to the scheduables
holdenk Jun 9, 2017
a880177
AppClient suite works! yay
holdenk Jun 21, 2017
b970403
Decomissioning now works in the coarse grained scheduler, yay....
holdenk Jun 21, 2017
ded6bbc
Remove sketchy println debugging
holdenk Jun 22, 2017
16c855a
Add a worker decommissioning suite
holdenk Jul 6, 2017
c79a06d
Merge in latest master
holdenk Jul 6, 2017
e3798d0
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Aug 16, 2017
4f70706
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Aug 17, 2017
07c3e3e
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Aug 22, 2017
c2a0ad8
Add decommissioning script for whatever process is running locally on…
holdenk Aug 22, 2017
672c3b6
Leave polling mechanism up to the cloud vendors
holdenk Aug 22, 2017
9cfdb7f
Remove legacy comment and remove some unecessary blank lines
holdenk Aug 22, 2017
65a29c1
Remove manually debugging printlns (oops)
holdenk Aug 22, 2017
9f08b7e
Merge in master
holdenk Aug 13, 2018
258a116
Update and add blocking for K8s
holdenk Aug 27, 2018
c40fac5
Add workerDecomissioning to K8s conf
holdenk Aug 27, 2018
0ba0ca5
Merge in master
holdenk Sep 8, 2018
5877c16
Tidy up small things.
holdenk Sep 8, 2018
fdb3598
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Sep 19, 2018
4e6572f
Fix missing endLifecycle
holdenk Sep 19, 2018
42a29ab
Add a WIP Decom suite work
holdenk Sep 21, 2018
745f206
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Oct 12, 2018
c7eaaf6
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Oct 12, 2018
05941da
Attempt at making decomissioning integration test for Spark on K8s co…
holdenk Oct 12, 2018
cb61f45
Add initial decomissioning_water helper script
holdenk Oct 12, 2018
963a289
We don't use the JavaConverters in this test suite.
holdenk Oct 12, 2018
1cc1436
1.to(10) is scala code, use range since we're in Python.
holdenk Oct 12, 2018
9036b44
Fix style issue with blank line at end of file.
holdenk Oct 12, 2018
d58f2a6
Remove unneeded appArgs
holdenk Oct 12, 2018
5ae1bd7
Add missing sys import
holdenk Oct 16, 2018
8f6cff2
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Oct 25, 2018
be38dab
Add back appArgs since despite Ilan's comments during the stream they…
holdenk Oct 26, 2018
bbeceb9
Extend DecommissionSuite so the tests are triggered.
holdenk Oct 31, 2018
e5fb644
Check all containers
holdenk Oct 31, 2018
164fa2a
Wait for the pod to become ready for before we kill it.
holdenk Oct 31, 2018
151d5d8
Merge in master
holdenk Oct 31, 2018
ca448d1
import the test tag idk why it won't run
holdenk Oct 31, 2018
8d504b2
Remove import
holdenk Nov 8, 2018
c2b3e6e
Merge in master
holdenk Dec 6, 2018
7b0023a
Maybe we don't need to explicitly set the docker image since the Pyth…
holdenk Dec 6, 2018
cca9948
We don't use () anymore on the properties in kubeconf
holdenk Dec 6, 2018
1bbb69b
Remove unused imports
holdenk Dec 6, 2018
00ae5e9
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Dec 12, 2018
af048f5
Maybe the class loading issue is from two traits with the same test n…
holdenk Dec 12, 2018
43abf98
Configure the image for Python.
holdenk Dec 12, 2018
abb0609
Mispelled decommissioning in the python test file.
holdenk Dec 12, 2018
9d4fc23
Change ref
holdenk Dec 12, 2018
7f3fd5f
The spark context on the session object is stored as _sc
holdenk Dec 25, 2018
8306827
Speed up running the kubernetes integration tests locally by allowing…
holdenk Dec 12, 2018
154c8b9
Log exec decom for test
holdenk Dec 25, 2018
3020ef8
Fix log msg check
holdenk Dec 25, 2018
27b4edd
30 seconds why not
holdenk Dec 25, 2018
00310f9
Some temporary printlns for debugging in Jenkins
holdenk Dec 25, 2018
c3c0e3a
Just run the decom suite.
holdenk Dec 28, 2018
0bf027a
Try and debug the tests some more.
holdenk Dec 28, 2018
be18e52
Re-enable basic test suite.
holdenk Dec 28, 2018
2914581
More debugging
holdenk Dec 28, 2018
953094a
Hey did we not run the Python tests?
holdenk Dec 29, 2018
5d173bd
Python tests aren't registering the executors, lets avoid that noise …
holdenk Dec 29, 2018
705fd58
Fix using SparkPI for decom test.
holdenk Dec 29, 2018
ca60dbf
Enable all the tests...
holdenk Dec 29, 2018
044f8c5
more ...
holdenk Dec 29, 2018
c09867b
[SPARK-26193][SQL][FOLLOW UP] Read metrics rename and display text ch…
xuanyuanking Dec 12, 2018
afe463c
[SPARK-19827][R][FOLLOWUP] spark.ml R API for PIC
srowen Dec 12, 2018
c76d70a
[SPARK-24102][ML][MLLIB] ML Evaluators should use weight column - add…
imatiach-msft Dec 12, 2018
d639627
[SPARK-25877][K8S] Move all feature logic to feature classes.
Dec 12, 2018
3ec9e3b
[SPARK-25277][YARN] YARN applicationMaster metrics should not registe…
LucaCanali Dec 13, 2018
d9bccb5
[SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delega…
gaborgsomogyi Dec 13, 2018
b650414
[SPARK-26297][SQL] improve the doc of Distribution/Partitioning
cloud-fan Dec 13, 2018
73a373b
[SPARK-26348][SQL][TEST] make sure expression is resolved during test
cloud-fan Dec 13, 2018
4a5acc7
[SPARK-26355][PYSPARK] Add a workaround for PyArrow 0.11.
ueshin Dec 13, 2018
239b8ec
[MINOR][R] Fix indents of sparkR welcome message to be consistent wit…
AzureQ Dec 13, 2018
bda9e84
[MINOR][DOC] Fix comments of ConvertToLocalRelation rule
seancxmao Dec 13, 2018
d2a58a1
[MINOR][DOC] update the condition description of BypassMergeSortShuffle
lcqzte10192193 Dec 13, 2018
6a1cdf4
[SPARK-26340][CORE] Ensure cores per executor is greater than cpu per…
Dec 13, 2018
9b127e1
[SPARK-26313][SQL] move `newScanBuilder` from Table to read related m…
cloud-fan Dec 13, 2018
14b4978
[SPARK-26098][WEBUI] Show associated SQL query in Job page
gengliangwang Dec 13, 2018
dde56e4
[SPARK-23886][SS] Update query status for ContinuousExecution
gaborgsomogyi Dec 14, 2018
3fac7d4
[SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf*
icexelloss Dec 14, 2018
611a13b
[SPARK-26360] remove redundant validateQuery call
JasonWayne Dec 14, 2018
41b0107
[SPARK-26337][SQL][TEST] Add benchmark for LongToUnsafeRowMap
viirya Dec 14, 2018
2ba82a7
[SPARK-26368][SQL] Make it clear that getOrInferFileFormatSchema does…
rxin Dec 14, 2018
e3b1790
[SPARK-26370][SQL] Fix resolution of higher-order function for the sa…
ueshin Dec 14, 2018
736df41
[MINOR][SQL] Some errors in the notes.
Dec 14, 2018
ff2a8b8
[SPARK-26265][CORE][FOLLOWUP] Put freePage into a finally block
viirya Dec 15, 2018
aa58472
[SPARK-26362][CORE] Remove 'spark.driver.allowMultipleContexts' to di…
HyukjinKwon Dec 15, 2018
6fce1af
[SPARK-26315][PYSPARK] auto cast threshold from Integer to Float in a…
Dec 15, 2018
ecd5aa1
[SPARK-26243][SQL] Use java.time API for parsing timestamps and dates…
MaxGekk Dec 16, 2018
ffbc6b1
[SPARK-26078][SQL] Dedup self-join attributes on IN subqueries
mgaido91 Dec 16, 2018
4c2af74
[SPARK-26372][SQL] Don't reuse value from previous row when parsing b…
bersprockets Dec 16, 2018
f2a56a6
[SPARK-26248][SQL] Infer date type from CSV
MaxGekk Dec 17, 2018
51a1cbb
[MINOR][DOCS] Fix the "not found: value Row" error on the "programmat…
kjmrknsn Dec 17, 2018
c26df2b
Revert "[SPARK-26248][SQL] Infer date type from CSV"
HyukjinKwon Dec 17, 2018
90c9bd5
[SPARK-26352][SQL] join reorder should not change the order of output…
rednaxelafx Dec 17, 2018
33de7df
[SPARK-26327][SQL][FOLLOW-UP] Refactor the code and restore the metri…
gatorsmile Dec 17, 2018
a1c97b5
[SPARK-20636] Add the rule TransposeWindow to the optimization batch
gatorsmile Dec 17, 2018
0ed3f6a
[SPARK-26243][SQL][FOLLOWUP] fix code style issues in TimestampFormat…
cloud-fan Dec 17, 2018
62a8466
[SPARK-20351][ML] Add trait hasTrainingSummary to replace the duplica…
YY-OnCall Dec 17, 2018
97a1d0d
[SPARK-26255][YARN] Apply user provided UI filters to SQL tab in yar…
chakravarthiT Dec 17, 2018
53e05ac
[SPARK-26371][SS] Increase kafka ConfigUpdater test coverage.
gaborgsomogyi Dec 17, 2018
9bbc1f9
[SPARK-24933][SS] Report numOutputRows in SinkProgress
vackosar Dec 17, 2018
a97ca7a
[SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mi…
suxingfate Dec 17, 2018
3ee251f
[SPARK-24561][SQL][PYTHON] User-defined window aggregation functions …
icexelloss Dec 18, 2018
435392e
[SPARK-26246][SQL] Inferring TimestampType from JSON
MaxGekk Dec 18, 2018
d33bf4b
[SPARK-26081][SQL][FOLLOW-UP] Use foreach instead of misuse of map (f…
HyukjinKwon Dec 18, 2018
77d78b8
[SPARK-24680][DEPLOY] Support spark.executorEnv.JAVA_HOME in Standalo…
stanzhai Dec 18, 2018
4af7980
[SPARK-26384][SQL] Propagate SQL configs for CSV schema inferring
MaxGekk Dec 18, 2018
0702b70
[SPARK-26382][CORE] prefix comparator should handle -0.0
cloud-fan Dec 18, 2018
c446c9e
[SPARK-26394][CORE] Fix annotation error for Utils.timeStringAsMs
Dec 18, 2018
4578d12
[SPARK-25815][K8S] Support kerberos in client mode, keytab-based toke…
Dec 18, 2018
93089b5
[SPARK-26366][SQL] ReplaceExceptWithFilter should consider NULL as False
mgaido91 Dec 19, 2018
c3a7a52
[SPARK-26390][SQL] ColumnPruning rule should only do column pruning
cloud-fan Dec 19, 2018
2867beb
[SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed config sets: WHOLE…
maropu Dec 20, 2018
4b72301
[SPARK-25271][SQL] Hive ctas commands should use data source if it is…
viirya Dec 20, 2018
2aa1022
[SPARK-26318][SQL] Deprecate Row.merge
KyleLi1985 Dec 20, 2018
029933c
[SPARK-26308][SQL] Avoid cast of decimals for ScalaUDF
mgaido91 Dec 20, 2018
a647251
[SPARK-24687][CORE] Avoid job hanging when generate task binary cause…
caneGuy Dec 20, 2018
b011de3
[SPARK-26324][DOCS] Add Spark docs for Running in Mesos with SSL
jomach Dec 20, 2018
f4e4c1a
[SPARK-26409][SQL][TESTS] SQLConf should be serializable in test sess…
gengliangwang Dec 20, 2018
ec539d1
[SPARK-26392][YARN] Cancel pending allocate requests by taking locali…
Ngone51 Dec 20, 2018
cc07eae
[SPARK-25970][ML] Add Instrumentation to PrefixSpan
zhengruifeng Dec 20, 2018
07d111d
[MINOR][SQL] Locality does not need to be implemented
10110346 Dec 21, 2018
75ff5d9
[SPARK-26422][R] Support to disable Hive support in SparkR even for H…
HyukjinKwon Dec 21, 2018
65de564
[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka
zsxwing Dec 21, 2018
def767a
[SPARK-26269][YARN] Yarnallocator should have same blacklist behaviou…
Ngone51 Dec 21, 2018
38930f0
[SPARK-25642][YARN] Adding two new metrics to record the number of re…
Dec 21, 2018
ac33584
[SPARK-26216][SQL][FOLLOWUP] use abstract class instead of trait for …
cloud-fan Dec 22, 2018
ffd2ef6
[SPARK-26427][BUILD] Upgrade Apache ORC to 1.5.4
dongjoon-hyun Dec 22, 2018
2b50381
[SPARK-26428][SS][TEST] Minimize deprecated `ProcessingTime` usage
dongjoon-hyun Dec 22, 2018
3892c5d
[SPARK-26430][BUILD][TEST-MAVEN] Upgrade Surefire plugin to 3.0.0-M2
dongjoon-hyun Dec 22, 2018
c19093a
[SPARK-26285][CORE] accumulator metrics sources for LongAccumulator a…
Dec 22, 2018
ce19610
[SPARK-25245][DOCS][SS] Explain regarding limiting modification on "s…
HeartSaVioR Dec 22, 2018
c03498e
[SPARK-26402][SQL] Accessing nested fields with different cases in ca…
dbtsai Dec 22, 2018
4536d53
[SPARK-26178][SPARK-26243][SQL][FOLLOWUP] Replacing SimpleDateFormat …
MaxGekk Dec 24, 2018
e73d73e
[SPARK-14023][CORE][SQL] Don't reference 'field' in StructField error…
srowen Dec 24, 2018
35c680e
[SPARK-26426][SQL] fix ExpresionInfo assert error in windows operatio…
Dec 25, 2018
210550c
[SPARK-26424][SQL] Use java.time API in date/timestamp expressions
MaxGekk Dec 27, 2018
46913ce
[SPARK-26435][SQL] Support creating partitioned table using Hive CTAS…
viirya Dec 27, 2018
a72b963
[SPARK-26191][SQL] Control truncation of Spark plans via maxFields pa…
MaxGekk Dec 27, 2018
b3032c9
[SPARK-25892][SQL] Change AttributeReference.withMetadata's return ty…
kevinyu98 Dec 27, 2018
e6d6eaf
[SPARK-26451][SQL] Change lead/lag argument name from count to offset
deepyaman Dec 27, 2018
11f1c8d
[SPARK-26446][CORE] Add cachedExecutorIdleTimeout docs at ExecutorAll…
Dec 28, 2018
fcbec31
[SPARK-26444][WEBUI] Stage color doesn't change with it's status
seancxmao Dec 28, 2018
1bb70d9
[SPARK-26424][SQL][FOLLOWUP] Fix DateFormatClass/UnixTime codegen
dongjoon-hyun Dec 28, 2018
bb07fe9
Maybe we have a race condition with the watcher? idk why we aren't ge…
holdenk Dec 29, 2018
ea77b23
wtf is going on with this eventually block
holdenk Dec 29, 2018
c120713
Rewrite the eventually's to use should be which I had accidently remo…
holdenk Jan 3, 2019
fecd0cf
Change how we handle decom tests to actually decom workers and check …
holdenk Jan 3, 2019
b668feb
Revert "Fix using SparkPI for decom test."
holdenk Jan 3, 2019
2905c8a
Revert "Python tests aren't registering the executors, lets avoid tha…
holdenk Jan 3, 2019
b77eb9e
Match the wait logic (TODO refactor)
holdenk Jan 3, 2019
9f069a4
Debug pods not becoming ready.
holdenk Jan 4, 2019
789bbdd
special format string.
holdenk Jan 6, 2019
348e2f8
Merge in master
holdenk Jan 6, 2019
bf834e2
If all the pods are done we don't need the pods to be ready.
holdenk Jan 7, 2019
1ed5c3d
Sleep 100 before waiting.
holdenk Jan 7, 2019
7acc255
Give the pod 5 minutes to become ready.
holdenk Jan 7, 2019
b2c58b6
I think we might have had a race condition where the top thread delet…
holdenk Jan 7, 2019
92f4289
Use POD_RUNNING_TIMEOUT
holdenk Jan 8, 2019
4774b79
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Jan 10, 2019
739cea0
Merge in master
holdenk Feb 11, 2019
0b33e1a
Fix appclient suite
holdenk Feb 11, 2019
2dc806e
Try and debug the waiting on killing the exec pod logic.
holdenk Feb 14, 2019
1963bf3
Maybe we were not actually hitting the k8s end point which is why the…
holdenk Feb 14, 2019
5771b05
re-order tests to fail faster.
holdenk Feb 14, 2019
a39cd85
Refactor the pod ready status check to be shared in the two places we…
holdenk Feb 14, 2019
3a16ee8
More debugging and use shouldBe rather than a direct assert.
holdenk Feb 14, 2019
8044441
Name isn't a label, just get the pod by name directly.
holdenk Feb 15, 2019
25dc907
Get namespace as well since we are not finding the pod whuich is odd.
holdenk Feb 15, 2019
46b5725
Fix namespace for pod exec check
holdenk Feb 15, 2019
4154eef
Fix pod check
holdenk Feb 16, 2019
8a2f5a7
For now skip scala style println checks in KubernetesSuite while we'r…
holdenk Mar 7, 2019
2b45f9a
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Mar 7, 2019
7d4f264
Fix long line comment in KubernetesSuite
holdenk Mar 7, 2019
367a666
Print out when we are running decom suite.
holdenk Mar 7, 2019
e54dbaa
Print out the namespace as well.
holdenk Mar 7, 2019
06bbbed
Debugging is easier when I see what failed
holdenk Mar 7, 2019
e8081cb
Without specifiying a spark release tarball the setup env script will…
holdenk Mar 7, 2019
908b204
I don't know why the ready check isn't doing what I expected, lets br…
holdenk Mar 7, 2019
158838c
Fix filter
holdenk Mar 7, 2019
a6ad1ff
Remove unrelated subquery suite change
holdenk Mar 7, 2019
d85c229
Revert "Speed up running the kubernetes integration tests locally by …
holdenk Mar 7, 2019
e770092
Take out set -e because I _think_ in integration env this fails with …
holdenk Mar 7, 2019
0cad83a
Temporary commit to support running tests locally, should be part of …
holdenk Mar 7, 2019
ad3474d
Tests are running locally, pod is created and deleted but we don't ge…
holdenk Mar 7, 2019
23decb9
Move the config variable for decom into worker, add a bit more loggin…
holdenk Mar 7, 2019
78d02d3
Fix instances of decomi , register SIGPWR in CoarseGrainedExecutorBac…
holdenk Mar 8, 2019
fa6db32
Fixed compile errors from last
holdenk Mar 8, 2019
6d31986
s/SIGPWR/PWR/ in the Scala code.
holdenk Mar 8, 2019
ba755de
Add lifecycle change
holdenk Mar 8, 2019
777da86
Print out when we're getting ready to stop Spark and increase sleep
holdenk Mar 8, 2019
a61186b
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Mar 13, 2019
209bf18
Try and debug whats going on with our container lifecycle. Currently …
holdenk Mar 13, 2019
0faf47c
Try and check that lifecycle is being added with more logging and a l…
holdenk Mar 14, 2019
40b75f5
Print out our scala version and set it for downstream.
holdenk Mar 14, 2019
0ee3ae0
Disable R build because it's not working and also set SPARK_SCALA_VER…
holdenk Mar 14, 2019
cd08175
Add scala version as a param but I don't think we need this
holdenk Mar 14, 2019
387035b
Revert "Add scala version as a param but I don't think we need this"
holdenk Mar 14, 2019
f11d9f5
Lifecycle now running ok take out the bad lifecycle stage
holdenk Mar 14, 2019
727f76e
decom script should be executable
holdenk Mar 14, 2019
b37892d
Decom script is in the pod yaml, not seeing SIGTERM anymore in the lo…
holdenk Mar 14, 2019
c1917b5
Try and log our exit process
holdenk Mar 14, 2019
fb559fe
Fix printing the worker pid
holdenk Mar 14, 2019
0982c11
Wait that awk statement wasn't doing anything for me
holdenk Mar 14, 2019
6c41552
Attempt to merge in master
holdenk May 7, 2019
09a01cf
Fix minor style issues after merge
holdenk May 7, 2019
9a5000d
Add license header to decom script
holdenk May 7, 2019
e271a1d
waitpid is the syscall wait is the shell command
holdenk May 7, 2019
7400792
Start cleaning up the decom script, todo fix the PID extraction
holdenk May 7, 2019
55fa260
Print out the termination log at the end as well
holdenk May 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ private[deploy] object DeployMessages {
assert (port > 0)
}

case class WorkerDecommission(
id: String,
worker: RpcEndpointRef)
extends DeployMessage

case class ExecutorStateChanged(
appId: String,
execId: Int,
Expand Down Expand Up @@ -131,6 +136,8 @@ private[deploy] object DeployMessages {

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

case object DecommissionSelf // Mark self for decommissioning.

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ package org.apache.spark.deploy

private[deploy] object ExecutorState extends Enumeration {

val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
// DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from
// the worker and the executor still exists - but we do want to avoid scheduling new tasks on it.
private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED)

def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils}
* Takes a master URL, an app description, and a listener for cluster events, and calls
* back the listener when various events occur.
*
*
* @param masterUrls Each url should look like spark://host:port.
*/
private[spark] class StandaloneAppClient(
Expand Down Expand Up @@ -180,6 +181,8 @@ private[spark] class StandaloneAppClient(
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId, message.getOrElse(""))
}

case WorkerRemoved(id, host, message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener {
def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit

def executorDecommissioned(fullId: String, message: String): Unit

def workerRemoved(workerId: String, host: String, message: String): Unit
}
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)

case WorkerDecommission(id, workerRef) =>
logInfo("Recording worker %s decommissioning".format(id))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else {
// If a worker attempts to decommission that isn't registered ignore it.
idToWorker.get(id).foreach(decommissionWorker)
}

case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
Expand Down Expand Up @@ -311,7 +320,9 @@ private[deploy] class Master(
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
// We also don't count failures from decommissioned workers since they are "expected."
if (!normalExit
&& oldState != ExecutorState.DECOMMISSIONED
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
Expand Down Expand Up @@ -790,6 +801,26 @@ private[deploy] class Master(
true
}

private def decommissionWorker(worker: WorkerInfo) {
if (worker.state != WorkerState.DECOMMISSIONED) {
logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))
worker.setState(WorkerState.DECOMMISSIONED)
for (exec <- worker.executors.values) {
logInfo("Telling app of decommission executors")
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None, workerLost = false))
exec.state = ExecutorState.DECOMMISSIONED
exec.application.removeExecutor(exec)
}
// On recovery do not add a decommissioned executor
persistenceEngine.removeWorker(worker)
} else {
logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned".
format(worker.id, worker.host, worker.port))
}
}

private def removeWorker(worker: WorkerInfo, msg: String) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
worker.setState(WorkerState.DEAD)
Expand Down
30 changes: 29 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
import org.apache.spark.util.{SignalUtils, SparkUncaughtExceptionHandler, ThreadUtils, Utils}

private[deploy] class Worker(
override val rpcEnv: RpcEnv,
Expand All @@ -63,6 +63,14 @@ private[deploy] class Worker(
Utils.checkHost(host)
assert (port > 0)

// If worker decommissioning is enabled register a handler on PWR to shutdown.
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR")(decommissionSelf)
} else {
logInfo("Worker decommissioning not enabled, skipping PWR")
}

// A scheduled executor used to send messages at the specified time.
private val forwardMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
Expand Down Expand Up @@ -124,6 +132,7 @@ private[deploy] class Worker(
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
private var decommissioned = false
private val workerId = generateWorkerId()
private val sparkHome =
if (sys.props.contains(IS_TESTING.key)) {
Expand Down Expand Up @@ -491,6 +500,8 @@ private[deploy] class Worker(
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else if (decommissioned) {
logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
Expand Down Expand Up @@ -610,6 +621,9 @@ private[deploy] class Worker(
case ApplicationFinished(id) =>
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
decommissionSelf()
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -707,6 +721,20 @@ private[deploy] class Worker(
}
}

private[deploy] def decommissionSelf(): Boolean = {
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
logDebug("Decommissioning self")
decommissioned = true
// TODO: Send decommission notification to executors & shuffle service.
// Also send message to master program.
sendToMaster(WorkerDecommission(workerId, self))
} else {
logWarning("Asked to decommission self, but decommissioning not enabled")
}
// Return true since can be called as a signal handler
true
}

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SignalUtils, ThreadUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand All @@ -49,13 +50,17 @@ private[spark] class CoarseGrainedExecutorBackend(

private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile private var decommissioned = false
@volatile var driver: Option[RpcEndpointRef] = None

// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
// to be changed so that we don't share the serializer instance across threads
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

override def onStart() {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR")(decommissionSelf)

logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
Expand Down Expand Up @@ -99,6 +104,8 @@ private[spark] class CoarseGrainedExecutorBackend(
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else if (decommissioned) {
logWarning("Asked to launch a task while decommissioned. Not launching.")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
Expand Down Expand Up @@ -177,6 +184,26 @@ private[spark] class CoarseGrainedExecutorBackend(

System.exit(code)
}

private def decommissionSelf(): Boolean = {
logError("Decommissioning self")
try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.send(DecommissionExecutor(executorId))
}
if (executor != null) {
executor.decommission()
}
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
false
}
}
}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
Expand Down
23 changes: 20 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,31 @@ private[spark] class Executor(
*/
private var heartbeatFailures = 0

/**
* Flag to prevent launching new tasks while decommissioned. There could be a race condition
* accessing this, but decommissioning is only intended to help not be a hard stop.
*/
private var decommissioned = false

heartbeater.start()

private[executor] def numRunningTasks: Int = runningTasks.size()

/**
* Mark an executor for decommissioning and avoid launching new tasks.
*/
private[spark] def decommission(): Unit = {
decommissioned = true
}

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
if (!decommissioned) {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
} else {
log.info(s"Not launching task, executor is in decommissioned state")
}
}

def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ private[spark] object Worker {
ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
.intConf
.createWithDefault(100)

private[spark] val WORKER_DECOMMISSION_ENABLED =
ConfigBuilder("spark.worker.decommission.enabled")
.booleanConf
.createWithDefault(false)
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ abstract class RDD[T: ClassTag](
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
// Block hit.
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
Expand All @@ -352,6 +353,7 @@ abstract class RDD[T: ClassTag](
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
// Need to compute the block.
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
private[spark]
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
extends ExecutorLossReason(_message)

/**
* A loss reason that means the worker is marked for decommissioning.
*
* This is used by the task scheduler to remove state associated with the executor, but
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
*/
private [spark] object WorkerDecommission extends ExecutorLossReason("Worker Decommission.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used? I only saw the use of WorkerDecommission which is DeployMessage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ private[spark] class Pool(
schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}

override def executorDecommission(executorId: String): Unit = {
schedulableQueue.asScala.foreach(_.executorDecommission(executorId))
}

override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue.asScala) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def executorDecommission(executorId: String): Unit
def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ private[spark] trait TaskScheduler {
*/
def applicationId(): String = appId

/**
* Process a decommissioning executor.
*/
def executorDecommission(executorId: String): Unit

/**
* Process a lost executor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,11 @@ private[spark] class TaskSchedulerImpl(
}
}

override def executorDecommission(executorId: String): Unit = {
rootPool.executorDecommission(executorId)
backend.reviveOffers()
}

override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,12 @@ private[spark] class TaskSetManager(
levels.toArray
}

def executorDecommission(execId: String) {
recomputeLocality()
// Future consideration: if an executor is decommissioned it may make sense to add the current
// tasks to the spec exec queue.
}

def recomputeLocality() {
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
extends CoarseGrainedClusterMessage

case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage

case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage

Expand Down
Loading