Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 70cbcdb

Browse files
committed
Reverted back the tests and made changes according to the comments
1 parent e951acb commit 70cbcdb

File tree

5 files changed

+34
-40
lines changed

5 files changed

+34
-40
lines changed

project/build.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version=1.0.3
Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,74 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
*/
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
44

55
package com.lightbend.kafka.scala.streams
66

77
import java.util.regex.Pattern
88

9-
import ImplicitConversions._
10-
import org.apache.kafka.streams.kstream.{ GlobalKTable, Materialized }
9+
import com.lightbend.kafka.scala.streams.ImplicitConversions._
10+
import org.apache.kafka.common.utils.Bytes
11+
import org.apache.kafka.streams.kstream.{GlobalKTable, Materialized}
1112
import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
12-
import org.apache.kafka.streams.state.{ StoreBuilder, KeyValueStore }
13+
import org.apache.kafka.streams.state.{KeyValueStore, StoreBuilder}
1314
import org.apache.kafka.streams.{Consumed, StreamsBuilder, Topology}
14-
import org.apache.kafka.common.utils.Bytes
1515

1616
import scala.collection.JavaConverters._
1717

1818
/**
19-
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
20-
*/
21-
class StreamsBuilderS(inner:StreamsBuilder) {
19+
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
20+
*/
21+
class StreamsBuilderS(inner: StreamsBuilder = new StreamsBuilder) {
2222

23-
def stream[K, V](topic: String) : KStreamS[K, V] =
24-
inner.stream[K, V](topic)
23+
def stream[K, V](topic: String): KStreamS[K, V] =
24+
inner.stream[K, V](topic)
2525

26-
def stream[K, V](topic: String, consumed: Consumed[K, V]) : KStreamS[K, V] =
27-
inner.stream[K, V](topic, consumed)
26+
def stream[K, V](topic: String, consumed: Consumed[K, V]): KStreamS[K, V] =
27+
inner.stream[K, V](topic, consumed)
2828

2929
def stream[K, V](topics: List[String]): KStreamS[K, V] =
30-
inner.stream[K, V](topics.asJava)
30+
inner.stream[K, V](topics.asJava)
3131

3232
def stream[K, V](topics: List[String], consumed: Consumed[K, V]): KStreamS[K, V] =
33-
inner.stream[K, V](topics.asJava, consumed)
33+
inner.stream[K, V](topics.asJava, consumed)
3434

35-
def stream[K, V](topicPattern: Pattern) : KStreamS[K, V] =
35+
def stream[K, V](topicPattern: Pattern): KStreamS[K, V] =
3636
inner.stream[K, V](topicPattern)
3737

38-
def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]) : KStreamS[K, V] =
38+
def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]): KStreamS[K, V] =
3939
inner.stream[K, V](topicPattern, consumed)
4040

41-
def table[K, V](topic: String) : KTableS[K, V] = inner.table[K, V](topic)
41+
def table[K, V](topic: String): KTableS[K, V] = inner.table[K, V](topic)
4242

43-
def table[K, V](topic: String, consumed: Consumed[K, V]) : KTableS[K, V] =
43+
def table[K, V](topic: String, consumed: Consumed[K, V]): KTableS[K, V] =
4444
inner.table[K, V](topic, consumed)
4545

4646
def table[K, V](topic: String, consumed: Consumed[K, V],
47-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
48-
inner.table[K, V](topic, consumed, materialized)
47+
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
48+
inner.table[K, V](topic, consumed, materialized)
4949

5050
def table[K, V](topic: String,
51-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
51+
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
5252
inner.table[K, V](topic, materialized)
5353

5454
def globalTable[K, V](topic: String): GlobalKTable[K, V] =
5555
inner.globalTable(topic)
5656

57-
def globalTable[K, V](topic: String, consumed: Consumed[K, V]) : GlobalKTable[K, V] =
58-
inner.globalTable(topic, consumed)
57+
def globalTable[K, V](topic: String, consumed: Consumed[K, V]): GlobalKTable[K, V] =
58+
inner.globalTable(topic, consumed)
5959

6060
def globalTable[K, V](topic: String, consumed: Consumed[K, V],
61-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
62-
inner.globalTable(topic, consumed, materialized)
61+
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
62+
inner.globalTable(topic, consumed, materialized)
6363

6464
def globalTable[K, V](topic: String,
65-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
66-
inner.globalTable(topic, materialized)
65+
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
66+
inner.globalTable(topic, materialized)
6767

6868
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilder = inner.addStateStore(builder)
6969

7070
def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, sourceName: String, consumed: Consumed[_, _], processorName: String, stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilder =
71-
inner.addGlobalStore(storeBuilder,topic,sourceName,consumed,processorName,stateUpdateSupplier)
72-
73-
def build() : Topology = inner.build()
74-
}
75-
76-
object StreamsBuilderS {
77-
78-
def apply() = new StreamsBuilderS(new StreamsBuilder)
71+
inner.addGlobalStore(storeBuilder, topic, sourceName, consumed, processorName, stateUpdateSupplier)
7972

80-
def apply(streamsBuilder: StreamsBuilder)= new StreamsBuilderS(streamsBuilder)
73+
def build(): Topology = inner.build()
8174
}

src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
4646
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
4747
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
4848

49-
val builder = StreamsBuilderS()
49+
val builder = new StreamsBuilderS()
5050

5151
val textLines = builder.stream[String, String](inputTopic)
5252

src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
9797
p
9898
}
9999

100-
val builder = StreamsBuilderS()
100+
val builder = new StreamsBuilderS()
101101

102102
val cmsStoreName = "cms-store"
103103
val cmsStoreBuilder = {

src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ object StreamToTableJoinScalaIntegrationTest extends TestSuite[KafkaLocalServer]
8181
p
8282
}
8383

84-
val builder = StreamsBuilderS()
84+
val builder = new StreamsBuilderS()
8585

8686
val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic, Consumed.`with`(stringSerde, longSerde))
8787

0 commit comments

Comments
 (0)