Skip to content

Commit 776a56c

Browse files
committed
address patrick's and ali's comments from the previous PR
1 parent 8859371 commit 776a56c

File tree

3 files changed

+65
-23
lines changed

3 files changed

+65
-23
lines changed

core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@ import org.apache.spark.util.Utils
3030

3131

3232
/**
33-
* Creates and maintains the logical mapping between logical blocks and tachyon fs
34-
* locations. By default, one block is mapped to one file with a name given by its BlockId.
35-
* However, it is also possible to have a block map to only a segment of a file, by calling
36-
* mapBlockToFileSegment().
33+
* Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By
34+
* default, one block is mapped to one file with a name given by its BlockId.
3735
*
3836
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
3937
*/
@@ -44,11 +42,12 @@ private[spark] class TachyonBlockManager(
4442
extends TachyonFilePathResolver with Logging {
4543

4644
val client = if (master != null && master != "") TachyonFS.get(master) else null
45+
4746
if (client == null) {
4847
logError("Failed to connect to the Tachyon as the master address is not configured")
49-
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
48+
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE)
5049
}
51-
50+
5251
private val MAX_DIR_CREATION_ATTEMPTS = 10
5352
private val subDirsPerTachyonDir =
5453
shuffleManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt

core/src/main/scala/org/apache/spark/storage/TachyonStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private class TachyonStore(
122122
val segment = tachyonManager.getBlockLocation(blockId)
123123
val file = tachyonManager.getFile(blockId)
124124
val is = file.getInStream(ReadType.CACHE)
125-
var buffer : ByteBuffer = null
125+
var buffer: ByteBuffer = null
126126
if (is != null){
127127
val size = segment.length - segment.offset
128128
val bs = new Array[Byte](size.asInstanceOf[Int])

docs/scala-programming-guide.md

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -265,11 +265,24 @@ A complete list of actions is available in the [RDD API doc](api/core/index.html
265265

266266
## RDD Persistence
267267

268-
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory across operations. When you persist an RDD, each node stores any slices of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for building iterative algorithms with Spark and for interactive use from the interpreter.
269-
270-
You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
271-
272-
In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), or even replicate it across nodes. These levels are chosen by passing a [`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel) object to `persist()`. The `cache()` method is a shorthand for using the default storage level, which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of available storage levels is:
268+
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory
269+
across operations. When you persist an RDD, each node stores any slices of it that it computes in
270+
memory and reuses them in other actions on that dataset (or datasets derived from it). This allows
271+
future actions to be much faster (often by more than 10x). Caching is a key tool for building
272+
iterative algorithms with Spark and for interactive use from the interpreter.
273+
274+
You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time
275+
it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant --
276+
if any partition of an RDD is lost, it will automatically be recomputed using the transformations
277+
that originally created it.
278+
279+
In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to
280+
persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space),
281+
or even replicate it across nodes. These levels are chosen by passing a
282+
[`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel)
283+
object to `persist()`. The `cache()` method is a shorthand for using the default storage level,
284+
which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of
285+
available storage levels is:
273286

274287
<table class="table">
275288
<tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr>
@@ -292,8 +305,8 @@ In addition, each RDD can be stored using a different *storage level*, allowing
292305
</tr>
293306
<tr>
294307
<td> MEMORY_AND_DISK_SER </td>
295-
<td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them
296-
on the fly each time they're needed. </td>
308+
<td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of
309+
recomputing them on the fly each time they're needed. </td>
297310
</tr>
298311
<tr>
299312
<td> DISK_ONLY </td>
@@ -307,8 +320,9 @@ In addition, each RDD can be stored using a different *storage level*, allowing
307320

308321
### Which Storage Level to Choose?
309322

310-
Spark's storage levels are meant to provide different tradeoffs between memory usage and CPU efficiency.
311-
We recommend going through the following process to select one:
323+
Spark's storage levels are meant to provide different trade-offs between memory usage and CPU
324+
efficiency. It allows uses to choose memory, disk, or Tachyon for storing data. We recommend going
325+
through the following process to select one:
312326

313327
* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. This is the most
314328
CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
@@ -322,17 +336,38 @@ We recommend going through the following process to select one:
322336
application). *All* the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones
323337
let you continue running tasks on the RDD without waiting to recompute a lost partition.
324338

325-
If you want to define your own storage level (say, with replication factor of 3 instead of 2), then use the function factor method `apply()` of the [`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object.
339+
If you want to define your own storage level (say, with replication factor of 3 instead of 2), then
340+
use the function factor method `apply()` of the
341+
[`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object.
342+
343+
Spark has a block manager inside the Executors that let you chose memory, disk, or Tachyon. The
344+
latter is for storing RDDs off-heap outside the Executor JVM on top of the memory management system
345+
[Tachyon](http://tachyon-project.org/). This mode has the following advantages:
346+
347+
* Executor crash won't lose the data cached.
348+
* Executors can have smaller memory footprint, allowing you to run more executors on the same
349+
machine as the bulk of the memory will be inside Tachyon.
350+
* There won't be GC overheads with data stored in Tachyon.
326351

327352
# Shared Variables
328353

329-
Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of *shared variables* for two common usage patterns: broadcast variables and accumulators.
354+
Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a
355+
remote cluster node, it works on separate copies of all the variables used in the function. These
356+
variables are copied to each machine, and no updates to the variables on the remote machine are
357+
propagated back to the driver program. Supporting general, read-write shared variables across tasks
358+
would be inefficient. However, Spark does provide two limited types of *shared variables* for two
359+
common usage patterns: broadcast variables and accumulators.
330360

331361
## Broadcast Variables
332362

333-
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
363+
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather
364+
than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a
365+
large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables
366+
using efficient broadcast algorithms to reduce communication cost.
334367

335-
Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` method. The interpreter session below shows this:
368+
Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The
369+
broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value`
370+
method. The interpreter session below shows this:
336371

337372
{% highlight scala %}
338373
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
@@ -342,13 +377,21 @@ scala> broadcastVar.value
342377
res0: Array[Int] = Array(1, 2, 3)
343378
{% endhighlight %}
344379

345-
After the broadcast variable is created, it should be used instead of the value `v` in any functions run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
380+
After the broadcast variable is created, it should be used instead of the value `v` in any functions
381+
run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object
382+
`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
383+
value of the broadcast variable (e.g. if the variable is shipped to a new node later).
346384

347385
## Accumulators
348386

349-
Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can add support for new types.
387+
Accumulators are variables that are only "added" to through an associative operation and can
388+
therefore be efficiently supported in parallel. They can be used to implement counters (as in
389+
MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable
390+
collections, and programmers can add support for new types.
350391

351-
An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks running on the cluster can then add to it using the `+=` operator. However, they cannot read its value. Only the driver program can read the accumulator's value, using its `value` method.
392+
An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
393+
running on the cluster can then add to it using the `+=` operator. However, they cannot read its
394+
value. Only the driver program can read the accumulator's value, using its `value` method.
352395

353396
The interpreter session below shows an accumulator being used to add up the elements of an array:
354397

0 commit comments

Comments
 (0)