Skip to content

Commit bcd4258

Browse files
Merge pull request apache#130 from lythesia/master
[SPARKR-162] add fullOuterJoin in RDD.R
2 parents 49f0404 + f88bc68 commit bcd4258

File tree

4 files changed

+152
-0
lines changed

4 files changed

+152
-0
lines changed

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ exportMethods(
1919
"flatMapValues",
2020
"foreach",
2121
"foreachPartition",
22+
"fullOuterJoin",
2223
"groupByKey",
2324
"join",
2425
"keyBy",

pkg/R/RDD.R

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1783,6 +1783,86 @@ setMethod("rightOuterJoin",
17831783
joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
17841784
})
17851785

1786+
#' Full outer join two RDDs
1787+
#'
1788+
#' This function full-outer-joins two RDDs where every element is of the form
1789+
#' list(K, V).
1790+
#' The key types of the two RDDs should be the same.
1791+
#'
1792+
#' @param rdd1 An RDD to be joined. Should be an RDD where each element is
1793+
#' list(K, V).
1794+
#' @param rdd2 An RDD to be joined. Should be an RDD where each element is
1795+
#' list(K, V).
1796+
#' @param numPartitions Number of partitions to create.
1797+
#' @return For each element (k, v) in rdd1 and (k, w) in rdd2, the resulting RDD
1798+
#' will contain all pairs (k, (v, w)) for both (k, v) in rdd1 and and
1799+
#' (k, w) in rdd2, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
1800+
#' in rdd1/rdd2 have key k.
1801+
#' @rdname fullOuterJoin
1802+
#' @export
1803+
#' @examples
1804+
#'\dontrun{
1805+
#' sc <- sparkR.init()
1806+
#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
1807+
#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
1808+
#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
1809+
#' # list(1, list(3, 1)),
1810+
#' # list(3, list(3, NULL)),
1811+
#' # list(2, list(NULL, 4)))
1812+
#'}
1813+
setGeneric("fullOuterJoin", function(rdd1, rdd2, numPartitions) { standardGeneric("fullOuterJoin") })
1814+
1815+
#' @rdname fullOuterJoin
1816+
#' @aliases fullOuterJoin,RDD,RDD-method
1817+
setMethod("fullOuterJoin",
1818+
signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"),
1819+
function(rdd1, rdd2, numPartitions) {
1820+
rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) })
1821+
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })
1822+
1823+
doJoin <- function(v) {
1824+
t1 <- vector("list", length(v))
1825+
t2 <- vector("list", length(v))
1826+
index1 <- 1
1827+
index2 <- 1
1828+
for (x in v) {
1829+
if (x[[1]] == 1L) {
1830+
t1[[index1]] <- x[[2]]
1831+
index1 <- index1 + 1
1832+
} else {
1833+
t2[[index2]] <- x[[2]]
1834+
index2 <- index2 + 1
1835+
}
1836+
}
1837+
len1 <- index1 - 1
1838+
len2 <- index2 - 1
1839+
1840+
if (len1 == 0) {
1841+
t1 <- list(NULL)
1842+
} else {
1843+
length(t1) <- len1
1844+
}
1845+
1846+
if (len2 == 0) {
1847+
t2 <- list(NULL)
1848+
} else {
1849+
length(t2) <- len2
1850+
}
1851+
1852+
result <- list()
1853+
length(result) <- length(t1) * length(t2)
1854+
index <- 1
1855+
for(i in t1) {
1856+
for(j in t2) {
1857+
result[[index]] <- list(i, j)
1858+
index <- index + 1
1859+
}
1860+
}
1861+
result
1862+
}
1863+
joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
1864+
})
1865+
17861866
#' For each key k in several RDDs, return a resulting RDD that
17871867
#' whose values are a list of values for the key in all RDDs.
17881868
#'

pkg/inst/tests/test_rdd.R

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,3 +347,29 @@ test_that("rightOuterJoin() on pairwise RDDs", {
347347
sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
348348
})
349349

350+
test_that("fullOuterJoin() on pairwise RDDs", {
351+
rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
352+
rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
353+
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
354+
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)), list(3, list(3, NULL)))
355+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
356+
357+
rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
358+
rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
359+
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
360+
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)), list("c", list(1, NULL)))
361+
expect_equal(sortKeyValueList(actual),
362+
sortKeyValueList(expected))
363+
364+
rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
365+
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
366+
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
367+
expect_equal(sortKeyValueList(actual),
368+
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
369+
370+
rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
371+
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
372+
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
373+
expect_equal(sortKeyValueList(actual),
374+
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
375+
})

pkg/man/fullOuterJoin.Rd

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
% Generated by roxygen2 (4.1.0): do not edit by hand
2+
% Please edit documentation in R/RDD.R
3+
\docType{methods}
4+
\name{fullOuterJoin}
5+
\alias{fullOuterJoin}
6+
\alias{fullOuterJoin,RDD,RDD,integer-method}
7+
\alias{fullOuterJoin,RDD,RDD-method}
8+
\title{Full outer join two RDDs}
9+
\usage{
10+
fullOuterJoin(rdd1, rdd2, numPartitions)
11+
12+
\S4method{fullOuterJoin}{RDD,RDD,integer}(rdd1, rdd2, numPartitions)
13+
}
14+
\arguments{
15+
\item{rdd1}{An RDD to be joined. Should be an RDD where each element is
16+
list(K, V).}
17+
18+
\item{rdd2}{An RDD to be joined. Should be an RDD where each element is
19+
list(K, V).}
20+
21+
\item{numPartitions}{Number of partitions to create.}
22+
}
23+
\value{
24+
For each element (k, v) in rdd1 and (k, w) in rdd2, the resulting RDD
25+
will contain all pairs (k, (v, w)) for both (k, v) in rdd1 and and
26+
(k, w) in rdd2, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
27+
in rdd1/rdd2 have key k.
28+
}
29+
\description{
30+
This function full-outer-joins two RDDs where every element is of the form
31+
list(K, V).
32+
The key types of the two RDDs should be the same.
33+
}
34+
\examples{
35+
\dontrun{
36+
sc <- sparkR.init()
37+
rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
38+
rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
39+
fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
40+
# list(1, list(3, 1)),
41+
# list(3, list(3, NULL)),
42+
# list(2, list(NULL, 4)))
43+
}
44+
}
45+

0 commit comments

Comments
 (0)