From f7e88ba22ca6547806b0cbfd49eb70813095dd9e Mon Sep 17 00:00:00 2001 From: cafreeman Date: Sun, 29 Mar 2015 19:50:21 -0500 Subject: [PATCH 01/14] Define functions for schema and fields Instead of using a list[list[list[]]], use specific constructors for schema and field objects. --- pkg/NAMESPACE | 6 ++- pkg/R/SQLContext.R | 74 ++++++++++++++++++++++++---------- pkg/inst/tests/test_sparkSQL.R | 27 ++++++++----- 3 files changed, 76 insertions(+), 31 deletions(-) diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 8ce4c7bbc..3b052a015 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -167,6 +167,8 @@ export("cacheTable", "jsonRDD", "loadDF", "parquetFile", + "buildSchema", + "field", "sql", "table", "tableNames", @@ -177,4 +179,6 @@ export("cacheTable", export("sparkRSQL.init", "sparkRHive.init") export("print.structType", - "print.structField") + "print.structField", + "print.struct", + "print.field") diff --git a/pkg/R/SQLContext.R b/pkg/R/SQLContext.R index b40ab2083..9e7840f3b 100644 --- a/pkg/R/SQLContext.R +++ b/pkg/R/SQLContext.R @@ -37,9 +37,9 @@ infer_type <- function(x) { # StructType types <- lapply(x, infer_type) fields <- lapply(1:length(x), function(i) { - list(name = names[[i]], type = types[[i]], nullable = TRUE) + field(names[[i]], types[[i]], TRUE) }) - list(type = "struct", fields = fields) + do.call(buildSchema, fields) } } else if (length(x) > 1) { list(type = "array", elementType = type, containsNull = TRUE) @@ -50,19 +50,19 @@ infer_type <- function(x) { #' dump the schema into JSON string tojson <- function(x) { - if (is.list(x)) { + if (inherits(x, "struct")) { + # schema object + l <- paste(lapply(x, tojson), collapse = ", ") + paste('{\"type\":\"struct\", \"fields\":','[', l, ']}', sep = '') + } else if (inherits(x, "field")) { + # field object names <- names(x) - if (!is.null(names)) { - items <- lapply(names, function(n) { - safe_n <- gsub('"', '\\"', n) - paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '') - }) - d <- paste(items, collapse = ', ') - paste('{', d, '}', sep = '') - } else { - l <- paste(lapply(x, tojson), collapse = ', ') - paste('[', l, ']', sep = '') - } + items <- lapply(names, function(n) { + safe_n <- gsub('"', '\\"', n) + paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '') + }) + d <- paste(items, collapse = ", ") + paste('{', d, '}', sep = '') } else if (is.character(x)) { paste('"', x, '"', sep = '') } else if (is.logical(x)) { @@ -117,7 +117,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { stop(paste("unexpected type:", class(data))) } - if (is.null(schema) || is.null(names(schema))) { + if (is.null(schema) || (!inherits(schema, "struct") && is.null(names(schema)))) { row <- first(rdd) names <- if (is.null(schema)) { names(row) @@ -126,7 +126,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { } if (is.null(names)) { names <- lapply(1:length(row), function(x) { - paste("_", as.character(x), sep = "") + paste("_", as.character(x), sep = "") }) } @@ -142,14 +142,12 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { types <- lapply(row, infer_type) fields <- lapply(1:length(row), function(i) { - list(name = names[[i]], type = types[[i]], nullable = TRUE) + field(names[[i]], types[[i]], TRUE) }) - schema <- list(type = "struct", fields = fields) + schema <- do.call(buildSchema, fields) } - stopifnot(class(schema) == "list") - stopifnot(schema$type == "struct") - stopifnot(class(schema$fields) == "list") + stopifnot(class(schema) == "struct") schemaString <- tojson(schema) jrdd <- getJRDD(lapply(rdd, function(x) x), "row") @@ -497,3 +495,37 @@ createExternalTable <- function(sqlCtx, tableName, path = NULL, source = NULL, . sdf <- callJMethod(sqlCtx, "createExternalTable", tableName, source, options) dataFrame(sdf) } + +buildSchema <- function(field, ...) { + fields <- list(field, ...) + if (!all(sapply(fields, inherits, "field"))) { + stop("All arguments must be Field objects.") + } + + structure(fields, class = "struct") +} + +print.struct <- function(x, ...) { + cat(sapply(x, function(field) { paste("|-", "name = \"", field$name, + "\", type = \"", field$type, + "\", nullable = ", field$nullable, "\n", + sep = "") }) + , sep = "") +} + +field <- function(name, type, nullable = TRUE) { + if (class(name) != "character") { + stop("Field name must be a string.") + } + if (class(type) != "character") { + stop("Field type must be a string.") + } + if (class(nullable) != "logical") { + stop("nullable must be either TRUE or FALSE") + } + structure(list("name" = name, "type" = type, "nullable" = nullable), class = "field") +} + +print.field <- function(x, ...) { + cat("name = \"", x$name, "\", type = \"", x$type, "\", nullable = ", x$nullable, sep = "") +} diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index 66e986998..d34c0f551 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -27,9 +27,8 @@ test_that("infer types", { expect_equal(infer_type(list(1L, 2L)), list(type = 'array', elementType = "integer", containsNull = TRUE)) expect_equal(infer_type(list(a = 1L, b = "2")), - list(type = "struct", - fields = list(list(name = "a", type = "integer", nullable = TRUE), - list(name = "b", type = "string", nullable = TRUE)))) + buildSchema(field(name = "a", type = "integer", nullable = TRUE), + field(name = "b", type = "string", nullable = TRUE))) e <- new.env() assign("a", 1L, envir = e) expect_equal(infer_type(e), @@ -37,6 +36,18 @@ test_that("infer types", { valueContainsNull = TRUE)) }) +test_that("buildSchema and field", { + testField <- field("a", "string") + expect_true(inherits(testField, "field")) + expect_true(testField$name == "a") + expect_true(testField$nullable) + + testSchema <- buildSchema(testField, field("b", "integer")) + expect_true(inherits(testSchema, "struct")) + expect_true(inherits(testSchema[[2]], "field")) + expect_true(testSchema[[1]]$type == "string") +}) + test_that("create DataFrame from RDD", { rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) df <- createDataFrame(sqlCtx, rdd, list("a", "b")) @@ -49,9 +60,8 @@ test_that("create DataFrame from RDD", { expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("_1", "_2")) - fields <- list(list(name = "a", type = "integer", nullable = TRUE), - list(name = "b", type = "string", nullable = TRUE)) - schema <- list(type = "struct", fields = fields) + schema <- buildSchema(field(name = "a", type = "integer", nullable = TRUE), + field(name = "b", type = "string", nullable = TRUE)) df <- createDataFrame(sqlCtx, rdd, schema) expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("a", "b")) @@ -77,9 +87,8 @@ test_that("toDF", { expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("_1", "_2")) - fields <- list(list(name = "a", type = "integer", nullable = TRUE), - list(name = "b", type = "string", nullable = TRUE)) - schema <- list(type = "struct", fields = fields) + schema <- buildSchema(field(name = "a", type = "integer", nullable = TRUE), + field(name = "b", type = "string", nullable = TRUE)) df <- toDF(rdd, schema) expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("a", "b")) From 162e76f0879a3a15d27505e5720c346f206159a1 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Mon, 30 Mar 2015 10:04:21 -0500 Subject: [PATCH 02/14] Documentation --- pkg/R/SQLContext.R | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/pkg/R/SQLContext.R b/pkg/R/SQLContext.R index 9e7840f3b..2510798c7 100644 --- a/pkg/R/SQLContext.R +++ b/pkg/R/SQLContext.R @@ -496,6 +496,23 @@ createExternalTable <- function(sqlCtx, tableName, path = NULL, source = NULL, . dataFrame(sdf) } +#' Create a Schema object +#' +#' Create an object of type "struct" that contains the metadata for a DataFrame. Intended for +#' use with createDataFrame and toDF. +#' +#' @param field a Field object (created with the field() function) +#' @param ... additional Field objects +#' @return a Schema object +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) +#' schema <- buildSchema(field("a", "integer"), field("b", "string")) +#' df <- createDataFrame(sqlCtx, rdd, schema) +#' } buildSchema <- function(field, ...) { fields <- list(field, ...) if (!all(sapply(fields, inherits, "field"))) { @@ -505,6 +522,7 @@ buildSchema <- function(field, ...) { structure(fields, class = "struct") } +# print method for "struct" object print.struct <- function(x, ...) { cat(sapply(x, function(field) { paste("|-", "name = \"", field$name, "\", type = \"", field$type, @@ -513,6 +531,25 @@ print.struct <- function(x, ...) { , sep = "") } +#' Create a Field object +#' +#' Create a Field object that contains the metadata for a single field in a schema. +#' +#' @param name The name of the field +#' @param type The data type of the field +#' @param nullable A logical vector indicating whether or not the field is nullable +#' @return a Field object +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) +#' field1 <- field("a", "integer", TRUE) +#' field2 <- field("b", "string", TRUE) +#' schema <- buildSchema(field1, field2) +#' df <- createDataFrame(sqlCtx, rdd, schema) +#' } field <- function(name, type, nullable = TRUE) { if (class(name) != "character") { stop("Field name must be a string.") @@ -526,6 +563,7 @@ field <- function(name, type, nullable = TRUE) { structure(list("name" = name, "type" = type, "nullable" = nullable), class = "field") } +# print method for Field objects print.field <- function(x, ...) { cat("name = \"", x$name, "\", type = \"", x$type, "\", nullable = ", x$nullable, sep = "") } From b50000d4d6cbe95fd81a1835c39956aa56b8e3dd Mon Sep 17 00:00:00 2001 From: cafreeman Date: Mon, 30 Mar 2015 13:34:55 -0500 Subject: [PATCH 03/14] new line at EOF --- pkg/R/SQLContext.R | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/R/SQLContext.R b/pkg/R/SQLContext.R index 2510798c7..86ab698d8 100644 --- a/pkg/R/SQLContext.R +++ b/pkg/R/SQLContext.R @@ -567,3 +567,4 @@ field <- function(name, type, nullable = TRUE) { print.field <- function(x, ...) { cat("name = \"", x$name, "\", type = \"", x$type, "\", nullable = ", x$nullable, sep = "") } + From 483506a90129d992694588d6df8e38d4839dce88 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Wed, 8 Apr 2015 14:09:53 -0500 Subject: [PATCH 04/14] refactor schema functions Refactored `structType` and `structField` so that they can be used to create schemas from R for use with `createDataFrame`. Moved everything to `schema.R` Added new methods to `SQLUtils.scala` for handling `StructType` and `StructField` on the JVM side --- pkg/R/SQLContext.R | 97 ---------- pkg/R/schema.R | 169 ++++++++++++++++++ .../berkeley/cs/amplab/sparkr/SQLUtils.scala | 31 +++- 3 files changed, 197 insertions(+), 100 deletions(-) create mode 100644 pkg/R/schema.R diff --git a/pkg/R/SQLContext.R b/pkg/R/SQLContext.R index 86ab698d8..b97a54c1c 100644 --- a/pkg/R/SQLContext.R +++ b/pkg/R/SQLContext.R @@ -48,30 +48,6 @@ infer_type <- function(x) { } } -#' dump the schema into JSON string -tojson <- function(x) { - if (inherits(x, "struct")) { - # schema object - l <- paste(lapply(x, tojson), collapse = ", ") - paste('{\"type\":\"struct\", \"fields\":','[', l, ']}', sep = '') - } else if (inherits(x, "field")) { - # field object - names <- names(x) - items <- lapply(names, function(n) { - safe_n <- gsub('"', '\\"', n) - paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '') - }) - d <- paste(items, collapse = ", ") - paste('{', d, '}', sep = '') - } else if (is.character(x)) { - paste('"', x, '"', sep = '') - } else if (is.logical(x)) { - if (x) "true" else "false" - } else { - stop(paste("unexpected type:", class(x))) - } -} - #' Create a DataFrame from an RDD #' #' Converts an RDD to a DataFrame by infer the types. @@ -495,76 +471,3 @@ createExternalTable <- function(sqlCtx, tableName, path = NULL, source = NULL, . sdf <- callJMethod(sqlCtx, "createExternalTable", tableName, source, options) dataFrame(sdf) } - -#' Create a Schema object -#' -#' Create an object of type "struct" that contains the metadata for a DataFrame. Intended for -#' use with createDataFrame and toDF. -#' -#' @param field a Field object (created with the field() function) -#' @param ... additional Field objects -#' @return a Schema object -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) -#' schema <- buildSchema(field("a", "integer"), field("b", "string")) -#' df <- createDataFrame(sqlCtx, rdd, schema) -#' } -buildSchema <- function(field, ...) { - fields <- list(field, ...) - if (!all(sapply(fields, inherits, "field"))) { - stop("All arguments must be Field objects.") - } - - structure(fields, class = "struct") -} - -# print method for "struct" object -print.struct <- function(x, ...) { - cat(sapply(x, function(field) { paste("|-", "name = \"", field$name, - "\", type = \"", field$type, - "\", nullable = ", field$nullable, "\n", - sep = "") }) - , sep = "") -} - -#' Create a Field object -#' -#' Create a Field object that contains the metadata for a single field in a schema. -#' -#' @param name The name of the field -#' @param type The data type of the field -#' @param nullable A logical vector indicating whether or not the field is nullable -#' @return a Field object -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) -#' field1 <- field("a", "integer", TRUE) -#' field2 <- field("b", "string", TRUE) -#' schema <- buildSchema(field1, field2) -#' df <- createDataFrame(sqlCtx, rdd, schema) -#' } -field <- function(name, type, nullable = TRUE) { - if (class(name) != "character") { - stop("Field name must be a string.") - } - if (class(type) != "character") { - stop("Field type must be a string.") - } - if (class(nullable) != "logical") { - stop("nullable must be either TRUE or FALSE") - } - structure(list("name" = name, "type" = type, "nullable" = nullable), class = "field") -} - -# print method for Field objects -print.field <- function(x, ...) { - cat("name = \"", x$name, "\", type = \"", x$type, "\", nullable = ", x$nullable, sep = "") -} - diff --git a/pkg/R/schema.R b/pkg/R/schema.R new file mode 100644 index 000000000..2f6221d9a --- /dev/null +++ b/pkg/R/schema.R @@ -0,0 +1,169 @@ +#' structType +#' +#' Create a structType object that contains the metadata for a DataFrame. Intended for +#' use with createDataFrame and toDF. +#' +#' @param x a Field object (created with the field() function) +#' @param ... additional Field objects +#' @return a structType object +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) +#' schema <- buildSchema(field("a", "integer"), field("b", "string")) +#' df <- createDataFrame(sqlCtx, rdd, schema) +#' } +structType <- function(x, ...) { + UseMethod("structType", x) +} + +structType.jobj <- function(x) { + obj <- structure(list(), class = "structType") + obj$jobj <- x + obj$fields <- function() { lapply(callJMethod(x, "fields"), structField) } + obj +} + +structType.structField <- function(x, ...) { + fields <- list(x, ...) + if (!all(sapply(fields, inherits, "structField"))) { + stop("All arguments must be structField objects.") + } + sfObjList <- lapply(fields, function(field) { + field$jobj + }) + stObj <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", + "createStructType", + listToSeq(sfObjList)) + structType(stObj) +} + +#' Print a Spark StructType. +#' +#' This function prints the contents of a StructType returned from the +#' SparkR JVM backend. +#' +#' @param x A StructType object +#' @param ... further arguments passed to or from other methods +print.structType <- function(x, ...) { + cat("StructType\n", + sapply(x$fields(), function(field) { paste("|-", "name = \"", field$name(), + "\", type = \"", field$dataType.toString(), + "\", nullable = ", field$nullable(), "\n", + sep = "") }) + , sep = "") +} + +#' structField +#' +#' Create a structField object that contains the metadata for a single field in a schema. +#' +#' @param x The name of the field +#' @param type The data type of the field +#' @param nullable A logical vector indicating whether or not the field is nullable +#' @return a Field object +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) +#' field1 <- field("a", "integer", TRUE) +#' field2 <- field("b", "string", TRUE) +#' schema <- buildSchema(field1, field2) +#' df <- createDataFrame(sqlCtx, rdd, schema) +#' } + +structField <- function(x, ...) { + UseMethod("structField", x) +} + +structField.jobj <- function(x) { + obj <- structure(list(), class = "structField") + obj$jobj <- x + obj$name <- function() { callJMethod(x, "name") } + obj$dataType <- function() { callJMethod(x, "dataType") } + obj$dataType.toString <- function() { callJMethod(obj$dataType(), "toString") } + obj$dataType.simpleString <- function() { callJMethod(obj$dataType(), "simpleString") } + obj$nullable <- function() { callJMethod(x, "nullable") } + obj +} + +structField.character <- function(x, type, nullable = TRUE) { + if (class(x) != "character") { + stop("Field name must be a string.") + } + if (class(type) != "character") { + stop("Field type must be a string.") + } + if (class(nullable) != "logical") { + stop("nullable must be either TRUE or FALSE") + } + options <- c("byte", + "integer", + "double", + "numeric", + "character", + "string", + "binary", + "raw", + "logical", + "boolean", + "timestamp", + "date") + dataType <- if (type %in% options) { + type + } else { + stop(paste("Unsupported type for Dataframe:", type)) + } + sfObj <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", + "createStructField", + x, + dataType, + nullable) + structField(sfObj) +} + +#' Print a Spark StructField. +#' +#' This function prints the contents of a StructField returned from the +#' SparkR JVM backend. +#' +#' @param x A StructField object +#' @param ... further arguments passed to or from other methods +print.structField <- function(x, ...) { + cat("StructField(name = \"", x$name(), + "\", type = \"", x$dataType.toString(), + "\", nullable = ", x$nullable(), + ")", + sep = "") +} + +# cfreeman: Don't think we need this function since we can create +# structType in R and pass to createDataFrame +# +# #' dump the schema into JSON string +# tojson <- function(x) { +# if (inherits(x, "struct")) { +# # schema object +# l <- paste(lapply(x, tojson), collapse = ", ") +# paste('{\"type\":\"struct\", \"fields\":','[', l, ']}', sep = '') +# } else if (inherits(x, "field")) { +# # field object +# names <- names(x) +# items <- lapply(names, function(n) { +# safe_n <- gsub('"', '\\"', n) +# paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '') +# }) +# d <- paste(items, collapse = ", ") +# paste('{', d, '}', sep = '') +# } else if (is.character(x)) { +# paste('"', x, '"', sep = '') +# } else if (is.logical(x)) { +# if (x) "true" else "false" +# } else { +# stop(paste("unexpected type:", class(x))) +# } +# } diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala index 8df90c160..cbd65ba76 100644 --- a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala @@ -5,7 +5,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.{Column, DataFrame, GroupedData, Row, SQLContext, SaveMode} import edu.berkeley.cs.amplab.sparkr.SerDe._ @@ -23,8 +23,33 @@ object SQLUtils { arr.toSeq } - def createDF(rdd: RDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { - val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] + def createStructType(fields : Seq[StructField]) : StructType = { + StructType(fields) + } + + def DataTypeObject(dataType: String): DataType = { + dataType match { + case "byte" => org.apache.spark.sql.types.ByteType + case "integer" => org.apache.spark.sql.types.IntegerType + case "double" => org.apache.spark.sql.types.DoubleType + case "numeric" => org.apache.spark.sql.types.DoubleType + case "character" => org.apache.spark.sql.types.StringType + case "string" => org.apache.spark.sql.types.StringType + case "binary" => org.apache.spark.sql.types.BinaryType + case "raw" => org.apache.spark.sql.types.BinaryType + case "logical" => org.apache.spark.sql.types.BooleanType + case "boolean" => org.apache.spark.sql.types.BooleanType + case "timestamp" => org.apache.spark.sql.types.TimestampType + case "date" => org.apache.spark.sql.types.DateType + case _ => throw new IllegalArgumentException(s"Invaid type $dataType") + } + } + + def createStructField(name: String, dataType: String, nullable: Boolean): StructField = { + val dtObj = DataTypeObject(dataType) + StructField(name, dtObj, nullable) + } + val num = schema.fields.size val rowRDD = rdd.map(bytesToRow) sqlContext.createDataFrame(rowRDD, schema) From eb728b104ac6ce84af420f50abff2d80a9aecbf4 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Wed, 8 Apr 2015 14:11:59 -0500 Subject: [PATCH 05/14] Update `createDataFrame` and `toDF` Refactored to use the new `structType` and `structField` functions. --- pkg/R/SQLContext.R | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/R/SQLContext.R b/pkg/R/SQLContext.R index b97a54c1c..d1b3cefd4 100644 --- a/pkg/R/SQLContext.R +++ b/pkg/R/SQLContext.R @@ -37,9 +37,9 @@ infer_type <- function(x) { # StructType types <- lapply(x, infer_type) fields <- lapply(1:length(x), function(i) { - field(names[[i]], types[[i]], TRUE) + structField(names[[i]], types[[i]], TRUE) }) - do.call(buildSchema, fields) + do.call(structType, fields) } } else if (length(x) > 1) { list(type = "array", elementType = type, containsNull = TRUE) @@ -93,7 +93,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { stop(paste("unexpected type:", class(data))) } - if (is.null(schema) || (!inherits(schema, "struct") && is.null(names(schema)))) { + if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) { row <- first(rdd) names <- if (is.null(schema)) { names(row) @@ -118,18 +118,18 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { types <- lapply(row, infer_type) fields <- lapply(1:length(row), function(i) { - field(names[[i]], types[[i]], TRUE) + structField(names[[i]], types[[i]], TRUE) }) - schema <- do.call(buildSchema, fields) + schema <- do.call(structType, fields) } - stopifnot(class(schema) == "struct") - schemaString <- tojson(schema) + stopifnot(class(schema) == "structType") + # schemaString <- tojson(schema) jrdd <- getJRDD(lapply(rdd, function(x) x), "row") srdd <- callJMethod(jrdd, "rdd") sdf <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "createDF", - srdd, schemaString, sqlCtx) + srdd, schema$jobj, sqlCtx) dataFrame(sdf) } From 79d4876c562e706083ba14b61fcec59cd1651079 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Wed, 8 Apr 2015 14:12:50 -0500 Subject: [PATCH 06/14] new version of `CreateDF` New version uses takes a `StructType` from R and creates a DataFrame. Commented out the `tojson` version since we don't currently use it. --- .../scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala index cbd65ba76..ca61e249f 100644 --- a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala @@ -50,6 +50,15 @@ object SQLUtils { StructField(name, dtObj, nullable) } + // cfreeman: I don't think we need this anymore since we can pass structType from R + // def createDF(rdd: RDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { + // val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] + // val num = schema.fields.size + // val rowRDD = rdd.map(bytesToRow) + // sqlContext.createDataFrame(rowRDD, schema) + // } + + def createDF(rdd: RDD[Array[Byte]], schema: StructType, sqlContext: SQLContext): DataFrame = { val num = schema.fields.size val rowRDD = rdd.map(bytesToRow) sqlContext.createDataFrame(rowRDD, schema) From 6b404df9225c4c4c34093f9e4b123acf2120cb44 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Wed, 8 Apr 2015 14:14:36 -0500 Subject: [PATCH 07/14] Update `NAMESPACE` and tests Updated `NAMESPACE`, `DESCRIPTION`, and unit tests for new schema functions. Deleted `SQLTypes.R` since everything has been moved to `schema.R`. --- pkg/DESCRIPTION | 2 +- pkg/NAMESPACE | 13 ++++++---- pkg/R/SQLTypes.R | 47 ---------------------------------- pkg/inst/tests/test_sparkSQL.R | 30 +++++++++++----------- 4 files changed, 24 insertions(+), 68 deletions(-) delete mode 100644 pkg/R/SQLTypes.R diff --git a/pkg/DESCRIPTION b/pkg/DESCRIPTION index 674e279c0..1e1549818 100644 --- a/pkg/DESCRIPTION +++ b/pkg/DESCRIPTION @@ -17,11 +17,11 @@ License: Apache License (== 2.0) Collate: 'generics.R' 'jobj.R' - 'SQLTypes.R' 'RDD.R' 'pairRDD.R' 'column.R' 'group.R' + 'schema.R' 'DataFrame.R' 'SQLContext.R' 'broadcast.R' diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 3b052a015..3b923cce8 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -167,8 +167,6 @@ export("cacheTable", "jsonRDD", "loadDF", "parquetFile", - "buildSchema", - "field", "sql", "table", "tableNames", @@ -178,7 +176,12 @@ export("cacheTable", export("sparkRSQL.init", "sparkRHive.init") -export("print.structType", + +export("structField", + "structField.jobj", + "structField.character", "print.structField", - "print.struct", - "print.field") + "structType", + "structType.jobj", + "structType.structField", + "print.structField") diff --git a/pkg/R/SQLTypes.R b/pkg/R/SQLTypes.R deleted file mode 100644 index a51da78a7..000000000 --- a/pkg/R/SQLTypes.R +++ /dev/null @@ -1,47 +0,0 @@ -# Utility functions for handling SparkSQL DataTypes. - -# Handler for StructType -structType <- function(st) { - obj <- structure(new.env(parent = emptyenv()), class = "structType") - obj$jobj <- st - obj$fields <- function() { lapply(callJMethod(st, "fields"), structField) } - obj -} - -#' Print a Spark StructType. -#' -#' This function prints the contents of a StructType returned from the -#' SparkR JVM backend. -#' -#' @param x A StructType object -#' @param ... further arguments passed to or from other methods -print.structType <- function(x, ...) { - fieldsList <- lapply(x$fields(), function(i) { i$print() }) - print(fieldsList) -} - -# Handler for StructField -structField <- function(sf) { - obj <- structure(new.env(parent = emptyenv()), class = "structField") - obj$jobj <- sf - obj$name <- function() { callJMethod(sf, "name") } - obj$dataType <- function() { callJMethod(sf, "dataType") } - obj$dataType.toString <- function() { callJMethod(obj$dataType(), "toString") } - obj$dataType.simpleString <- function() { callJMethod(obj$dataType(), "simpleString") } - obj$nullable <- function() { callJMethod(sf, "nullable") } - obj$print <- function() { paste("StructField(", - paste(obj$name(), obj$dataType.toString(), obj$nullable(), sep = ", "), - ")", sep = "") } - obj -} - -#' Print a Spark StructField. -#' -#' This function prints the contents of a StructField returned from the -#' SparkR JVM backend. -#' -#' @param x A StructField object -#' @param ... further arguments passed to or from other methods -print.structField <- function(x, ...) { - cat(x$print()) -} diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index d34c0f551..d604a6dc1 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -27,8 +27,8 @@ test_that("infer types", { expect_equal(infer_type(list(1L, 2L)), list(type = 'array', elementType = "integer", containsNull = TRUE)) expect_equal(infer_type(list(a = 1L, b = "2")), - buildSchema(field(name = "a", type = "integer", nullable = TRUE), - field(name = "b", type = "string", nullable = TRUE))) + structType(structField(x = "a", type = "integer", nullable = TRUE), + structField(x = "b", type = "string", nullable = TRUE))) e <- new.env() assign("a", 1L, envir = e) expect_equal(infer_type(e), @@ -36,16 +36,16 @@ test_that("infer types", { valueContainsNull = TRUE)) }) -test_that("buildSchema and field", { - testField <- field("a", "string") - expect_true(inherits(testField, "field")) - expect_true(testField$name == "a") - expect_true(testField$nullable) +test_that("structType and structField", { + testField <- structField("a", "string") + expect_true(inherits(testField, "structField")) + expect_true(testField$name() == "a") + expect_true(testField$nullable()) - testSchema <- buildSchema(testField, field("b", "integer")) - expect_true(inherits(testSchema, "struct")) - expect_true(inherits(testSchema[[2]], "field")) - expect_true(testSchema[[1]]$type == "string") + testSchema <- structType(testField, structField("b", "integer")) + expect_true(inherits(testSchema, "structType")) + expect_true(inherits(testSchema$fields()[[2]], "structField")) + expect_true(testSchema$fields()[[1]]$dataType.toString() == "StringType") }) test_that("create DataFrame from RDD", { @@ -60,8 +60,8 @@ test_that("create DataFrame from RDD", { expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("_1", "_2")) - schema <- buildSchema(field(name = "a", type = "integer", nullable = TRUE), - field(name = "b", type = "string", nullable = TRUE)) + schema <- structType(structField(x = "a", type = "integer", nullable = TRUE), + structField(x = "b", type = "string", nullable = TRUE)) df <- createDataFrame(sqlCtx, rdd, schema) expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("a", "b")) @@ -87,8 +87,8 @@ test_that("toDF", { expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("_1", "_2")) - schema <- buildSchema(field(name = "a", type = "integer", nullable = TRUE), - field(name = "b", type = "string", nullable = TRUE)) + schema <- structType(structField(x = "a", type = "integer", nullable = TRUE), + structField(x = "b", type = "string", nullable = TRUE)) df <- toDF(rdd, schema) expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("a", "b")) From 0ab9862a121d933d25da55ce61d2a2fbc5101d01 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Wed, 8 Apr 2015 14:21:17 -0500 Subject: [PATCH 08/14] Fixed duplicate export --- pkg/NAMESPACE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 3b923cce8..004a07cad 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -184,4 +184,4 @@ export("structField", "structType", "structType.jobj", "structType.structField", - "print.structField") + "print.structType") From af21482b014be03a7b8550b4c8771a3ce9294c75 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Wed, 8 Apr 2015 15:18:57 -0500 Subject: [PATCH 09/14] Update `subtract` to work with `generics.R` --- pkg/NAMESPACE | 1 - pkg/R/DataFrame.R | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index a8ff52728..4b5f021ae 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -122,7 +122,6 @@ exportMethods("columns", "show", "showDF", "sortDF", - "subtract", "toJSON", "toRDD", "unionAll", diff --git a/pkg/R/DataFrame.R b/pkg/R/DataFrame.R index 8c9256bdb..cb27decf2 100644 --- a/pkg/R/DataFrame.R +++ b/pkg/R/DataFrame.R @@ -1212,14 +1212,13 @@ setMethod("intersect", #' df2 <- jsonFile(sqlCtx, path2) #' subtractDF <- subtract(df, df2) #' } -setGeneric("subtract", function(x, y) { standardGeneric("subtract") }) #' @rdname subtract #' @export setMethod("subtract", - signature(x = "DataFrame", y = "DataFrame"), - function(x, y) { - subtracted <- callJMethod(x@sdf, "except", y@sdf) + signature(x = "DataFrame", other = "DataFrame"), + function(x, other) { + subtracted <- callJMethod(x@sdf, "except", other@sdf) dataFrame(subtracted) }) From 0c241bfa77668ecfa2a9fba52c6a43be2ea2c4d0 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Fri, 10 Apr 2015 08:45:10 -0500 Subject: [PATCH 10/14] Rename the SQL DataType function --- .../main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala index ca61e249f..80bc6463f 100644 --- a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala @@ -27,7 +27,7 @@ object SQLUtils { StructType(fields) } - def DataTypeObject(dataType: String): DataType = { + def getSQLDataType(dataType: String): DataType = { dataType match { case "byte" => org.apache.spark.sql.types.ByteType case "integer" => org.apache.spark.sql.types.IntegerType @@ -46,7 +46,7 @@ object SQLUtils { } def createStructField(name: String, dataType: String, nullable: Boolean): StructField = { - val dtObj = DataTypeObject(dataType) + val dtObj = getSQLDataType(dataType) StructField(name, dtObj, nullable) } From 243df0ded417d42a698e3872f27f993c22ceb3d3 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Fri, 10 Apr 2015 08:45:20 -0500 Subject: [PATCH 11/14] Fix spacing --- .../src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala index 80bc6463f..2ec1f4eb4 100644 --- a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala @@ -23,7 +23,7 @@ object SQLUtils { arr.toSeq } - def createStructType(fields : Seq[StructField]) : StructType = { + def createStructType(fields : Seq[StructField]): StructType = { StructType(fields) } From 921d64f6d4728c16e1d91adba8d3e54db9869231 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Fri, 10 Apr 2015 08:45:41 -0500 Subject: [PATCH 12/14] Remove `tojson` functions --- pkg/R/schema.R | 27 ------------------- .../berkeley/cs/amplab/sparkr/SQLUtils.scala | 8 ------ 2 files changed, 35 deletions(-) diff --git a/pkg/R/schema.R b/pkg/R/schema.R index 2f6221d9a..7aad74a06 100644 --- a/pkg/R/schema.R +++ b/pkg/R/schema.R @@ -140,30 +140,3 @@ print.structField <- function(x, ...) { ")", sep = "") } - -# cfreeman: Don't think we need this function since we can create -# structType in R and pass to createDataFrame -# -# #' dump the schema into JSON string -# tojson <- function(x) { -# if (inherits(x, "struct")) { -# # schema object -# l <- paste(lapply(x, tojson), collapse = ", ") -# paste('{\"type\":\"struct\", \"fields\":','[', l, ']}', sep = '') -# } else if (inherits(x, "field")) { -# # field object -# names <- names(x) -# items <- lapply(names, function(n) { -# safe_n <- gsub('"', '\\"', n) -# paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '') -# }) -# d <- paste(items, collapse = ", ") -# paste('{', d, '}', sep = '') -# } else if (is.character(x)) { -# paste('"', x, '"', sep = '') -# } else if (is.logical(x)) { -# if (x) "true" else "false" -# } else { -# stop(paste("unexpected type:", class(x))) -# } -# } diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala index 2ec1f4eb4..e07af21d9 100644 --- a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SQLUtils.scala @@ -50,14 +50,6 @@ object SQLUtils { StructField(name, dtObj, nullable) } - // cfreeman: I don't think we need this anymore since we can pass structType from R - // def createDF(rdd: RDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { - // val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] - // val num = schema.fields.size - // val rowRDD = rdd.map(bytesToRow) - // sqlContext.createDataFrame(rowRDD, schema) - // } - def createDF(rdd: RDD[Array[Byte]], schema: StructType, sqlContext: SQLContext): DataFrame = { val num = schema.fields.size val rowRDD = rdd.map(bytesToRow) From afb38cda54049421970cbd2732e86e0a292da3ca Mon Sep 17 00:00:00 2001 From: cafreeman Date: Fri, 10 Apr 2015 08:46:19 -0500 Subject: [PATCH 13/14] Update docs and examples --- pkg/R/schema.R | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/R/schema.R b/pkg/R/schema.R index 7aad74a06..02adf0e8a 100644 --- a/pkg/R/schema.R +++ b/pkg/R/schema.R @@ -1,10 +1,13 @@ +# A set of S3 classes and methods that support the SparkSQL `StructType` and `StructField +# datatypes. These are used to create and interact with DataFrame schemas. + #' structType #' #' Create a structType object that contains the metadata for a DataFrame. Intended for #' use with createDataFrame and toDF. #' -#' @param x a Field object (created with the field() function) -#' @param ... additional Field objects +#' @param x a structField object (created with the field() function) +#' @param ... additional structField objects #' @return a structType object #' @export #' @examples @@ -12,7 +15,7 @@ #' sc <- sparkR.init() #' sqlCtx <- sparkRSQL.init(sc) #' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) -#' schema <- buildSchema(field("a", "integer"), field("b", "string")) +#' schema <- structType(structField("a", "integer"), structField("b", "string")) #' df <- createDataFrame(sqlCtx, rdd, schema) #' } structType <- function(x, ...) { @@ -63,16 +66,16 @@ print.structType <- function(x, ...) { #' @param x The name of the field #' @param type The data type of the field #' @param nullable A logical vector indicating whether or not the field is nullable -#' @return a Field object +#' @return a structField object #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlCtx <- sparkRSQL.init(sc) #' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) -#' field1 <- field("a", "integer", TRUE) -#' field2 <- field("b", "string", TRUE) -#' schema <- buildSchema(field1, field2) +#' field1 <- structField("a", "integer", TRUE) +#' field2 <- structField("b", "string", TRUE) +#' schema <- structType(field1, field2) #' df <- createDataFrame(sqlCtx, rdd, schema) #' } From 50f4c90ab46ada9a835111f19232b3618f8aff54 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Fri, 10 Apr 2015 08:46:36 -0500 Subject: [PATCH 14/14] Use object attribute instead of argument --- pkg/R/schema.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/R/schema.R b/pkg/R/schema.R index 02adf0e8a..0a758f0cc 100644 --- a/pkg/R/schema.R +++ b/pkg/R/schema.R @@ -25,7 +25,7 @@ structType <- function(x, ...) { structType.jobj <- function(x) { obj <- structure(list(), class = "structType") obj$jobj <- x - obj$fields <- function() { lapply(callJMethod(x, "fields"), structField) } + obj$fields <- function() { lapply(callJMethod(obj$jobj, "fields"), structField) } obj }