|  | 
| 33 | 33 | import java.util.concurrent.TimeoutException; | 
| 34 | 34 | import java.util.concurrent.atomic.AtomicReference; | 
| 35 | 35 | import java.util.stream.Collectors; | 
| 36 |  | -import java.util.stream.Stream; | 
| 37 | 36 | 
 | 
| 38 | 37 | import static io.confluent.csid.utils.LatchTestUtils.awaitLatch; | 
| 39 | 38 | import static io.confluent.csid.utils.StringUtils.msg; | 
| @@ -345,7 +344,7 @@ public List<Integer> getCommitHistoryFlattened() { | 
| 345 | 344 |     public List<OffsetAndMetadata> getCommitHistoryFlattenedMeta() { | 
| 346 | 345 |         return (isUsingTransactionalProducer()) | 
| 347 | 346 |                 ? ktu.getProducerCommitsMeta(producerSpy) | 
| 348 |  | -                : extractAllPartitionsOffsetsSequentiallyMeta(); | 
|  | 347 | +                : extractAllPartitionsOffsetsSequentiallyMeta(true); | 
| 349 | 348 |     } | 
| 350 | 349 | 
 | 
| 351 | 350 |     public void assertCommits(List<Integer> offsets, String description) { | 
| @@ -381,26 +380,26 @@ public void assertCommits(List<Integer> offsets, Optional<String> description) { | 
| 381 | 380 |      * Flattens the offsets of all partitions into a single sequential list | 
| 382 | 381 |      */ | 
| 383 | 382 |     protected List<Integer> extractAllPartitionsOffsetsSequentially(boolean trimGenesis) { | 
| 384 |  | -        return extractAllPartitionsOffsetsSequentiallyMeta().stream(). | 
|  | 383 | +        return extractAllPartitionsOffsetsSequentiallyMeta(trimGenesis).stream(). | 
| 385 | 384 |                 map(x -> (int) x.offset()) // int cast a luxury in test context - no big offsets | 
| 386 | 385 |                 .collect(Collectors.toList()); | 
| 387 | 386 |     } | 
| 388 | 387 | 
 | 
| 389 | 388 |     /** | 
| 390 | 389 |      * Flattens the offsets of all partitions into a single sequential list | 
| 391 | 390 |      */ | 
| 392 |  | -    protected List<OffsetAndMetadata> extractAllPartitionsOffsetsSequentiallyMeta() { | 
|  | 391 | +    protected List<OffsetAndMetadata> extractAllPartitionsOffsetsSequentiallyMeta(boolean trimGenesis) { | 
| 393 | 392 |         // copy the list for safe concurrent access | 
| 394 | 393 |         List<Map<TopicPartition, OffsetAndMetadata>> history = new ArrayList<>(consumerSpy.getCommitHistoryInt()); | 
| 395 | 394 |         return history.stream() | 
| 396 | 395 |                 .flatMap(commits -> | 
| 397 | 396 |                         { | 
| 398 |  | -                            Collection<OffsetAndMetadata> values = new ArrayList<>(commits.values()); // 4 debugging | 
| 399 |  | -                            Stream<Integer> rawOffsets = values.stream().map(meta -> (int) meta.offset()); | 
|  | 397 | +                            var rawValues = new ArrayList<>(commits.values()).stream(); // 4 debugging | 
|  | 398 | +//                            Stream<Integer> rawOffsets = values.stream().map(meta -> (int) meta.offset()); | 
| 400 | 399 |                             if (trimGenesis) | 
| 401 |  | -                                return rawOffsets.filter(x -> x != 0); | 
|  | 400 | +                                return rawValues.filter(x -> x.offset() != 0); | 
| 402 | 401 |                             else | 
| 403 |  | -                                return rawOffsets; // int cast a luxury in test context - no big offsets | 
|  | 402 | +                                return rawValues; // int cast a luxury in test context - no big offsets | 
| 404 | 403 |                         } | 
| 405 | 404 |                 ).collect(Collectors.toList()); | 
| 406 | 405 |     } | 
|  | 
0 commit comments