Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
implicit val futureExecContext = ExecutionContext.fromExecutor(
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))

private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
@volatile
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null

private val authEnabled = securityManager.isAuthenticationEnabled()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/
var cachedPeers: Seq[BlockManagerId] = null
@volatile var cachedPeers: Seq[BlockManagerId] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Looking at this code, is this right? When we create this (5 lines down), we seem to give it a specific level.replication, so we'll only end up with a certain number of peers, which we will then cache. It seems to me that every block thereafter will be forcibly stored at the same replication level as the first.

This bug may be possible, because it would only show up if you're using 2 separate replication levels above 1, which is probably uncommon. Am I missing something, though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we can create explicit storage levels with different
replication, the stock impl has only two : no and one.
Spark codebase relies on this implicitly.

We had sort of discussed it in the checkpoint to blocks jira (forgot jira
id and can't search right now)
On 27-May-2014 10:39 am, "Aaron Davidson" [email protected] wrote:

In core/src/main/scala/org/apache/spark/storage/BlockManager.scala:

@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/

  • var cachedPeers: Seq[BlockManagerId] = null
  • @volatile var cachedPeers: Seq[BlockManagerId] = null

@rxin https://github.com/rxin Looking at this code, is this right? When
we create this (5 lines down), we seem to give it a specific
level.replication, so we'll only end up with a certain number of peers,
which we will then cache. It seems to me that every block thereafter
will be forcibly stored at the same replication level as the first.

This bug may be possible, because it would only show up if you're using 2
separate replication levels above 1, which is probably uncommon. Am I
missing something, though?


Reply to this email directly or view it on GitHubhttps://github.com//pull/887/files#r13062338
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark should not be making assumptions about the replication level like that. StorageLevel has a fully public apply() to construct new ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is private to spark iirc ?
But agreed about incorrect assumption.
Doing it cleanly is much more involved than just going to master for
various replication levels and caching that per level ... We would need a
block placement spi like what NN exposes
On 27-May-2014 10:51 am, "Aaron Davidson" [email protected] wrote:

In core/src/main/scala/org/apache/spark/storage/BlockManager.scala:

@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/

  • var cachedPeers: Seq[BlockManagerId] = null
  • @volatile var cachedPeers: Seq[BlockManagerId] = null

Spark should not be making assumptions about the replication level like
that. StorageLevel has a fully public apply() to construct new ones.


Reply to this email directly or view it on GitHubhttps://github.com//pull/887/files#r13062499
.

private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
Expand Down