@@ -25,80 +25,80 @@ getMinPartitions <- function(sc, minPartitions) {
2525 as.integer(minPartitions )
2626}
2727
28- # ' Create an RDD from a text file.
29- # '
30- # ' This function reads a text file from HDFS, a local file system (available on all
31- # ' nodes), or any Hadoop-supported file system URI, and creates an
32- # ' RDD of strings from it.
33- # '
34- # ' @param sc SparkContext to use
35- # ' @param path Path of file to read. A vector of multiple paths is allowed.
36- # ' @param minPartitions Minimum number of partitions to be created. If NULL, the default
37- # ' value is chosen based on available parallelism.
38- # ' @return RDD where each item is of type \code{character}
39- # ' @export
40- # ' @examples
41- # ' \dontrun{
42- # ' sc <- sparkR.init()
43- # ' lines <- textFile(sc, "myfile.txt")
44- # ' }
28+ # Create an RDD from a text file.
29+ #
30+ # This function reads a text file from HDFS, a local file system (available on all
31+ # nodes), or any Hadoop-supported file system URI, and creates an
32+ # RDD of strings from it.
33+ #
34+ # @param sc SparkContext to use
35+ # @param path Path of file to read. A vector of multiple paths is allowed.
36+ # @param minPartitions Minimum number of partitions to be created. If NULL, the default
37+ # value is chosen based on available parallelism.
38+ # @return RDD where each item is of type \code{character}
39+ # @export
40+ # @examples
41+ # \dontrun{
42+ # sc <- sparkR.init()
43+ # lines <- textFile(sc, "myfile.txt")
44+ # }
4545textFile <- function (sc , path , minPartitions = NULL ) {
4646 # Allow the user to have a more flexible definiton of the text file path
4747 path <- suppressWarnings(normalizePath(path ))
48- # ' Convert a string vector of paths to a string containing comma separated paths
48+ # Convert a string vector of paths to a string containing comma separated paths
4949 path <- paste(path , collapse = " ," )
5050
5151 jrdd <- callJMethod(sc , " textFile" , path , getMinPartitions(sc , minPartitions ))
5252 # jrdd is of type JavaRDD[String]
5353 RDD(jrdd , " string" )
5454}
5555
56- # ' Load an RDD saved as a SequenceFile containing serialized objects.
57- # '
58- # ' The file to be loaded should be one that was previously generated by calling
59- # ' saveAsObjectFile() of the RDD class.
60- # '
61- # ' @param sc SparkContext to use
62- # ' @param path Path of file to read. A vector of multiple paths is allowed.
63- # ' @param minPartitions Minimum number of partitions to be created. If NULL, the default
64- # ' value is chosen based on available parallelism.
65- # ' @return RDD containing serialized R objects.
66- # ' @seealso saveAsObjectFile
67- # ' @export
68- # ' @examples
69- # ' \dontrun{
70- # ' sc <- sparkR.init()
71- # ' rdd <- objectFile(sc, "myfile")
72- # ' }
56+ # Load an RDD saved as a SequenceFile containing serialized objects.
57+ #
58+ # The file to be loaded should be one that was previously generated by calling
59+ # saveAsObjectFile() of the RDD class.
60+ #
61+ # @param sc SparkContext to use
62+ # @param path Path of file to read. A vector of multiple paths is allowed.
63+ # @param minPartitions Minimum number of partitions to be created. If NULL, the default
64+ # value is chosen based on available parallelism.
65+ # @return RDD containing serialized R objects.
66+ # @seealso saveAsObjectFile
67+ # @export
68+ # @examples
69+ # \dontrun{
70+ # sc <- sparkR.init()
71+ # rdd <- objectFile(sc, "myfile")
72+ # }
7373objectFile <- function (sc , path , minPartitions = NULL ) {
7474 # Allow the user to have a more flexible definiton of the text file path
7575 path <- suppressWarnings(normalizePath(path ))
76- # ' Convert a string vector of paths to a string containing comma separated paths
76+ # Convert a string vector of paths to a string containing comma separated paths
7777 path <- paste(path , collapse = " ," )
7878
7979 jrdd <- callJMethod(sc , " objectFile" , path , getMinPartitions(sc , minPartitions ))
8080 # Assume the RDD contains serialized R objects.
8181 RDD(jrdd , " byte" )
8282}
8383
84- # ' Create an RDD from a homogeneous list or vector.
85- # '
86- # ' This function creates an RDD from a local homogeneous list in R. The elements
87- # ' in the list are split into \code{numSlices} slices and distributed to nodes
88- # ' in the cluster.
89- # '
90- # ' @param sc SparkContext to use
91- # ' @param coll collection to parallelize
92- # ' @param numSlices number of partitions to create in the RDD
93- # ' @return an RDD created from this collection
94- # ' @export
95- # ' @examples
96- # ' \dontrun{
97- # ' sc <- sparkR.init()
98- # ' rdd <- parallelize(sc, 1:10, 2)
99- # ' # The RDD should contain 10 elements
100- # ' length(rdd)
101- # ' }
84+ # Create an RDD from a homogeneous list or vector.
85+ #
86+ # This function creates an RDD from a local homogeneous list in R. The elements
87+ # in the list are split into \code{numSlices} slices and distributed to nodes
88+ # in the cluster.
89+ #
90+ # @param sc SparkContext to use
91+ # @param coll collection to parallelize
92+ # @param numSlices number of partitions to create in the RDD
93+ # @return an RDD created from this collection
94+ # @export
95+ # @examples
96+ # \dontrun{
97+ # sc <- sparkR.init()
98+ # rdd <- parallelize(sc, 1:10, 2)
99+ # # The RDD should contain 10 elements
100+ # length(rdd)
101+ # }
102102parallelize <- function (sc , coll , numSlices = 1 ) {
103103 # TODO: bound/safeguard numSlices
104104 # TODO: unit tests for if the split works for all primitives
@@ -133,33 +133,33 @@ parallelize <- function(sc, coll, numSlices = 1) {
133133 RDD(jrdd , " byte" )
134134}
135135
136- # ' Include this specified package on all workers
137- # '
138- # ' This function can be used to include a package on all workers before the
139- # ' user's code is executed. This is useful in scenarios where other R package
140- # ' functions are used in a function passed to functions like \code{lapply}.
141- # ' NOTE: The package is assumed to be installed on every node in the Spark
142- # ' cluster.
143- # '
144- # ' @param sc SparkContext to use
145- # ' @param pkg Package name
146- # '
147- # ' @export
148- # ' @examples
149- # ' \dontrun{
150- # ' library(Matrix)
151- # '
152- # ' sc <- sparkR.init()
153- # ' # Include the matrix library we will be using
154- # ' includePackage(sc, Matrix)
155- # '
156- # ' generateSparse <- function(x) {
157- # ' sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
158- # ' }
159- # '
160- # ' rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
161- # ' collect(rdd)
162- # ' }
136+ # Include this specified package on all workers
137+ #
138+ # This function can be used to include a package on all workers before the
139+ # user's code is executed. This is useful in scenarios where other R package
140+ # functions are used in a function passed to functions like \code{lapply}.
141+ # NOTE: The package is assumed to be installed on every node in the Spark
142+ # cluster.
143+ #
144+ # @param sc SparkContext to use
145+ # @param pkg Package name
146+ #
147+ # @export
148+ # @examples
149+ # \dontrun{
150+ # library(Matrix)
151+ #
152+ # sc <- sparkR.init()
153+ # # Include the matrix library we will be using
154+ # includePackage(sc, Matrix)
155+ #
156+ # generateSparse <- function(x) {
157+ # sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
158+ # }
159+ #
160+ # rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
161+ # collect(rdd)
162+ # }
163163includePackage <- function (sc , pkg ) {
164164 pkg <- as.character(substitute(pkg ))
165165 if (exists(" .packages" , .sparkREnv )) {
@@ -171,30 +171,30 @@ includePackage <- function(sc, pkg) {
171171 .sparkREnv $ .packages <- packages
172172}
173173
174- # ' @title Broadcast a variable to all workers
175- # '
176- # ' @description
177- # ' Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
178- # ' object for reading it in distributed functions.
179- # '
180- # ' @param sc Spark Context to use
181- # ' @param object Object to be broadcast
182- # ' @export
183- # ' @examples
184- # ' \dontrun{
185- # ' sc <- sparkR.init()
186- # ' rdd <- parallelize(sc, 1:2, 2L)
187- # '
188- # ' # Large Matrix object that we want to broadcast
189- # ' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
190- # ' randomMatBr <- broadcast(sc, randomMat)
191- # '
192- # ' # Use the broadcast variable inside the function
193- # ' useBroadcast <- function(x) {
194- # ' sum(value(randomMatBr) * x)
195- # ' }
196- # ' sumRDD <- lapply(rdd, useBroadcast)
197- # ' }
174+ # @title Broadcast a variable to all workers
175+ #
176+ # @description
177+ # Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
178+ # object for reading it in distributed functions.
179+ #
180+ # @param sc Spark Context to use
181+ # @param object Object to be broadcast
182+ # @export
183+ # @examples
184+ # \dontrun{
185+ # sc <- sparkR.init()
186+ # rdd <- parallelize(sc, 1:2, 2L)
187+ #
188+ # # Large Matrix object that we want to broadcast
189+ # randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
190+ # randomMatBr <- broadcast(sc, randomMat)
191+ #
192+ # # Use the broadcast variable inside the function
193+ # useBroadcast <- function(x) {
194+ # sum(value(randomMatBr) * x)
195+ # }
196+ # sumRDD <- lapply(rdd, useBroadcast)
197+ # }
198198broadcast <- function (sc , object ) {
199199 objName <- as.character(substitute(object ))
200200 serializedObj <- serialize(object , connection = NULL )
@@ -205,21 +205,21 @@ broadcast <- function(sc, object) {
205205 Broadcast(id , object , jBroadcast , objName )
206206}
207207
208- # ' @title Set the checkpoint directory
209- # '
210- # ' Set the directory under which RDDs are going to be checkpointed. The
211- # ' directory must be a HDFS path if running on a cluster.
212- # '
213- # ' @param sc Spark Context to use
214- # ' @param dirName Directory path
215- # ' @export
216- # ' @examples
217- # ' \dontrun{
218- # ' sc <- sparkR.init()
219- # ' setCheckpointDir(sc, "~/checkpoint")
220- # ' rdd <- parallelize(sc, 1:2, 2L)
221- # ' checkpoint(rdd)
222- # ' }
208+ # @title Set the checkpoint directory
209+ #
210+ # Set the directory under which RDDs are going to be checkpointed. The
211+ # directory must be a HDFS path if running on a cluster.
212+ #
213+ # @param sc Spark Context to use
214+ # @param dirName Directory path
215+ # @export
216+ # @examples
217+ # \dontrun{
218+ # sc <- sparkR.init()
219+ # setCheckpointDir(sc, "~/checkpoint")
220+ # rdd <- parallelize(sc, 1:2, 2L)
221+ # checkpoint(rdd)
222+ # }
223223setCheckpointDir <- function (sc , dirName ) {
224224 invisible (callJMethod(sc , " setCheckpointDir" , suppressWarnings(normalizePath(dirName ))))
225225}
0 commit comments