Skip to content
Merged
2 changes: 1 addition & 1 deletion pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ License: Apache License (== 2.0)
Collate:
'generics.R'
'jobj.R'
'SQLTypes.R'
'RDD.R'
'pairRDD.R'
'column.R'
'group.R'
'schema.R'
'DataFrame.R'
'SQLContext.R'
'broadcast.R'
Expand Down
11 changes: 9 additions & 2 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -181,5 +181,12 @@ export("cacheTable",

export("sparkRSQL.init",
"sparkRHive.init")
export("print.structType",
"print.structField")

export("structField",
"structField.jobj",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: does the structField constructor fail if we don't export structField.jobj ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does in some cases. For example, when you call the fields() closure of a pre-existing structType object, at that point you're trying to explicitly create a structField from a jobj directly and it won't be able to find the correct method:

> testSchema$fields()
Error in UseMethod("structField", x) : 
 no applicable method for 'structField' applied to an object of class "jobj" 

"structField.character",
"print.structField",
"structType",
"structType.jobj",
"structType.structField",
"print.structType")
44 changes: 9 additions & 35 deletions pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ infer_type <- function(x) {
# StructType
types <- lapply(x, infer_type)
fields <- lapply(1:length(x), function(i) {
list(name = names[[i]], type = types[[i]], nullable = TRUE)
structField(names[[i]], types[[i]], TRUE)
})
list(type = "struct", fields = fields)
do.call(structType, fields)
}
} else if (length(x) > 1) {
list(type = "array", elementType = type, containsNull = TRUE)
Expand All @@ -48,30 +48,6 @@ infer_type <- function(x) {
}
}

#' dump the schema into JSON string
tojson <- function(x) {
if (is.list(x)) {
names <- names(x)
if (!is.null(names)) {
items <- lapply(names, function(n) {
safe_n <- gsub('"', '\\"', n)
paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '')
})
d <- paste(items, collapse = ', ')
paste('{', d, '}', sep = '')
} else {
l <- paste(lapply(x, tojson), collapse = ', ')
paste('[', l, ']', sep = '')
}
} else if (is.character(x)) {
paste('"', x, '"', sep = '')
} else if (is.logical(x)) {
if (x) "true" else "false"
} else {
stop(paste("unexpected type:", class(x)))
}
}

#' Create a DataFrame from an RDD
#'
#' Converts an RDD to a DataFrame by infer the types.
Expand Down Expand Up @@ -117,7 +93,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
stop(paste("unexpected type:", class(data)))
}

if (is.null(schema) || is.null(names(schema))) {
if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
row <- first(rdd)
names <- if (is.null(schema)) {
names(row)
Expand All @@ -126,7 +102,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
}
if (is.null(names)) {
names <- lapply(1:length(row), function(x) {
paste("_", as.character(x), sep = "")
paste("_", as.character(x), sep = "")
})
}

Expand All @@ -142,20 +118,18 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {

types <- lapply(row, infer_type)
fields <- lapply(1:length(row), function(i) {
list(name = names[[i]], type = types[[i]], nullable = TRUE)
structField(names[[i]], types[[i]], TRUE)
})
schema <- list(type = "struct", fields = fields)
schema <- do.call(structType, fields)
}

stopifnot(class(schema) == "list")
stopifnot(schema$type == "struct")
stopifnot(class(schema$fields) == "list")
schemaString <- tojson(schema)
stopifnot(class(schema) == "structType")
# schemaString <- tojson(schema)

jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
srdd <- callJMethod(jrdd, "rdd")
sdf <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "createDF",
srdd, schemaString, sqlCtx)
srdd, schema$jobj, sqlCtx)
dataFrame(sdf)
}

Expand Down
47 changes: 0 additions & 47 deletions pkg/R/SQLTypes.R

This file was deleted.

145 changes: 145 additions & 0 deletions pkg/R/schema.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# A set of S3 classes and methods that support the SparkSQL `StructType` and `StructField
# datatypes. These are used to create and interact with DataFrame schemas.

#' structType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A one line comment on top with a description of what is in this file would be good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Fixing that now.

#'
#' Create a structType object that contains the metadata for a DataFrame. Intended for
#' use with createDataFrame and toDF.
#'
#' @param x a structField object (created with the field() function)
#' @param ... additional structField objects
#' @return a structType object
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
#' schema <- structType(structField("a", "integer"), structField("b", "string"))
#' df <- createDataFrame(sqlCtx, rdd, schema)
#' }
structType <- function(x, ...) {
UseMethod("structType", x)
}

structType.jobj <- function(x) {
obj <- structure(list(), class = "structType")
obj$jobj <- x
obj$fields <- function() { lapply(callJMethod(obj$jobj, "fields"), structField) }
obj
}

structType.structField <- function(x, ...) {
fields <- list(x, ...)
if (!all(sapply(fields, inherits, "structField"))) {
stop("All arguments must be structField objects.")
}
sfObjList <- lapply(fields, function(field) {
field$jobj
})
stObj <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils",
"createStructType",
listToSeq(sfObjList))
structType(stObj)
}

#' Print a Spark StructType.
#'
#' This function prints the contents of a StructType returned from the
#' SparkR JVM backend.
#'
#' @param x A StructType object
#' @param ... further arguments passed to or from other methods
print.structType <- function(x, ...) {
cat("StructType\n",
sapply(x$fields(), function(field) { paste("|-", "name = \"", field$name(),
"\", type = \"", field$dataType.toString(),
"\", nullable = ", field$nullable(), "\n",
sep = "") })
, sep = "")
}

#' structField
#'
#' Create a structField object that contains the metadata for a single field in a schema.
#'
#' @param x The name of the field
#' @param type The data type of the field
#' @param nullable A logical vector indicating whether or not the field is nullable
#' @return a structField object
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
#' field1 <- structField("a", "integer", TRUE)
#' field2 <- structField("b", "string", TRUE)
#' schema <- structType(field1, field2)
#' df <- createDataFrame(sqlCtx, rdd, schema)
#' }

structField <- function(x, ...) {
UseMethod("structField", x)
}

structField.jobj <- function(x) {
obj <- structure(list(), class = "structField")
obj$jobj <- x
obj$name <- function() { callJMethod(x, "name") }
obj$dataType <- function() { callJMethod(x, "dataType") }
obj$dataType.toString <- function() { callJMethod(obj$dataType(), "toString") }
obj$dataType.simpleString <- function() { callJMethod(obj$dataType(), "simpleString") }
obj$nullable <- function() { callJMethod(x, "nullable") }
obj
}

structField.character <- function(x, type, nullable = TRUE) {
if (class(x) != "character") {
stop("Field name must be a string.")
}
if (class(type) != "character") {
stop("Field type must be a string.")
}
if (class(nullable) != "logical") {
stop("nullable must be either TRUE or FALSE")
}
options <- c("byte",
"integer",
"double",
"numeric",
"character",
"string",
"binary",
"raw",
"logical",
"boolean",
"timestamp",
"date")
dataType <- if (type %in% options) {
type
} else {
stop(paste("Unsupported type for Dataframe:", type))
}
sfObj <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils",
"createStructField",
x,
dataType,
nullable)
structField(sfObj)
}

#' Print a Spark StructField.
#'
#' This function prints the contents of a StructField returned from the
#' SparkR JVM backend.
#'
#' @param x A StructField object
#' @param ... further arguments passed to or from other methods
print.structField <- function(x, ...) {
cat("StructField(name = \"", x$name(),
"\", type = \"", x$dataType.toString(),
"\", nullable = ", x$nullable(),
")",
sep = "")
}
27 changes: 18 additions & 9 deletions pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,27 @@ test_that("infer types", {
expect_equal(infer_type(list(1L, 2L)),
list(type = 'array', elementType = "integer", containsNull = TRUE))
expect_equal(infer_type(list(a = 1L, b = "2")),
list(type = "struct",
fields = list(list(name = "a", type = "integer", nullable = TRUE),
list(name = "b", type = "string", nullable = TRUE))))
structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE)))
e <- new.env()
assign("a", 1L, envir = e)
expect_equal(infer_type(e),
list(type = "map", keyType = "string", valueType = "integer",
valueContainsNull = TRUE))
})

test_that("structType and structField", {
testField <- structField("a", "string")
expect_true(inherits(testField, "structField"))
expect_true(testField$name() == "a")
expect_true(testField$nullable())

testSchema <- structType(testField, structField("b", "integer"))
expect_true(inherits(testSchema, "structType"))
expect_true(inherits(testSchema$fields()[[2]], "structField"))
expect_true(testSchema$fields()[[1]]$dataType.toString() == "StringType")
})

test_that("create DataFrame from RDD", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
df <- createDataFrame(sqlCtx, rdd, list("a", "b"))
Expand All @@ -49,9 +60,8 @@ test_that("create DataFrame from RDD", {
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("_1", "_2"))

fields <- list(list(name = "a", type = "integer", nullable = TRUE),
list(name = "b", type = "string", nullable = TRUE))
schema <- list(type = "struct", fields = fields)
schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE))
df <- createDataFrame(sqlCtx, rdd, schema)
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("a", "b"))
Expand All @@ -77,9 +87,8 @@ test_that("toDF", {
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("_1", "_2"))

fields <- list(list(name = "a", type = "integer", nullable = TRUE),
list(name = "b", type = "string", nullable = TRUE))
schema <- list(type = "struct", fields = fields)
schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE))
df <- toDF(rdd, schema)
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("a", "b"))
Expand Down
Loading