@@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
6666 .Object
6767})
6868
69+ setMethod ("show ", "RDD",
70+ function (.Object ) {
71+ cat(paste(callJMethod(.Object @ jrdd , " toString" ), " \n " , sep = " " ))
72+ })
73+
6974setMethod ("initialize ", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
7075 .Object @ env <- new.env()
7176 .Object @ env $ isCached <- FALSE
@@ -1590,3 +1595,49 @@ setMethod("intersection",
15901595
15911596 keys(filterRDD(cogroup(rdd1 , rdd2 , numPartitions = numPartitions ), filterFunction ))
15921597 })
1598+
1599+ # ' Zips an RDD's partitions with one (or more) RDD(s).
1600+ # ' Same as zipPartitions in Spark.
1601+ # '
1602+ # ' @param ... RDDs to be zipped.
1603+ # ' @param func A function to transform zipped partitions.
1604+ # ' @return A new RDD by applying a function to the zipped partitions.
1605+ # ' Assumes that all the RDDs have the *same number of partitions*, but
1606+ # ' does *not* require them to have the same number of elements in each partition.
1607+ # ' @examples
1608+ # '\dontrun{
1609+ # ' sc <- sparkR.init()
1610+ # ' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
1611+ # ' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
1612+ # ' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
1613+ # ' collect(zipPartitions(rdd1, rdd2, rdd3,
1614+ # ' func = function(x, y, z) { list(list(x, y, z))} ))
1615+ # ' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
1616+ # '}
1617+ # ' @rdname zipRDD
1618+ # ' @aliases zipPartitions,RDD
1619+ setMethod ("zipPartitions ",
1620+ " RDD" ,
1621+ function (... , func ) {
1622+ rrdds <- list (... )
1623+ if (length(rrdds ) == 1 ) {
1624+ return (rrdds [[1 ]])
1625+ }
1626+ nPart <- sapply(rrdds , numPartitions )
1627+ if (length(unique(nPart )) != 1 ) {
1628+ stop(" Can only zipPartitions RDDs which have the same number of partitions." )
1629+ }
1630+
1631+ rrdds <- lapply(rrdds , function (rdd ) {
1632+ mapPartitionsWithIndex(rdd , function (partIndex , part ) {
1633+ print(length(part ))
1634+ list (list (partIndex , part ))
1635+ })
1636+ })
1637+ union.rdd <- Reduce(unionRDD , rrdds )
1638+ zipped.rdd <- values(groupByKey(union.rdd , numPartitions = nPart [1 ]))
1639+ res <- mapPartitions(zipped.rdd , function (plist ) {
1640+ do.call(func , plist [[1 ]])
1641+ })
1642+ res
1643+ })
0 commit comments