Skip to content

Conversation

@attilapiros
Copy link
Contributor

What changes were proposed in this pull request?

Before this PR during fetching a disk persisted RDD block the network was always used to get the requested block content even when both the source and fetcher executor was running on the same host.

The idea to access another executor local disk files by directly reading the disk comes from the external shuffle service where the local dirs are stored for each executor (block manager).

To make this possible the following changes are done:

  • RegisterBlockManager message is extended with the localDirs which is stored by the block manager master for each block manager as a new property of the BlockManagerInfo
  • GetLocationsAndStatus is extended with the requester host
  • BlockLocationsAndStatus (the reply for GetLocationsAndStatus message) is extended with the an option of local directories, which is filled with a local directories of a same host executor (if there is any, otherwise None is used). This is where the block content can be read from.

Shuffle blocks are out of scope of this PR: there will be a separate PR opened for that (for another Jira issue).

How was this patch tested?

With a new unit test in BlockManagerSuite. See the the test prefixed by "SPARK-27622: avoid the network when block requested from same host".

@SparkQA
Copy link

SparkQA commented May 8, 2019

Test build #105248 has finished for PR 24554 at commit 90d7d75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class ExecutorDiskReader
  • case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String)
  • case class BlockLocationsAndStatus(

@attilapiros
Copy link
Contributor Author

I can see one more possible improvement here: in BlockManagerMasterEndpoint#getLocationsAndStatus I can remove that block manager ID from the remote locations to which the local directories are given in the BlockLocationsAndStatus. As if the direct local disk access failed for that then there is no reason to try it via network too. In a following commit I will make this change.

@SparkQA
Copy link

SparkQA commented May 9, 2019

Test build #105281 has finished for PR 24554 at commit 33817da.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented May 9, 2019

Test build #105284 has finished for PR 24554 at commit 33817da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

ping @vanzin @squito

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good in general.

Have you tried this on a cluster? In particular w/ encryption. The code looks OK but worth an extra check.

@squito
Copy link
Contributor

squito commented May 10, 2019

actually ... now I'm wondering about the locking of blocks in BlockInfoManager. You aren't getting any read locks on the same-host blocks now. This means that the block might get unpersisted from underneath you, while you're in the middle of reading it.

That is true even when you read a remote block ... but that would look like the block went missing, and you'd try to get it from elsewhere, or compute from lineage. But now you'd probably have some weird failure in the middle of your task (I think).

@attilapiros
Copy link
Contributor Author

attilapiros commented May 10, 2019

Thanks @squito! I will look into the problem regarding unpersist and file delete. On linux what I have found is already promising: https://linux.die.net/man/2/unlink :

If the name was the last link to a file but any processes still have the file open the file will remain in existence until the last file descriptor referring to it is closed.

Of course I will test this and come back to you with the result. Moreover as I know on windows a file cannot be deleted if it is opened. But I really have to check the details.

I have not tried this PR on a cluster but I will do that too.

@SparkQA
Copy link

SparkQA commented May 10, 2019

Test build #105313 has finished for PR 24554 at commit 53949f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 10, 2019

Test build #105312 has finished for PR 24554 at commit 19361c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented May 10, 2019

oh that's a great point about an already open file, that is probably enough.

it would be worth also taking a look at what happens if the file gets deleted before its open. I think things will just work correctly -- it should be just like the equivalent race that happens when reading a remote block. But would appreciate you taking a closer look at that part.

@attilapiros
Copy link
Contributor Author

In the previous commit "introduce getAndMapRemoteManagedBuf and open the block file early" the old getRemoteManagedBuffer is restructured a bit:

  • the network fetching is extracted
  • the transformation of ManagedBuffer to BlockResult and to ChunkedByteBuffer is passed down for testing the buffer reading from the local directory of the same host remote executor

Both these usage (getRemoteValues and getRemoteBytes) involve opening of the file so this way we have a guarantee to read the block content. If the transformation fails because the file is deleted right before it would be opened then the process falls back to fetching from the network (tested with new tests).

The relevant tests:

info] BlockManagerSuite:
[info] - SPARK-27622: avoid the network when block requested from same host, StorageLevel(disk, 1 replicas) (421 milliseconds)
[info] - SPARK-27622: avoid the network when block requested from same host, StorageLevel(disk, deserialized, 1 replicas) (85 milliseconds)
[info] - SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, 1 replicas), getRemoteValue() (83 milliseconds)
[info] - SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, 1 replicas), getRemoteBytes() (66 milliseconds)
[info] - SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, deserialized, 1 replicas), getRemoteValue() (81 milliseconds)
[info] - SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, deserialized, 1 replicas), getRemoteBytes() (58 milliseconds)

And with DEBUG mode the relevant output:

attilapiros@apiros-MBP ~/github/spark (SPARK-27622) $ grep -e "from the disk of a same host executor\|===" core/target/unit-tests.log
===== TEST OUTPUT FOR o.a.s.storage.BlockManagerSuite: 'SPARK-27622: avoid the network when block requested from same host, StorageLevel(disk, 1 replicas)' =====
19/05/27 18:36:40.434 pool-1-thread-1-ScalaTest-running-BlockManagerSuite DEBUG BlockManager: Read test_list from the disk of a same host executor is successful.
19/05/27 18:36:40.460 pool-1-thread-1-ScalaTest-running-BlockManagerSuite DEBUG BlockManager: Read test_list from the disk of a same host executor is successful.
===== FINISHED o.a.s.storage.BlockManagerSuite: 'SPARK-27622: avoid the network when block requested from same host, StorageLevel(disk, 1 replicas)' =====
===== TEST OUTPUT FOR o.a.s.storage.BlockManagerSuite: 'SPARK-27622: avoid the network when block requested from same host, StorageLevel(disk, deserialized, 1 replicas)' =====
19/05/27 18:36:40.597 pool-1-thread-1-ScalaTest-running-BlockManagerSuite DEBUG BlockManager: Read test_list from the disk of a same host executor is successful.
19/05/27 18:36:40.611 pool-1-thread-1-ScalaTest-running-BlockManagerSuite DEBUG BlockManager: Read test_list from the disk of a same host executor is successful.
===== FINISHED o.a.s.storage.BlockManagerSuite: 'SPARK-27622: avoid the network when block requested from same host, StorageLevel(disk, deserialized, 1 replicas)' =====
===== TEST OUTPUT FOR o.a.s.storage.BlockManagerSuite: 'SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, 1 replicas), getRemoteValue()' =====
19/05/27 18:36:40.705 pool-1-thread-1-ScalaTest-running-BlockManagerSuite DEBUG BlockManagerSuite$$anon$6: Read test_list from the disk of a same host executor is failed.
===== FINISHED o.a.s.storage.BlockManagerSuite: 'SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, 1 replicas), getRemoteValue()' =====
===== TEST OUTPUT FOR o.a.s.storage.BlockManagerSuite: 'SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, 1 replicas), getRemoteBytes()' =====
19/05/27 18:36:40.823 pool-1-thread-1-ScalaTest-running-BlockManagerSuite DEBUG BlockManagerSuite$$anon$6: Read test_list from the disk of a same host executor is failed.
===== FINISHED o.a.s.storage.BlockManagerSuite: 'SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, 1 replicas), getRemoteBytes()' =====
===== TEST OUTPUT FOR o.a.s.storage.BlockManagerSuite: 'SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, deserialized, 1 replicas), getRemoteValue()' =====
19/05/27 18:36:40.936 pool-1-thread-1-ScalaTest-running-BlockManagerSuite DEBUG BlockManagerSuite$$anon$6: Read test_list from the disk of a same host executor is failed.
===== FINISHED o.a.s.storage.BlockManagerSuite: 'SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, deserialized, 1 replicas), getRemoteValue()' =====
===== TEST OUTPUT FOR o.a.s.storage.BlockManagerSuite: 'SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, deserialized, 1 replicas), getRemoteBytes()' =====
19/05/27 18:36:41.039 pool-1-thread-1-ScalaTest-running-BlockManagerSuite DEBUG BlockManagerSuite$$anon$6: Read test_list from the disk of a same host executor is failed.
===== FINISHED o.a.s.storage.BlockManagerSuite: 'SPARK-27622: as file is removed fall back to network fetch, StorageLevel(disk, deserialized, 1 replicas), getRemoteBytes()' =====

@attilapiros
Copy link
Contributor Author

attilapiros commented May 27, 2019

I have added a new test to UtilsSuite: "deleting an already opened file does not interrupt the reading process".
I have not found a better place for this test and it was only executed on macOS (I expect no problem on Linux and as Jenkins runs on Linux that would be answered soon anyway). But this must be tested on Microsoft Windows too. What I plan to do in the next days.

@SparkQA
Copy link

SparkQA commented May 27, 2019

Test build #105841 has finished for PR 24554 at commit 087dba4.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 27, 2019

Test build #105845 has finished for PR 24554 at commit 8543b93.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

I have run the test on Windows 11 and the result value of the delete() and exists() was not the same but the content was read correctly. So I have adapted my unit test accordingly.

@SparkQA
Copy link

SparkQA commented May 28, 2019

Test build #105867 has finished for PR 24554 at commit 847afc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 2, 2019

Test build #106065 has finished for PR 24554 at commit c048eca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 2, 2019

Test build #106066 has finished for PR 24554 at commit 02c213e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jun 2, 2019

test failures look real

@SparkQA
Copy link

SparkQA commented Jun 2, 2019

Test build #106073 has finished for PR 24554 at commit 78e360c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

1 failures
org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a sbt.testing.SuiteSelector)

@attilapiros
Copy link
Contributor Author

Jenkins retest this please

1 similar comment
@attilapiros
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jun 3, 2019

Test build #106102 has finished for PR 24554 at commit 78e360c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of minor things but want to read this again. Maybe Imran will get to review it before going on vacation...

@squito
Copy link
Contributor

squito commented Jun 4, 2019

looks good to me other than the comments by marcelo.

(some of my prior comments on blockmanagersuite were prematurely resolved, but talked to attila and he will address them in next patch)

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106165 has finished for PR 24554 at commit 49f9508.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor things.

@SparkQA
Copy link

SparkQA commented Jun 7, 2019

Test build #106280 has finished for PR 24554 at commit 261fc76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jun 24, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106849 has finished for PR 24554 at commit 261fc76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jun 25, 2019

Merging to master.

@vanzin vanzin closed this in b71c130 Jun 25, 2019
@mridulm
Copy link
Contributor

mridulm commented Aug 7, 2020

Catching up on PR's ... this essentially means all executors on same host have effectively same preferred locality (modulo concurrent block removal) - did we update the preferred locality for the block with this mind ? (here or in a follow up PR) Thx.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants