Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
900bc1f
[SPARK-24371][SQL] Added isInCollection in DataFrame API for Scala an…
dbtsai May 29, 2018
f489388
[SPARK-24365][SQL] Add Data Source write benchmark
gengliangwang May 30, 2018
a4be981
[SPARK-24331][SPARKR][SQL] Adding arrays_overlap, array_repeat, map_e…
mn-mikke May 30, 2018
0ebb0c0
[SPARK-23754][PYTHON] Re-raising StopIteration in client code
e-dorigatti May 30, 2018
9e7bad0
[SPARK-24419][BUILD] Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+
dbtsai May 30, 2018
1e46f92
[SPARK-24369][SQL] Correct handling for multiple distinct aggregation…
maropu May 30, 2018
b142157
[SPARK-24384][PYTHON][SPARK SUBMIT] Add .py files correctly into Pyth…
HyukjinKwon May 30, 2018
ec6f971
[SPARK-23161][PYSPARK][ML] Add missing APIs to Python GBTClassifier
huaxingao May 30, 2018
1b36f14
[SPARK-23901][SQL] Add masking functions
mgaido91 May 30, 2018
24ef7fb
[SPARK-24276][SQL] Order of literals in IN should not affect semantic…
mgaido91 May 30, 2018
0053e15
[SPARK-24337][CORE] Improve error messages for Spark conf values
PenguinToast May 31, 2018
90ae98d
[SPARK-24146][PYSPARK][ML] spark.ml parity for sequential pattern min…
WeichenXu123 May 31, 2018
698b9a0
[WEBUI] Avoid possibility of script in query param keys
srowen May 31, 2018
7a82e93
[SPARK-24414][UI] Calculate the correct number of tasks for a stage.
May 31, 2018
223df5d
[SPARK-24397][PYSPARK] Added TaskContext.getLocalProperty(key) in Python
tdas May 31, 2018
cc976f6
[SPARK-23900][SQL] format_number support user specifed format as argu…
wangyum May 31, 2018
21e1fc7
[SPARK-24232][K8S] Add support for secret env vars
May 31, 2018
2c9c862
[MINOR][YARN] Add YARN-specific credential providers in debug logging…
HyukjinKwon Jun 1, 2018
cbaa729
[SPARK-24330][SQL] Refactor ExecuteWriteTask and Use `while` in writi…
gengliangwang Jun 1, 2018
b2d0226
[SPARK-24444][DOCS][PYTHON] Improve Pandas UDF docs to explain column…
BryanCutler Jun 1, 2018
22df953
[SPARK-24326][MESOS] add support for local:// scheme for the app jar
Jun 1, 2018
98909c3
[SPARK-23920][SQL] add array_remove to remove all elements that equal…
huaxingao Jun 1, 2018
6039b13
[SPARK-24351][SS] offsetLog/commitLog purge thresholdBatchId should b…
ivoson Jun 1, 2018
d2c3de7
Revert "[SPARK-24369][SQL] Correct handling for multiple distinct agg…
gatorsmile Jun 1, 2018
09e78c1
[INFRA] Close stale PRs.
Jun 1, 2018
8ef167a
[SPARK-24340][CORE] Clean up non-shuffle disk block manager files fol…
jiangxb1987 Jun 1, 2018
a36c1a6
[SPARK-23668][K8S] Added missing config property in running-on-kubern…
liyinan926 Jun 2, 2018
de4feae
[SPARK-24356][CORE] Duplicate strings in File.path managed by FileSeg…
misha-cloudera Jun 3, 2018
a2166ec
[SPARK-24455][CORE] fix typo in TaskSchedulerImpl comment
Jun 4, 2018
416cd1f
[SPARK-24369][SQL] Correct handling for multiple distinct aggregation…
cloud-fan Jun 4, 2018
1d9338b
[SPARK-23786][SQL] Checking column names of csv headers
MaxGekk Jun 4, 2018
0be5aa2
[SPARK-23903][SQL] Add support for date extract
wangyum Jun 4, 2018
7297ae0
[SPARK-21896][SQL] Fix StackOverflow caused by window functions insid…
Jun 4, 2018
b24d3db
[SPARK-24290][ML] add support for Array input for instrumentation.log…
lu-wang-dl Jun 4, 2018
ff0501b
[SPARK-24300][ML] change the way to set seed in ml.cluster.LDASuite.g…
lu-wang-dl Jun 4, 2018
dbb4d83
[SPARK-24215][PYSPARK] Implement _repr_html_ for dataframes in PySpark
xuanyuanking Jun 5, 2018
b3417b7
[SPARK-16451][REPL] Fail shell if SparkSession fails to start.
Jun 5, 2018
e8c1a0c
[SPARK-15784] Add Power Iteration Clustering to spark.ml
WeichenXu123 Jun 5, 2018
2c2a86b
[SPARK-24453][SS] Fix error recovering from the failure in a no-data …
tdas Jun 5, 2018
93df3cd
[SPARK-22384][SQL] Refine partition pruning when attribute is wrapped…
Jun 5, 2018
e9efb62
[SPARK-24187][R][SQL] Add array_join function to SparkR
huaxingao Jun 6, 2018
1706fde
Initial version
attilapiros May 29, 2018
d2753a6
introduce factory
attilapiros May 29, 2018
616d601
Extend ProtocolSuite
attilapiros May 30, 2018
acc1e20
add test for fetch to disk
attilapiros May 31, 2018
797f558
tiny fix
attilapiros Jun 1, 2018
76f23cb
Add SASL support for FrameDecoder
attilapiros Jun 5, 2018
365e673
fix
attilapiros Jun 5, 2018
5899663
fix
attilapiros Jun 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,13 @@ exportMethods("%<=>%",
"approxCountDistinct",
"approxQuantile",
"array_contains",
"array_join",
"array_max",
"array_min",
"array_position",
"array_repeat",
"array_sort",
"arrays_overlap",
"asc",
"ascii",
"asin",
Expand Down Expand Up @@ -302,6 +305,7 @@ exportMethods("%<=>%",
"lower",
"lpad",
"ltrim",
"map_entries",
"map_keys",
"map_values",
"max",
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2297,6 +2297,8 @@ setMethod("rename",

setClassUnion("characterOrColumn", c("character", "Column"))

setClassUnion("numericOrColumn", c("numeric", "Column"))

#' Arrange Rows by Variables
#'
#' Sort a SparkDataFrame by the specified column(s).
Expand Down
87 changes: 79 additions & 8 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ NULL
#' the map or array of maps.
#' \item \code{from_json}: it is the column containing the JSON string.
#' }
#' @param y Column to compute on.
#' @param value A value to compute on.
#' \itemize{
#' \item \code{array_contains}: a value to be checked if contained in the column.
Expand All @@ -207,7 +208,7 @@ NULL
#' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
#' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1)))
#' head(select(tmp, array_position(tmp$v1, 21), array_sort(tmp$v1)))
#' head(select(tmp, array_position(tmp$v1, 21), array_repeat(df$mpg, 3), array_sort(tmp$v1)))
#' head(select(tmp, flatten(tmp$v1), reverse(tmp$v1)))
#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
#' head(tmp2)
Expand All @@ -216,12 +217,13 @@ NULL
#' head(select(tmp, sort_array(tmp$v1)))
#' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))
#' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl))
#' head(select(tmp3, map_keys(tmp3$v3)))
#' head(select(tmp3, map_values(tmp3$v3)))
#' head(select(tmp3, map_entries(tmp3$v3), map_keys(tmp3$v3), map_values(tmp3$v3)))
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))
#' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$hp))
#' head(select(tmp4, concat(tmp4$v4, tmp4$v5)))
#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))}
#' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$cyl, df$hp))
#' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, tmp4$v5)))
#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))
#' tmp5 <- mutate(df, v6 = create_array(df$model, df$model))
#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", "NULL")))}
NULL

#' Window functions for Column operations
Expand Down Expand Up @@ -3006,6 +3008,27 @@ setMethod("array_contains",
column(jc)
})

#' @details
#' \code{array_join}: Concatenates the elements of column using the delimiter.
#' Null values are replaced with nullReplacement if set, otherwise they are ignored.
#'
#' @param delimiter a character string that is used to concatenate the elements of column.
#' @param nullReplacement an optional character string that is used to replace the Null values.
#' @rdname column_collection_functions
#' @aliases array_join array_join,Column-method
#' @note array_join since 2.4.0
setMethod("array_join",
signature(x = "Column", delimiter = "character"),
function(x, delimiter, nullReplacement = NULL) {
jc <- if (is.null(nullReplacement)) {
callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter)
} else {
callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter,
as.character(nullReplacement))
}
column(jc)
})

#' @details
#' \code{array_max}: Returns the maximum value of the array.
#'
Expand Down Expand Up @@ -3048,6 +3071,26 @@ setMethod("array_position",
column(jc)
})

#' @details
#' \code{array_repeat}: Creates an array containing \code{x} repeated the number of times
#' given by \code{count}.
#'
#' @param count a Column or constant determining the number of repetitions.
#' @rdname column_collection_functions
#' @aliases array_repeat array_repeat,Column,numericOrColumn-method
#' @note array_repeat since 2.4.0
setMethod("array_repeat",
signature(x = "Column", count = "numericOrColumn"),
function(x, count) {
if (class(count) == "Column") {
count <- count@jc
} else {
count <- as.integer(count)
}
jc <- callJStatic("org.apache.spark.sql.functions", "array_repeat", x@jc, count)
column(jc)
})

#' @details
#' \code{array_sort}: Sorts the input array in ascending order. The elements of the input array
#' must be orderable. NA elements will be placed at the end of the returned array.
Expand All @@ -3062,6 +3105,21 @@ setMethod("array_sort",
column(jc)
})

#' @details
#' \code{arrays_overlap}: Returns true if the input arrays have at least one non-null element in
#' common. If not and both arrays are non-empty and any of them contains a null, it returns null.
#' It returns false otherwise.
#'
#' @rdname column_collection_functions
#' @aliases arrays_overlap arrays_overlap,Column-method
#' @note arrays_overlap since 2.4.0
setMethod("arrays_overlap",
signature(x = "Column", y = "Column"),
function(x, y) {
jc <- callJStatic("org.apache.spark.sql.functions", "arrays_overlap", x@jc, y@jc)
column(jc)
})

#' @details
#' \code{flatten}: Creates a single array from an array of arrays.
#' If a structure of nested arrays is deeper than two levels, only one level of nesting is removed.
Expand All @@ -3076,6 +3134,19 @@ setMethod("flatten",
column(jc)
})

#' @details
#' \code{map_entries}: Returns an unordered array of all entries in the given map.
#'
#' @rdname column_collection_functions
#' @aliases map_entries map_entries,Column-method
#' @note map_entries since 2.4.0
setMethod("map_entries",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "map_entries", x@jc)
column(jc)
})

#' @details
#' \code{map_keys}: Returns an unordered array containing the keys of the map.
#'
Expand Down Expand Up @@ -3149,8 +3220,8 @@ setMethod("size",
#' (or starting from the end if start is negative) with the specified length.
#'
#' @rdname column_collection_functions
#' @param start an index indicating the first element occuring in the result.
#' @param length a number of consecutive elements choosen to the result.
#' @param start an index indicating the first element occurring in the result.
#' @param length a number of consecutive elements chosen to the result.
#' @aliases slice slice,Column-method
#' @note slice since 2.4.0
setMethod("slice",
Expand Down
16 changes: 16 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,10 @@ setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCoun
#' @name NULL
setGeneric("array_contains", function(x, value) { standardGeneric("array_contains") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_join", function(x, delimiter, ...) { standardGeneric("array_join") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_max", function(x) { standardGeneric("array_max") })
Expand All @@ -769,10 +773,18 @@ setGeneric("array_min", function(x) { standardGeneric("array_min") })
#' @name NULL
setGeneric("array_position", function(x, value) { standardGeneric("array_position") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_repeat", function(x, count) { standardGeneric("array_repeat") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_sort", function(x) { standardGeneric("array_sort") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("arrays_overlap", function(x, y) { standardGeneric("arrays_overlap") })

#' @rdname column_string_functions
#' @name NULL
setGeneric("ascii", function(x) { standardGeneric("ascii") })
Expand Down Expand Up @@ -1034,6 +1046,10 @@ setGeneric("lpad", function(x, len, pad) { standardGeneric("lpad") })
#' @name NULL
setGeneric("ltrim", function(x, trimString) { standardGeneric("ltrim") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("map_entries", function(x) { standardGeneric("map_entries") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("map_keys", function(x) { standardGeneric("map_keys") })
Expand Down
37 changes: 36 additions & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,36 @@ test_that("column functions", {
result <- collect(select(df2, reverse(df2[[1]])))[[1]]
expect_equal(result, "cba")

# Test array_repeat()
df <- createDataFrame(list(list("a", 3L), list("b", 2L)))
result <- collect(select(df, array_repeat(df[[1]], df[[2]])))[[1]]
expect_equal(result, list(list("a", "a", "a"), list("b", "b")))

result <- collect(select(df, array_repeat(df[[1]], 2L)))[[1]]
expect_equal(result, list(list("a", "a"), list("b", "b")))

# Test arrays_overlap()
df <- createDataFrame(list(list(list(1L, 2L), list(3L, 1L)),
list(list(1L, 2L), list(3L, 4L)),
list(list(1L, NA), list(3L, 4L))))
result <- collect(select(df, arrays_overlap(df[[1]], df[[2]])))[[1]]
expect_equal(result, c(TRUE, FALSE, NA))

# Test array_join()
df <- createDataFrame(list(list(list("Hello", "World!"))))
result <- collect(select(df, array_join(df[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")
df2 <- createDataFrame(list(list(list("Hello", NA, "World!"))))
result <- collect(select(df2, array_join(df2[[1]], "#", "Beautiful")))[[1]]
expect_equal(result, "Hello#Beautiful#World!")
result <- collect(select(df2, array_join(df2[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")
df3 <- createDataFrame(list(list(list("Hello", NULL, "World!"))))
result <- collect(select(df3, array_join(df3[[1]], "#", "Beautiful")))[[1]]
expect_equal(result, "Hello#Beautiful#World!")
result <- collect(select(df3, array_join(df3[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")

# Test array_sort() and sort_array()
df <- createDataFrame(list(list(list(2L, 1L, 3L, NA)), list(list(NA, 6L, 5L, NA, 4L))))

Expand Down Expand Up @@ -1531,8 +1561,13 @@ test_that("column functions", {
result <- collect(select(df, flatten(df[[1]])))[[1]]
expect_equal(result, list(list(1L, 2L, 3L, 4L), list(5L, 6L, 7L, 8L)))

# Test map_keys(), map_values() and element_at()
# Test map_entries(), map_keys(), map_values() and element_at()
df <- createDataFrame(list(list(map = as.environment(list(x = 1, y = 2)))))
result <- collect(select(df, map_entries(df$map)))[[1]]
expected_entries <- list(listToStruct(list(key = "x", value = 1)),
listToStruct(list(key = "y", value = 2)))
expect_equal(result, list(expected_entries))

result <- collect(select(df, map_keys(df$map)))[[1]]
expect_equal(result, list(list("x", "y")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public TransportChannelHandler initializePipeline(
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast(TransportFrameDecoder.HANDLER_NAME,
NettyUtils.createFrameDecoder(conf.maxRemoteBlockSizeFetchToMem(), false))
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.network.client;

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.nio.ByteBuffer;

Expand All @@ -28,13 +30,13 @@
* The network library guarantees that a single thread will call these methods at a time, but
* different call may be made by different threads.
*/
public interface StreamCallback {
public interface StreamCallback<T> {
/** Called upon receipt of stream data. */
void onData(String streamId, ByteBuffer buf) throws IOException;
void onData(T streamId, ByteBuffer buf) throws IOException;

/** Called when all data from the stream has been received. */
void onComplete(String streamId) throws IOException;
void onComplete(T streamId) throws IOException;

/** Called if there's an error reading data from the stream. */
void onFailure(String streamId, Throwable cause) throws IOException;
void onFailure(T streamId, Throwable cause) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
* An interceptor that is registered with the frame decoder to feed stream data to a
* callback.
*/
class StreamInterceptor implements TransportFrameDecoder.Interceptor {
class StreamInterceptor<T> implements TransportFrameDecoder.Interceptor {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a pretty simple mapping between StreamChunkId used by ChunkFetchSuccess and the String id used by StreamResponse -- I think if you just used that you could eliminate a lot of these changes

https://github.com/apache/spark/blob/master/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L129

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private final TransportResponseHandler handler;
private final String streamId;
private final T streamId;
private final long byteCount;
private final StreamCallback callback;
private final StreamCallback<T> callback;
private long bytesRead;

StreamInterceptor(
TransportResponseHandler handler,
String streamId,
T streamId,
long byteCount,
StreamCallback callback) {
this.handler = handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -132,14 +133,15 @@ public void setClientId(String id) {
public void fetchChunk(
long streamId,
int chunkIndex,
ChunkReceivedCallback callback) {
ChunkReceivedCallback callback,
Supplier<StreamCallback<StreamChunkId>> streamCallbackFactory) {
long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}

StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);
handler.addFetchRequest(streamChunkId, callback, streamCallbackFactory);

channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
if (future.isSuccess()) {
Expand Down Expand Up @@ -169,7 +171,7 @@ public void fetchChunk(
* @param streamId The stream to fetch.
* @param callback Object to call with the stream data.
*/
public void stream(String streamId, StreamCallback callback) {
public void stream(String streamId, StreamCallback<String> callback) {
long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
Expand Down
Loading