Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented May 27, 2014

var cachedPeers: Seq[BlockManagerId] = null is used in def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) without proper protection.

There are two place will call replicate(blockId, bytesAfterPut, level)

As they run in different Executors, this is a race condition which may cause the memory pointed by cachedPeers is not correct even if cachedPeers != null.

The race condition of onReceiveCallback is that it's set in BlockManagerWorker but read in a different thread in ConnectionManager.handleMessageExecutor.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented May 27, 2014

Jenkins, add to whitelist.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15218/

@aarondav
Copy link
Contributor

I agree that this is technically safer, though I believe in neither of these cases could a partially-constructed object actually be visible. onReceiveCallback appears to be fully immutable and cachedPeers is the result of a future, so it's probably not possible that it's partially constructed. Thus, I don't think anyone should be running into either of these race conditions in practice, but this way is still safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Looking at this code, is this right? When we create this (5 lines down), we seem to give it a specific level.replication, so we'll only end up with a certain number of peers, which we will then cache. It seems to me that every block thereafter will be forcibly stored at the same replication level as the first.

This bug may be possible, because it would only show up if you're using 2 separate replication levels above 1, which is probably uncommon. Am I missing something, though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we can create explicit storage levels with different
replication, the stock impl has only two : no and one.
Spark codebase relies on this implicitly.

We had sort of discussed it in the checkpoint to blocks jira (forgot jira
id and can't search right now)
On 27-May-2014 10:39 am, "Aaron Davidson" [email protected] wrote:

In core/src/main/scala/org/apache/spark/storage/BlockManager.scala:

@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/

  • var cachedPeers: Seq[BlockManagerId] = null
  • @volatile var cachedPeers: Seq[BlockManagerId] = null

@rxin https://github.com/rxin Looking at this code, is this right? When
we create this (5 lines down), we seem to give it a specific
level.replication, so we'll only end up with a certain number of peers,
which we will then cache. It seems to me that every block thereafter
will be forcibly stored at the same replication level as the first.

This bug may be possible, because it would only show up if you're using 2
separate replication levels above 1, which is probably uncommon. Am I
missing something, though?


Reply to this email directly or view it on GitHubhttps://github.com//pull/887/files#r13062338
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark should not be making assumptions about the replication level like that. StorageLevel has a fully public apply() to construct new ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is private to spark iirc ?
But agreed about incorrect assumption.
Doing it cleanly is much more involved than just going to master for
various replication levels and caching that per level ... We would need a
block placement spi like what NN exposes
On 27-May-2014 10:51 am, "Aaron Davidson" [email protected] wrote:

In core/src/main/scala/org/apache/spark/storage/BlockManager.scala:

@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/

  • var cachedPeers: Seq[BlockManagerId] = null
  • @volatile var cachedPeers: Seq[BlockManagerId] = null

Spark should not be making assumptions about the replication level like
that. StorageLevel has a fully public apply() to construct new ones.


Reply to this email directly or view it on GitHubhttps://github.com//pull/887/files#r13062499
.

@zsxwing
Copy link
Member Author

zsxwing commented May 27, 2014

onReceiveCallback appears to be fully immutable

Actually, onReceiveCallback is not immutable. It's set to BlockManagerWorker.onBlockMessageReceive:

blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)

BlockManagerWorker contains a reference to BlockManager, which is mutable and has not finished the initialization when BlockManagerWorker.scala#L34 is called. In JVM, "val" fields can not be guaranteed to publish safely until the constructor exits.

So onReceiveCallback is not a pure function.

I also report an issue here https://issues.apache.org/jira/browse/SPARK-1934 about "this escape".

@zsxwing
Copy link
Member Author

zsxwing commented May 27, 2014

cachedPeers is the result of a future, so it's probably not possible that it's partially constructed.

One thread runs the following code:

*r = ...
set cachedPeers to r

And another thread runs the following code:

if (read cachedPeers != null) {
    assertTrue(read *cachedPeers == read *r) // JVM allows this assertion fails if reading and writing do not have the happen-before relation
}

I added volatile to force a happen-before relation between writing cachedPeers and reading cachedPeers.

@aarondav
Copy link
Contributor

Good point -- this could very well be a correctness violation and it's much harder than I initially let on to verify that that isn't the case.

LGTM, merged into master and branch-1.0. Thanks!

@asfgit asfgit closed this in 549830b May 27, 2014
asfgit pushed a commit that referenced this pull request May 27, 2014
`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection.

There are two place will call `replicate(blockId, bytesAfterPut, level)`
* https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext`
* https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`.

As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`.

The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`.

Author: zsxwing <[email protected]>

Closes #887 from zsxwing/SPARK-1932 and squashes the following commits:

524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers

(cherry picked from commit 549830b)
Signed-off-by: Aaron Davidson <[email protected]>
@zsxwing zsxwing deleted the SPARK-1932 branch June 19, 2014 14:33
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection.

There are two place will call `replicate(blockId, bytesAfterPut, level)`
* https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext`
* https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`.

As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`.

The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`.

Author: zsxwing <[email protected]>

Closes apache#887 from zsxwing/SPARK-1932 and squashes the following commits:

524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection.

There are two place will call `replicate(blockId, bytesAfterPut, level)`
* https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext`
* https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`.

As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`.

The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`.

Author: zsxwing <[email protected]>

Closes apache#887 from zsxwing/SPARK-1932 and squashes the following commits:

524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants