-
Notifications
You must be signed in to change notification settings - Fork 28.9k
SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers #887
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Jenkins, add to whitelist. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
|
I agree that this is technically safer, though I believe in neither of these cases could a partially-constructed object actually be visible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin Looking at this code, is this right? When we create this (5 lines down), we seem to give it a specific level.replication, so we'll only end up with a certain number of peers, which we will then cache. It seems to me that every block thereafter will be forcibly stored at the same replication level as the first.
This bug may be possible, because it would only show up if you're using 2 separate replication levels above 1, which is probably uncommon. Am I missing something, though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though we can create explicit storage levels with different
replication, the stock impl has only two : no and one.
Spark codebase relies on this implicitly.
We had sort of discussed it in the checkpoint to blocks jira (forgot jira
id and can't search right now)
On 27-May-2014 10:39 am, "Aaron Davidson" [email protected] wrote:
In core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/
- var cachedPeers: Seq[BlockManagerId] = null
- @volatile var cachedPeers: Seq[BlockManagerId] = null
@rxin https://github.com/rxin Looking at this code, is this right? When
we create this (5 lines down), we seem to give it a specific
level.replication, so we'll only end up with a certain number of peers,
which we will then cache. It seems to me that every block thereafter
will be forcibly stored at the same replication level as the first.This bug may be possible, because it would only show up if you're using 2
separate replication levels above 1, which is probably uncommon. Am I
missing something, though?—
Reply to this email directly or view it on GitHubhttps://github.com//pull/887/files#r13062338
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark should not be making assumptions about the replication level like that. StorageLevel has a fully public apply() to construct new ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which is private to spark iirc ?
But agreed about incorrect assumption.
Doing it cleanly is much more involved than just going to master for
various replication levels and caching that per level ... We would need a
block placement spi like what NN exposes
On 27-May-2014 10:51 am, "Aaron Davidson" [email protected] wrote:
In core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/
- var cachedPeers: Seq[BlockManagerId] = null
- @volatile var cachedPeers: Seq[BlockManagerId] = null
Spark should not be making assumptions about the replication level like
that. StorageLevel has a fully public apply() to construct new ones.—
Reply to this email directly or view it on GitHubhttps://github.com//pull/887/files#r13062499
.
Actually,
So I also report an issue here https://issues.apache.org/jira/browse/SPARK-1934 about "this escape". |
One thread runs the following code: And another thread runs the following code: I added volatile to force a happen-before relation between writing |
|
Good point -- this could very well be a correctness violation and it's much harder than I initially let on to verify that that isn't the case. LGTM, merged into master and branch-1.0. Thanks! |
`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection. There are two place will call `replicate(blockId, bytesAfterPut, level)` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`. As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`. The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`. Author: zsxwing <[email protected]> Closes #887 from zsxwing/SPARK-1932 and squashes the following commits: 524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers (cherry picked from commit 549830b) Signed-off-by: Aaron Davidson <[email protected]>
`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection. There are two place will call `replicate(blockId, bytesAfterPut, level)` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`. As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`. The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`. Author: zsxwing <[email protected]> Closes apache#887 from zsxwing/SPARK-1932 and squashes the following commits: 524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection. There are two place will call `replicate(blockId, bytesAfterPut, level)` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`. As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`. The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`. Author: zsxwing <[email protected]> Closes apache#887 from zsxwing/SPARK-1932 and squashes the following commits: 524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
var cachedPeers: Seq[BlockManagerId] = nullis used indef replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)without proper protection.There are two place will call
replicate(blockId, bytesAfterPut, level)spark/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Line 644 in 17f3075
connectionManager.futureExecContextspark/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Line 752 in 17f3075
doPutruns inconnectionManager.handleMessageExecutor.org.apache.spark.storage.BlockManagerWorkercallsblockManager.putBytesinconnectionManager.handleMessageExecutor.As they run in different
Executors, this is a race condition which may cause the memory pointed bycachedPeersis not correct even ifcachedPeers != null.The race condition of
onReceiveCallbackis that it's set inBlockManagerWorkerbut read in a different thread inConnectionManager.handleMessageExecutor.