Skip to content

Commit 17b4be4

Browse files
committed
Merge remote-tracking branch 'upstream/master' into cleanup
2 parents 32edccc + 5e00a5d commit 17b4be4

File tree

115 files changed

+1773
-586
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+1773
-586
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.spark.network.util;
1919

2020
import java.io.Closeable;
21+
import java.io.EOFException;
2122
import java.io.File;
2223
import java.io.IOException;
2324
import java.nio.ByteBuffer;
25+
import java.nio.channels.ReadableByteChannel;
2426
import java.nio.charset.StandardCharsets;
2527
import java.util.concurrent.TimeUnit;
2628
import java.util.regex.Matcher;
@@ -344,4 +346,17 @@ public static byte[] bufferToArray(ByteBuffer buffer) {
344346
}
345347
}
346348

349+
/**
350+
* Fills a buffer with data read from the channel.
351+
*/
352+
public static void readFully(ReadableByteChannel channel, ByteBuffer dst) throws IOException {
353+
int expected = dst.remaining();
354+
while (dst.hasRemaining()) {
355+
if (channel.read(dst) < 0) {
356+
throw new EOFException(String.format("Not enough bytes in channel (expected %d).",
357+
expected));
358+
}
359+
}
360+
}
361+
347362
}

common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,22 @@ public final class Platform {
4646
private static final boolean unaligned;
4747
static {
4848
boolean _unaligned;
49-
// use reflection to access unaligned field
50-
try {
51-
Class<?> bitsClass =
52-
Class.forName("java.nio.Bits", false, ClassLoader.getSystemClassLoader());
53-
Method unalignedMethod = bitsClass.getDeclaredMethod("unaligned");
54-
unalignedMethod.setAccessible(true);
55-
_unaligned = Boolean.TRUE.equals(unalignedMethod.invoke(null));
56-
} catch (Throwable t) {
57-
// We at least know x86 and x64 support unaligned access.
58-
String arch = System.getProperty("os.arch", "");
59-
//noinspection DynamicRegexReplaceableByCompiledPattern
60-
_unaligned = arch.matches("^(i[3-6]86|x86(_64)?|x64|amd64|aarch64)$");
49+
String arch = System.getProperty("os.arch", "");
50+
if (arch.equals("ppc64le") || arch.equals("ppc64")) {
51+
// Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but ppc64 and ppc64le support it
52+
_unaligned = true;
53+
} else {
54+
try {
55+
Class<?> bitsClass =
56+
Class.forName("java.nio.Bits", false, ClassLoader.getSystemClassLoader());
57+
Method unalignedMethod = bitsClass.getDeclaredMethod("unaligned");
58+
unalignedMethod.setAccessible(true);
59+
_unaligned = Boolean.TRUE.equals(unalignedMethod.invoke(null));
60+
} catch (Throwable t) {
61+
// We at least know x86 and x64 support unaligned access.
62+
//noinspection DynamicRegexReplaceableByCompiledPattern
63+
_unaligned = arch.matches("^(i[3-6]86|x86(_64)?|x64|amd64|aarch64)$");
64+
}
6165
}
6266
unaligned = _unaligned;
6367
}

core/src/main/java/org/apache/spark/memory/MemoryConsumer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ protected long getUsed() {
6060

6161
/**
6262
* Force spill during building.
63-
*
64-
* For testing.
6563
*/
6664
public void spill() throws IOException {
6765
spill(Long.MAX_VALUE, this);

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
155155
for (MemoryConsumer c: consumers) {
156156
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
157157
long key = c.getUsed();
158-
List<MemoryConsumer> list = sortedConsumers.get(key);
159-
if (list == null) {
160-
list = new ArrayList<>(1);
161-
sortedConsumers.put(key, list);
162-
}
158+
List<MemoryConsumer> list = sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
163159
list.add(c);
164160
}
165161
}

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,15 @@
5252
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
5353
* writes incoming records to separate files, one file per reduce partition, then concatenates these
5454
* per-partition files to form a single output file, regions of which are served to reducers.
55-
* Records are not buffered in memory. This is essentially identical to
56-
* {@link org.apache.spark.shuffle.hash.HashShuffleWriter}, except that it writes output in a format
55+
* Records are not buffered in memory. It writes output in a format
5756
* that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}.
5857
* <p>
5958
* This write path is inefficient for shuffles with large numbers of reduce partitions because it
6059
* simultaneously opens separate serializers and file streams for all partitions. As a result,
6160
* {@link SortShuffleManager} only selects this write path when
6261
* <ul>
6362
* <li>no Ordering is specified,</li>
64-
* <li>no Aggregator is specific, and</li>
63+
* <li>no Aggregator is specified, and</li>
6564
* <li>the number of partitions is less than
6665
* <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
6766
* </ul>

core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.spark.util.EnumUtil;
2121

22+
import java.util.Collections;
2223
import java.util.HashSet;
2324
import java.util.Set;
2425

@@ -30,9 +31,7 @@ public enum TaskSorting {
3031
private final Set<String> alternateNames;
3132
TaskSorting(String... names) {
3233
alternateNames = new HashSet<>();
33-
for (String n: names) {
34-
alternateNames.add(n);
35-
}
34+
Collections.addAll(alternateNames, names);
3635
}
3736

3837
public static TaskSorting fromString(String str) {

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,12 @@ private[spark] trait ExecutorAllocationClient {
7171

7272
/**
7373
* Request that the cluster manager kill every executor on the specified host.
74-
* Results in a call to killExecutors for each executor on the host, with the replace
75-
* and force arguments set to true.
74+
*
7675
* @return whether the request is acknowledged by the cluster manager.
7776
*/
7877
def killExecutorsOnHost(host: String): Boolean
7978

80-
/**
79+
/**
8180
* Request that the cluster manager kill the specified executor.
8281
* @return whether the request is acknowledged by the cluster manager.
8382
*/

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark._
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.io.CompressionCodec
3131
import org.apache.spark.serializer.Serializer
32-
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
32+
import org.apache.spark.storage._
3333
import org.apache.spark.util.{ByteBufferInputStream, Utils}
3434
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
3535

@@ -141,10 +141,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
141141
}
142142

143143
/** Fetch torrent blocks from the driver and/or other executors. */
144-
private def readBlocks(): Array[ChunkedByteBuffer] = {
144+
private def readBlocks(): Array[BlockData] = {
145145
// Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
146146
// to the driver, so other executors can pull these chunks from this executor as well.
147-
val blocks = new Array[ChunkedByteBuffer](numBlocks)
147+
val blocks = new Array[BlockData](numBlocks)
148148
val bm = SparkEnv.get.blockManager
149149

150150
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
@@ -173,7 +173,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
173173
throw new SparkException(
174174
s"Failed to store $pieceId of $broadcastId in local BlockManager")
175175
}
176-
blocks(pid) = b
176+
blocks(pid) = new ByteBufferBlockData(b, true)
177177
case None =>
178178
throw new SparkException(s"Failed to get $pieceId of $broadcastId")
179179
}
@@ -219,18 +219,22 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
219219
case None =>
220220
logInfo("Started reading broadcast variable " + id)
221221
val startTimeMs = System.currentTimeMillis()
222-
val blocks = readBlocks().flatMap(_.getChunks())
222+
val blocks = readBlocks()
223223
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
224224

225-
val obj = TorrentBroadcast.unBlockifyObject[T](
226-
blocks, SparkEnv.get.serializer, compressionCodec)
227-
// Store the merged copy in BlockManager so other tasks on this executor don't
228-
// need to re-fetch it.
229-
val storageLevel = StorageLevel.MEMORY_AND_DISK
230-
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
231-
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
225+
try {
226+
val obj = TorrentBroadcast.unBlockifyObject[T](
227+
blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
228+
// Store the merged copy in BlockManager so other tasks on this executor don't
229+
// need to re-fetch it.
230+
val storageLevel = StorageLevel.MEMORY_AND_DISK
231+
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
232+
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
233+
}
234+
obj
235+
} finally {
236+
blocks.foreach(_.dispose())
232237
}
233-
obj
234238
}
235239
}
236240
}
@@ -277,12 +281,11 @@ private object TorrentBroadcast extends Logging {
277281
}
278282

279283
def unBlockifyObject[T: ClassTag](
280-
blocks: Array[ByteBuffer],
284+
blocks: Array[InputStream],
281285
serializer: Serializer,
282286
compressionCodec: Option[CompressionCodec]): T = {
283287
require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
284-
val is = new SequenceInputStream(
285-
blocks.iterator.map(new ByteBufferInputStream(_)).asJavaEnumeration)
288+
val is = new SequenceInputStream(blocks.iterator.asJavaEnumeration)
286289
val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
287290
val ser = serializer.newInstance()
288291
val serIn = ser.deserializeStream(in)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,12 +485,17 @@ object SparkSubmit extends CommandLineUtils {
485485

486486
// In client mode, launch the application main class directly
487487
// In addition, add the main application jar and any added jars (if any) to the classpath
488-
if (deployMode == CLIENT) {
488+
// Also add the main application jar and any added jars to classpath in case YARN client
489+
// requires these jars.
490+
if (deployMode == CLIENT || isYarnCluster) {
489491
childMainClass = args.mainClass
490492
if (isUserJar(args.primaryResource)) {
491493
childClasspath += args.primaryResource
492494
}
493495
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
496+
}
497+
498+
if (deployMode == CLIENT) {
494499
if (args.childArgs != null) { childArgs ++= args.childArgs }
495500
}
496501

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
190190
.orNull
191191
numExecutors = Option(numExecutors)
192192
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
193+
queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
193194
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
194195
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
195196

0 commit comments

Comments
 (0)