Skip to content

Commit c8cfaa2

Browse files
committed
merge with master
2 parents 46df1bb + f6471dc commit c8cfaa2

File tree

49 files changed

+745
-211
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+745
-211
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import java.io.{DataInputStream, DataOutputStream}
2121
import java.nio.ByteBuffer
22+
import java.nio.charset.StandardCharsets
2223
import java.util.Properties
2324

2425
import scala.collection.JavaConverters._
@@ -86,7 +87,10 @@ private[spark] object TaskDescription {
8687
dataOut.writeInt(taskDescription.properties.size())
8788
taskDescription.properties.asScala.foreach { case (key, value) =>
8889
dataOut.writeUTF(key)
89-
dataOut.writeUTF(value)
90+
// SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
91+
val bytes = value.getBytes(StandardCharsets.UTF_8)
92+
dataOut.writeInt(bytes.length)
93+
dataOut.write(bytes)
9094
}
9195

9296
// Write the task. The task is already serialized, so write it directly to the byte buffer.
@@ -124,7 +128,11 @@ private[spark] object TaskDescription {
124128
val properties = new Properties()
125129
val numProperties = dataIn.readInt()
126130
for (i <- 0 until numProperties) {
127-
properties.setProperty(dataIn.readUTF(), dataIn.readUTF())
131+
val key = dataIn.readUTF()
132+
val valueLength = dataIn.readInt()
133+
val valueBytes = new Array[Byte](valueLength)
134+
dataIn.readFully(valueBytes)
135+
properties.setProperty(key, new String(valueBytes, StandardCharsets.UTF_8))
128136
}
129137

130138
// Create a sub-buffer for the serialized task into its own buffer (to be deserialized later).

core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.io.{ByteArrayOutputStream, DataOutputStream, UTFDataFormatException}
2021
import java.nio.ByteBuffer
2122
import java.util.Properties
2223

@@ -36,6 +37,21 @@ class TaskDescriptionSuite extends SparkFunSuite {
3637
val originalProperties = new Properties()
3738
originalProperties.put("property1", "18")
3839
originalProperties.put("property2", "test value")
40+
// SPARK-19796 -- large property values (like a large job description for a long sql query)
41+
// can cause problems for DataOutputStream, make sure we handle correctly
42+
val sb = new StringBuilder()
43+
(0 to 10000).foreach(_ => sb.append("1234567890"))
44+
val largeString = sb.toString()
45+
originalProperties.put("property3", largeString)
46+
// make sure we've got a good test case
47+
intercept[UTFDataFormatException] {
48+
val out = new DataOutputStream(new ByteArrayOutputStream())
49+
try {
50+
out.writeUTF(largeString)
51+
} finally {
52+
out.close()
53+
}
54+
}
3955

4056
// Create a dummy byte buffer for the task.
4157
val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator
3636
/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */
3737
private[kinesis]
3838
case class SequenceNumberRange(
39-
streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String)
39+
streamName: String,
40+
shardId: String,
41+
fromSeqNumber: String,
42+
toSeqNumber: String,
43+
recordCount: Int)
4044

4145
/** Class representing an array of Kinesis sequence number ranges */
4246
private[kinesis]
@@ -136,6 +140,8 @@ class KinesisSequenceRangeIterator(
136140
private val client = new AmazonKinesisClient(credentials)
137141
private val streamName = range.streamName
138142
private val shardId = range.shardId
143+
// AWS limits to maximum of 10k records per get call
144+
private val maxGetRecordsLimit = 10000
139145

140146
private var toSeqNumberReceived = false
141147
private var lastSeqNumber: String = null
@@ -153,12 +159,14 @@ class KinesisSequenceRangeIterator(
153159

154160
// If the internal iterator has not been initialized,
155161
// then fetch records from starting sequence number
156-
internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber)
162+
internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber,
163+
range.recordCount)
157164
} else if (!internalIterator.hasNext) {
158165

159166
// If the internal iterator does not have any more records,
160167
// then fetch more records after the last consumed sequence number
161-
internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber)
168+
internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber,
169+
range.recordCount)
162170
}
163171

164172
if (!internalIterator.hasNext) {
@@ -191,9 +199,12 @@ class KinesisSequenceRangeIterator(
191199
/**
192200
* Get records starting from or after the given sequence number.
193201
*/
194-
private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = {
202+
private def getRecords(
203+
iteratorType: ShardIteratorType,
204+
seqNum: String,
205+
recordCount: Int): Iterator[Record] = {
195206
val shardIterator = getKinesisIterator(iteratorType, seqNum)
196-
val result = getRecordsAndNextKinesisIterator(shardIterator)
207+
val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount)
197208
result._1
198209
}
199210

@@ -202,10 +213,12 @@ class KinesisSequenceRangeIterator(
202213
* to get records from Kinesis), and get the next shard iterator for next consumption.
203214
*/
204215
private def getRecordsAndNextKinesisIterator(
205-
shardIterator: String): (Iterator[Record], String) = {
216+
shardIterator: String,
217+
recordCount: Int): (Iterator[Record], String) = {
206218
val getRecordsRequest = new GetRecordsRequest
207219
getRecordsRequest.setRequestCredentials(credentials)
208220
getRecordsRequest.setShardIterator(shardIterator)
221+
getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit))
209222
val getRecordsResult = retryOrTimeout[GetRecordsResult](
210223
s"getting records using shard iterator") {
211224
client.getRecords(getRecordsRequest)

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ private[kinesis] class KinesisReceiver[T](
210210
if (records.size > 0) {
211211
val dataIterator = records.iterator().asScala.map(messageHandler)
212212
val metadata = SequenceNumberRange(streamName, shardId,
213-
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
213+
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber(),
214+
records.size())
214215
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
215216
}
216217
}

external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
5151
shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
5252
shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
5353
val seqNumRange = SequenceNumberRange(
54-
testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
54+
testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size)
5555
(shardId, seqNumRange)
5656
}
5757
allRanges = shardIdToRange.values.toSeq
@@ -181,7 +181,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
181181

182182
// Create the necessary ranges to use in the RDD
183183
val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
184-
SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")))
184+
SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 1)))
185185
val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
186186
val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis)))
187187
SequenceNumberRanges(Array(range))

external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
119119

120120
// Generate block info data for testing
121121
val seqNumRanges1 = SequenceNumberRanges(
122-
SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
122+
SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
123123
val blockId1 = StreamBlockId(kinesisStream.id, 123)
124124
val blockInfo1 = ReceivedBlockInfo(
125125
0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
126126

127127
val seqNumRanges2 = SequenceNumberRanges(
128-
SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
128+
SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
129129
val blockId2 = StreamBlockId(kinesisStream.id, 345)
130130
val blockInfo2 = ReceivedBlockInfo(
131131
0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))

mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ import breeze.linalg.{DenseVector => BDV}
2424
import org.apache.spark.SparkFunSuite
2525
import org.apache.spark.ml.classification.LinearSVCSuite._
2626
import org.apache.spark.ml.feature.{Instance, LabeledPoint}
27-
import org.apache.spark.ml.linalg.{Vector, Vectors}
27+
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
2828
import org.apache.spark.ml.param.ParamsSuite
2929
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
3030
import org.apache.spark.ml.util.TestingUtils._
3131
import org.apache.spark.mllib.util.MLlibTestSparkContext
3232
import org.apache.spark.sql.{Dataset, Row}
33+
import org.apache.spark.sql.functions.udf
3334

3435

3536
class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
@@ -41,6 +42,9 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
4142
@transient var smallValidationDataset: Dataset[_] = _
4243
@transient var binaryDataset: Dataset[_] = _
4344

45+
@transient var smallSparseBinaryDataset: Dataset[_] = _
46+
@transient var smallSparseValidationDataset: Dataset[_] = _
47+
4448
override def beforeAll(): Unit = {
4549
super.beforeAll()
4650

@@ -51,6 +55,13 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
5155
smallBinaryDataset = generateSVMInput(A, Array[Double](B, C), nPoints, 42).toDF()
5256
smallValidationDataset = generateSVMInput(A, Array[Double](B, C), nPoints, 17).toDF()
5357
binaryDataset = generateSVMInput(1.0, Array[Double](1.0, 2.0, 3.0, 4.0), 10000, 42).toDF()
58+
59+
// Dataset for testing SparseVector
60+
val toSparse: Vector => SparseVector = _.asInstanceOf[DenseVector].toSparse
61+
val sparse = udf(toSparse)
62+
smallSparseBinaryDataset = smallBinaryDataset.withColumn("features", sparse('features))
63+
smallSparseValidationDataset = smallValidationDataset.withColumn("features", sparse('features))
64+
5465
}
5566

5667
/**
@@ -68,13 +79,17 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
6879
val model = svm.fit(smallBinaryDataset)
6980
assert(model.transform(smallValidationDataset)
7081
.where("prediction=label").count() > nPoints * 0.8)
82+
val sparseModel = svm.fit(smallSparseBinaryDataset)
83+
checkModels(model, sparseModel)
7184
}
7285

7386
test("Linear SVC binary classification with regularization") {
7487
val svm = new LinearSVC()
7588
val model = svm.setRegParam(0.1).fit(smallBinaryDataset)
7689
assert(model.transform(smallValidationDataset)
7790
.where("prediction=label").count() > nPoints * 0.8)
91+
val sparseModel = svm.fit(smallSparseBinaryDataset)
92+
checkModels(model, sparseModel)
7893
}
7994

8095
test("params") {
@@ -235,7 +250,7 @@ object LinearSVCSuite {
235250
"aggregationDepth" -> 3
236251
)
237252

238-
// Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise)
253+
// Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise)
239254
def generateSVMInput(
240255
intercept: Double,
241256
weights: Array[Double],
@@ -252,5 +267,10 @@ object LinearSVCSuite {
252267
y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
253268
}
254269

270+
def checkModels(model1: LinearSVCModel, model2: LinearSVCModel): Unit = {
271+
assert(model1.intercept == model2.intercept)
272+
assert(model1.coefficients.equals(model2.coefficients))
273+
}
274+
255275
}
256276

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ class Analyzer(
117117
Batch("Hints", fixedPoint,
118118
new ResolveHints.ResolveBroadcastHints(conf),
119119
ResolveHints.RemoveAllHints),
120+
Batch("Simple Sanity Check", Once,
121+
LookupFunctions),
120122
Batch("Substitution", fixedPoint,
121123
CTESubstitution,
122124
WindowsSubstitution,
@@ -604,7 +606,11 @@ class Analyzer(
604606

605607
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
606608
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
607-
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
609+
lookupTableFromCatalog(u).canonicalized match {
610+
case v: View =>
611+
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
612+
case other => i.copy(table = other)
613+
}
608614
case u: UnresolvedRelation => resolveRelation(u)
609615
}
610616

@@ -1038,6 +1044,25 @@ class Analyzer(
10381044
}
10391045
}
10401046

1047+
/**
1048+
* Checks whether a function identifier referenced by an [[UnresolvedFunction]] is defined in the
1049+
* function registry. Note that this rule doesn't try to resolve the [[UnresolvedFunction]]. It
1050+
* only performs simple existence check according to the function identifier to quickly identify
1051+
* undefined functions without triggering relation resolution, which may incur potentially
1052+
* expensive partition/schema discovery process in some cases.
1053+
*
1054+
* @see [[ResolveFunctions]]
1055+
* @see https://issues.apache.org/jira/browse/SPARK-19737
1056+
*/
1057+
object LookupFunctions extends Rule[LogicalPlan] {
1058+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions {
1059+
case f: UnresolvedFunction if !catalog.functionExists(f.name) =>
1060+
withPosition(f) {
1061+
throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName)
1062+
}
1063+
}
1064+
}
1065+
10411066
/**
10421067
* Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
10431068
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.catalog
1919

20+
import java.net.URI
21+
2022
import org.apache.hadoop.fs.Path
2123
import org.apache.hadoop.util.Shell
2224

@@ -162,6 +164,30 @@ object CatalogUtils {
162164
BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
163165
}
164166

167+
/**
168+
* Convert URI to String.
169+
* Since URI.toString does not decode the uri, e.g. change '%25' to '%'.
170+
* Here we create a hadoop Path with the given URI, and rely on Path.toString
171+
* to decode the uri
172+
* @param uri the URI of the path
173+
* @return the String of the path
174+
*/
175+
def URIToString(uri: URI): String = {
176+
new Path(uri).toString
177+
}
178+
179+
/**
180+
* Convert String to URI.
181+
* Since new URI(string) does not encode string, e.g. change '%' to '%25'.
182+
* Here we create a hadoop Path with the given String, and rely on Path.toUri
183+
* to encode the string
184+
* @param str the String of the path
185+
* @return the URI of the path
186+
*/
187+
def stringToURI(str: String): URI = {
188+
new Path(str).toUri
189+
}
190+
165191
private def normalizeColumnName(
166192
tableName: String,
167193
tableCols: Seq[String],

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class InMemoryCatalog(
202202
tableDefinition.storage.locationUri.isEmpty
203203

204204
val tableWithLocation = if (needDefaultTableLocation) {
205-
val defaultTableLocation = new Path(catalog(db).db.locationUri, table)
205+
val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
206206
try {
207207
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
208208
fs.mkdirs(defaultTableLocation)
@@ -211,7 +211,7 @@ class InMemoryCatalog(
211211
throw new SparkException(s"Unable to create table $table as failed " +
212212
s"to create its directory $defaultTableLocation", e)
213213
}
214-
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString))
214+
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
215215
} else {
216216
tableDefinition
217217
}
@@ -274,7 +274,7 @@ class InMemoryCatalog(
274274
"Managed table should always have table location, as we will assign a default location " +
275275
"to it if it doesn't have one.")
276276
val oldDir = new Path(oldDesc.table.location)
277-
val newDir = new Path(catalog(db).db.locationUri, newName)
277+
val newDir = new Path(new Path(catalog(db).db.locationUri), newName)
278278
try {
279279
val fs = oldDir.getFileSystem(hadoopConfig)
280280
fs.rename(oldDir, newDir)
@@ -283,7 +283,7 @@ class InMemoryCatalog(
283283
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
284284
s"to rename its directory $oldDir", e)
285285
}
286-
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString))
286+
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
287287
}
288288

289289
catalog(db).tables.put(newName, oldDesc)
@@ -389,7 +389,7 @@ class InMemoryCatalog(
389389

390390
existingParts.put(
391391
p.spec,
392-
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
392+
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))))
393393
}
394394
}
395395

@@ -462,7 +462,7 @@ class InMemoryCatalog(
462462
}
463463
oldPartition.copy(
464464
spec = newSpec,
465-
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
465+
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri)))
466466
} else {
467467
oldPartition.copy(spec = newSpec)
468468
}

0 commit comments

Comments
 (0)