Skip to content

Commit 343ca77

Browse files
committed
Added setJobGroup, cancelJobGroup and clearJobGroup to SparkR
1 parent ddc5baf commit 343ca77

File tree

2 files changed

+40
-0
lines changed

2 files changed

+40
-0
lines changed

R/pkg/NAMESPACE

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ export("sparkR.init")
1010
export("sparkR.stop")
1111
export("print.jobj")
1212

13+
# Job group lifecycle management methods
14+
export("setJobGroup",
15+
"clearJobGroup",
16+
"cancelJobGroup")
17+
1318
exportClasses("DataFrame")
1419

1520
exportMethods("arrange",

R/pkg/R/sparkR.R

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,3 +278,38 @@ sparkRHive.init <- function(jsc = NULL) {
278278
assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
279279
hiveCtx
280280
}
281+
282+
#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
283+
#' different value or cleared.
284+
#'
285+
#' @param sc The existing
286+
#' @param groupid the ID to be assigned to job groups
287+
#' @param description description for the the job group ID
288+
#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
289+
290+
setJobGroup <- function(groupId, description, interruptOnCancel) {
291+
if (exists(".sparkRjsc", envir = env)) {
292+
sc <- get(".sparkRjsc", envir = env)
293+
callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
294+
}
295+
}
296+
297+
#' Clear current job group ID and its description
298+
299+
clearJobGroup <- function() {
300+
if (exists(".sparkRjsc", envir = env)) {
301+
sc <- get(".sparkRjsc", envir = env)
302+
callJMethod(sc, "clearJobGroup")
303+
}
304+
}
305+
306+
#' Cancel active jobs for the specified group
307+
#'
308+
#' @param groupId the ID of job group to be cancelled
309+
310+
cancelJobGroup <- function(groupId) {
311+
if (exists(".sparkRjsc", envir = env)) {
312+
sc <- get(".sparkRjsc", envir = env)
313+
callJMethod(sc, "cancelJobGroup", groupId)
314+
}
315+
}

0 commit comments

Comments
 (0)