- 
                Notifications
    You must be signed in to change notification settings 
- Fork 117
Dynamic allocation #272
Dynamic allocation #272
Conversation
4f6f75a    to
    988db3b      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A corner case comes into my mind: if a shuffle pod for a node died and is being restarted, and at this moment a new executor on that node registers with the driver, it would crash the driver.
Can we improve this, e.g. let the executor die when the shuffle pod is not ready, instead of throwing SparkException to abort the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! I think you're right. Done. Passing back an empty string which should make the executor crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing the exception here should be fine, right? This is being processed on a separate thread in the RPC environment. Thus the exception here should only propagate to the executor that asked for this configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be wrong, but IIUC SparkException is thrown whenever unrecoverable errors happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it's less important what the type of exception is. What's important is where the exception is thrown from and where it propagates to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a daemonset watchable? If so can we watch on it directly instead of using labels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, but we don't want to make an assumption that it's a daemonset in use. In the current way, it remains - any pod that is co-located on that node with the same labels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw an exception here is dynamic allocation is enabled but shuffle.labels is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the name NodeCacheManager is not that intuitive. Maybe sth. like ShufflePodsCatalog? I don't know..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to ShufflePodCache
fc70821    to
    bccf43b      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the top level PR, will follow up on changes in the subsequent individual commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could probably just use < for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to structure the logic so that we don't use return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here - avoid return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this specifically need to be a separate class? The code could just be inlined in the scheduler backend class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the separate class for the separation it offers for this allocator mechanism. Do you strongly prefer not having it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ought to be in a separate file then, I think. But if this also requires a backwards reference to the scheduler backend (e.g. to access fields) then this should just be inlined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It requires access to totalRegisteredExecutors which is a protected field in CoarseGrainedSchedulerBackend and a couple of other accounting fields from the KubernetesClusterSchedulerBackend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, did you mean that it shouldn't be a separate class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we prefer foreach over for in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid adding tests to V1 and solely focus on V2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val not var.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to confirm that this test is creating multiple executors and is writing files to the shuffle service. I'm not sure if we can do this in an automated way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can achieve that by setting --conf spark.dynamicAllocation.minExecutors=2 and wait for these two executors to be ready via k8s api (or spark rest api?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The executors can spin up but not write any shuffle data to disk. We should check that shuffle data is being written to the disks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose this test so that it would have shuffle data being written to disk. I've manually verified that it does write to disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make the interval an Optional configuration and use interval.foreach, as opposed to checking if an integer is greater than zero. We should validate then that any given value is positive and throw an exception if it isn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use .get instead of .getOption here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to get Option[String] because if the shuffle directory is left empty, we use the default from Utils.getConfiguredLocalDirs(conf). I'm not sure how we can get this behavior using get.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using .get here without using .key on the configuration key should give back an Option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah.. good point. Done
3ac475f    to
    5264fad      
    Compare
  
    | @mccheah Addressed all comments. PTAL | 
5264fad    to
    4400a8c      
    Compare
  
    | Has the unit testing changed? I'm seeing failures in files I did not touch at all. | 
| rerun unit tests please | 
| @foxish I've been working on an SBT-based unit test build in jenkins and it looks like it was racing with the current maven-based unit tests. I've disabled the new test build and expect just the old one to be running now. Sorry about that! | 
| @ash211, we can fix the new one. The errors appeared to be: Sending a PR to fix these. | 
| Ah! Okay, SG. Thanks! | 
2f05ac0    to
    a861849      
    Compare
  
    a861849    to
    c87008d      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to add unit-level tests around these? It would be great if we can start hardening the features we are implementing here. Unit testing these kinds of things can be difficult; we would probably have to refactor much of the scheduler backend and the shuffle pod cache to make us able to verify the things that are important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add what the value the user specified was.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put allocatorRunnable, 0, TimeUnit.SECONDS all on this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think since now podAllocationInterval is always going to be provided (we always set it to Some(...) or throw an exception) then this thread will always be running. Is this the intended behavior? If so, no need to use foreach and options here.
c87008d    to
    2b5bba0      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be cleaner I think to use if...else if... else here:
if (...) {
  logDebug("Maximum allowed executor limit...")
} else if (...) {
  logDebug("Waiting for pending...")
} else {
  // Actual logic
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks! Trying to add a couple of unit tests to ShufflePodCache now and a mechanism that might help us add tests easily in the future.
2b5bba0    to
    26805ed      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably no need to make this an option - just assign podAllocationInterval directly, and then check the variable directly + throw the SparkException immediately afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be KeyValueUtils? This doesn't seem related to the command line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I envisioned it as a place for utility functions related to commandline options which we could have more of, in future. The parsing of key-values is necessitated by commandline strings being supplied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to mainly be used to parse out labels and annotations from SparkConf values - the command line doesn't seem to be related to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I was assuming the primary way of supplying those args was via the cmdline. Okay, how about ConfigurationUtils? KeyValueUtils.parseKeyValuePairs() just seems a bit redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfigurationUtils is fine.
26805ed    to
    b377fa6      
    Compare
  
    | Unit tests seem more complex than expected, because watchers and such. https://mvnrepository.com/artifact/io.fabric8/kubernetes-server-mock provided an easy beginnning but I think I'll take it separately instead of blocking experiments using dynamic allocation. | 
| We can probably test the watches separately and just ensure that if the watch receives an event then the scheduler responds accordingly. | 
| The mock server can be taught to expect the watch calls and respond appropriately. I used a similar thing in the unit tests here. | 
| Created #275, will follow up there | 
b377fa6    to
    6ec3d59      
    Compare
  
    | Updated docs, any other comments? | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a few minor style things but they can be addressed either here or at some other point. Someone else can make a final pass before merging, but if there are no objections before the end of the day then feel free to proceed with the merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use require here and in other similar places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move up to previous line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't have to be vals and it's preferred that they aren't since these variables are now accessible from outside the scope of this class.
6ec3d59    to
    4dd4715      
    Compare
  
    | Addressed comments. Will merge after tests pass. | 
| I think this dynamic allocation PR first, then afterwards the init containers one. That way the executor recovery PR can start making progress given that it's also blocked on this PR merging | 
| Okay, SG. Merging this now, as tests passed. | 
* dynamic allocation: shuffle service docker, yaml and test fixture * dynamic allocation: changes to spark-core * dynamic allocation: tests * dynamic allocation: docs * dynamic allocation: kubernetes allocator and executor accounting * dynamic allocation: shuffle service, node caching
…ogging Force commons-logging version to avoid conflicts
* dynamic allocation: shuffle service docker, yaml and test fixture * dynamic allocation: changes to spark-core * dynamic allocation: tests * dynamic allocation: docs * dynamic allocation: kubernetes allocator and executor accounting * dynamic allocation: shuffle service, node caching
Dynamic allocation updated
Please see commits individually during review for clarity
cc @mccheah @ash211 @varunkatta @apache-spark-on-k8s/contributors