Skip to content

Commit c9d5ba9

Browse files
committed
Address
1 parent 20b01d4 commit c9d5ba9

File tree

3 files changed

+7
-7
lines changed

3 files changed

+7
-7
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
102102
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
103103

104104
private val offsetFetchAttemptIntervalMs =
105-
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "100").toLong
105+
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
106106

107107
private val maxOffsetsPerTrigger =
108108
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -814,8 +814,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
814814

815815
override def createSparkSession(): TestSparkSession = {
816816
// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
817-
new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context",
818-
sparkConf.set("spark.sql.testkey", "true")))
817+
new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
819818
}
820819

821820
override def beforeAll(): Unit = {

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,8 @@ class KafkaTestUtils extends Logging {
286286
props
287287
}
288288

289-
/** Assert topic is deleted in all places, e.g, brokers, zookeeper. */
290-
private def assertTopicDeleted(
289+
/** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
290+
private def verifyTopicDeletion(
291291
topic: String,
292292
numPartitions: Int,
293293
servers: Seq[KafkaServer]): Unit = {
@@ -320,14 +320,15 @@ class KafkaTestUtils extends Logging {
320320
s"topic $topic still exists on zookeeper")
321321
}
322322

323-
private def verifyTopicDeletion(
323+
/** Verify topic is deleted. Retry to delete the topic if not. */
324+
private def verifyTopicDeletionWithRetries(
324325
zkUtils: ZkUtils,
325326
topic: String,
326327
numPartitions: Int,
327328
servers: Seq[KafkaServer]) {
328329
eventually(timeout(60.seconds), interval(200.millis)) {
329330
try {
330-
assertTopicDeleted(topic, numPartitions, servers)
331+
verifyTopicDeletion(topic, numPartitions, servers)
331332
} catch {
332333
case e: Throwable =>
333334
// As pushing messages into Kafka updates Zookeeper asynchronously, there is a small

0 commit comments

Comments
 (0)