diff --git a/R/pkg/R/mllib_tree.R b/R/pkg/R/mllib_tree.R
index 40a806c41bad0..82279be6fbe77 100644
--- a/R/pkg/R/mllib_tree.R
+++ b/R/pkg/R/mllib_tree.R
@@ -52,12 +52,14 @@ summary.treeEnsemble <- function(model) {
numFeatures <- callJMethod(jobj, "numFeatures")
features <- callJMethod(jobj, "features")
featureImportances <- callJMethod(callJMethod(jobj, "featureImportances"), "toString")
+ maxDepth <- callJMethod(jobj, "maxDepth")
numTrees <- callJMethod(jobj, "numTrees")
treeWeights <- callJMethod(jobj, "treeWeights")
list(formula = formula,
numFeatures = numFeatures,
features = features,
featureImportances = featureImportances,
+ maxDepth = maxDepth,
numTrees = numTrees,
treeWeights = treeWeights,
jobj = jobj)
@@ -70,6 +72,7 @@ print.summary.treeEnsemble <- function(x) {
cat("\nNumber of features: ", x$numFeatures)
cat("\nFeatures: ", unlist(x$features))
cat("\nFeature importances: ", x$featureImportances)
+ cat("\nMax Depth: ", x$maxDepth)
cat("\nNumber of trees: ", x$numTrees)
cat("\nTree weights: ", unlist(x$treeWeights))
@@ -197,8 +200,8 @@ setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"),
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list of components includes \code{formula} (formula),
#' \code{numFeatures} (number of features), \code{features} (list of features),
-#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
-#' and \code{treeWeights} (tree weights).
+#' \code{featureImportances} (feature importances), \code{maxDepth} (max depth of trees),
+#' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights).
#' @rdname spark.gbt
#' @aliases summary,GBTRegressionModel-method
#' @export
@@ -403,8 +406,8 @@ setMethod("spark.randomForest", signature(data = "SparkDataFrame", formula = "fo
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list of components includes \code{formula} (formula),
#' \code{numFeatures} (number of features), \code{features} (list of features),
-#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
-#' and \code{treeWeights} (tree weights).
+#' \code{featureImportances} (feature importances), \code{maxDepth} (max depth of trees),
+#' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights).
#' @rdname spark.randomForest
#' @aliases summary,RandomForestRegressionModel-method
#' @export
diff --git a/R/pkg/inst/tests/testthat/test_mllib_tree.R b/R/pkg/inst/tests/testthat/test_mllib_tree.R
index e6fda251ebea2..e0802a9b02d13 100644
--- a/R/pkg/inst/tests/testthat/test_mllib_tree.R
+++ b/R/pkg/inst/tests/testthat/test_mllib_tree.R
@@ -39,6 +39,7 @@ test_that("spark.gbt", {
tolerance = 1e-4)
stats <- summary(model)
expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
expect_equal(stats$formula, "Employed ~ .")
expect_equal(stats$numFeatures, 6)
expect_equal(length(stats$treeWeights), 20)
@@ -53,6 +54,7 @@ test_that("spark.gbt", {
expect_equal(stats$numFeatures, stats2$numFeatures)
expect_equal(stats$features, stats2$features)
expect_equal(stats$featureImportances, stats2$featureImportances)
+ expect_equal(stats$maxDepth, stats2$maxDepth)
expect_equal(stats$numTrees, stats2$numTrees)
expect_equal(stats$treeWeights, stats2$treeWeights)
@@ -66,6 +68,7 @@ test_that("spark.gbt", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)
predictions <- collect(predict(model, data))$prediction
@@ -93,6 +96,7 @@ test_that("spark.gbt", {
expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction))
expect_equal(s$numFeatures, 5)
expect_equal(s$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
# spark.gbt classification can work on libsvm data
data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"),
@@ -116,6 +120,7 @@ test_that("spark.randomForest", {
stats <- summary(model)
expect_equal(stats$numTrees, 1)
+ expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)
@@ -129,6 +134,7 @@ test_that("spark.randomForest", {
tolerance = 1e-4)
stats <- summary(model)
expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp")
write.ml(model, modelPath)
@@ -141,6 +147,7 @@ test_that("spark.randomForest", {
expect_equal(stats$features, stats2$features)
expect_equal(stats$featureImportances, stats2$featureImportances)
expect_equal(stats$numTrees, stats2$numTrees)
+ expect_equal(stats$maxDepth, stats2$maxDepth)
expect_equal(stats$treeWeights, stats2$treeWeights)
unlink(modelPath)
@@ -153,6 +160,7 @@ test_that("spark.randomForest", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)
# Test string prediction values
@@ -187,6 +195,8 @@ test_that("spark.randomForest", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
+
# Test numeric prediction values
predictions <- collect(predict(model, data))$prediction
expect_equal(length(grep("1.0", predictions)), 50)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index e48817ebbafdd..00b9d1af373db 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -62,8 +62,8 @@ private[deploy] class Worker(
private val forwordMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
- // A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
- // methods.
+ // A separated thread to clean up the workDir and the directories of finished applications.
+ // Used to provide the implicit parameter of `Future` methods.
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
@@ -578,10 +578,15 @@ private[deploy] class Worker(
if (shouldCleanup) {
finishedApps -= id
appDirectories.remove(id).foreach { dirList =>
- logInfo(s"Cleaning up local directories for application $id")
- dirList.foreach { dir =>
- Utils.deleteRecursively(new File(dir))
- }
+ concurrent.Future {
+ logInfo(s"Cleaning up local directories for application $id")
+ dirList.foreach { dir =>
+ Utils.deleteRecursively(new File(dir))
+ }
+ }(cleanupThreadExecutor).onFailure {
+ case e: Throwable =>
+ logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+ }(cleanupThreadExecutor)
}
shuffleService.applicationRemoved(id)
}
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 995ac77a4fb3b..798847237866b 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -539,7 +539,7 @@ spark = SparkSession. ...
# Read text from socket
socketDF = spark \
- .readStream() \
+ .readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
@@ -552,7 +552,7 @@ socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
- .readStream() \
+ .readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
@@ -971,7 +971,7 @@ Here is the compatibility matrix.
Update mode uses watermark to drop old aggregation state.
- Complete mode does drop not old aggregation state since by definition this mode
+ Complete mode does not drop old aggregation state since by definition this mode
preserves all data in the Result Table.
@@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
noAggDF \
- .writeStream() \
+ .writeStream \
.format("console") \
.start()
# Write new data to Parquet files
noAggDF \
- .writeStream() \
+ .writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
@@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count()
# Print updated aggregations to console
aggDF \
- .writeStream() \
+ .writeStream \
.outputMode("complete") \
.format("console") \
.start()
# Have all the aggregates in an in memory table. The query name will be the table name
aggDF \
- .writeStream() \
+ .writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
@@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu