Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -907,3 +907,19 @@ basenameSansExtFromUrl <- function(url) {
isAtomicLengthOne <- function(x) {
is.atomic(x) && length(x) == 1
}

is_cran <- function() {
!identical(Sys.getenv("NOT_CRAN"), "true")
}

is_windows <- function() {
.Platform$OS.type == "windows"
}

hadoop_home_set <- function() {
!identical(Sys.getenv("HADOOP_HOME"), "")
}

not_cran_or_windows_with_hadoop <- function() {
!is_cran() && (!is_windows() || hadoop_home_set())
}
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ test_that("spark.svmLinear", {
expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)

# Test model save and load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -59,6 +60,7 @@ test_that("spark.svmLinear", {
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)
}

# Test prediction with numeric label
label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
Expand Down Expand Up @@ -128,6 +130,7 @@ test_that("spark.logit", {
expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1))

# Test model save and load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -137,6 +140,7 @@ test_that("spark.logit", {
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)
}

# R code to reproduce the result.
# nolint start
Expand Down Expand Up @@ -243,6 +247,7 @@ test_that("spark.mlp", {
expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0"))

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -256,6 +261,7 @@ test_that("spark.mlp", {
expect_equal(length(summary2$weights), 64)

unlink(modelPath)
}

# Test default parameter
model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
Expand Down Expand Up @@ -354,6 +360,7 @@ test_that("spark.naiveBayes", {
"Yes", "Yes", "No", "No"))

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp")
write.ml(m, modelPath)
expect_error(write.ml(m, modelPath))
Expand All @@ -364,6 +371,7 @@ test_that("spark.naiveBayes", {
expect_equal(s$tables, s2$tables)

unlink(modelPath)
}

# Test e1071::naiveBayes
if (requireNamespace("e1071", quietly = TRUE)) {
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ test_that("spark.bisectingKmeans", {
c(0, 1, 2, 3))

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -65,6 +66,7 @@ test_that("spark.bisectingKmeans", {
expect_true(summary2$is.loaded)

unlink(modelPath)
}
})

test_that("spark.gaussianMixture", {
Expand Down Expand Up @@ -125,6 +127,7 @@ test_that("spark.gaussianMixture", {
expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1))

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -137,6 +140,7 @@ test_that("spark.gaussianMixture", {
expect_equal(unlist(stats$loglik), unlist(stats2$loglik))

unlink(modelPath)
}
})

test_that("spark.kmeans", {
Expand Down Expand Up @@ -171,6 +175,7 @@ test_that("spark.kmeans", {
expect_true(class(summary.model$coefficients[1, ]) == "numeric")

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -183,6 +188,7 @@ test_that("spark.kmeans", {
expect_true(summary2$is.loaded)

unlink(modelPath)
}

# Test Kmeans on dataset that is sensitive to seed value
col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
Expand Down Expand Up @@ -236,6 +242,7 @@ test_that("spark.lda with libsvm", {
expect_true(logPrior <= 0 & !is.na(logPrior))

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -252,6 +259,7 @@ test_that("spark.lda with libsvm", {
expect_equal(logPrior, stats2$logPrior)

unlink(modelPath)
}
})

test_that("spark.lda with text input", {
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_fpm.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ test_that("spark.fpGrowth", {

expect_equivalent(expected_predictions, collect(predict(model, new_data)))

if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-fpm", fileext = ".tmp")
write.ml(model, modelPath, overwrite = TRUE)
loaded_model <- read.ml(modelPath)
Expand All @@ -71,6 +72,7 @@ test_that("spark.fpGrowth", {
collect(spark.freqItemsets(loaded_model)))

unlink(modelPath)
}

model_without_numpartitions <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8)
expect_equal(
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_recommendation.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ test_that("spark.als", {
tolerance = 1e-4)

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-als", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -60,6 +61,7 @@ test_that("spark.als", {
expect_equal(itemFactors$features[orderItem], itemFactors2$features[orderItem2])

unlink(modelPath)
}
})

sparkR.session.stop()
4 changes: 4 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_regression.R
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ test_that("spark.isoreg", {
expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0))

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-isoreg", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -409,6 +410,7 @@ test_that("spark.isoreg", {
expect_equal(result, summary(model2))

unlink(modelPath)
}
})

test_that("spark.survreg", {
Expand Down Expand Up @@ -450,6 +452,7 @@ test_that("spark.survreg", {
2.390146, 2.891269, 2.891269), tolerance = 1e-4)

# Test model save/load
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -461,6 +464,7 @@ test_that("spark.survreg", {
expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients))

unlink(modelPath)
}

# Test survival::survreg
if (requireNamespace("survival", quietly = TRUE)) {
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ test_that("spark.gbt", {
expect_equal(stats$numFeatures, 6)
expect_equal(length(stats$treeWeights), 20)

if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-gbtRegression", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -59,6 +60,7 @@ test_that("spark.gbt", {
expect_equal(stats$treeWeights, stats2$treeWeights)

unlink(modelPath)
}

# classification
# label must be binary - GBTClassifier currently only supports binary classification.
Expand All @@ -76,6 +78,7 @@ test_that("spark.gbt", {
expect_equal(length(grep("setosa", predictions)), 50)
expect_equal(length(grep("versicolor", predictions)), 50)

if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-gbtClassification", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -87,6 +90,7 @@ test_that("spark.gbt", {
expect_equal(stats$numClasses, stats2$numClasses)

unlink(modelPath)
}

iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1)
df <- suppressWarnings(createDataFrame(iris2))
Expand Down Expand Up @@ -136,6 +140,7 @@ test_that("spark.randomForest", {
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)

if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -151,6 +156,7 @@ test_that("spark.randomForest", {
expect_equal(stats$treeWeights, stats2$treeWeights)

unlink(modelPath)
}

# classification
data <- suppressWarnings(createDataFrame(iris))
Expand All @@ -168,6 +174,7 @@ test_that("spark.randomForest", {
expect_equal(length(grep("setosa", predictions)), 50)
expect_equal(length(grep("versicolor", predictions)), 50)

if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-randomForestClassification", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
Expand All @@ -179,6 +186,7 @@ test_that("spark.randomForest", {
expect_equal(stats$numClasses, stats2$numClasses)

unlink(modelPath)
}

# Test numeric response variable
labelToIndex <- function(species) {
Expand Down
18 changes: 17 additions & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ unsetHiveContext <- function() {
# Tests for SparkSQL functions in SparkR

filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkSession <- sparkR.session(master = sparkRTestMaster)
sparkSession <- if (not_cran_or_windows_with_hadoop()) {
sparkR.session(master = sparkRTestMaster)
} else {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
}
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

mockLines <- c("{\"name\":\"Michael\"}",
Expand Down Expand Up @@ -326,6 +330,7 @@ test_that("createDataFrame uses files for large objects", {
})

test_that("read/write csv as DataFrame", {
if (not_cran_or_windows_with_hadoop()) {
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
mockLinesCsv <- c("year,make,model,comment,blank",
"\"2012\",\"Tesla\",\"S\",\"No comment\",",
Expand Down Expand Up @@ -371,6 +376,7 @@ test_that("read/write csv as DataFrame", {

unlink(csvPath)
unlink(csvPath2)
}
})

test_that("Support other types for options", {
Expand Down Expand Up @@ -601,6 +607,7 @@ test_that("Collect DataFrame with complex types", {
})

test_that("read/write json files", {
if (not_cran_or_windows_with_hadoop()) {
# Test read.df
df <- read.df(jsonPath, "json")
expect_is(df, "SparkDataFrame")
Expand Down Expand Up @@ -643,6 +650,7 @@ test_that("read/write json files", {

unlink(jsonPath2)
unlink(jsonPath3)
}
})

test_that("read/write json files - compression option", {
Expand Down Expand Up @@ -736,6 +744,7 @@ test_that("test cache, uncache and clearCache", {
})

test_that("insertInto() on a registered table", {
if (not_cran_or_windows_with_hadoop()) {
df <- read.df(jsonPath, "json")
write.df(df, parquetPath, "parquet", "overwrite")
dfParquet <- read.df(parquetPath, "parquet")
Expand Down Expand Up @@ -763,6 +772,7 @@ test_that("insertInto() on a registered table", {

unlink(jsonPath2)
unlink(parquetPath2)
}
})

test_that("tableToDF() returns a new DataFrame", {
Expand Down Expand Up @@ -954,6 +964,7 @@ test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame",
})

test_that("setCheckpointDir(), checkpoint() on a DataFrame", {
if (not_cran_or_windows_with_hadoop()) {
checkpointDir <- file.path(tempdir(), "cproot")
expect_true(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)

Expand All @@ -962,6 +973,7 @@ test_that("setCheckpointDir(), checkpoint() on a DataFrame", {
df <- checkpoint(df)
expect_is(df, "SparkDataFrame")
expect_false(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
}
})

test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
Expand Down Expand Up @@ -1329,6 +1341,7 @@ test_that("column calculation", {
})

test_that("test HiveContext", {
if (not_cran_or_windows_with_hadoop()) {
setHiveContext(sc)

schema <- structType(structField("name", "string"), structField("age", "integer"),
Expand Down Expand Up @@ -1368,6 +1381,7 @@ test_that("test HiveContext", {
unlink(parquetDataPath)

unsetHiveContext()
}
})

test_that("column operators", {
Expand Down Expand Up @@ -2415,6 +2429,7 @@ test_that("read/write ORC files - compression option", {
})

test_that("read/write Parquet files", {
if (not_cran_or_windows_with_hadoop()) {
df <- read.df(jsonPath, "json")
# Test write.df and read.df
write.df(df, parquetPath, "parquet", mode = "overwrite")
Expand Down Expand Up @@ -2443,6 +2458,7 @@ test_that("read/write Parquet files", {
unlink(parquetPath2)
unlink(parquetPath3)
unlink(parquetPath4)
}
})

test_that("read/write Parquet files - compression option/mode", {
Expand Down