Skip to content

Commit cc4e104

Browse files
author
Debasish Das
committed
Merge branch 'similarity' of https://github.com/debasish83/spark into similarity
2 parents 71f24a4 + 49549d5 commit cc4e104

File tree

17 files changed

+330
-66
lines changed

17 files changed

+330
-66
lines changed

core/src/main/scala/org/apache/spark/Aggregator.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,7 @@ case class Aggregator[K, V, C] (
8888
combiners.iterator
8989
} else {
9090
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
91-
while (iter.hasNext) {
92-
val pair = iter.next()
93-
combiners.insert(pair._1, pair._2)
94-
}
91+
combiners.insertAll(iter)
9592
// Update task metrics if context is not null
9693
// TODO: Make context non-optional in a future release
9794
Option(context).foreach { c =>

core/src/main/scala/org/apache/spark/util/SizeEstimator.scala

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ private[spark] object SizeEstimator extends Logging {
4747
private val FLOAT_SIZE = 4
4848
private val DOUBLE_SIZE = 8
4949

50+
// Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be pointers. The size of
51+
// a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and UseCompressedOops flag.
52+
// The sizes should be in descending order, as we will use that information for fields placement.
53+
private val fieldSizes = List(8, 4, 2, 1)
54+
5055
// Alignment boundary for objects
5156
// TODO: Is this arch dependent ?
5257
private val ALIGN_SIZE = 8
@@ -171,7 +176,7 @@ private[spark] object SizeEstimator extends Logging {
171176
// general all ClassLoaders and Classes will be shared between objects anyway.
172177
} else {
173178
val classInfo = getClassInfo(cls)
174-
state.size += classInfo.shellSize
179+
state.size += alignSize(classInfo.shellSize)
175180
for (field <- classInfo.pointerFields) {
176181
state.enqueue(field.get(obj))
177182
}
@@ -237,8 +242,8 @@ private[spark] object SizeEstimator extends Logging {
237242
}
238243
size
239244
}
240-
241-
private def primitiveSize(cls: Class[_]): Long = {
245+
246+
private def primitiveSize(cls: Class[_]): Int = {
242247
if (cls == classOf[Byte]) {
243248
BYTE_SIZE
244249
} else if (cls == classOf[Boolean]) {
@@ -274,30 +279,66 @@ private[spark] object SizeEstimator extends Logging {
274279
val parent = getClassInfo(cls.getSuperclass)
275280
var shellSize = parent.shellSize
276281
var pointerFields = parent.pointerFields
282+
val sizeCount = Array.fill(fieldSizes.max + 1)(0)
277283

284+
// iterate through the fields of this class and gather information.
278285
for (field <- cls.getDeclaredFields) {
279286
if (!Modifier.isStatic(field.getModifiers)) {
280287
val fieldClass = field.getType
281288
if (fieldClass.isPrimitive) {
282-
shellSize += primitiveSize(fieldClass)
289+
sizeCount(primitiveSize(fieldClass)) += 1
283290
} else {
284291
field.setAccessible(true) // Enable future get()'s on this field
285-
shellSize += pointerSize
292+
sizeCount(pointerSize) += 1
286293
pointerFields = field :: pointerFields
287294
}
288295
}
289296
}
290297

291-
shellSize = alignSize(shellSize)
298+
// Based on the simulated field layout code in Aleksey Shipilev's report:
299+
// http://cr.openjdk.java.net/~shade/papers/2013-shipilev-fieldlayout-latest.pdf
300+
// The code is in Figure 9.
301+
// The simplified idea of field layout consists of 4 parts (see more details in the report):
302+
//
303+
// 1. field alignment: HotSpot lays out the fields aligned by their size.
304+
// 2. object alignment: HotSpot rounds instance size up to 8 bytes
305+
// 3. consistent fields layouts throughout the hierarchy: This means we should layout
306+
// superclass first. And we can use superclass's shellSize as a starting point to layout the
307+
// other fields in this class.
308+
// 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize not 4 bytes, confirmed
309+
// with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322
310+
//
311+
// The real world field layout is much more complicated. There are three kinds of fields
312+
// order in Java 8. And we don't consider the @contended annotation introduced by Java 8.
313+
// see the HotSpot classloader code, layout_fields method for more details.
314+
// hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/classFileParser.cpp
315+
var alignedSize = shellSize
316+
for (size <- fieldSizes if sizeCount(size) > 0) {
317+
val count = sizeCount(size)
318+
// If there are internal gaps, smaller field can fit in.
319+
alignedSize = math.max(alignedSize, alignSizeUp(shellSize, size) + size * count)
320+
shellSize += size * count
321+
}
322+
323+
// Should choose a larger size to be new shellSize and clearly alignedSize >= shellSize, and
324+
// round up the instance filed blocks
325+
shellSize = alignSizeUp(alignedSize, pointerSize)
292326

293327
// Create and cache a new ClassInfo
294328
val newInfo = new ClassInfo(shellSize, pointerFields)
295329
classInfos.put(cls, newInfo)
296330
newInfo
297331
}
298332

299-
private def alignSize(size: Long): Long = {
300-
val rem = size % ALIGN_SIZE
301-
if (rem == 0) size else (size + ALIGN_SIZE - rem)
302-
}
333+
private def alignSize(size: Long): Long = alignSizeUp(size, ALIGN_SIZE)
334+
335+
/**
336+
* Compute aligned size. The alignSize must be 2^n, otherwise the result will be wrong.
337+
* When alignSize = 2^n, alignSize - 1 = 2^n - 1. The binary representation of (alignSize - 1)
338+
* will only have n trailing 1s(0b00...001..1). ~(alignSize - 1) will be 0b11..110..0. Hence,
339+
* (size + alignSize - 1) & ~(alignSize - 1) will set the last n bits to zeros, which leads to
340+
* multiple of alignSize.
341+
*/
342+
private def alignSizeUp(size: Long, alignSize: Int): Long =
343+
(size + alignSize - 1) & ~(alignSize - 1)
303344
}

core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ class DummyClass4(val d: DummyClass3) {
3636
val x: Int = 0
3737
}
3838

39+
// dummy class to show class field blocks alignment.
40+
class DummyClass5 extends DummyClass1 {
41+
val x: Boolean = true
42+
}
43+
44+
class DummyClass6 extends DummyClass5 {
45+
val y: Boolean = true
46+
}
47+
3948
object DummyString {
4049
def apply(str: String) : DummyString = new DummyString(str.toArray)
4150
}
@@ -50,6 +59,7 @@ class SizeEstimatorSuite
5059

5160
override def beforeEach() {
5261
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
62+
super.beforeEach()
5363
System.setProperty("os.arch", "amd64")
5464
System.setProperty("spark.test.useCompressedOops", "true")
5565
}
@@ -62,6 +72,22 @@ class SizeEstimatorSuite
6272
assertResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
6373
}
6474

75+
test("primitive wrapper objects") {
76+
assertResult(16)(SizeEstimator.estimate(new java.lang.Boolean(true)))
77+
assertResult(16)(SizeEstimator.estimate(new java.lang.Byte("1")))
78+
assertResult(16)(SizeEstimator.estimate(new java.lang.Character('1')))
79+
assertResult(16)(SizeEstimator.estimate(new java.lang.Short("1")))
80+
assertResult(16)(SizeEstimator.estimate(new java.lang.Integer(1)))
81+
assertResult(24)(SizeEstimator.estimate(new java.lang.Long(1)))
82+
assertResult(16)(SizeEstimator.estimate(new java.lang.Float(1.0)))
83+
assertResult(24)(SizeEstimator.estimate(new java.lang.Double(1.0d)))
84+
}
85+
86+
test("class field blocks rounding") {
87+
assertResult(16)(SizeEstimator.estimate(new DummyClass5))
88+
assertResult(24)(SizeEstimator.estimate(new DummyClass6))
89+
}
90+
6591
// NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
6692
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
6793
test("strings") {
@@ -102,18 +128,18 @@ class SizeEstimatorSuite
102128
val arr = new Array[Char](100000)
103129
assertResult(200016)(SizeEstimator.estimate(arr))
104130
assertResult(480032)(SizeEstimator.estimate(Array.fill(10000)(new DummyString(arr))))
105-
131+
106132
val buf = new ArrayBuffer[DummyString]()
107133
for (i <- 0 until 5000) {
108134
buf.append(new DummyString(new Array[Char](10)))
109135
}
110136
assertResult(340016)(SizeEstimator.estimate(buf.toArray))
111-
137+
112138
for (i <- 0 until 5000) {
113139
buf.append(new DummyString(arr))
114140
}
115141
assertResult(683912)(SizeEstimator.estimate(buf.toArray))
116-
142+
117143
// If an array contains the *same* element many times, we should only count it once.
118144
val d1 = new DummyClass1
119145
// 10 pointers plus 8-byte object
@@ -155,5 +181,20 @@ class SizeEstimatorSuite
155181
assertResult(64)(SizeEstimator.estimate(DummyString("a")))
156182
assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
157183
assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
184+
185+
// primitive wrapper classes
186+
assertResult(24)(SizeEstimator.estimate(new java.lang.Boolean(true)))
187+
assertResult(24)(SizeEstimator.estimate(new java.lang.Byte("1")))
188+
assertResult(24)(SizeEstimator.estimate(new java.lang.Character('1')))
189+
assertResult(24)(SizeEstimator.estimate(new java.lang.Short("1")))
190+
assertResult(24)(SizeEstimator.estimate(new java.lang.Integer(1)))
191+
assertResult(24)(SizeEstimator.estimate(new java.lang.Long(1)))
192+
assertResult(24)(SizeEstimator.estimate(new java.lang.Float(1.0)))
193+
assertResult(24)(SizeEstimator.estimate(new java.lang.Double(1.0d)))
194+
}
195+
196+
test("class field blocks rounding on 64-bit VM without useCompressedOops") {
197+
assertResult(24)(SizeEstimator.estimate(new DummyClass5))
198+
assertResult(32)(SizeEstimator.estimate(new DummyClass6))
158199
}
159200
}

core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -377,17 +377,17 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
377377
val sorter = new ExternalSorter[Int, Int, Int](
378378
None, Some(new HashPartitioner(3)), Some(ord), None)
379379
assertDidNotBypassMergeSort(sorter)
380-
sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
380+
sorter.insertAll((0 until 120000).iterator.map(i => (i, i)))
381381
assert(diskBlockManager.getAllFiles().length > 0)
382382
sorter.stop()
383383
assert(diskBlockManager.getAllBlocks().length === 0)
384384

385385
val sorter2 = new ExternalSorter[Int, Int, Int](
386386
None, Some(new HashPartitioner(3)), Some(ord), None)
387387
assertDidNotBypassMergeSort(sorter2)
388-
sorter2.insertAll((0 until 100000).iterator.map(i => (i, i)))
388+
sorter2.insertAll((0 until 120000).iterator.map(i => (i, i)))
389389
assert(diskBlockManager.getAllFiles().length > 0)
390-
assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
390+
assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet)
391391
sorter2.stop()
392392
assert(diskBlockManager.getAllBlocks().length === 0)
393393
}
@@ -428,8 +428,8 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
428428
None, Some(new HashPartitioner(3)), Some(ord), None)
429429
assertDidNotBypassMergeSort(sorter)
430430
intercept[SparkException] {
431-
sorter.insertAll((0 until 100000).iterator.map(i => {
432-
if (i == 99990) {
431+
sorter.insertAll((0 until 120000).iterator.map(i => {
432+
if (i == 119990) {
433433
throw new SparkException("Intentional failure")
434434
}
435435
(i, i)

docs/configuration.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,6 +1464,16 @@ Apart from these, the following properties are also available, and may be useful
14641464
for more details.
14651465
</td>
14661466
</tr>
1467+
<tr>
1468+
<td><code>spark.streaming.kafka.maxRetries</code></td>
1469+
<td>1</td>
1470+
<td>
1471+
Maximum number of consecutive retries the driver will make in order to find
1472+
the latest offsets on the leader of each partition (a default value of 1
1473+
means that the driver will make a maximum of 2 attempts). Only applies to
1474+
the new Kafka direct stream API.
1475+
</td>
1476+
</tr>
14671477
</table>
14681478

14691479
#### Cluster Managers

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOE
190190
firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null;
191191

192192
List<String> cmd = buildJavaCommand(extraClassPath);
193+
// Take Thrift Server as daemon
194+
if (isThriftServer(mainClass)) {
195+
addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
196+
}
193197
addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
194198
addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
195199

@@ -201,7 +205,11 @@ private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOE
201205
// - SPARK_DRIVER_MEMORY env variable
202206
// - SPARK_MEM env variable
203207
// - default value (512m)
204-
String memory = firstNonEmpty(firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props),
208+
// Take Thrift Server as daemon
209+
String tsMemory =
210+
isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
211+
String memory = firstNonEmpty(tsMemory,
212+
firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props),
205213
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
206214
cmd.add("-Xms" + memory);
207215
cmd.add("-Xmx" + memory);
@@ -292,6 +300,15 @@ private boolean isClientMode(Properties userProps) {
292300
(!userMaster.equals("yarn-cluster") && deployMode == null);
293301
}
294302

303+
/**
304+
* Return whether the given main class represents a thrift server.
305+
*/
306+
private boolean isThriftServer(String mainClass) {
307+
return (mainClass != null &&
308+
mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"));
309+
}
310+
311+
295312
private class OptionParser extends SparkSubmitOptionParser {
296313

297314
@Override

python/pyspark/sql/dataframe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#
22
# Licensed to the Apache Software Foundation (ASF) under one or more
3-
# contir[butor license agreements. See the NOTICE file distributed with
3+
# contributor license agreements. See the NOTICE file distributed with
44
# this work for additional information regarding copyright ownership.
55
# The ASF licenses this file to You under the Apache License, Version 2.0
66
# (the "License"); you may not use this file except in compliance with

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ import scala.util.parsing.input.CharArrayReader.EofCh
2525

2626
import org.apache.spark.sql.catalyst.plans.logical._
2727

28-
private[sql] object KeywordNormalizer {
29-
def apply(str: String): String = str.toLowerCase()
30-
}
31-
3228
private[sql] abstract class AbstractSparkSQLParser
3329
extends StandardTokenParsers with PackratParsers {
3430

@@ -42,7 +38,7 @@ private[sql] abstract class AbstractSparkSQLParser
4238
}
4339

4440
protected case class Keyword(str: String) {
45-
def normalize: String = KeywordNormalizer(str)
41+
def normalize: String = lexical.normalizeKeyword(str)
4642
def parser: Parser[String] = normalize
4743
}
4844

@@ -90,13 +86,16 @@ class SqlLexical extends StdLexical {
9086
reserved ++= keywords
9187
}
9288

89+
/* Normal the keyword string */
90+
def normalizeKeyword(str: String): String = str.toLowerCase
91+
9392
delimiters += (
9493
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
9594
",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
9695
)
9796

9897
protected override def processIdent(name: String) = {
99-
val token = KeywordNormalizer(name)
98+
val token = normalizeKeyword(name)
10099
if (reserved contains token) Keyword(token) else Identifier(name)
101100
}
102101

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
23+
/**
24+
* Root class of SQL Parser Dialect, and we don't guarantee the binary
25+
* compatibility for the future release, let's keep it as the internal
26+
* interface for advanced user.
27+
*
28+
*/
29+
@DeveloperApi
30+
abstract class Dialect {
31+
// this is the main function that will be implemented by sql parser.
32+
def parse(sqlText: String): LogicalPlan
33+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ package object errors {
3838
}
3939
}
4040

41+
class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause)
42+
4143
/**
4244
* Wraps any exceptions that are thrown while executing `f` in a
4345
* [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.

0 commit comments

Comments
 (0)