Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions R/pkg/R/mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ summary.treeEnsemble <- function(model) {
numFeatures <- callJMethod(jobj, "numFeatures")
features <- callJMethod(jobj, "features")
featureImportances <- callJMethod(callJMethod(jobj, "featureImportances"), "toString")
maxDepth <- callJMethod(jobj, "maxDepth")
numTrees <- callJMethod(jobj, "numTrees")
treeWeights <- callJMethod(jobj, "treeWeights")
list(formula = formula,
numFeatures = numFeatures,
features = features,
featureImportances = featureImportances,
maxDepth = maxDepth,
numTrees = numTrees,
treeWeights = treeWeights,
jobj = jobj)
Expand All @@ -70,6 +72,7 @@ print.summary.treeEnsemble <- function(x) {
cat("\nNumber of features: ", x$numFeatures)
cat("\nFeatures: ", unlist(x$features))
cat("\nFeature importances: ", x$featureImportances)
cat("\nMax Depth: ", x$maxDepth)
cat("\nNumber of trees: ", x$numTrees)
cat("\nTree weights: ", unlist(x$treeWeights))

Expand Down Expand Up @@ -197,8 +200,8 @@ setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"),
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list of components includes \code{formula} (formula),
#' \code{numFeatures} (number of features), \code{features} (list of features),
#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
#' and \code{treeWeights} (tree weights).
#' \code{featureImportances} (feature importances), \code{maxDepth} (max depth of trees),
#' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights).
#' @rdname spark.gbt
#' @aliases summary,GBTRegressionModel-method
#' @export
Expand Down Expand Up @@ -403,8 +406,8 @@ setMethod("spark.randomForest", signature(data = "SparkDataFrame", formula = "fo
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list of components includes \code{formula} (formula),
#' \code{numFeatures} (number of features), \code{features} (list of features),
#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
#' and \code{treeWeights} (tree weights).
#' \code{featureImportances} (feature importances), \code{maxDepth} (max depth of trees),
#' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights).
#' @rdname spark.randomForest
#' @aliases summary,RandomForestRegressionModel-method
#' @export
Expand Down
10 changes: 10 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ test_that("spark.gbt", {
tolerance = 1e-4)
stats <- summary(model)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)
expect_equal(stats$formula, "Employed ~ .")
expect_equal(stats$numFeatures, 6)
expect_equal(length(stats$treeWeights), 20)
Expand All @@ -53,6 +54,7 @@ test_that("spark.gbt", {
expect_equal(stats$numFeatures, stats2$numFeatures)
expect_equal(stats$features, stats2$features)
expect_equal(stats$featureImportances, stats2$featureImportances)
expect_equal(stats$maxDepth, stats2$maxDepth)
expect_equal(stats$numTrees, stats2$numTrees)
expect_equal(stats$treeWeights, stats2$treeWeights)

Expand All @@ -66,6 +68,7 @@ test_that("spark.gbt", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)
predictions <- collect(predict(model, data))$prediction
Expand Down Expand Up @@ -93,6 +96,7 @@ test_that("spark.gbt", {
expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction))
expect_equal(s$numFeatures, 5)
expect_equal(s$numTrees, 20)
expect_equal(stats$maxDepth, 5)

# spark.gbt classification can work on libsvm data
data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"),
Expand All @@ -116,6 +120,7 @@ test_that("spark.randomForest", {

stats <- summary(model)
expect_equal(stats$numTrees, 1)
expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)

Expand All @@ -129,6 +134,7 @@ test_that("spark.randomForest", {
tolerance = 1e-4)
stats <- summary(model)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)

modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp")
write.ml(model, modelPath)
Expand All @@ -141,6 +147,7 @@ test_that("spark.randomForest", {
expect_equal(stats$features, stats2$features)
expect_equal(stats$featureImportances, stats2$featureImportances)
expect_equal(stats$numTrees, stats2$numTrees)
expect_equal(stats$maxDepth, stats2$maxDepth)
expect_equal(stats$treeWeights, stats2$treeWeights)

unlink(modelPath)
Expand All @@ -153,6 +160,7 @@ test_that("spark.randomForest", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)
# Test string prediction values
Expand Down Expand Up @@ -187,6 +195,8 @@ test_that("spark.randomForest", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)

# Test numeric prediction values
predictions <- collect(predict(model, data))$prediction
expect_equal(length(grep("1.0", predictions)), 50)
Expand Down
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ private[deploy] class Worker(
private val forwordMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")

// A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
// methods.
// A separated thread to clean up the workDir and the directories of finished applications.
// Used to provide the implicit parameter of `Future` methods.
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))

Expand Down Expand Up @@ -578,10 +578,15 @@ private[deploy] class Worker(
if (shouldCleanup) {
finishedApps -= id
appDirectories.remove(id).foreach { dirList =>
logInfo(s"Cleaning up local directories for application $id")
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
concurrent.Future {
logInfo(s"Cleaning up local directories for application $id")
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
}(cleanupThreadExecutor).onFailure {
case e: Throwable =>
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
}(cleanupThreadExecutor)
}
shuffleService.applicationRemoved(id)
}
Expand Down
18 changes: 9 additions & 9 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ spark = SparkSession. ...

# Read text from socket
socketDF = spark \
.readStream() \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
Expand All @@ -552,7 +552,7 @@ socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
.readStream() \
.readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
Expand Down Expand Up @@ -971,7 +971,7 @@ Here is the compatibility matrix.
<br/><br/>
Update mode uses watermark to drop old aggregation state.
<br/><br/>
Complete mode does drop not old aggregation state since by definition this mode
Complete mode does not drop old aggregation state since by definition this mode
preserves all data in the Result Table.
</td>
</tr>
Expand Down Expand Up @@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")

# Print new data to console
noAggDF \
.writeStream() \
.writeStream \
.format("console") \
.start()

# Write new data to Parquet files
noAggDF \
.writeStream() \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
Expand All @@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
.writeStream() \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()

# Have all the aggregates in an in memory table. The query name will be the table name
aggDF \
.writeStream() \
.writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
Expand Down Expand Up @@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu
<div data-lang="python" markdown="1">

{% highlight python %}
query = df.writeStream().format("console").start() # get the query object
query = df.writeStream.format("console").start() # get the query object

query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data

Expand Down Expand Up @@ -1658,7 +1658,7 @@ aggDF

{% highlight python %}
aggDF \
.writeStream() \
.writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[r] class GBTClassifierWrapper private (
lazy val featureImportances: Vector = gbtcModel.featureImportances
lazy val numTrees: Int = gbtcModel.getNumTrees
lazy val treeWeights: Array[Double] = gbtcModel.treeWeights
lazy val maxDepth: Int = gbtcModel.getMaxDepth

def summary: String = gbtcModel.toDebugString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[r] class GBTRegressorWrapper private (
lazy val featureImportances: Vector = gbtrModel.featureImportances
lazy val numTrees: Int = gbtrModel.getNumTrees
lazy val treeWeights: Array[Double] = gbtrModel.treeWeights
lazy val maxDepth: Int = gbtrModel.getMaxDepth

def summary: String = gbtrModel.toDebugString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[r] class RandomForestClassifierWrapper private (
lazy val featureImportances: Vector = rfcModel.featureImportances
lazy val numTrees: Int = rfcModel.getNumTrees
lazy val treeWeights: Array[Double] = rfcModel.treeWeights
lazy val maxDepth: Int = rfcModel.getMaxDepth

def summary: String = rfcModel.toDebugString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[r] class RandomForestRegressorWrapper private (
lazy val featureImportances: Vector = rfrModel.featureImportances
lazy val numTrees: Int = rfrModel.getNumTrees
lazy val treeWeights: Array[Double] = rfrModel.treeWeights
lazy val maxDepth: Int = rfrModel.getMaxDepth

def summary: String = rfrModel.toDebugString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class FileStreamSource(
}
seenFiles.purge()

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs")
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")

/**
* Returns the maximum offset that can be retrieved from the source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* - It must pass the user-provided file filter.
* - It must be newer than the ignore threshold. It is assumed that files older than the ignore
* threshold have already been considered or are existing files before start
* (when newFileOnly = true).
* (when newFilesOnly = true).
* - It must not be present in the recently selected files that this class remembers.
* - It must not be newer than the time of the batch (i.e. `currentTime` for which this
* file is being tested. This can occur if the driver was recovered, and the missing batches
Expand Down