Skip to content

Commit d6e8cbe

Browse files
committed
Fix a conflict.
2 parents e699cb6 + db420f7 commit d6e8cbe

File tree

104 files changed

+1701
-540
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+1701
-540
lines changed

.github/workflows/build_and_test.yml

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,10 @@ jobs:
168168
python3.8 -m pip list
169169
# SparkR
170170
- name: Install R 4.0
171+
uses: r-lib/actions/setup-r@v1
171172
if: contains(matrix.modules, 'sparkr')
172-
run: |
173-
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
174-
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
175-
sudo apt-get update
176-
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
173+
with:
174+
r-version: 4.0
177175
- name: Install R packages
178176
if: contains(matrix.modules, 'sparkr')
179177
run: |
@@ -232,11 +230,9 @@ jobs:
232230
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
233231
pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme ipython nbsphinx
234232
- name: Install R 4.0
235-
run: |
236-
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
237-
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
238-
sudo apt-get update
239-
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
233+
uses: r-lib/actions/setup-r@v1
234+
with:
235+
r-version: 4.0
240236
- name: Install R linter dependencies and SparkR
241237
run: |
242238
sudo apt-get install -y libcurl4-openssl-dev

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ exportMethods("%<=>%",
348348
"negate",
349349
"next_day",
350350
"not",
351+
"nth_value",
351352
"ntile",
352353
"otherwise",
353354
"over",

R/pkg/R/functions.R

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,8 @@ NULL
338338
#' tmp <- mutate(df, dist = over(cume_dist(), ws), dense_rank = over(dense_rank(), ws),
339339
#' lag = over(lag(df$mpg), ws), lead = over(lead(df$mpg, 1), ws),
340340
#' percent_rank = over(percent_rank(), ws),
341-
#' rank = over(rank(), ws), row_number = over(row_number(), ws))
341+
#' rank = over(rank(), ws), row_number = over(row_number(), ws),
342+
#' nth_value = over(nth_value(df$mpg, 3), ws))
342343
#' # Get ntile group id (1-4) for hp
343344
#' tmp <- mutate(tmp, ntile = over(ntile(4), ws))
344345
#' head(tmp)}
@@ -3298,6 +3299,37 @@ setMethod("lead",
32983299
column(jc)
32993300
})
33003301

3302+
#' @details
3303+
#' \code{nth_value}: Window function: returns the value that is the \code{offset}th
3304+
#' row of the window frame# (counting from 1), and \code{null} if the size of window
3305+
#' frame is less than \code{offset} rows.
3306+
#'
3307+
#' @param offset a numeric indicating number of row to use as the value
3308+
#' @param na.rm a logical which indicates that the Nth value should skip null in the
3309+
#' determination of which row to use
3310+
#'
3311+
#' @rdname column_window_functions
3312+
#' @aliases nth_value nth_value,characterOrColumn-method
3313+
#' @note nth_value since 3.1.0
3314+
setMethod("nth_value",
3315+
signature(x = "characterOrColumn", offset = "numeric"),
3316+
function(x, offset, na.rm = FALSE) {
3317+
x <- if (is.character(x)) {
3318+
column(x)
3319+
} else {
3320+
x
3321+
}
3322+
offset <- as.integer(offset)
3323+
jc <- callJStatic(
3324+
"org.apache.spark.sql.functions",
3325+
"nth_value",
3326+
x@jc,
3327+
offset,
3328+
na.rm
3329+
)
3330+
column(jc)
3331+
})
3332+
33013333
#' @details
33023334
#' \code{ntile}: Returns the ntile group id (from 1 to n inclusive) in an ordered window
33033335
#' partition. For example, if n is 4, the first quarter of the rows will get value 1, the second
@@ -4419,10 +4451,10 @@ setMethod("current_timestamp",
44194451
#' @aliases timestamp_seconds timestamp_seconds,Column-method
44204452
#' @note timestamp_seconds since 3.1.0
44214453
setMethod("timestamp_seconds",
4422-
signature(x = "Column"),
4423-
function(x) {
4424-
jc <- callJStatic(
4425-
"org.apache.spark.sql.functions", "timestamp_seconds", x@jc
4426-
)
4427-
column(jc)
4428-
})
4454+
signature(x = "Column"),
4455+
function(x) {
4456+
jc <- callJStatic(
4457+
"org.apache.spark.sql.functions", "timestamp_seconds", x@jc
4458+
)
4459+
column(jc)
4460+
})

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,10 @@ setGeneric("months_between", function(y, x, ...) { standardGeneric("months_betwe
11641164
#' @rdname count
11651165
setGeneric("n", function(x) { standardGeneric("n") })
11661166

1167+
#' @rdname column_window_functions
1168+
#' @name NULL
1169+
setGeneric("nth_value", function(x, offset, ...) { standardGeneric("nth_value") })
1170+
11671171
#' @rdname column_nonaggregate_functions
11681172
#' @name NULL
11691173
setGeneric("nanvl", function(y, x) { standardGeneric("nanvl") })

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1425,6 +1425,8 @@ test_that("column functions", {
14251425
c25 <- overlay(c1, c2, c3, c3) + overlay(c1, c2, c3) + overlay(c1, c2, 1) +
14261426
overlay(c1, c2, 3, 4)
14271427
c26 <- timestamp_seconds(c1)
1428+
c27 <- nth_value("x", 1L) + nth_value("y", 2, TRUE) +
1429+
nth_value(column("v"), 3) + nth_value(column("z"), 4L, FALSE)
14281430

14291431
# Test if base::is.nan() is exposed
14301432
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ public class ExternalShuffleBlockResolver {
9292
@VisibleForTesting
9393
final DB db;
9494

95-
private final List<String> knownManagers = Arrays.asList(
96-
"org.apache.spark.shuffle.sort.SortShuffleManager",
97-
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
98-
9995
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
10096
throws IOException {
10197
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
@@ -148,10 +144,6 @@ public void registerExecutor(
148144
ExecutorShuffleInfo executorInfo) {
149145
AppExecId fullId = new AppExecId(appId, execId);
150146
logger.info("Registered executor {} with {}", fullId, executorInfo);
151-
if (!knownManagers.contains(executorInfo.shuffleManager)) {
152-
throw new UnsupportedOperationException(
153-
"Unsupported shuffle manager of executor: " + executorInfo);
154-
}
155147
try {
156148
if (db != null) {
157149
byte[] key = dbAppExecKey(fullId);

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,6 @@ public void testBadRequests() throws IOException {
7171
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
7272
}
7373

74-
// Invalid shuffle manager
75-
try {
76-
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
77-
resolver.getBlockData("app0", "exec2", 1, 1, 0);
78-
fail("Should have failed");
79-
} catch (UnsupportedOperationException e) {
80-
// pass
81-
}
82-
8374
// Nonexistent shuffle block
8475
resolver.registerExecutor("app0", "exec3",
8576
dataContext.createExecutorInfo(SORT_MANAGER));

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,9 @@ public void testFetchThreeSort() throws Exception {
233233
exec0Fetch.releaseBuffers();
234234
}
235235

236-
@Test (expected = RuntimeException.class)
237-
public void testRegisterInvalidExecutor() throws Exception {
238-
registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
236+
@Test
237+
public void testRegisterWithCustomShuffleManager() throws Exception {
238+
registerExecutor("exec-1", dataContext0.createExecutorInfo("custom shuffle manager"));
239239
}
240240

241241
@Test

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 71 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
203203
}
204204

205205
if (inMemSorter == null || inMemSorter.numRecords() <= 0) {
206+
// There could still be some memory allocated when there are no records in the in-memory
207+
// sorter. We will not spill it however, to ensure that we can always process at least one
208+
// record before spilling. See the comments in `allocateMemoryForRecordIfNecessary` for why
209+
// this is necessary.
206210
return 0L;
207211
}
208212

@@ -224,7 +228,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
224228
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
225229
// pages will currently be counted as memory spilled even though that space isn't actually
226230
// written to disk. This also counts the space needed to store the sorter's pointer array.
227-
inMemSorter.reset();
231+
inMemSorter.freeMemory();
228232
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
229233
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
230234
// pages, we might not be able to get memory for the pointer array.
@@ -325,7 +329,7 @@ public void cleanupResources() {
325329
deleteSpillFiles();
326330
freeMemory();
327331
if (inMemSorter != null) {
328-
inMemSorter.free();
332+
inMemSorter.freeMemory();
329333
inMemSorter = null;
330334
}
331335
}
@@ -339,40 +343,53 @@ public void cleanupResources() {
339343
private void growPointerArrayIfNecessary() throws IOException {
340344
assert(inMemSorter != null);
341345
if (!inMemSorter.hasSpaceForAnotherRecord()) {
346+
if (inMemSorter.numRecords() <= 0) {
347+
// Spilling was triggered just before this method was called. The pointer array was freed
348+
// during the spill, so a new pointer array needs to be allocated here.
349+
LongArray array = allocateArray(inMemSorter.getInitialSize());
350+
inMemSorter.expandPointerArray(array);
351+
return;
352+
}
353+
342354
long used = inMemSorter.getMemoryUsage();
343-
LongArray array;
355+
LongArray array = null;
344356
try {
345357
// could trigger spilling
346358
array = allocateArray(used / 8 * 2);
347359
} catch (TooLargePageException e) {
348360
// The pointer array is too big to fix in a single page, spill.
349361
spill();
350-
return;
351362
} catch (SparkOutOfMemoryError e) {
352-
// should have trigger spilling
353-
if (!inMemSorter.hasSpaceForAnotherRecord()) {
363+
if (inMemSorter.numRecords() > 0) {
354364
logger.error("Unable to grow the pointer array");
355365
throw e;
356366
}
357-
return;
367+
// The new array could not be allocated, but that is not an issue as it is longer needed,
368+
// as all records were spilled.
358369
}
359-
// check if spilling is triggered or not
360-
if (inMemSorter.hasSpaceForAnotherRecord()) {
361-
freeArray(array);
362-
} else {
363-
inMemSorter.expandPointerArray(array);
370+
371+
if (inMemSorter.numRecords() <= 0) {
372+
// Spilling was triggered while trying to allocate the new array.
373+
if (array != null) {
374+
// We succeeded in allocating the new array, but, since all records were spilled, a
375+
// smaller array would also suffice.
376+
freeArray(array);
377+
}
378+
// The pointer array was freed during the spill, so a new pointer array needs to be
379+
// allocated here.
380+
array = allocateArray(inMemSorter.getInitialSize());
364381
}
382+
inMemSorter.expandPointerArray(array);
365383
}
366384
}
367385

368386
/**
369-
* Allocates more memory in order to insert an additional record. This will request additional
370-
* memory from the memory manager and spill if the requested memory can not be obtained.
387+
* Allocates an additional page in order to insert an additional record. This will request
388+
* additional memory from the memory manager and spill if the requested memory can not be
389+
* obtained.
371390
*
372391
* @param required the required space in the data page, in bytes, including space for storing
373-
* the record size. This must be less than or equal to the page size (records
374-
* that exceed the page size are handled via a different code path which uses
375-
* special overflow pages).
392+
* the record size.
376393
*/
377394
private void acquireNewPageIfNecessary(int required) {
378395
if (currentPage == null ||
@@ -384,6 +401,37 @@ private void acquireNewPageIfNecessary(int required) {
384401
}
385402
}
386403

404+
/**
405+
* Allocates more memory in order to insert an additional record. This will request additional
406+
* memory from the memory manager and spill if the requested memory can not be obtained.
407+
*
408+
* @param required the required space in the data page, in bytes, including space for storing
409+
* the record size.
410+
*/
411+
private void allocateMemoryForRecordIfNecessary(int required) throws IOException {
412+
// Step 1:
413+
// Ensure that the pointer array has space for another record. This may cause a spill.
414+
growPointerArrayIfNecessary();
415+
// Step 2:
416+
// Ensure that the last page has space for another record. This may cause a spill.
417+
acquireNewPageIfNecessary(required);
418+
// Step 3:
419+
// The allocation in step 2 could have caused a spill, which would have freed the pointer
420+
// array allocated in step 1. Therefore we need to check again whether we have to allocate
421+
// a new pointer array.
422+
//
423+
// If the allocation in this step causes a spill event then it will not cause the page
424+
// allocated in the previous step to be freed. The function `spill` only frees memory if at
425+
// least one record has been inserted in the in-memory sorter. This will not be the case if
426+
// we have spilled in the previous step.
427+
//
428+
// If we did not spill in the previous step then `growPointerArrayIfNecessary` will be a
429+
// no-op that does not allocate any memory, and therefore can't cause a spill event.
430+
//
431+
// Thus there is no need to call `acquireNewPageIfNecessary` again after this step.
432+
growPointerArrayIfNecessary();
433+
}
434+
387435
/**
388436
* Write a record to the sorter.
389437
*/
@@ -398,11 +446,10 @@ public void insertRecord(
398446
spill();
399447
}
400448

401-
growPointerArrayIfNecessary();
402-
int uaoSize = UnsafeAlignedOffset.getUaoSize();
449+
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
403450
// Need 4 or 8 bytes to store the record length.
404451
final int required = length + uaoSize;
405-
acquireNewPageIfNecessary(required);
452+
allocateMemoryForRecordIfNecessary(required);
406453

407454
final Object base = currentPage.getBaseObject();
408455
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
@@ -425,10 +472,9 @@ public void insertKVRecord(Object keyBase, long keyOffset, int keyLen,
425472
Object valueBase, long valueOffset, int valueLen, long prefix, boolean prefixIsNull)
426473
throws IOException {
427474

428-
growPointerArrayIfNecessary();
429-
int uaoSize = UnsafeAlignedOffset.getUaoSize();
475+
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
430476
final int required = keyLen + valueLen + (2 * uaoSize);
431-
acquireNewPageIfNecessary(required);
477+
allocateMemoryForRecordIfNecessary(required);
432478

433479
final Object base = currentPage.getBaseObject();
434480
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
@@ -572,7 +618,7 @@ public long spill() throws IOException {
572618
assert(inMemSorter != null);
573619
released += inMemSorter.getMemoryUsage();
574620
totalSortTimeNanos += inMemSorter.getSortTimeNanos();
575-
inMemSorter.free();
621+
inMemSorter.freeMemory();
576622
inMemSorter = null;
577623
taskContext.taskMetrics().incMemoryBytesSpilled(released);
578624
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
@@ -669,7 +715,7 @@ public UnsafeSorterIterator getIterator(int startIndex) throws IOException {
669715
}
670716
i += spillWriter.recordsSpilled();
671717
}
672-
if (inMemSorter != null) {
718+
if (inMemSorter != null && inMemSorter.numRecords() > 0) {
673719
UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
674720
moveOver(iter, startIndex - i);
675721
queue.add(iter);

0 commit comments

Comments
 (0)