Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.flume

import java.net.InetSocketAddress
Expand Down Expand Up @@ -213,7 +212,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
assert(counter === totalEventsPerChannel * channels.size)
}

def assertChannelIsEmpty(channel: MemoryChannel) = {
def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
queueRemaining.setAccessible(true)
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
}

/** Class to create socket channel with compression */
private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
private class CompressionChannelFactory(compressionLevel: Int)
extends NioClientSocketChannelFactory {

override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
val encoder = new ZlibEncoder(compressionLevel)
pipeline.addFirst("deflater", encoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
msgTopic.publish(message)
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Thread.sleep(50) // wait for Spark streaming to consume something from the message queue
// wait for Spark streaming to consume something from the message queue
Thread.sleep(50)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("flatMapValues") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10)),
(s: DStream[String]) => {
s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10))
},
Seq( Seq(("a", 2), ("a", 12), ("b", 1), ("b", 11)), Seq(("", 2), ("", 12)), Seq() ),
true
)
Expand Down Expand Up @@ -474,7 +476,7 @@ class BasicOperationsSuite extends TestSuiteBase {
stream.foreachRDD(_ => {}) // Dummy output stream
ssc.start()
Thread.sleep(2000)
def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
def getInputFromSlice(fromMillis: Long, toMillis: Long): Set[Int] = {
stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CheckpointSuite extends TestSuiteBase {

var ssc: StreamingContext = null

override def batchDuration = Milliseconds(500)
override def batchDuration: Duration = Milliseconds(500)

override def beforeFunction() {
super.beforeFunction()
Expand Down Expand Up @@ -72,7 +72,7 @@ class CheckpointSuite extends TestSuiteBase {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some((values.sum + state.getOrElse(0)))
Some(values.sum + state.getOrElse(0))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)
Expand Down Expand Up @@ -199,7 +199,12 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ),
Seq(
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq(),
Seq(("a", 2), ("b", 1)),
Seq(("", 2)), Seq() ),
3
)
}
Expand All @@ -212,7 +217,8 @@ class CheckpointSuite extends TestSuiteBase {
val n = 10
val w = 4
val input = (1 to n).map(_ => Seq("a")).toSeq
val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
val output = Seq(
Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
val operation = (st: DStream[String]) => {
st.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
Expand All @@ -236,7 +242,13 @@ class CheckpointSuite extends TestSuiteBase {
classOf[TextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
Seq(
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq(),
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq()),
3
)
} finally {
Expand All @@ -259,7 +271,13 @@ class CheckpointSuite extends TestSuiteBase {
classOf[NewTextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
Seq(
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq(),
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq()),
3
)
} finally {
Expand Down Expand Up @@ -298,7 +316,13 @@ class CheckpointSuite extends TestSuiteBase {
output
}
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
Seq(
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq(),
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq()),
3
)
} finally {
Expand Down Expand Up @@ -533,7 +557,8 @@ class CheckpointSuite extends TestSuiteBase {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] =
{
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
for (i <- 1 to numBatches.toInt) {
Expand All @@ -543,7 +568,7 @@ class CheckpointSuite extends TestSuiteBase {
logInfo("Manual clock after advancing = " + clock.getTimeMillis())
Thread.sleep(batchDuration.milliseconds)

val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output.map(_.flatten)
Expand All @@ -552,4 +577,4 @@ class CheckpointSuite extends TestSuiteBase {

private object CheckpointSuite extends Serializable {
var batchThreeShouldBlockIndefinitely: Boolean = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class FailureSuite extends TestSuiteBase with Logging {
val directory = Utils.createTempDir()
val numBatches = 30

override def batchDuration = Milliseconds(1000)
override def batchDuration: Duration = Milliseconds(1000)

override def useManualClock = false
override def useManualClock: Boolean = false

override def afterFunction() {
Utils.deleteRecursively(directory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
def output: ArrayBuffer[String] = outputBuffer.flatMap(x => x)
outputStream.register()
ssc.start()

Expand Down Expand Up @@ -164,7 +164,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
outputStream.register()
ssc.start()

Expand Down Expand Up @@ -196,15 +196,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(queueStream, outputBuffer)
def output = outputBuffer.filter(_.size > 0)
def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
outputStream.register()
ssc.start()

// Setup data queued into the stream
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
//Thread.sleep(1000)

val inputIterator = input.toIterator
for (i <- 0 until input.size) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
Expand Down Expand Up @@ -239,7 +239,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(queueStream, outputBuffer)
def output = outputBuffer.filter(_.size > 0)
def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
outputStream.register()
ssc.start()

Expand Down Expand Up @@ -352,7 +352,8 @@ class TestServer(portToBind: Int = 0) extends Logging {
logInfo("New connection")
try {
clientSocket.setTcpNoDelay(true)
val outputStream = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream))
val outputStream = new BufferedWriter(
new OutputStreamWriter(clientSocket.getOutputStream))

while(clientSocket.isConnected) {
val msg = queue.poll(100, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -384,7 +385,7 @@ class TestServer(portToBind: Int = 0) extends Logging {

def stop() { servingThread.interrupt() }

def port = serverSocket.getLocalPort
def port: Int = serverSocket.getLocalPort
}

/** This is a receiver to test multiple threads inserting data using block generator */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
// Verify the data in block manager is correct
val storedData = blockIds.flatMap { blockId =>
blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
}.toList
storedData shouldEqual data

Expand All @@ -120,7 +120,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
// Verify the data in block manager is correct
val storedData = blockIds.flatMap { blockId =>
blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
}.toList
storedData shouldEqual data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ class ReceivedBlockTrackerSuite
* Get all the data written in the given write ahead log files. By default, it will read all
* files in the test log directory.
*/
def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles): Seq[ReceivedBlockTrackerLogEvent] = {
def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles)
: Seq[ReceivedBlockTrackerLogEvent] = {
logFiles.flatMap {
file => new WriteAheadLogReader(file, hadoopConf).toSeq
}.map { byteBuffer =>
Expand All @@ -244,7 +245,8 @@ class ReceivedBlockTrackerSuite
}

/** Create batch allocation object from the given info */
def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo]): BatchAllocationEvent = {
def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo])
: BatchAllocationEvent = {
BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
val errors = new ArrayBuffer[Throwable]

/** Check if all data structures are clean */
def isAllEmpty = {
def isAllEmpty: Boolean = {
singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
arrayBuffers.isEmpty && errors.isEmpty
}
Expand All @@ -320,24 +320,21 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
optionalBlockId: Option[StreamBlockId]) {
byteBuffers += bytes
}

def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
optionalBlockId: Option[StreamBlockId]) {
iterators += iterator
}

def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
optionalBlockId: Option[StreamBlockId]) {
arrayBuffers += arrayBuffer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
SlowTestReceiver.receivedAllRecords = false
//Create test receiver that sleeps in onStop()
// Create test receiver that sleeps in onStop()
val totalNumRecords = 15
val recordsPerSecond = 1
val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond))
Expand Down Expand Up @@ -370,7 +370,8 @@ object TestReceiver {
}

/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {

var receivingThreadOption: Option[Thread] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {

// To make sure that the processing start and end times in collected
// information are different for successive batches
override def batchDuration = Milliseconds(100)
override def actuallyWait = true
override def batchDuration: Duration = Milliseconds(100)
override def actuallyWait: Boolean = true

test("batch info reporting") {
val ssc = setupStreams(input, operation)
Expand Down
Loading