Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.zookeeper.client.ZKClientConfig

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection._

Expand Down Expand Up @@ -297,6 +298,7 @@ object ConfigCommand extends Config {
}
}

@nowarn("cat=deprecation")
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ class Log(@volatile private var _dir: File,

def newLeaderEpochFileCache(): LeaderEpochFileCache = {
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
new LeaderEpochFileCache(topicPartition, () => logEndOffset, checkpointFile)
}

if (recordVersion.precedes(RecordVersion.V2)) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}

import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -106,7 +107,7 @@ object RequestChannel extends Logging {

def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"

def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
def body[T <: AbstractRequest](implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
bodyAndSize.request match {
case r: T => r
case r =>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
* Wakeup the thread for selection.
*/
@Override
def wakeup = nioSelector.wakeup()
def wakeup(): Unit = nioSelector.wakeup()

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.zookeeper.client.ZKClientConfig

import scala.collection.{mutable, Seq}
import scala.annotation.nowarn
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}

Expand Down Expand Up @@ -249,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
} catch {
case e: Exception =>
resourceBindingsBeingDeleted.foreach { case (binding, index) =>
resourceBindingsBeingDeleted.keys.foreach { binding =>
deleteExceptions.getOrElseUpdate(binding, apiException(e))
}
}
Expand All @@ -263,6 +264,7 @@ class AclAuthorizer extends Authorizer with Logging {
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
}

@nowarn("cat=optimizer")
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
val aclBindings = new util.ArrayList[AclBinding]()
aclCache.foreach { case (resource, versionedAcls) =>
Expand Down Expand Up @@ -342,6 +344,7 @@ class AclAuthorizer extends Authorizer with Logging {
} else false
}

@nowarn("cat=deprecation")
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
// save aclCache reference to a local val to get a consistent view of the cache during acl updates.
val aclCacheSnapshot = aclCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}

import scala.annotation.nowarn


object AuthorizerUtils {

@nowarn("cat=deprecation")
def createAuthorizer(className: String): Authorizer = {
Utils.newInstance(className, classOf[Object]) match {
case auth: Authorizer => auth
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger

import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.ElectLeadersRequestOps
import kafka.api.LeaderAndIsr
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Utils

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -575,6 +576,7 @@ class ChecksumMessageFormatter extends MessageFormatter {
topicStr = ""
}

@nowarn("cat=deprecation")
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
output.println(topicStr + "checksum:" + consumerRecord.checksum)
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
Expand Down Expand Up @@ -190,6 +191,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {

setName(threadName)

@nowarn("cat=deprecation")
private def toBaseConsumerRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord =
BaseConsumerRecord(record.topic,
record.partition,
Expand Down Expand Up @@ -412,10 +414,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
* If message.handler.args is specified. A constructor that takes in a String as argument must exist.
*/
trait MirrorMakerMessageHandler {
@nowarn("cat=deprecation")
def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
}

private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
@nowarn("cat=deprecation")
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, record.headers))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/utils/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {

def hasNext: Boolean = iter.hasNext

def next: (K, V) = {
def next(): (K, V) = {
val n = iter.next
(n.getKey, n.getValue)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class ZooKeeperClient(connectString: String,
case GetDataRequest(path, ctx) =>
zooKeeper.getData(path, shouldWatch(request), new DataCallback {
def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))),
callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
case GetChildrenRequest(path, _, ctx) =>
zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.junit.{After, Before, Rule, Test}
import org.junit.rules.Timeout
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -92,6 +93,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, client)
}

@nowarn("cat=deprecation")
@Test
def testInvalidAlterConfigsDueToPolicy(): Unit = {
client = Admin.create(createConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
Expand Down Expand Up @@ -949,6 +950,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(consumer)
}

@nowarn("cat=deprecation")
@Test
def testPatternSubscriptionWithNoTopicAccess(): Unit = {
createTopic(topic)
Expand Down Expand Up @@ -985,6 +987,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}

@nowarn("cat=deprecation")
@Test
def testPatternSubscriptionWithTopicAndGroupRead(): Unit = {
createTopic(topic)
Expand Down Expand Up @@ -1016,6 +1019,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertTrue(consumer.assignment().isEmpty)
}

@nowarn("cat=deprecation")
@Test
def testPatternSubscriptionMatchingInternalTopic(): Unit = {
createTopic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions.fail

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
import scala.concurrent.ExecutionException
Expand Down Expand Up @@ -102,6 +103,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
* 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
* 2. Last message of the non-blocking send should return the correct offset metadata
*/
@nowarn("cat=deprecation")
@Test
def testSendOffset(): Unit = {
val producer = createProducer(brokerList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinator
import org.junit.Assert._
import org.junit.{After, Ignore, Test}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}

Expand Down Expand Up @@ -83,6 +84,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* 1. Produce a bunch of messages
* 2. Then consume the messages while killing and restarting brokers at random
*/
@nowarn("cat=deprecation")
def consumeWithBrokerFailures(numIters: Int): Unit = {
val numRecords = 1000
val producer = createProducer()
Expand Down Expand Up @@ -379,6 +381,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
checkCloseDuringRebalance("group1", topic, executor, true)
}

@nowarn("cat=deprecation")
private def checkCloseDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = {

def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -2176,6 +2177,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
/**
* The AlterConfigs API is deprecated and should not support altering log levels
*/
@nowarn("cat=deprecation")
@Test
@Ignore // To be re-enabled once KAFKA-8779 is resolved
def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
Expand Down Expand Up @@ -2227,6 +2229,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {

object PlaintextAdminIntegrationTest {

@nowarn("cat=deprecation")
def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
// Alter topics
var topicConfigEntries1 = Seq(
Expand Down Expand Up @@ -2289,6 +2292,7 @@ object PlaintextAdminIntegrationTest {
assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
}

@nowarn("cat=deprecation")
def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: Admin): Unit = {
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
Expand Down Expand Up @@ -2356,12 +2360,12 @@ object PlaintextAdminIntegrationTest {

assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.CompressionType.toString,
assertEquals(Defaults.CompressionType,
configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)

assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)

assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
assertEquals(Defaults.CompressionType, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{After, Assert, Before, Test}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
Expand All @@ -46,6 +47,7 @@ abstract class AuthorizationAdmin {
// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
@nowarn("cat=deprecation")
val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -1324,6 +1325,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}.mkString(",")
}

@nowarn("cat=deprecation")
private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
val configs = servers.map { server =>
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
Expand All @@ -1350,6 +1352,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
}

@nowarn("cat=deprecation")
private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
Expand All @@ -1358,6 +1361,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v) }
}

@nowarn("cat=deprecation")
private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
perBrokerConfig: Boolean): AlterConfigsResult = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
.map(KafkaConfig.fromProps(_, overridingProps))

@Before
override def setUp: Unit = {
override def setUp(): Unit = {
// Do some Metrics Registry cleanup by removing the metrics that this test checks.
// This is a test workaround to the issue that prior harness runs may have left a populated registry.
// see https://issues.apache.org/jira/browse/KAFKA-4605
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsRes
import org.junit.Assert._
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn

class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
Expand Down Expand Up @@ -347,6 +349,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
}

@nowarn("cat=deprecation")
private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = {
val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import scala.util.Random
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.InvalidOffsetException

import scala.annotation.nowarn

class OffsetIndexTest {

var idx: OffsetIndex = null
Expand All @@ -47,7 +49,8 @@ class OffsetIndexTest {
if(this.idx != null)
this.idx.file.delete()
}


@nowarn("cat=deprecation")
@Test
def randomLookupTest(): Unit = {
assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L))
Expand Down
Loading