Skip to content

Commit 4dca9b1

Browse files
committed
Merge pull request apache#106 from hlin09/hlin09
Add function countByValue() and countByKey().
2 parents e6fb999 + 1220d92 commit 4dca9b1

File tree

6 files changed

+185
-5
lines changed

6 files changed

+185
-5
lines changed

pkg/NAMESPACE

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ exportMethods(
99
"collectPartition",
1010
"combineByKey",
1111
"count",
12+
"countByKey",
13+
"countByValue",
1214
"distinct",
1315
"Filter",
1416
"filter",
@@ -27,6 +29,7 @@ exportMethods(
2729
"mapValues",
2830
"maximum",
2931
"minimum",
32+
"numPartitions",
3033
"partitionBy",
3134
"reduce",
3235
"reduceByKey",

pkg/R/RDD.R

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,30 @@ setMethod("checkpoint",
234234
rdd
235235
})
236236

237+
#' Gets the number of partitions of an RDD
238+
#'
239+
#' @param rdd A RDD.
240+
#' @return the number of partitions of rdd as an integer.
241+
#' @rdname numPartitions
242+
#' @export
243+
#' @examples
244+
#'\dontrun{
245+
#' sc <- sparkR.init()
246+
#' rdd <- parallelize(sc, 1:10, 2L)
247+
#' numParititions(rdd) # 2L
248+
#'}
249+
setGeneric("numPartitions", function(rdd) { standardGeneric("numPartitions") })
250+
251+
#' @rdname numPartitions
252+
#' @aliases numPartitions,RDD-method
253+
setMethod("numPartitions",
254+
signature(rdd = "RDD"),
255+
function(rdd) {
256+
jrdd <- getJRDD(rdd)
257+
partitions <- .jcall(jrdd, "Ljava/util/List;", "splits")
258+
.jcall(partitions, "I", "size")
259+
})
260+
237261
#' Collect elements of an RDD
238262
#'
239263
#' @description
@@ -359,6 +383,58 @@ setMethod("length",
359383
count(x)
360384
})
361385

386+
#' Return the count of each unique value in this RDD as a list of
387+
#' (value, count) pairs.
388+
#'
389+
#' Same as countByValue in Spark.
390+
#'
391+
#' @param rdd The RDD to count
392+
#' @return list of (value, count) pairs, where count is number of each unique
393+
#' value in rdd.
394+
#' @rdname countByValue
395+
#' @export
396+
#' @examples
397+
#'\dontrun{
398+
#' sc <- sparkR.init()
399+
#' rdd <- parallelize(sc, c(1,2,3,2,1))
400+
#' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
401+
#'}
402+
setGeneric("countByValue", function(rdd) { standardGeneric("countByValue") })
403+
404+
#' @rdname countByValue
405+
#' @aliases countByValue,RDD-method
406+
setMethod("countByValue",
407+
signature(rdd = "RDD"),
408+
function(rdd) {
409+
ones <- lapply(rdd, function(item) { list(item, 1L) })
410+
collect(reduceByKey(ones, `+`, numPartitions(rdd)))
411+
})
412+
413+
#' Count the number of elements for each key, and return the result to the
414+
#' master as lists of (key, count) pairs.
415+
#'
416+
#' Same as countByKey in Spark.
417+
#'
418+
#' @param rdd The RDD to count keys.
419+
#' @return list of (key, count) pairs, where count is number of each key in rdd.
420+
#' @rdname countByKey
421+
#' @export
422+
#' @examples
423+
#'\dontrun{
424+
#' sc <- sparkR.init()
425+
#' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
426+
#' countByKey(rdd) # ("a", 2L), ("b", 1L)
427+
#'}
428+
setGeneric("countByKey", function(rdd) { standardGeneric("countByKey") })
429+
430+
#' @rdname countByKey
431+
#' @aliases countByKey,RDD-method
432+
setMethod("countByKey",
433+
signature(rdd = "RDD"),
434+
function(rdd) {
435+
keys <- lapply(rdd, function(item) { item[[1]] })
436+
countByValue(keys)
437+
})
362438

363439
#' Apply a function to all elements
364440
#'
@@ -659,8 +735,8 @@ setMethod("take",
659735
resList <- list()
660736
index <- -1
661737
jrdd <- getJRDD(rdd)
662-
partitions <- .jcall(jrdd, "Ljava/util/List;", "splits")
663-
numPartitions <- .jcall(partitions, "I", "size")
738+
numPartitions <- numPartitions(rdd)
739+
664740
# TODO(shivaram): Collect more than one partition based on size
665741
# estimates similar to the scala version of `take`.
666742
while (TRUE) {
@@ -707,9 +783,7 @@ setMethod("distinct",
707783
signature(rdd = "RDD", numPartitions = "missingOrInteger"),
708784
function(rdd, numPartitions) {
709785
if (missing(numPartitions)) {
710-
jrdd <- getJRDD(rdd)
711-
partitions <- .jcall(jrdd, "Ljava/util/List;", "splits")
712-
numPartitions <- .jcall(partitions, "I", "size")
786+
numPartitions <- SparkR::numPartitions(rdd)
713787
}
714788
identical.mapped <- lapply(rdd, function(x) { list(x, NULL) })
715789
reduced <- reduceByKey(identical.mapped,

pkg/inst/tests/test_rdd.R

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,27 @@ rdd <- parallelize(sc, nums, 2L)
1010
intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
1111
intRdd <- parallelize(sc, intPairs, 2L)
1212

13+
test_that("get number of partitions in RDD", {
14+
expect_equal(numPartitions(rdd), 2)
15+
expect_equal(numPartitions(intRdd), 2)
16+
})
17+
1318
test_that("count and length on RDD", {
1419
expect_equal(count(rdd), 10)
1520
expect_equal(length(rdd), 10)
1621
})
1722

23+
test_that("count by values and keys", {
24+
mods <- lapply(rdd, function(x) { x %% 3 })
25+
actual <- countByValue(mods)
26+
expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
27+
expect_equal(actual, expected)
28+
29+
actual <- countByKey(intRdd)
30+
expected <- list(list(2L, 2L), list(1L, 2L))
31+
expect_equal(actual, expected)
32+
})
33+
1834
test_that("lapply on RDD", {
1935
multiples <- lapply(rdd, function(x) { 2 * x })
2036
actual <- collect(multiples)

pkg/man/countByKey.Rd

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{countByKey}
4+
\alias{countByKey}
5+
\alias{countByKey,RDD-method}
6+
\title{Count the number of elements for each key, and return the result to the
7+
master as lists of (key, count) pairs.}
8+
\usage{
9+
countByKey(rdd)
10+
11+
\S4method{countByKey}{RDD}(rdd)
12+
}
13+
\arguments{
14+
\item{rdd}{The RDD to count keys.}
15+
}
16+
\value{
17+
list of (key, count) pairs, where count is number of each key in rdd.
18+
}
19+
\description{
20+
Same as countByKey in Spark.
21+
}
22+
\examples{
23+
\dontrun{
24+
sc <- sparkR.init()
25+
rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
26+
countByKey(rdd) # ("a", 2L), ("b", 1L)
27+
}
28+
}
29+

pkg/man/countByValue.Rd

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{countByValue}
4+
\alias{countByValue}
5+
\alias{countByValue,RDD-method}
6+
\title{Return the count of each unique value in this RDD as a list of
7+
(value, count) pairs.}
8+
\usage{
9+
countByValue(rdd)
10+
11+
\S4method{countByValue}{RDD}(rdd)
12+
}
13+
\arguments{
14+
\item{rdd}{The RDD to count}
15+
}
16+
\value{
17+
list of (value, count) pairs, where count is number of each unique
18+
value in rdd.
19+
}
20+
\description{
21+
Same as countByValue in Spark.
22+
}
23+
\examples{
24+
\dontrun{
25+
sc <- sparkR.init()
26+
rdd <- parallelize(sc, c(1,2,3,2,1))
27+
countByValue(rdd) # (1,2L), (2,2L), (3,1L)
28+
}
29+
}
30+

pkg/man/numPartitions.Rd

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{numPartitions}
4+
\alias{numPartitions}
5+
\alias{numPartitions,RDD-method}
6+
\title{Gets the number of partitions of an RDD}
7+
\usage{
8+
numPartitions(rdd)
9+
10+
\S4method{numPartitions}{RDD}(rdd)
11+
}
12+
\arguments{
13+
\item{rdd}{A RDD.}
14+
}
15+
\value{
16+
the number of partitions of rdd as an integer.
17+
}
18+
\description{
19+
Gets the number of partitions of an RDD
20+
}
21+
\examples{
22+
\dontrun{
23+
sc <- sparkR.init()
24+
rdd <- parallelize(sc, 1:10, 2L)
25+
numParititions(rdd) # 2L
26+
}
27+
}
28+

0 commit comments

Comments
 (0)