@@ -730,6 +730,7 @@ setMethod("take",
730730 index <- - 1
731731 jrdd <- getJRDD(x )
732732 numPartitions <- numPartitions(x )
733+ serializedModeRDD <- getSerializedMode(x )
733734
734735 # TODO(shivaram): Collect more than one partition based on size
735736 # estimates similar to the scala version of `take`.
@@ -748,13 +749,14 @@ setMethod("take",
748749 elems <- convertJListToRList(partition ,
749750 flatten = TRUE ,
750751 logicalUpperBound = size ,
751- serializedMode = getSerializedMode( x ) )
752- # TODO: Check if this append is O(n^2)?
752+ serializedMode = serializedModeRDD )
753+
753754 resList <- append(resList , elems )
754755 }
755756 resList
756757 })
757758
759+
758760# ' First
759761# '
760762# ' Return the first element of an RDD
@@ -1092,21 +1094,42 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
10921094 if (num < length(part )) {
10931095 # R limitation: order works only on primitive types!
10941096 ord <- order(unlist(part , recursive = FALSE ), decreasing = ! ascending )
1095- list ( part [ord [1 : num ]])
1097+ part [ord [1 : num ]]
10961098 } else {
1097- list ( part )
1099+ part
10981100 }
10991101 }
11001102
1101- reduceFunc <- function (elems , part ) {
1102- newElems <- append(elems , part )
1103- # R limitation: order works only on primitive types!
1104- ord <- order(unlist(newElems , recursive = FALSE ), decreasing = ! ascending )
1105- newElems [ord [1 : num ]]
1106- }
1107-
11081103 newRdd <- mapPartitions(x , partitionFunc )
1109- reduce(newRdd , reduceFunc )
1104+
1105+ resList <- list ()
1106+ index <- - 1
1107+ jrdd <- getJRDD(newRdd )
1108+ numPartitions <- numPartitions(newRdd )
1109+ serializedModeRDD <- getSerializedMode(newRdd )
1110+
1111+ while (TRUE ) {
1112+ index <- index + 1
1113+
1114+ if (index > = numPartitions ) {
1115+ ord <- order(unlist(resList , recursive = FALSE ), decreasing = ! ascending )
1116+ resList <- resList [ord [1 : num ]]
1117+ break
1118+ }
1119+
1120+ # a JList of byte arrays
1121+ partitionArr <- callJMethod(jrdd , " collectPartitions" , as.list(as.integer(index )))
1122+ partition <- partitionArr [[1 ]]
1123+
1124+ # elems is capped to have at most `num` elements
1125+ elems <- convertJListToRList(partition ,
1126+ flatten = TRUE ,
1127+ logicalUpperBound = num ,
1128+ serializedMode = serializedModeRDD )
1129+
1130+ resList <- append(resList , elems )
1131+ }
1132+ resList
11101133}
11111134
11121135# ' Returns the first N elements from an RDD in ascending order.
@@ -1465,67 +1488,105 @@ setMethod("zipRDD",
14651488 stop(" Can only zip RDDs which have the same number of partitions." )
14661489 }
14671490
1468- if (getSerializedMode(x ) != getSerializedMode(other ) ||
1469- getSerializedMode(x ) == " byte" ) {
1470- # Append the number of elements in each partition to that partition so that we can later
1471- # check if corresponding partitions of both RDDs have the same number of elements.
1472- #
1473- # Note that this appending also serves the purpose of reserialization, because even if
1474- # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
1475- # as a single byte array. For example, partitions of an RDD generated from partitionBy()
1476- # may be encoded as multiple byte arrays.
1477- appendLength <- function (part ) {
1478- part [[length(part ) + 1 ]] <- length(part ) + 1
1479- part
1480- }
1481- x <- lapplyPartition(x , appendLength )
1482- other <- lapplyPartition(other , appendLength )
1483- }
1491+ rdds <- appendPartitionLengths(x , other )
1492+ jrdd <- callJMethod(getJRDD(rdds [[1 ]]), " zip" , getJRDD(rdds [[2 ]]))
1493+ # The jrdd's elements are of scala Tuple2 type. The serialized
1494+ # flag here is used for the elements inside the tuples.
1495+ rdd <- RDD(jrdd , getSerializedMode(rdds [[1 ]]))
14841496
1485- zippedJRDD <- callJMethod(getJRDD(x ), " zip" , getJRDD(other ))
1486- # The zippedRDD's elements are of scala Tuple2 type. The serialized
1487- # flag Here is used for the elements inside the tuples.
1488- serializerMode <- getSerializedMode(x )
1489- zippedRDD <- RDD(zippedJRDD , serializerMode )
1497+ mergePartitions(rdd , TRUE )
1498+ })
1499+
1500+ # ' Cartesian product of this RDD and another one.
1501+ # '
1502+ # ' Return the Cartesian product of this RDD and another one,
1503+ # ' that is, the RDD of all pairs of elements (a, b) where a
1504+ # ' is in this and b is in other.
1505+ # '
1506+ # ' @param x An RDD.
1507+ # ' @param other An RDD.
1508+ # ' @return A new RDD which is the Cartesian product of these two RDDs.
1509+ # ' @examples
1510+ # '\dontrun{
1511+ # ' sc <- sparkR.init()
1512+ # ' rdd <- parallelize(sc, 1:2)
1513+ # ' sortByKey(cartesian(rdd, rdd))
1514+ # ' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
1515+ # '}
1516+ # ' @rdname cartesian
1517+ # ' @aliases cartesian,RDD,RDD-method
1518+ setMethod ("cartesian ",
1519+ signature(x = " RDD" , other = " RDD" ),
1520+ function (x , other ) {
1521+ rdds <- appendPartitionLengths(x , other )
1522+ jrdd <- callJMethod(getJRDD(rdds [[1 ]]), " cartesian" , getJRDD(rdds [[2 ]]))
1523+ # The jrdd's elements are of scala Tuple2 type. The serialized
1524+ # flag here is used for the elements inside the tuples.
1525+ rdd <- RDD(jrdd , getSerializedMode(rdds [[1 ]]))
14901526
1491- partitionFunc <- function (split , part ) {
1492- len <- length(part )
1493- if (len > 0 ) {
1494- if (serializerMode == " byte" ) {
1495- lengthOfValues <- part [[len ]]
1496- lengthOfKeys <- part [[len - lengthOfValues ]]
1497- stopifnot(len == lengthOfKeys + lengthOfValues )
1498-
1499- # check if corresponding partitions of both RDDs have the same number of elements.
1500- if (lengthOfKeys != lengthOfValues ) {
1501- stop(" Can only zip RDDs with same number of elements in each pair of corresponding partitions." )
1502- }
1503-
1504- if (lengthOfKeys > 1 ) {
1505- keys <- part [1 : (lengthOfKeys - 1 )]
1506- values <- part [(lengthOfKeys + 1 ) : (len - 1 )]
1507- } else {
1508- keys <- list ()
1509- values <- list ()
1510- }
1511- } else {
1512- # Keys, values must have same length here, because this has
1513- # been validated inside the JavaRDD.zip() function.
1514- keys <- part [c(TRUE , FALSE )]
1515- values <- part [c(FALSE , TRUE )]
1516- }
1517- mapply(
1518- function (k , v ) {
1519- list (k , v )
1520- },
1521- keys ,
1522- values ,
1523- SIMPLIFY = FALSE ,
1524- USE.NAMES = FALSE )
1525- } else {
1526- part
1527- }
1527+ mergePartitions(rdd , FALSE )
1528+ })
1529+
1530+ # ' Subtract an RDD with another RDD.
1531+ # '
1532+ # ' Return an RDD with the elements from this that are not in other.
1533+ # '
1534+ # ' @param x An RDD.
1535+ # ' @param other An RDD.
1536+ # ' @param numPartitions Number of the partitions in the result RDD.
1537+ # ' @return An RDD with the elements from this that are not in other.
1538+ # ' @examples
1539+ # '\dontrun{
1540+ # ' sc <- sparkR.init()
1541+ # ' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
1542+ # ' rdd2 <- parallelize(sc, list(2, 4))
1543+ # ' collect(subtract(rdd1, rdd2))
1544+ # ' # list(1, 1, 3)
1545+ # '}
1546+ # ' @rdname subtract
1547+ # ' @aliases subtract,RDD
1548+ setMethod ("subtract ",
1549+ signature(x = " RDD" , other = " RDD" ),
1550+ function (x , other , numPartitions = SparkR :: numPartitions(x )) {
1551+ mapFunction <- function (e ) { list (e , NA ) }
1552+ rdd1 <- map(x , mapFunction )
1553+ rdd2 <- map(other , mapFunction )
1554+ keys(subtractByKey(rdd1 , rdd2 , numPartitions ))
1555+ })
1556+
1557+ # ' Intersection of this RDD and another one.
1558+ # '
1559+ # ' Return the intersection of this RDD and another one.
1560+ # ' The output will not contain any duplicate elements,
1561+ # ' even if the input RDDs did. Performs a hash partition
1562+ # ' across the cluster.
1563+ # ' Note that this method performs a shuffle internally.
1564+ # '
1565+ # ' @param x An RDD.
1566+ # ' @param other An RDD.
1567+ # ' @param numPartitions The number of partitions in the result RDD.
1568+ # ' @return An RDD which is the intersection of these two RDDs.
1569+ # ' @examples
1570+ # '\dontrun{
1571+ # ' sc <- sparkR.init()
1572+ # ' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
1573+ # ' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
1574+ # ' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
1575+ # ' # list(1, 2, 3)
1576+ # '}
1577+ # ' @rdname intersection
1578+ # ' @aliases intersection,RDD
1579+ setMethod ("intersection ",
1580+ signature(x = " RDD" , other = " RDD" ),
1581+ function (x , other , numPartitions = SparkR :: numPartitions(x )) {
1582+ rdd1 <- map(x , function (v ) { list (v , NA ) })
1583+ rdd2 <- map(other , function (v ) { list (v , NA ) })
1584+
1585+ filterFunction <- function (elem ) {
1586+ iters <- elem [[2 ]]
1587+ all(as.vector(
1588+ lapply(iters , function (iter ) { length(iter ) > 0 }), mode = " logical" ))
15281589 }
1529-
1530- PipelinedRDD( zippedRDD , partitionFunc )
1590+
1591+ keys(filterRDD(cogroup( rdd1 , rdd2 , numPartitions = numPartitions ), filterFunction ) )
15311592 })
0 commit comments