Skip to content

Commit d4a1ed3

Browse files
committed
Pulled from main Spark respository and changed join and test functions on join
1 parent 060b972 commit d4a1ed3

File tree

2 files changed

+258
-71
lines changed

2 files changed

+258
-71
lines changed

R/pkg/R/DataFrame.R

Lines changed: 121 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ setMethod("names<-",
271271
signature(x = "DataFrame"),
272272
function(x, value) {
273273
if (!is.null(value)) {
274-
sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value)))
274+
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
275275
dataFrame(sdf)
276276
}
277277
})
@@ -652,18 +652,49 @@ setMethod("dim",
652652
setMethod("collect",
653653
signature(x = "DataFrame"),
654654
function(x, stringsAsFactors = FALSE) {
655-
# listCols is a list of raw vectors, one per column
656-
listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
657-
cols <- lapply(listCols, function(col) {
658-
objRaw <- rawConnection(col)
659-
numRows <- readInt(objRaw)
660-
col <- readCol(objRaw, numRows)
661-
close(objRaw)
662-
col
663-
})
664-
names(cols) <- columns(x)
665-
do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors))
666-
})
655+
names <- columns(x)
656+
ncol <- length(names)
657+
if (ncol <= 0) {
658+
# empty data.frame with 0 columns and 0 rows
659+
data.frame()
660+
} else {
661+
# listCols is a list of columns
662+
listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
663+
stopifnot(length(listCols) == ncol)
664+
665+
# An empty data.frame with 0 columns and number of rows as collected
666+
nrow <- length(listCols[[1]])
667+
if (nrow <= 0) {
668+
df <- data.frame()
669+
} else {
670+
df <- data.frame(row.names = 1 : nrow)
671+
}
672+
673+
# Append columns one by one
674+
for (colIndex in 1 : ncol) {
675+
# Note: appending a column of list type into a data.frame so that
676+
# data of complex type can be held. But getting a cell from a column
677+
# of list type returns a list instead of a vector. So for columns of
678+
# non-complex type, append them as vector.
679+
col <- listCols[[colIndex]]
680+
if (length(col) <= 0) {
681+
df[[names[colIndex]]] <- col
682+
} else {
683+
# TODO: more robust check on column of primitive types
684+
vec <- do.call(c, col)
685+
if (class(vec) != "list") {
686+
df[[names[colIndex]]] <- vec
687+
} else {
688+
# For columns of complex type, be careful to access them.
689+
# Get a column of complex type returns a list.
690+
# Get a cell from a column of complex type returns a list instead of a vector.
691+
df[[names[colIndex]]] <- col
692+
}
693+
}
694+
}
695+
df
696+
}
697+
})
667698

668699
#' Limit
669700
#'
@@ -812,10 +843,10 @@ setMethod("groupBy",
812843
function(x, ...) {
813844
cols <- list(...)
814845
if (length(cols) >= 1 && class(cols[[1]]) == "character") {
815-
sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], listToSeq(cols[-1]))
846+
sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], cols[-1])
816847
} else {
817848
jcol <- lapply(cols, function(c) { c@jc })
818-
sgd <- callJMethod(x@sdf, "groupBy", listToSeq(jcol))
849+
sgd <- callJMethod(x@sdf, "groupBy", jcol)
819850
}
820851
groupedData(sgd)
821852
})
@@ -1044,12 +1075,20 @@ setMethod("subset", signature(x = "DataFrame"),
10441075
#' select(df, c("col1", "col2"))
10451076
#' select(df, list(df$name, df$age + 1))
10461077
#' # Similar to R data frames columns can also be selected using `$`
1047-
#' df$age
1078+
#' df[,df$age]
10481079
#' }
10491080
setMethod("select", signature(x = "DataFrame", col = "character"),
10501081
function(x, col, ...) {
1051-
sdf <- callJMethod(x@sdf, "select", col, toSeq(...))
1052-
dataFrame(sdf)
1082+
if (length(col) > 1) {
1083+
if (length(list(...)) > 0) {
1084+
stop("To select multiple columns, use a character vector or list for col")
1085+
}
1086+
1087+
select(x, as.list(col))
1088+
} else {
1089+
sdf <- callJMethod(x@sdf, "select", col, list(...))
1090+
dataFrame(sdf)
1091+
}
10531092
})
10541093

10551094
#' @rdname select
@@ -1059,7 +1098,7 @@ setMethod("select", signature(x = "DataFrame", col = "Column"),
10591098
jcols <- lapply(list(col, ...), function(c) {
10601099
c@jc
10611100
})
1062-
sdf <- callJMethod(x@sdf, "select", listToSeq(jcols))
1101+
sdf <- callJMethod(x@sdf, "select", jcols)
10631102
dataFrame(sdf)
10641103
})
10651104

@@ -1075,7 +1114,7 @@ setMethod("select",
10751114
col(c)@jc
10761115
}
10771116
})
1078-
sdf <- callJMethod(x@sdf, "select", listToSeq(cols))
1117+
sdf <- callJMethod(x@sdf, "select", cols)
10791118
dataFrame(sdf)
10801119
})
10811120

@@ -1102,7 +1141,7 @@ setMethod("selectExpr",
11021141
signature(x = "DataFrame", expr = "character"),
11031142
function(x, expr, ...) {
11041143
exprList <- list(expr, ...)
1105-
sdf <- callJMethod(x@sdf, "selectExpr", listToSeq(exprList))
1144+
sdf <- callJMethod(x@sdf, "selectExpr", exprList)
11061145
dataFrame(sdf)
11071146
})
11081147

@@ -1259,8 +1298,10 @@ setClassUnion("characterOrColumn", c("character", "Column"))
12591298
#' Sort a DataFrame by the specified column(s).
12601299
#'
12611300
#' @param x A DataFrame to be sorted.
1262-
#' @param col Either a Column object or character vector indicating the field to sort on
1301+
#' @param col A character or Column object vector indicating the fields to sort on
12631302
#' @param ... Additional sorting fields
1303+
#' @param decreasing A logical argument indicating sorting order for columns when
1304+
#' a character vector is specified for col
12641305
#' @return A DataFrame where all elements are sorted.
12651306
#' @rdname arrange
12661307
#' @name arrange
@@ -1273,23 +1314,52 @@ setClassUnion("characterOrColumn", c("character", "Column"))
12731314
#' path <- "path/to/file.json"
12741315
#' df <- jsonFile(sqlContext, path)
12751316
#' arrange(df, df$col1)
1276-
#' arrange(df, "col1")
12771317
#' arrange(df, asc(df$col1), desc(abs(df$col2)))
1318+
#' arrange(df, "col1", decreasing = TRUE)
1319+
#' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE))
12781320
#' }
12791321
setMethod("arrange",
1280-
signature(x = "DataFrame", col = "characterOrColumn"),
1322+
signature(x = "DataFrame", col = "Column"),
12811323
function(x, col, ...) {
1282-
if (class(col) == "character") {
1283-
sdf <- callJMethod(x@sdf, "sort", col, toSeq(...))
1284-
} else if (class(col) == "Column") {
12851324
jcols <- lapply(list(col, ...), function(c) {
12861325
c@jc
12871326
})
1288-
sdf <- callJMethod(x@sdf, "sort", listToSeq(jcols))
1289-
}
1327+
1328+
sdf <- callJMethod(x@sdf, "sort", jcols)
12901329
dataFrame(sdf)
12911330
})
12921331

1332+
#' @rdname arrange
1333+
#' @export
1334+
setMethod("arrange",
1335+
signature(x = "DataFrame", col = "character"),
1336+
function(x, col, ..., decreasing = FALSE) {
1337+
1338+
# all sorting columns
1339+
by <- list(col, ...)
1340+
1341+
if (length(decreasing) == 1) {
1342+
# in case only 1 boolean argument - decreasing value is specified,
1343+
# it will be used for all columns
1344+
decreasing <- rep(decreasing, length(by))
1345+
} else if (length(decreasing) != length(by)) {
1346+
stop("Arguments 'col' and 'decreasing' must have the same length")
1347+
}
1348+
1349+
# builds a list of columns of type Column
1350+
# example: [[1]] Column Species ASC
1351+
# [[2]] Column Petal_Length DESC
1352+
jcols <- lapply(seq_len(length(decreasing)), function(i){
1353+
if (decreasing[[i]]) {
1354+
desc(getColumn(x, by[[i]]))
1355+
} else {
1356+
asc(getColumn(x, by[[i]]))
1357+
}
1358+
})
1359+
1360+
do.call("arrange", c(x, jcols))
1361+
})
1362+
12931363
#' @rdname arrange
12941364
#' @name orderby
12951365
setMethod("orderBy",
@@ -1633,7 +1703,7 @@ setMethod("describe",
16331703
signature(x = "DataFrame", col = "character"),
16341704
function(x, col, ...) {
16351705
colList <- list(col, ...)
1636-
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
1706+
sdf <- callJMethod(x@sdf, "describe", colList)
16371707
dataFrame(sdf)
16381708
})
16391709

@@ -1643,7 +1713,7 @@ setMethod("describe",
16431713
signature(x = "DataFrame"),
16441714
function(x) {
16451715
colList <- as.list(c(columns(x)))
1646-
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
1716+
sdf <- callJMethod(x@sdf, "describe", colList)
16471717
dataFrame(sdf)
16481718
})
16491719

@@ -1700,7 +1770,7 @@ setMethod("dropna",
17001770

17011771
naFunctions <- callJMethod(x@sdf, "na")
17021772
sdf <- callJMethod(naFunctions, "drop",
1703-
as.integer(minNonNulls), listToSeq(as.list(cols)))
1773+
as.integer(minNonNulls), as.list(cols))
17041774
dataFrame(sdf)
17051775
})
17061776

@@ -1784,36 +1854,30 @@ setMethod("fillna",
17841854
sdf <- if (length(cols) == 0) {
17851855
callJMethod(naFunctions, "fill", value)
17861856
} else {
1787-
callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols)))
1857+
callJMethod(naFunctions, "fill", value, as.list(cols))
17881858
}
17891859
dataFrame(sdf)
17901860
})
17911861

1792-
#' crosstab
1793-
#'
1794-
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
1795-
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
1796-
#' non-zero pair frequencies will be returned.
1862+
#' This function downloads the contents of a DataFrame into an R's data.frame.
1863+
#' Since data.frames are held in memory, ensure that you have enough memory
1864+
#' in your system to accommodate the contents.
17971865
#'
1798-
#' @param col1 name of the first column. Distinct items will make the first item of each row.
1799-
#' @param col2 name of the second column. Distinct items will make the column names of the output.
1800-
#' @return a local R data.frame representing the contingency table. The first column of each row
1801-
#' will be the distinct values of `col1` and the column names will be the distinct values
1802-
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
1803-
#' occurrences will have zero as their counts.
1866+
#' @title Download data from a DataFrame into a data.frame
1867+
#' @param x a DataFrame
1868+
#' @return a data.frame
1869+
#' @rdname as.data.frame
1870+
#' @examples \dontrun{
18041871
#'
1805-
#' @rdname statfunctions
1806-
#' @name crosstab
1807-
#' @export
1808-
#' @examples
1809-
#' \dontrun{
1810-
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
1811-
#' ct = crosstab(df, "title", "gender")
1872+
#' irisDF <- createDataFrame(sqlContext, iris)
1873+
#' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ])
18121874
#' }
1813-
setMethod("crosstab",
1814-
signature(x = "DataFrame", col1 = "character", col2 = "character"),
1815-
function(x, col1, col2) {
1816-
statFunctions <- callJMethod(x@sdf, "stat")
1817-
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
1818-
collect(dataFrame(sct))
1875+
setMethod("as.data.frame",
1876+
signature(x = "DataFrame"),
1877+
function(x, ...) {
1878+
# Check if additional parameters have been passed
1879+
if (length(list(...)) > 0) {
1880+
stop(paste("Unused argument(s): ", paste(list(...), collapse=", ")))
1881+
}
1882+
collect(x)
18191883
})

0 commit comments

Comments
 (0)