Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Conversation

@varunkatta
Copy link
Member

What changes were proposed in this pull request?

Refactored access to a HashMap through an explicit external lock with a implicit internal lock. Thread safe access to the map is maintained.

How was this patch tested?

Ran Unit-tests

@liyinan926
Copy link
Member

LGTM.

Copy link
Member

@kimoonkim kimoonkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the cleanup.

@foxish
Copy link
Member

foxish commented Jul 26, 2017

Thanks! This is not a bug-fix correct? If so, please hold off on merge till we cut the 0.3.0 for the 2.2 branch (just to keep 0.3.0 in sync with the 2.1 branch).

Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this leaves room for a synchronization bug, but feel free to mention if I'm mistaken here.

hostToLocalTaskCount
}
for (pod <- executorPodsWithIPs) {
for ((_, pod) <- executorPodsByIPs) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem we need to be careful of here is modifications in between each iteration of the for loop. Recall that concurrent hash map only protects concurrent access to one key at a time. But here we care about the whole state; that is, we want the entire map to be consistent throughout the iteration, but we lose that guarantee if we don't lock around the entire iteration cycle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per Javadocs at https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentHashMap.html

The following in my understanding.
ConcurrentHashMap is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details. When you iterate, you are guaranteed that you get access to the all the entries in a thread-safe manner. Effectively, you are iterating on a snapshot of the Map's contents (and what you end up reading is dependent on concurrent modifications on the Map by other threads). Multiple threads can iterate on the HashMap in a thread-safe manner too as long as each thread has it own copy of the iterator.

The method here only seems to require, thread-safe access to the map, and it seems to me the change should be safe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concern from @mccheah is valid if we care about the consistency of the map throughout the entire iterating. Iterating through a ConcurrentHashMap may or may not reflect changes made to the map after the iterator is created, although iteration is thread-safe and won't throw the ConcurrentModificationException. In this particular case, it means we may or may not lose changes made while the map is being iterated through. Also it is mentioned in the Javadoc that "iterators are designed to be used by only one thread at a time."

Copy link
Member Author

@varunkatta varunkatta Jul 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My change is based on the understanding that we don't need the whole map to be consistent during the iteration the Map. The previous lock there was for thread-safety not for exclusive lock down of the map for access.

"iterators are designed to be used by only one thread at a time."

==> means a single iterator cannot be shared across multiple threads safely. Multiple threads can safely iterate as long as each thread has its own copy of an iterator.

Copy link
Member Author

@varunkatta varunkatta Jul 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...
But here we care about the whole state; that is, we want the entire map to be consistent throughout the iteration, but we lose that guarantee if we don't lock around the entire iteration cycle.
....

@mccheah - I don't think this is necessary. Confirmed it with @kimoonkim, who is the original author for this method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Discovering executor pods is done only as a best effort. No need for consistency.

@varunkatta
Copy link
Member Author

This is not a bug fix. Addressed a TODO for a general code enhancement. This code is not directly related to any specific feature work.

Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to approved but would like @kimoonkim to sign off.

private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
// Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
private val executorPodsByIPs = new mutable.HashMap[String, Pod]
private val executorPodsByIPs: concurrent.Map[String, Pod] = new
Copy link

@satybald satybald Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you like to specify a type of the variable after it? Would it be cleaner without it? i.e.

val executorPodsByIPs = new ConcurrentHashMap[String, Pod]().asScala

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait concurrent.Map[K,V] is the correct interface/trait for the variable type, but it is abstract and can't be instantiated directly, so in this case typing the variable makes sense. Using scala's concurrent.TrieMap is another option, although the choice is probably arbitrary.

val executorPodsByIPs: concurrent.Map[String, Pod] = concurrent.TrieMap.empty[String, Pod]

https://www.scala-lang.org/api/2.11.8/index.html#scala.collection.concurrent.TrieMap$

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikerlandson thank you for an explanation. That makes sense for me, though when I typed the line in scala console(2.11.8), the compiler was able to inference concurrent.Map[K, V] class.

scala> val executorPodsByIPs = new ConcurrentHashMap[String, String]().asScala
executorPodsByIPs: scala.collection.concurrent.Map[String,String] = Map()

I looked into the docs, and found that .asScala method going to do a conversion from java.util.ConcurrentHashMap[K, V] to scala.concurrent.Map[K, V]

mapAsScalaConcurrentMapConverter

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked into databrick scala recomendations, they suggest to avoid concurrent.Map and use j.u.c.ConcurrentHashMap.

Prefer java.util.concurrent.ConcurrentHashMap over scala.collection.concurrent.Map
https://github.com/databricks/scala-style-guide#concurrency

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satybald good point re: style recommendations and SI-7943

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satybald Thanks for bringing this up. Please note that SI-7943 doesn't really affect the thread correctness of our code. It is specific to only the TrieMap implementation. We are already using the java.util.concurrent.ConcurrentHashMap underneath and using the scala.concurrent.Map trait on top for access.

We should not use the trait nevertheless per the community recommendation to avoid any future maintenance or merging of it upstream.

@ash211
Copy link

ash211 commented Aug 21, 2017

It seems this PR has stalled a bit since the last activity a few weeks ago. @varunkatta what's next here?

@varunkatta
Copy link
Member Author

varunkatta commented Aug 23, 2017

Just back to this PR...I thought this PR was merged long back ; apparently not. I will address the last few comments today.

@varunkatta
Copy link
Member Author

varunkatta commented Aug 24, 2017

@satybald I addressed your comments. Do you want to take a quick look at the diff?

@ash211 Next steps, if there are no more changes requested by reviewers is to merge this PR.

@ash211
Copy link

ash211 commented Sep 7, 2017

Going to bring in after #459

@ash211
Copy link

ash211 commented Sep 8, 2017

@varunkatta this is ready for rebase -- please fix merge conflicts

@ash211 ash211 merged commit e5838c1 into apache-spark-on-k8s:branch-2.2-kubernetes Sep 15, 2017
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
…with a concurrent map. (apache-spark-on-k8s#392)

* Replaced explicit synchronized access to hashmap with a concurrent map

* Removed usages of scala.collection.concurrent.Map
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
…with a concurrent map. (apache-spark-on-k8s#392)

* Replaced explicit synchronized access to hashmap with a concurrent map

* Removed usages of scala.collection.concurrent.Map
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants