Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
85a8718
[SPARK-11394][SQL] Throw IllegalArgumentException for unsupported typ…
maropu Dec 29, 2015
c069ffc
[SPARK-12526][SPARKR] ifelse`, `when`, `otherwise` unable to take Col…
saurfang Dec 29, 2015
8dc6549
[SPARK-12300] [SQL] [PYSPARK] fix schema inferance on local collections
holdenk Dec 30, 2015
cd86075
[SPARK-12399] Display correct error message when accessing REST API w…
carsonwang Dec 30, 2015
4e9dd16
[SPARK-12327][SPARKR] fix code for lintr warning for commented code
felixcheung Jan 3, 2016
f7a3223
[SPARK-12562][SQL] DataFrame.write.format(text) requires the column n…
xguo27 Jan 4, 2016
cd02038
[SPARK-12486] Worker should kill the executors more forcefully if pos…
nongli Jan 4, 2016
b5a1f56
[SPARK-12470] [SQL] Fix size reduction calculation
robbinspg Jan 4, 2016
7f37c1e
[SPARK-12579][SQL] Force user-specified JDBC driver to take precedence
JoshRosen Jan 4, 2016
1005ee3
[DOC] Adjust coverage for partitionBy()
tedyu Jan 4, 2016
8ac9198
[SPARK-12589][SQL] Fix UnsafeRowParquetRecordReader to properly set t…
nongli Jan 4, 2016
6f4a224
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 4, 2016
8950482
[SPARKR][DOC] minor doc update for version in migration guide
felixcheung Jan 5, 2016
d9e4438
[SPARK-12568][SQL] Add BINARY to Encoders
marmbrus Jan 5, 2016
5afa62b
[SPARK-12647][SQL] Fix o.a.s.sqlexecution.ExchangeCoordinatorSuite.de…
robbinspg Jan 5, 2016
f31d0fd
[SPARK-12617] [PYSPARK] Clean up the leak sockets of Py4J
zsxwing Jan 5, 2016
83fe5cf
[SPARK-12511] [PYSPARK] [STREAMING] Make sure PythonDStream.registerS…
zsxwing Jan 5, 2016
0afad66
[SPARK-12450][MLLIB] Un-persist broadcasted variables in KMeans
rnowling Jan 5, 2016
bf3dca2
[SPARK-12453][STREAMING] Remove explicit dependency on aws-java-sdk
BrianLondon Jan 5, 2016
c3135d0
[SPARK-12393][SPARKR] Add read.text and write.text for SparkR
yanboliang Jan 6, 2016
1756819
[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is not None
zero323 Jan 6, 2016
d821fae
[SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming
zsxwing Jan 6, 2016
8f0ead3
[SPARK-12672][STREAMING][UI] Use the uiRoot function instead of defau…
SaintBacchus Jan 6, 2016
39b0a34
Revert "[SPARK-12672][STREAMING][UI] Use the uiRoot function instead …
zsxwing Jan 6, 2016
11b901b
[SPARK-12016] [MLLIB] [PYSPARK] Wrap Word2VecModel when loading it in…
viirya Dec 14, 2015
94af69c
[SPARK-12673][UI] Add missing uri prepending for job description
jerryshao Jan 7, 2016
d061b85
[SPARK-12678][CORE] MapPartitionsRDD clearDependencies
gpoulin Jan 7, 2016
34effc4
Revert "[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is …
yhuai Jan 7, 2016
47a58c7
[DOC] fix 'spark.memory.offHeap.enabled' default value to false
zzcclp Jan 7, 2016
13895cb
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 7, 2016
69a885a
[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is not None
zero323 Jan 7, 2016
017b73e
[SPARK-12662][SQL] Fix DataFrame.randomSplit to avoid creating overla…
sameeragarwal Jan 7, 2016
6ef8235
[SPARK-12598][CORE] bug in setMinPartitions
datafarmer Jan 7, 2016
a7c3636
[SPARK-12507][STREAMING][DOCUMENT] Expose closeFileAfterWrite and all…
zsxwing Jan 8, 2016
0d96c54
[SPARK-12591][STREAMING] Register OpenHashMapBasedStateMap for Kryo (…
zsxwing Jan 8, 2016
a77a7c5
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 8, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/.lintr
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
linters: with_defaults(line_length_linter(100), camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE), commented_code_linter = NULL)
linters: with_defaults(line_length_linter(100), camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE))
exclusions: list("inst/profile/general.R" = 1, "inst/profile/shell.R")
4 changes: 3 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ exportMethods("arrange",
"withColumnRenamed",
"write.df",
"write.json",
"write.parquet")
"write.parquet",
"write.text")

exportClasses("Column")

Expand Down Expand Up @@ -274,6 +275,7 @@ export("as.DataFrame",
"parquetFile",
"read.df",
"read.parquet",
"read.text",
"sql",
"table",
"tableNames",
Expand Down
28 changes: 28 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,34 @@ setMethod("saveAsParquetFile",
write.parquet(x, path)
})

#' write.text
#'
#' Saves the content of the DataFrame in a text file at the specified path.
#' The DataFrame must have only one column of string type with the name "value".
#' Each row becomes a new line in the output file.
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname write.text
#' @name write.text
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.txt"
#' df <- read.text(sqlContext, path)
#' write.text(df, "/tmp/sparkr-tmp/")
#'}
setMethod("write.text",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "text", path))
})

#' Distinct
#'
#' Return a new DataFrame containing the distinct rows in this DataFrame.
Expand Down
40 changes: 38 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
}
# Save the serialization flag after we create a RRDD
rdd@env$serializedMode <- serializedMode
rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD") # rddRef$asJavaRDD()
rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD")
rdd@env$jrdd_val
})

Expand Down Expand Up @@ -225,7 +225,7 @@ setMethod("cache",
#'
#' Persist this RDD with the specified storage level. For details of the
#' supported storage levels, refer to
#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
#'\url{http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence}.
#'
#' @param x The RDD to persist
#' @param newLevel The new storage level to be assigned
Expand Down Expand Up @@ -382,11 +382,13 @@ setMethod("collectPartition",
#' \code{collectAsMap} returns a named list as a map that contains all of the elements
#' in a key-value pair RDD.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
#'}
# nolint end
#' @rdname collect-methods
#' @aliases collectAsMap,RDD-method
#' @noRd
Expand Down Expand Up @@ -442,11 +444,13 @@ setMethod("length",
#' @return list of (value, count) pairs, where count is number of each unique
#' value in rdd.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, c(1,2,3,2,1))
#' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
#'}
# nolint end
#' @rdname countByValue
#' @aliases countByValue,RDD-method
#' @noRd
Expand Down Expand Up @@ -597,11 +601,13 @@ setMethod("mapPartitionsWithIndex",
#' @param x The RDD to be filtered.
#' @param f A unary predicate function.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
#'}
# nolint end
#' @rdname filterRDD
#' @aliases filterRDD,RDD,function-method
#' @noRd
Expand Down Expand Up @@ -756,11 +762,13 @@ setMethod("foreachPartition",
#' @param x The RDD to take elements from
#' @param num Number of elements to take
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' take(rdd, 2L) # list(1, 2)
#'}
# nolint end
#' @rdname take
#' @aliases take,RDD,numeric-method
#' @noRd
Expand Down Expand Up @@ -824,11 +832,13 @@ setMethod("first",
#' @param x The RDD to remove duplicates from.
#' @param numPartitions Number of partitions to create.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
#' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
#'}
# nolint end
#' @rdname distinct
#' @aliases distinct,RDD-method
#' @noRd
Expand Down Expand Up @@ -974,11 +984,13 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
#' @param x The RDD.
#' @param func The function to be applied.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3))
#' collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
#'}
# nolint end
#' @rdname keyBy
#' @aliases keyBy,RDD
#' @noRd
Expand Down Expand Up @@ -1113,11 +1125,13 @@ setMethod("saveAsTextFile",
#' @param numPartitions Number of partitions to create.
#' @return An RDD where all elements are sorted.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(3, 2, 1))
#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
#'}
# nolint end
#' @rdname sortBy
#' @aliases sortBy,RDD,RDD-method
#' @noRd
Expand Down Expand Up @@ -1188,11 +1202,13 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
#' @param num Number of elements to return.
#' @return The first N elements from the RDD in ascending order.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
#'}
# nolint end
#' @rdname takeOrdered
#' @aliases takeOrdered,RDD,RDD-method
#' @noRd
Expand All @@ -1209,11 +1225,13 @@ setMethod("takeOrdered",
#' @return The top N elements from the RDD.
#' @rdname top
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
#'}
# nolint end
#' @aliases top,RDD,RDD-method
#' @noRd
setMethod("top",
Expand Down Expand Up @@ -1261,6 +1279,7 @@ setMethod("fold",
#' @rdname aggregateRDD
#' @seealso reduce
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4))
Expand All @@ -1269,6 +1288,7 @@ setMethod("fold",
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
#'}
# nolint end
#' @aliases aggregateRDD,RDD,RDD-method
#' @noRd
setMethod("aggregateRDD",
Expand Down Expand Up @@ -1367,12 +1387,14 @@ setMethod("setName",
#' @return An RDD with zipped items.
#' @seealso zipWithIndex
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
#' collect(zipWithUniqueId(rdd))
#' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
#'}
# nolint end
#' @rdname zipWithUniqueId
#' @aliases zipWithUniqueId,RDD
#' @noRd
Expand Down Expand Up @@ -1408,12 +1430,14 @@ setMethod("zipWithUniqueId",
#' @return An RDD with zipped items.
#' @seealso zipWithUniqueId
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
#' collect(zipWithIndex(rdd))
#' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
#'}
# nolint end
#' @rdname zipWithIndex
#' @aliases zipWithIndex,RDD
#' @noRd
Expand Down Expand Up @@ -1454,12 +1478,14 @@ setMethod("zipWithIndex",
#' @return An RDD created by coalescing all elements within
#' each partition into a list.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, as.list(1:4), 2L)
#' collect(glom(rdd))
#' # list(list(1, 2), list(3, 4))
#'}
# nolint end
#' @rdname glom
#' @aliases glom,RDD
#' @noRd
Expand Down Expand Up @@ -1519,13 +1545,15 @@ setMethod("unionRDD",
#' @param other Another RDD to be zipped.
#' @return An RDD zipped from the two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 0:4)
#' rdd2 <- parallelize(sc, 1000:1004)
#' collect(zipRDD(rdd1, rdd2))
#' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
#'}
# nolint end
#' @rdname zipRDD
#' @aliases zipRDD,RDD
#' @noRd
Expand Down Expand Up @@ -1557,12 +1585,14 @@ setMethod("zipRDD",
#' @param other An RDD.
#' @return A new RDD which is the Cartesian product of these two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:2)
#' sortByKey(cartesian(rdd, rdd))
#' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
#'}
# nolint end
#' @rdname cartesian
#' @aliases cartesian,RDD,RDD-method
#' @noRd
Expand All @@ -1587,13 +1617,15 @@ setMethod("cartesian",
#' @param numPartitions Number of the partitions in the result RDD.
#' @return An RDD with the elements from this that are not in other.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
#' rdd2 <- parallelize(sc, list(2, 4))
#' collect(subtract(rdd1, rdd2))
#' # list(1, 1, 3)
#'}
# nolint end
#' @rdname subtract
#' @aliases subtract,RDD
#' @noRd
Expand All @@ -1619,13 +1651,15 @@ setMethod("subtract",
#' @param numPartitions The number of partitions in the result RDD.
#' @return An RDD which is the intersection of these two RDDs.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
#' # list(1, 2, 3)
#'}
# nolint end
#' @rdname intersection
#' @aliases intersection,RDD
#' @noRd
Expand Down Expand Up @@ -1653,6 +1687,7 @@ setMethod("intersection",
#' Assumes that all the RDDs have the *same number of partitions*, but
#' does *not* require them to have the same number of elements in each partition.
#' @examples
# nolint start
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
Expand All @@ -1662,6 +1697,7 @@ setMethod("intersection",
#' func = function(x, y, z) { list(list(x, y, z))} ))
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
#'}
# nolint end
#' @rdname zipRDD
#' @aliases zipPartitions,RDD
#' @noRd
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,32 @@ parquetFile <- function(sqlContext, ...) {
dataFrame(sdf)
}

#' Create a DataFrame from a text file.
#'
#' Loads a text file and returns a DataFrame with a single string column named "value".
#' Each line in the text file is a new row in the resulting DataFrame.
#'
#' @param sqlContext SQLContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @return DataFrame
#' @rdname read.text
#' @name read.text
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.txt"
#' df <- read.text(sqlContext, path)
#' }
read.text <- function(sqlContext, path) {
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sqlContext, "read")
sdf <- callJMethod(read, "text", paths)
dataFrame(sdf)
}

#' SQL Query
#'
#' Executes a SQL query using Spark, returning the result as a DataFrame.
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ setMethod("%in%",

#' otherwise
#'
#' If values in the specified column are null, returns the value.
#' If values in the specified column are null, returns the value.
#' Can be used in conjunction with `when` to specify a default value for expressions.
#'
#' @rdname otherwise
Expand All @@ -225,7 +225,7 @@ setMethod("%in%",
setMethod("otherwise",
signature(x = "Column", value = "ANY"),
function(x, value) {
value <- ifelse(class(value) == "Column", value@jc, value)
value <- if (class(value) == "Column") { value@jc } else { value }
jc <- callJMethod(x@jc, "otherwise", value)
column(jc)
})
Loading