Skip to content

Commit b186dcd

Browse files
committed
Merge remote-tracking branch 'upstream/master' into refactorDDLSuite
2 parents b905191 + fbc4058 commit b186dcd

File tree

1,253 files changed

+40725
-17215
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,253 files changed

+40725
-17215
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependency-reduced-pom.xml
4242
derby.log
4343
dev/create-release/*final
4444
dev/create-release/*txt
45+
dev/pr-deps/
4546
dist/
4647
docs/_site
4748
docs/api

.travis.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ dist: trusty
2828
# 2. Choose language and target JDKs for parallel builds.
2929
language: java
3030
jdk:
31-
- oraclejdk7
3231
- oraclejdk8
3332

3433
# 3. Setup cache directory for SBT and Maven.
@@ -44,7 +43,7 @@ notifications:
4443
# 5. Run maven install before running lint-java.
4544
install:
4645
- export MAVEN_SKIP_RC=1
47-
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
46+
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
4847

4948
# 6. Run lint-java.
5049
script:

R/WINDOWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ To run the SparkR unit tests on Windows, the following steps are required —ass
3838

3939
```
4040
R -e "install.packages('testthat', repos='http://cran.us.r-project.org')"
41-
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
41+
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R
4242
```
4343

R/pkg/NAMESPACE

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
118
# Imports from base R
219
# Do not include stats:: "rpois", "runif" - causes error at runtime
320
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
@@ -47,7 +64,9 @@ exportMethods("glm",
4764
"spark.kstest",
4865
"spark.logit",
4966
"spark.randomForest",
50-
"spark.gbt")
67+
"spark.gbt",
68+
"spark.bisectingKmeans",
69+
"spark.svmLinear")
5170

5271
# Job group lifecycle management methods
5372
export("setJobGroup",
@@ -63,6 +82,7 @@ exportMethods("arrange",
6382
"as.data.frame",
6483
"attach",
6584
"cache",
85+
"coalesce",
6686
"collect",
6787
"colnames",
6888
"colnames<-",
@@ -94,6 +114,7 @@ exportMethods("arrange",
94114
"freqItems",
95115
"gapply",
96116
"gapplyCollect",
117+
"getNumPartitions",
97118
"group_by",
98119
"groupBy",
99120
"head",
@@ -306,6 +327,7 @@ exportMethods("%in%",
306327
"toDegrees",
307328
"toRadians",
308329
"to_date",
330+
"to_timestamp",
309331
"to_utc_timestamp",
310332
"translate",
311333
"trim",

R/pkg/R/DataFrame.R

Lines changed: 127 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ setMethod("dtypes",
280280

281281
#' Column Names of SparkDataFrame
282282
#'
283-
#' Return all column names as a list.
283+
#' Return a vector of column names.
284284
#'
285285
#' @param x a SparkDataFrame.
286286
#'
@@ -323,10 +323,8 @@ setMethod("names",
323323
setMethod("names<-",
324324
signature(x = "SparkDataFrame"),
325325
function(x, value) {
326-
if (!is.null(value)) {
327-
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
328-
dataFrame(sdf)
329-
}
326+
colnames(x) <- value
327+
x
330328
})
331329

332330
#' @rdname columns
@@ -340,7 +338,7 @@ setMethod("colnames",
340338
})
341339

342340
#' @param value a character vector. Must have the same length as the number
343-
#' of columns in the SparkDataFrame.
341+
#' of columns to be renamed.
344342
#' @rdname columns
345343
#' @aliases colnames<-,SparkDataFrame-method
346344
#' @name colnames<-
@@ -417,7 +415,7 @@ setMethod("coltypes",
417415
type <- PRIMITIVE_TYPES[[specialtype]]
418416
}
419417
}
420-
type
418+
type[[1]]
421419
})
422420

423421
# Find which types don't have mapping to R
@@ -680,14 +678,53 @@ setMethod("storageLevel",
680678
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
681679
})
682680

681+
#' Coalesce
682+
#'
683+
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
684+
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
685+
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
686+
#' the current partitions. If a larger number of partitions is requested, it will stay at the
687+
#' current number of partitions.
688+
#'
689+
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
690+
#' this may result in your computation taking place on fewer nodes than
691+
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
692+
#' call \code{repartition}. This will add a shuffle step, but means the
693+
#' current upstream partitions will be executed in parallel (per whatever
694+
#' the current partitioning is).
695+
#'
696+
#' @param numPartitions the number of partitions to use.
697+
#'
698+
#' @family SparkDataFrame functions
699+
#' @rdname coalesce
700+
#' @name coalesce
701+
#' @aliases coalesce,SparkDataFrame-method
702+
#' @seealso \link{repartition}
703+
#' @export
704+
#' @examples
705+
#'\dontrun{
706+
#' sparkR.session()
707+
#' path <- "path/to/file.json"
708+
#' df <- read.json(path)
709+
#' newDF <- coalesce(df, 1L)
710+
#'}
711+
#' @note coalesce(SparkDataFrame) since 2.1.1
712+
setMethod("coalesce",
713+
signature(x = "SparkDataFrame"),
714+
function(x, numPartitions) {
715+
stopifnot(is.numeric(numPartitions))
716+
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
717+
dataFrame(sdf)
718+
})
719+
683720
#' Repartition
684721
#'
685722
#' The following options for repartition are possible:
686723
#' \itemize{
687-
#' \item{1.} {Return a new SparkDataFrame partitioned by
724+
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
725+
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
688726
#' the given columns into \code{numPartitions}.}
689-
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
690-
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
727+
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
691728
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
692729
#'}
693730
#' @param x a SparkDataFrame.
@@ -699,6 +736,7 @@ setMethod("storageLevel",
699736
#' @rdname repartition
700737
#' @name repartition
701738
#' @aliases repartition,SparkDataFrame-method
739+
#' @seealso \link{coalesce}
702740
#' @export
703741
#' @examples
704742
#'\dontrun{
@@ -1138,6 +1176,7 @@ setMethod("collect",
11381176
if (!is.null(PRIMITIVE_TYPES[[colType]]) && colType != "binary") {
11391177
vec <- do.call(c, col)
11401178
stopifnot(class(vec) != "list")
1179+
class(vec) <- PRIMITIVE_TYPES[[colType]]
11411180
df[[colIndex]] <- vec
11421181
} else {
11431182
df[[colIndex]] <- col
@@ -1717,6 +1756,23 @@ getColumn <- function(x, c) {
17171756
column(callJMethod(x@sdf, "col", c))
17181757
}
17191758

1759+
setColumn <- function(x, c, value) {
1760+
if (class(value) != "Column" && !is.null(value)) {
1761+
if (isAtomicLengthOne(value)) {
1762+
value <- lit(value)
1763+
} else {
1764+
stop("value must be a Column, literal value as atomic in length of 1, or NULL")
1765+
}
1766+
}
1767+
1768+
if (is.null(value)) {
1769+
nx <- drop(x, c)
1770+
} else {
1771+
nx <- withColumn(x, c, value)
1772+
}
1773+
nx
1774+
}
1775+
17201776
#' @param name name of a Column (without being wrapped by \code{""}).
17211777
#' @rdname select
17221778
#' @name $
@@ -1735,19 +1791,7 @@ setMethod("$", signature(x = "SparkDataFrame"),
17351791
#' @note $<- since 1.4.0
17361792
setMethod("$<-", signature(x = "SparkDataFrame"),
17371793
function(x, name, value) {
1738-
if (class(value) != "Column" && !is.null(value)) {
1739-
if (isAtomicLengthOne(value)) {
1740-
value <- lit(value)
1741-
} else {
1742-
stop("value must be a Column, literal value as atomic in length of 1, or NULL")
1743-
}
1744-
}
1745-
1746-
if (is.null(value)) {
1747-
nx <- drop(x, name)
1748-
} else {
1749-
nx <- withColumn(x, name, value)
1750-
}
1794+
nx <- setColumn(x, name, value)
17511795
x@sdf <- nx@sdf
17521796
x
17531797
})
@@ -1760,13 +1804,36 @@ setClassUnion("numericOrcharacter", c("numeric", "character"))
17601804
#' @note [[ since 1.4.0
17611805
setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17621806
function(x, i) {
1807+
if (length(i) > 1) {
1808+
warning("Subset index has length > 1. Only the first index is used.")
1809+
i <- i[1]
1810+
}
17631811
if (is.numeric(i)) {
17641812
cols <- columns(x)
17651813
i <- cols[[i]]
17661814
}
17671815
getColumn(x, i)
17681816
})
17691817

1818+
#' @rdname subset
1819+
#' @name [[<-
1820+
#' @aliases [[<-,SparkDataFrame,numericOrcharacter-method
1821+
#' @note [[<- since 2.1.1
1822+
setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
1823+
function(x, i, value) {
1824+
if (length(i) > 1) {
1825+
warning("Subset index has length > 1. Only the first index is used.")
1826+
i <- i[1]
1827+
}
1828+
if (is.numeric(i)) {
1829+
cols <- columns(x)
1830+
i <- cols[[i]]
1831+
}
1832+
nx <- setColumn(x, i, value)
1833+
x@sdf <- nx@sdf
1834+
x
1835+
})
1836+
17701837
#' @rdname subset
17711838
#' @name [
17721839
#' @aliases [,SparkDataFrame-method
@@ -1811,14 +1878,19 @@ setMethod("[", signature(x = "SparkDataFrame"),
18111878
#' Return subsets of SparkDataFrame according to given conditions
18121879
#' @param x a SparkDataFrame.
18131880
#' @param i,subset (Optional) a logical expression to filter on rows.
1881+
#' For extract operator [[ and replacement operator [[<-, the indexing parameter for
1882+
#' a single Column.
18141883
#' @param j,select expression for the single Column or a list of columns to select from the SparkDataFrame.
18151884
#' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
18161885
#' Otherwise, a SparkDataFrame will always be returned.
1886+
#' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
1887+
#' If \code{NULL}, the specified Column is dropped.
18171888
#' @param ... currently not used.
18181889
#' @return A new SparkDataFrame containing only the rows that meet the condition with selected columns.
18191890
#' @export
18201891
#' @family SparkDataFrame functions
18211892
#' @aliases subset,SparkDataFrame-method
1893+
#' @seealso \link{withColumn}
18221894
#' @rdname subset
18231895
#' @name subset
18241896
#' @family subsetting functions
@@ -1836,6 +1908,10 @@ setMethod("[", signature(x = "SparkDataFrame"),
18361908
#' subset(df, df$age %in% c(19, 30), 1:2)
18371909
#' subset(df, df$age %in% c(19), select = c(1,2))
18381910
#' subset(df, select = c(1,2))
1911+
#' # Columns can be selected and set
1912+
#' df[["age"]] <- 23
1913+
#' df[[1]] <- df$age
1914+
#' df[[2]] <- NULL # drop column
18391915
#' }
18401916
#' @note subset since 1.5.0
18411917
setMethod("subset", signature(x = "SparkDataFrame"),
@@ -1960,7 +2036,7 @@ setMethod("selectExpr",
19602036
#' @aliases withColumn,SparkDataFrame,character-method
19612037
#' @rdname withColumn
19622038
#' @name withColumn
1963-
#' @seealso \link{rename} \link{mutate}
2039+
#' @seealso \link{rename} \link{mutate} \link{subset}
19642040
#' @export
19652041
#' @examples
19662042
#'\dontrun{
@@ -1971,6 +2047,10 @@ setMethod("selectExpr",
19712047
#' # Replace an existing column
19722048
#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
19732049
#' newDF3 <- withColumn(newDF, "newCol", 42)
2050+
#' # Use extract operator to set an existing or new column
2051+
#' df[["age"]] <- 23
2052+
#' df[[2]] <- df$col1
2053+
#' df[[2]] <- NULL # drop column
19742054
#' }
19752055
#' @note withColumn since 1.4.0
19762056
setMethod("withColumn",
@@ -3406,3 +3486,26 @@ setMethod("randomSplit",
34063486
}
34073487
sapply(sdfs, dataFrame)
34083488
})
3489+
3490+
#' getNumPartitions
3491+
#'
3492+
#' Return the number of partitions
3493+
#'
3494+
#' @param x A SparkDataFrame
3495+
#' @family SparkDataFrame functions
3496+
#' @aliases getNumPartitions,SparkDataFrame-method
3497+
#' @rdname getNumPartitions
3498+
#' @name getNumPartitions
3499+
#' @export
3500+
#' @examples
3501+
#'\dontrun{
3502+
#' sparkR.session()
3503+
#' df <- createDataFrame(cars, numPartitions = 2)
3504+
#' getNumPartitions(df)
3505+
#' }
3506+
#' @note getNumPartitions since 2.1.1
3507+
setMethod("getNumPartitions",
3508+
signature(x = "SparkDataFrame"),
3509+
function(x) {
3510+
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
3511+
})

0 commit comments

Comments
 (0)