Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 23de625

Browse files
authored
Merge pull request #34 from InternityFoundation/issue-33
Issue 33 : Added additional constructor for passing an instance of StreamBuilder
2 parents fba1053 + 4735353 commit 23de625

File tree

4 files changed

+69
-82
lines changed

4 files changed

+69
-82
lines changed
Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,74 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
*/
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
44

55
package com.lightbend.kafka.scala.streams
66

77
import java.util.regex.Pattern
88

9-
import ImplicitConversions._
10-
import org.apache.kafka.streams.kstream.{ GlobalKTable, Materialized }
9+
import com.lightbend.kafka.scala.streams.ImplicitConversions._
10+
import org.apache.kafka.common.utils.Bytes
11+
import org.apache.kafka.streams.kstream.{GlobalKTable, Materialized}
1112
import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
12-
import org.apache.kafka.streams.state.{ StoreBuilder, KeyValueStore }
13+
import org.apache.kafka.streams.state.{KeyValueStore, StoreBuilder}
1314
import org.apache.kafka.streams.{Consumed, StreamsBuilder, Topology}
14-
import org.apache.kafka.common.utils.Bytes
1515

1616
import scala.collection.JavaConverters._
1717

1818
/**
19-
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
20-
*/
21-
class StreamsBuilderS {
22-
23-
val inner = new StreamsBuilder
19+
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
20+
*/
21+
class StreamsBuilderS(inner: StreamsBuilder = new StreamsBuilder) {
2422

25-
def stream[K, V](topic: String) : KStreamS[K, V] =
26-
inner.stream[K, V](topic)
23+
def stream[K, V](topic: String): KStreamS[K, V] =
24+
inner.stream[K, V](topic)
2725

28-
def stream[K, V](topic: String, consumed: Consumed[K, V]) : KStreamS[K, V] =
29-
inner.stream[K, V](topic, consumed)
26+
def stream[K, V](topic: String, consumed: Consumed[K, V]): KStreamS[K, V] =
27+
inner.stream[K, V](topic, consumed)
3028

31-
def stream[K, V](topics: List[String]): KStreamS[K, V] =
32-
inner.stream[K, V](topics.asJava)
29+
def stream[K, V](topics: List[String]): KStreamS[K, V] =
30+
inner.stream[K, V](topics.asJava)
3331

3432
def stream[K, V](topics: List[String], consumed: Consumed[K, V]): KStreamS[K, V] =
35-
inner.stream[K, V](topics.asJava, consumed)
33+
inner.stream[K, V](topics.asJava, consumed)
3634

37-
def stream[K, V](topicPattern: Pattern) : KStreamS[K, V] =
35+
def stream[K, V](topicPattern: Pattern): KStreamS[K, V] =
3836
inner.stream[K, V](topicPattern)
3937

40-
def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]) : KStreamS[K, V] =
38+
def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]): KStreamS[K, V] =
4139
inner.stream[K, V](topicPattern, consumed)
4240

43-
def table[K, V](topic: String) : KTableS[K, V] = inner.table[K, V](topic)
41+
def table[K, V](topic: String): KTableS[K, V] = inner.table[K, V](topic)
4442

45-
def table[K, V](topic: String, consumed: Consumed[K, V]) : KTableS[K, V] =
43+
def table[K, V](topic: String, consumed: Consumed[K, V]): KTableS[K, V] =
4644
inner.table[K, V](topic, consumed)
4745

4846
def table[K, V](topic: String, consumed: Consumed[K, V],
49-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
50-
inner.table[K, V](topic, consumed, materialized)
47+
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
48+
inner.table[K, V](topic, consumed, materialized)
5149

52-
def table[K, V](topic: String,
53-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
50+
def table[K, V](topic: String,
51+
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
5452
inner.table[K, V](topic, materialized)
5553

5654
def globalTable[K, V](topic: String): GlobalKTable[K, V] =
5755
inner.globalTable(topic)
5856

59-
def globalTable[K, V](topic: String, consumed: Consumed[K, V]) : GlobalKTable[K, V] =
60-
inner.globalTable(topic, consumed)
57+
def globalTable[K, V](topic: String, consumed: Consumed[K, V]): GlobalKTable[K, V] =
58+
inner.globalTable(topic, consumed)
6159

6260
def globalTable[K, V](topic: String, consumed: Consumed[K, V],
63-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
64-
inner.globalTable(topic, consumed, materialized)
61+
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
62+
inner.globalTable(topic, consumed, materialized)
6563

66-
def globalTable[K, V](topic: String,
67-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
68-
inner.globalTable(topic, materialized)
64+
def globalTable[K, V](topic: String,
65+
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
66+
inner.globalTable(topic, materialized)
6967

7068
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilder = inner.addStateStore(builder)
7169

7270
def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, sourceName: String, consumed: Consumed[_, _], processorName: String, stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilder =
73-
inner.addGlobalStore(storeBuilder,topic,sourceName,consumed,processorName,stateUpdateSupplier)
74-
75-
def build() : Topology = inner.build()
76-
}
77-
71+
inner.addGlobalStore(storeBuilder, topic, sourceName, consumed, processorName, stateUpdateSupplier)
7872

73+
def build(): Topology = inner.build()
74+
}

src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,18 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
*/
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
44

55
package com.lightbend.kafka.scala.streams
66

7-
import minitest.TestSuite
8-
import com.lightbend.kafka.scala.server.{ KafkaLocalServer, MessageSender, MessageListener, RecordProcessorTrait }
9-
10-
import java.util.{ Properties, Locale }
7+
import java.util.Properties
118
import java.util.regex.Pattern
129

13-
import org.apache.kafka.streams.{ KeyValue, StreamsConfig, KafkaStreams, Consumed }
14-
import org.apache.kafka.streams.kstream.{ Materialized, Produced, KeyValueMapper, Printed }
15-
import org.apache.kafka.common.serialization.{ Serdes, StringSerializer, StringDeserializer, Serde, LongDeserializer }
10+
import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, MessageSender, RecordProcessorTrait}
11+
import minitest.TestSuite
1612
import org.apache.kafka.clients.consumer.ConsumerRecord
17-
18-
import scala.concurrent.duration._
19-
20-
import ImplicitConversions._
13+
import org.apache.kafka.common.serialization._
14+
import org.apache.kafka.streams.kstream.Produced
15+
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
2116

2217
object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData {
2318

@@ -45,19 +40,19 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
4540
val streamsConfiguration = new Properties()
4641
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, s"wordcount-${scala.util.Random.nextInt(100)}")
4742
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcountgroup")
48-
43+
4944
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
5045
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
5146
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
5247
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
5348

54-
val builder = new StreamsBuilderS
49+
val builder = new StreamsBuilderS()
5550

5651
val textLines = builder.stream[String, String](inputTopic)
5752

5853
val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
5954

60-
val wordCounts: KTableS[String, Long] =
55+
val wordCounts: KTableS[String, Long] =
6156
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
6257
.groupBy((k, v) => v)
6358
.count()
@@ -70,15 +65,15 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
7065
//
7166
// Step 2: Produce some input data to the input topic.
7267
//
73-
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
68+
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
7469
val mvals = sender.batchWriteValue(inputTopic, inputValues)
7570

7671
//
7772
// Step 3: Verify the application's output data.
7873
//
79-
val listener = MessageListener(brokers, outputTopic, "wordcountgroup",
80-
classOf[StringDeserializer].getName,
81-
classOf[LongDeserializer].getName,
74+
val listener = MessageListener(brokers, outputTopic, "wordcountgroup",
75+
classOf[StringDeserializer].getName,
76+
classOf[LongDeserializer].getName,
8277
new RecordProcessor
8378
)
8479

@@ -90,10 +85,11 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
9085
}
9186

9287
class RecordProcessor extends RecordProcessorTrait[String, Long] {
93-
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
88+
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
9489
// println(s"Get Message $record")
9590
}
9691
}
92+
9793
}
9894

9995
trait WordCountTestData {

src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
* Adapted from Confluent Inc. whose copyright is reproduced below.
4-
*/
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
* Adapted from Confluent Inc. whose copyright is reproduced below.
4+
*/
55

66
/*
77
* Copyright Confluent Inc.
@@ -22,21 +22,15 @@ package com.lightbend.kafka.scala.streams
2222

2323
import java.util.Properties
2424

25-
import minitest.TestSuite
26-
import com.lightbend.kafka.scala.server.{ KafkaLocalServer, MessageSender, MessageListener, RecordProcessorTrait, Utils }
27-
25+
import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, MessageSender, RecordProcessorTrait}
26+
import com.lightbend.kafka.scala.streams.ImplicitConversions._
2827
import com.lightbend.kafka.scala.streams.algebird.{CMSStore, CMSStoreBuilder}
29-
30-
import org.apache.kafka.clients.consumer.ConsumerConfig
31-
import org.apache.kafka.clients.producer.ProducerConfig
28+
import minitest.TestSuite
29+
import org.apache.kafka.clients.consumer.ConsumerRecord
3230
import org.apache.kafka.common.serialization._
33-
import org.apache.kafka.streams.kstream.{KStream, Produced, Transformer, TransformerSupplier}
31+
import org.apache.kafka.streams.kstream.{Produced, Transformer}
3432
import org.apache.kafka.streams.processor.ProcessorContext
35-
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}
36-
import org.apache.kafka.clients.consumer.ConsumerRecord
37-
38-
import collection.JavaConverters._
39-
import ImplicitConversions._
33+
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
4034

4135
/**
4236
* End-to-end integration test that demonstrates how to probabilistically count items in an input stream.
@@ -71,7 +65,7 @@ trait ProbabilisticCountingScalaIntegrationTestData {
7165
)
7266
}
7367

74-
object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalServer]
68+
object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalServer]
7569
with ProbabilisticCountingScalaIntegrationTestData {
7670

7771
override def setup(): KafkaLocalServer = {
@@ -103,7 +97,7 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
10397
p
10498
}
10599

106-
val builder = new StreamsBuilderS
100+
val builder = new StreamsBuilderS()
107101

108102
val cmsStoreName = "cms-store"
109103
val cmsStoreBuilder = {
@@ -159,15 +153,15 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
159153
//
160154
// Step 2: Publish some input text lines.
161155
//
162-
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
156+
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
163157
val mvals = sender.batchWriteValue(inputTopic, inputTextLines)
164158

165159
//
166160
// Step 3: Verify the application's output data.
167161
//
168-
val listener = MessageListener(brokers, outputTopic, "probwordcountgroup",
169-
classOf[StringDeserializer].getName,
170-
classOf[LongDeserializer].getName,
162+
val listener = MessageListener(brokers, outputTopic, "probwordcountgroup",
163+
classOf[StringDeserializer].getName,
164+
classOf[LongDeserializer].getName,
171165
new RecordProcessor
172166
)
173167

@@ -178,8 +172,9 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
178172
}
179173

180174
class RecordProcessor extends RecordProcessorTrait[String, Long] {
181-
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
175+
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
182176
// println(s"Get Message $record")
183177
}
184178
}
179+
185180
}

src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ object StreamToTableJoinScalaIntegrationTest extends TestSuite[KafkaLocalServer]
8181
p
8282
}
8383

84-
val builder = new StreamsBuilderS
84+
val builder = new StreamsBuilderS()
8585

8686
val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic, Consumed.`with`(stringSerde, longSerde))
8787

0 commit comments

Comments
 (0)