Skip to content

Commit 1ddb82e

Browse files
authored
Merge branch 'master' into sampling
2 parents c4008cd + 990af63 commit 1ddb82e

File tree

289 files changed

+10607
-3885
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

289 files changed

+10607
-3885
lines changed

R/pkg/DESCRIPTION

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Collate:
5151
'serialize.R'
5252
'sparkR.R'
5353
'stats.R'
54+
'streaming.R'
5455
'types.R'
5556
'utils.R'
5657
'window.R'

R/pkg/NAMESPACE

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ exportMethods("arrange",
121121
"insertInto",
122122
"intersect",
123123
"isLocal",
124+
"isStreaming",
124125
"join",
125126
"limit",
126127
"merge",
@@ -169,6 +170,7 @@ exportMethods("arrange",
169170
"write.json",
170171
"write.orc",
171172
"write.parquet",
173+
"write.stream",
172174
"write.text",
173175
"write.ml")
174176

@@ -365,6 +367,7 @@ export("as.DataFrame",
365367
"read.json",
366368
"read.orc",
367369
"read.parquet",
370+
"read.stream",
368371
"read.text",
369372
"spark.lapply",
370373
"spark.addFile",
@@ -402,6 +405,16 @@ export("partitionBy",
402405
export("windowPartitionBy",
403406
"windowOrderBy")
404407

408+
exportClasses("StreamingQuery")
409+
410+
export("awaitTermination",
411+
"isActive",
412+
"lastProgress",
413+
"queryName",
414+
"status",
415+
"stopQuery")
416+
417+
405418
S3method(print, jobj)
406419
S3method(print, structField)
407420
S3method(print, structType)

R/pkg/R/DataFrame.R

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,6 @@ setMethod("schema",
133133
#'
134134
#' Print the logical and physical Catalyst plans to the console for debugging.
135135
#'
136-
#' @param x a SparkDataFrame.
137-
#' @param extended Logical. If extended is FALSE, explain() only prints the physical plan.
138-
#' @param ... further arguments to be passed to or from other methods.
139136
#' @family SparkDataFrame functions
140137
#' @aliases explain,SparkDataFrame-method
141138
#' @rdname explain
@@ -3515,3 +3512,104 @@ setMethod("getNumPartitions",
35153512
function(x) {
35163513
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
35173514
})
3515+
3516+
#' isStreaming
3517+
#'
3518+
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
3519+
#' as it arrives.
3520+
#'
3521+
#' @param x A SparkDataFrame
3522+
#' @return TRUE if this SparkDataFrame is from a streaming source
3523+
#' @family SparkDataFrame functions
3524+
#' @aliases isStreaming,SparkDataFrame-method
3525+
#' @rdname isStreaming
3526+
#' @name isStreaming
3527+
#' @seealso \link{read.stream} \link{write.stream}
3528+
#' @export
3529+
#' @examples
3530+
#'\dontrun{
3531+
#' sparkR.session()
3532+
#' df <- read.stream("socket", host = "localhost", port = 9999)
3533+
#' isStreaming(df)
3534+
#' }
3535+
#' @note isStreaming since 2.2.0
3536+
#' @note experimental
3537+
setMethod("isStreaming",
3538+
signature(x = "SparkDataFrame"),
3539+
function(x) {
3540+
callJMethod(x@sdf, "isStreaming")
3541+
})
3542+
3543+
#' Write the streaming SparkDataFrame to a data source.
3544+
#'
3545+
#' The data source is specified by the \code{source} and a set of options (...).
3546+
#' If \code{source} is not specified, the default data source configured by
3547+
#' spark.sql.sources.default will be used.
3548+
#'
3549+
#' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a
3550+
#' output data source. There are three modes:
3551+
#' \itemize{
3552+
#' \item append: Only the new rows in the streaming SparkDataFrame will be written out. This
3553+
#' output mode can be only be used in queries that do not contain any aggregation.
3554+
#' \item complete: All the rows in the streaming SparkDataFrame will be written out every time
3555+
#' there are some updates. This output mode can only be used in queries that
3556+
#' contain aggregations.
3557+
#' \item update: Only the rows that were updated in the streaming SparkDataFrame will be written
3558+
#' out every time there are some updates. If the query doesn't contain aggregations,
3559+
#' it will be equivalent to \code{append} mode.
3560+
#' }
3561+
#'
3562+
#' @param df a streaming SparkDataFrame.
3563+
#' @param source a name for external data source.
3564+
#' @param outputMode one of 'append', 'complete', 'update'.
3565+
#' @param ... additional argument(s) passed to the method.
3566+
#'
3567+
#' @family SparkDataFrame functions
3568+
#' @seealso \link{read.stream}
3569+
#' @aliases write.stream,SparkDataFrame-method
3570+
#' @rdname write.stream
3571+
#' @name write.stream
3572+
#' @export
3573+
#' @examples
3574+
#'\dontrun{
3575+
#' sparkR.session()
3576+
#' df <- read.stream("socket", host = "localhost", port = 9999)
3577+
#' isStreaming(df)
3578+
#' wordCounts <- count(group_by(df, "value"))
3579+
#'
3580+
#' # console
3581+
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
3582+
#' # text stream
3583+
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
3584+
#' # memory stream
3585+
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
3586+
#' head(sql("SELECT * from outs"))
3587+
#' queryName(q)
3588+
#'
3589+
#' stopQuery(q)
3590+
#' }
3591+
#' @note write.stream since 2.2.0
3592+
#' @note experimental
3593+
setMethod("write.stream",
3594+
signature(df = "SparkDataFrame"),
3595+
function(df, source = NULL, outputMode = NULL, ...) {
3596+
if (!is.null(source) && !is.character(source)) {
3597+
stop("source should be character, NULL or omitted. It is the data source specified ",
3598+
"in 'spark.sql.sources.default' configuration by default.")
3599+
}
3600+
if (!is.null(outputMode) && !is.character(outputMode)) {
3601+
stop("outputMode should be charactor or omitted.")
3602+
}
3603+
if (is.null(source)) {
3604+
source <- getDefaultSqlSource()
3605+
}
3606+
options <- varargsToStrEnv(...)
3607+
write <- handledCallJMethod(df@sdf, "writeStream")
3608+
write <- callJMethod(write, "format", source)
3609+
if (!is.null(outputMode)) {
3610+
write <- callJMethod(write, "outputMode", outputMode)
3611+
}
3612+
write <- callJMethod(write, "options", options)
3613+
ssq <- handledCallJMethod(write, "start")
3614+
streamingQuery(ssq)
3615+
})

R/pkg/R/SQLContext.R

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,3 +937,53 @@ read.jdbc <- function(url, tableName,
937937
}
938938
dataFrame(sdf)
939939
}
940+
941+
#' Load a streaming SparkDataFrame
942+
#'
943+
#' Returns the dataset in a data source as a SparkDataFrame
944+
#'
945+
#' The data source is specified by the \code{source} and a set of options(...).
946+
#' If \code{source} is not specified, the default data source configured by
947+
#' "spark.sql.sources.default" will be used.
948+
#'
949+
#' @param source The name of external data source
950+
#' @param schema The data schema defined in structType, this is required for file-based streaming
951+
#' data source
952+
#' @param ... additional external data source specific named options, for instance \code{path} for
953+
#' file-based streaming data source
954+
#' @return SparkDataFrame
955+
#' @rdname read.stream
956+
#' @name read.stream
957+
#' @seealso \link{write.stream}
958+
#' @export
959+
#' @examples
960+
#'\dontrun{
961+
#' sparkR.session()
962+
#' df <- read.stream("socket", host = "localhost", port = 9999)
963+
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
964+
#'
965+
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
966+
#' }
967+
#' @name read.stream
968+
#' @note read.stream since 2.2.0
969+
#' @note experimental
970+
read.stream <- function(source = NULL, schema = NULL, ...) {
971+
sparkSession <- getSparkSession()
972+
if (!is.null(source) && !is.character(source)) {
973+
stop("source should be character, NULL or omitted. It is the data source specified ",
974+
"in 'spark.sql.sources.default' configuration by default.")
975+
}
976+
if (is.null(source)) {
977+
source <- getDefaultSqlSource()
978+
}
979+
options <- varargsToStrEnv(...)
980+
read <- callJMethod(sparkSession, "readStream")
981+
read <- callJMethod(read, "format", source)
982+
if (!is.null(schema)) {
983+
stopifnot(class(schema) == "structType")
984+
read <- callJMethod(read, "schema", schema$jobj)
985+
}
986+
read <- callJMethod(read, "options", options)
987+
sdf <- handledCallJMethod(read, "load")
988+
dataFrame(callJMethod(sdf, "toDF"))
989+
}

R/pkg/R/functions.R

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2437,6 +2437,7 @@ setMethod("date_format", signature(y = "Column", x = "character"),
24372437
#'
24382438
#' @param x Column containing the JSON string.
24392439
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
2440+
#' @param asJsonArray indicating if input string is JSON array of objects or a single object.
24402441
#' @param ... additional named properties to control how the json is parsed, accepts the same
24412442
#' options as the JSON data source.
24422443
#'
@@ -2452,11 +2453,18 @@ setMethod("date_format", signature(y = "Column", x = "character"),
24522453
#'}
24532454
#' @note from_json since 2.2.0
24542455
setMethod("from_json", signature(x = "Column", schema = "structType"),
2455-
function(x, schema, ...) {
2456+
function(x, schema, asJsonArray = FALSE, ...) {
2457+
if (asJsonArray) {
2458+
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
2459+
"createArrayType",
2460+
schema$jobj)
2461+
} else {
2462+
jschema <- schema$jobj
2463+
}
24562464
options <- varargsToStrEnv(...)
24572465
jc <- callJStatic("org.apache.spark.sql.functions",
24582466
"from_json",
2459-
x@jc, schema$jobj, options)
2467+
x@jc, jschema, options)
24602468
column(jc)
24612469
})
24622470

R/pkg/R/generics.R

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,9 @@ setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
539539

540540
#' @rdname explain
541541
#' @export
542+
#' @param x a SparkDataFrame or a StreamingQuery.
543+
#' @param extended Logical. If extended is FALSE, prints only the physical plan.
544+
#' @param ... further arguments to be passed to or from other methods.
542545
setGeneric("explain", function(x, ...) { standardGeneric("explain") })
543546

544547
#' @rdname except
@@ -577,6 +580,10 @@ setGeneric("intersect", function(x, y) { standardGeneric("intersect") })
577580
#' @export
578581
setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
579582

583+
#' @rdname isStreaming
584+
#' @export
585+
setGeneric("isStreaming", function(x) { standardGeneric("isStreaming") })
586+
580587
#' @rdname limit
581588
#' @export
582589
setGeneric("limit", function(x, num) {standardGeneric("limit") })
@@ -682,6 +689,12 @@ setGeneric("write.parquet", function(x, path, ...) {
682689
#' @export
683690
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
684691

692+
#' @rdname write.stream
693+
#' @export
694+
setGeneric("write.stream", function(df, source = NULL, outputMode = NULL, ...) {
695+
standardGeneric("write.stream")
696+
})
697+
685698
#' @rdname write.text
686699
#' @export
687700
setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })
@@ -1428,10 +1441,36 @@ setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark
14281441
#' @export
14291442
setGeneric("spark.perplexity", function(object, data) { standardGeneric("spark.perplexity") })
14301443

1431-
14321444
#' @param object a fitted ML model object.
14331445
#' @param path the directory where the model is saved.
14341446
#' @param ... additional argument(s) passed to the method.
14351447
#' @rdname write.ml
14361448
#' @export
14371449
setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") })
1450+
1451+
1452+
###################### Streaming Methods ##########################
1453+
1454+
#' @rdname awaitTermination
1455+
#' @export
1456+
setGeneric("awaitTermination", function(x, timeout) { standardGeneric("awaitTermination") })
1457+
1458+
#' @rdname isActive
1459+
#' @export
1460+
setGeneric("isActive", function(x) { standardGeneric("isActive") })
1461+
1462+
#' @rdname lastProgress
1463+
#' @export
1464+
setGeneric("lastProgress", function(x) { standardGeneric("lastProgress") })
1465+
1466+
#' @rdname queryName
1467+
#' @export
1468+
setGeneric("queryName", function(x) { standardGeneric("queryName") })
1469+
1470+
#' @rdname status
1471+
#' @export
1472+
setGeneric("status", function(x) { standardGeneric("status") })
1473+
1474+
#' @rdname stopQuery
1475+
#' @export
1476+
setGeneric("stopQuery", function(x) { standardGeneric("stopQuery") })

0 commit comments

Comments
 (0)