Skip to content

Commit 6c5c594

Browse files
felixcheungFelix Cheung
authored andcommitted
[SPARK-20015][SPARKR][SS][DOC][EXAMPLE] Document R Structured Streaming (experimental) in R vignettes and R & SS programming guide, R example
Add - R vignettes - R programming guide - SS programming guide - R example Also disable spark.als in vignettes for now since it's failing (SPARK-20402) manually Author: Felix Cheung <[email protected]> Closes #17814 from felixcheung/rdocss. (cherry picked from commit b8302cc) Signed-off-by: Felix Cheung <[email protected]>
1 parent 5fe9313 commit 6c5c594

File tree

4 files changed

+380
-43
lines changed

4 files changed

+380
-43
lines changed

R/pkg/vignettes/sparkr-vignettes.Rmd

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ head(df)
182182
```
183183

184184
### Data Sources
185-
SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL programming guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.
185+
SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL Programming Guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.
186186

187187
The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`.
188188

@@ -232,7 +232,7 @@ write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite"
232232
```
233233

234234
### Hive Tables
235-
You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
235+
You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
236236

237237
```{r, eval=FALSE}
238238
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
@@ -657,6 +657,7 @@ head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction
657657
Survival analysis studies the expected duration of time until an event happens, and often the relationship with risk factors or treatment taken on the subject. In contrast to standard regression analysis, survival modeling has to deal with special characteristics in the data including non-negative survival time and censoring.
658658

659659
Accelerated Failure Time (AFT) model is a parametric survival model for censored data that assumes the effect of a covariate is to accelerate or decelerate the life course of an event by some constant. For more information, refer to the Wikipedia page [AFT Model](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) and the references there. Different from a [Proportional Hazards Model](https://en.wikipedia.org/wiki/Proportional_hazards_model) designed for the same purpose, the AFT model is easier to parallelize because each instance contributes to the objective function independently.
660+
660661
```{r, warning=FALSE}
661662
library(survival)
662663
ovarianDF <- createDataFrame(ovarian)
@@ -887,15 +888,15 @@ perplexity
887888

888889
There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.
889890

890-
```{r}
891+
```{r, eval=FALSE}
891892
ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
892893
list(2, 1, 1.0), list(2, 2, 5.0))
893894
df <- createDataFrame(ratings, c("user", "item", "rating"))
894895
model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegative = TRUE)
895896
```
896897

897898
Extract latent factors.
898-
```{r}
899+
```{r, eval=FALSE}
899900
stats <- summary(model)
900901
userFactors <- stats$userFactors
901902
itemFactors <- stats$itemFactors
@@ -905,7 +906,7 @@ head(itemFactors)
905906

906907
Make predictions.
907908

908-
```{r}
909+
```{r, eval=FALSE}
909910
predicted <- predict(model, df)
910911
head(predicted)
911912
```
@@ -987,6 +988,72 @@ unlink(modelPath)
987988
```
988989

989990

991+
## Structured Streaming
992+
993+
SparkR supports the Structured Streaming API (experimental).
994+
995+
You can check the Structured Streaming Programming Guide for [an introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model) to its programming model and basic concepts.
996+
997+
### Simple Source and Sink
998+
999+
Spark has a few built-in input sources. As an example, to test with a socket source reading text into words and displaying the computed word counts:
1000+
1001+
```{r, eval=FALSE}
1002+
# Create DataFrame representing the stream of input lines from connection
1003+
lines <- read.stream("socket", host = hostname, port = port)
1004+
1005+
# Split the lines into words
1006+
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
1007+
1008+
# Generate running word count
1009+
wordCounts <- count(groupBy(words, "word"))
1010+
1011+
# Start running the query that prints the running counts to the console
1012+
query <- write.stream(wordCounts, "console", outputMode = "complete")
1013+
```
1014+
1015+
### Kafka Source
1016+
1017+
It is simple to read data from Kafka. For more information, see [Input Sources](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) supported by Structured Streaming.
1018+
1019+
```{r, eval=FALSE}
1020+
topic <- read.stream("kafka",
1021+
kafka.bootstrap.servers = "host1:port1,host2:port2",
1022+
subscribe = "topic1")
1023+
keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")
1024+
```
1025+
1026+
### Operations and Sinks
1027+
1028+
Most of the common operations on `SparkDataFrame` are supported for streaming, including selection, projection, and aggregation. Once you have defined the final result, to start the streaming computation, you will call the `write.stream` method setting a sink and `outputMode`.
1029+
1030+
A streaming `SparkDataFrame` can be written for debugging to the console, to a temporary in-memory table, or for further processing in a fault-tolerant manner to a File Sink in different formats.
1031+
1032+
```{r, eval=FALSE}
1033+
noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")
1034+
1035+
# Print new data to console
1036+
write.stream(noAggDF, "console")
1037+
1038+
# Write new data to Parquet files
1039+
write.stream(noAggDF,
1040+
"parquet",
1041+
path = "path/to/destination/dir",
1042+
checkpointLocation = "path/to/checkpoint/dir")
1043+
1044+
# Aggregate
1045+
aggDF <- count(groupBy(noAggDF, "device"))
1046+
1047+
# Print updated aggregations to console
1048+
write.stream(aggDF, "console", outputMode = "complete")
1049+
1050+
# Have all the aggregates in an in memory table. The query name will be the table name
1051+
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
1052+
1053+
head(sql("select * from aggregates"))
1054+
```
1055+
1056+
9901057
## Advanced Topics
9911058

9921059
### SparkR Object Classes

docs/sparkr.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,10 @@ The following example shows how to save/load a MLlib model by SparkR.
559559
</tr>
560560
</table>
561561

562+
# Structured Streaming
563+
564+
SparkR supports the Structured Streaming API (experimental). Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html)
565+
562566
# R Function Name Conflicts
563567

564568
When loading and attaching a new package in R, it is possible to have a name [conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html), where a

0 commit comments

Comments
 (0)