Skip to content

Commit 1fa29c2

Browse files
falakishivaram
authored andcommitted
[SPARK-8452] [SPARKR] expose jobGroup API in SparkR
This pull request adds following methods to SparkR: ```R setJobGroup() cancelJobGroup() clearJobGroup() ``` For each method, the spark context is passed as the first argument. There does not seem to be a good way to test these in R. cc shivaram and davies Author: Hossein <[email protected]> Closes apache#6889 from falaki/SPARK-8452 and squashes the following commits: 9ce9f1e [Hossein] Added basic tests to verify methods can be called and won't throw errors c706af9 [Hossein] Added examples a2c19af [Hossein] taking spark context as first argument 343ca77 [Hossein] Added setJobGroup, cancelJobGroup and clearJobGroup to SparkR
1 parent 54976e5 commit 1fa29c2

File tree

3 files changed

+56
-0
lines changed

3 files changed

+56
-0
lines changed

R/pkg/NAMESPACE

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ export("sparkR.init")
1010
export("sparkR.stop")
1111
export("print.jobj")
1212

13+
# Job group lifecycle management methods
14+
export("setJobGroup",
15+
"clearJobGroup",
16+
"cancelJobGroup")
17+
1318
exportClasses("DataFrame")
1419

1520
exportMethods("arrange",

R/pkg/R/sparkR.R

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,3 +278,47 @@ sparkRHive.init <- function(jsc = NULL) {
278278
assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
279279
hiveCtx
280280
}
281+
282+
#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
283+
#' different value or cleared.
284+
#'
285+
#' @param sc existing spark context
286+
#' @param groupid the ID to be assigned to job groups
287+
#' @param description description for the the job group ID
288+
#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
289+
#' @examples
290+
#'\dontrun{
291+
#' sc <- sparkR.init()
292+
#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
293+
#'}
294+
295+
setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
296+
callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
297+
}
298+
299+
#' Clear current job group ID and its description
300+
#'
301+
#' @param sc existing spark context
302+
#' @examples
303+
#'\dontrun{
304+
#' sc <- sparkR.init()
305+
#' clearJobGroup(sc)
306+
#'}
307+
308+
clearJobGroup <- function(sc) {
309+
callJMethod(sc, "clearJobGroup")
310+
}
311+
312+
#' Cancel active jobs for the specified group
313+
#'
314+
#' @param sc existing spark context
315+
#' @param groupId the ID of job group to be cancelled
316+
#' @examples
317+
#'\dontrun{
318+
#' sc <- sparkR.init()
319+
#' cancelJobGroup(sc, "myJobGroup")
320+
#'}
321+
322+
cancelJobGroup <- function(sc, groupId) {
323+
callJMethod(sc, "cancelJobGroup", groupId)
324+
}

R/pkg/inst/tests/test_context.R

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,10 @@ test_that("rdd GC across sparkR.stop", {
4848
count(rdd3)
4949
count(rdd4)
5050
})
51+
52+
test_that("job group functions can be called", {
53+
sc <- sparkR.init()
54+
setJobGroup(sc, "groupId", "job description", TRUE)
55+
cancelJobGroup(sc, "groupId")
56+
clearJobGroup(sc)
57+
})

0 commit comments

Comments
 (0)