2626import java .util .Collection ;
2727import java .util .Collections ;
2828import java .util .HashMap ;
29+ import java .util .HashSet ;
2930import java .util .List ;
3031import java .util .Map ;
3132import java .util .Properties ;
33+ import java .util .Set ;
3234import java .util .concurrent .CountDownLatch ;
3335import java .util .concurrent .TimeUnit ;
3436import java .util .stream .Collectors ;
7678 * @author Gary Russell
7779 * @author Kamill Sokol
7880 * @author Elliot Kennedy
81+ * @author Nakul Mishra
7982 */
8083public class KafkaEmbedded extends ExternalResource implements KafkaRule , InitializingBean , DisposableBean {
8184
@@ -118,7 +121,7 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule, Initia
118121
119122 private final boolean controlledShutdown ;
120123
121- private final String [] topics ;
124+ private final Set < String > topics ;
122125
123126 private final int partitionsPerTopic ;
124127
@@ -160,10 +163,10 @@ public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, Stri
160163 this .kafkaPorts = new int [this .count ]; // random ports by default.
161164 this .controlledShutdown = controlledShutdown ;
162165 if (topics != null ) {
163- this .topics = topics ;
166+ this .topics = new HashSet <>( Arrays . asList ( topics )) ;
164167 }
165168 else {
166- this .topics = new String [ 0 ] ;
169+ this .topics = new HashSet <>() ;
167170 }
168171 this .partitionsPerTopic = partitions ;
169172 }
@@ -235,19 +238,18 @@ public void before() throws Exception { //NOSONAR
235238 this .kafkaPorts [i ] = TestUtils .boundPort (server , SecurityProtocol .PLAINTEXT );
236239 }
237240 }
238- addTopics (this .topics );
241+ createKafkaTopics (this .topics );
239242 System .setProperty (SPRING_EMBEDDED_KAFKA_BROKERS , getBrokersAsString ());
240243 System .setProperty (SPRING_EMBEDDED_ZOOKEEPER_CONNECT , getZookeeperConnectionString ());
241244 }
242245
243246 /**
244- * Add topics to the existing broker(s) using the configured number of partitions.
247+ * Create topics in the existing broker(s) using the configured number of partitions.
245248 * @param topics the topics.
246- * @since 2.1
247249 */
248- public void addTopics ( String ... topics ) {
250+ private void createKafkaTopics ( Set < String > topics ) {
249251 doWithAdmin (admin -> {
250- List <NewTopic > newTopics = Arrays .stream (topics )
252+ List <NewTopic > newTopics = topics .stream ()
251253 .map (t -> new NewTopic (t , this .partitionsPerTopic , (short ) this .count ))
252254 .collect (Collectors .toList ());
253255 CreateTopicsResult createTopics = admin .createTopics (newTopics );
@@ -260,6 +262,18 @@ public void addTopics(String... topics) {
260262 });
261263 }
262264
265+
266+ /**
267+ * Add topics to the existing broker(s) using the configured number of partitions.
268+ * @param topics the topics.
269+ * @since 2.1
270+ */
271+ public void addTopics (String ... topics ) {
272+ HashSet <String > set = new HashSet <>(Arrays .asList (topics ));
273+ createKafkaTopics (set );
274+ this .topics .addAll (set );
275+ }
276+
263277 /**
264278 * Create an {@link AdminClient} invoke the callback and reliable close the
265279 * admin.
@@ -340,6 +354,10 @@ public void after() {
340354 }
341355 }
342356
357+ public Set <String > getTopics () {
358+ return new HashSet <>(this .topics );
359+ }
360+
343361 @ Override
344362 public List <KafkaServer > getKafkaServers () {
345363 return this .kafkaServers ;
@@ -457,7 +475,7 @@ public boolean isEmbedded() {
457475 * @throws Exception an exception.
458476 */
459477 public void consumeFromAllEmbeddedTopics (Consumer <?, ?> consumer ) throws Exception {
460- consumeFromEmbeddedTopics (consumer , this .topics );
478+ consumeFromEmbeddedTopics (consumer , this .topics . toArray ( new String [ 0 ]) );
461479 }
462480
463481 /**
@@ -477,9 +495,11 @@ public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) th
477495 * @throws Exception an exception.
478496 */
479497 public void consumeFromEmbeddedTopics (Consumer <?, ?> consumer , String ... topics ) throws Exception {
480- for (String topic : topics ) {
481- assertThat (this .topics ).as ("topic '" + topic + "' is not in embedded topic list" ).contains (topic );
482- }
498+ HashSet <String > diff = new HashSet <>(Arrays .asList (topics ));
499+ diff .removeAll (new HashSet <>(this .topics ));
500+ assertThat (this .topics )
501+ .as ("topic(s):'" + diff + "' are not in embedded topic list" )
502+ .containsAll (new HashSet <>(Arrays .asList (topics )));
483503 final CountDownLatch consumerLatch = new CountDownLatch (1 );
484504 consumer .subscribe (Arrays .asList (topics ), new ConsumerRebalanceListener () {
485505
0 commit comments