Skip to content

Conversation

@sobychacko
Copy link
Contributor

Fixes: #3786
Issue link: #3786

When tracing is enabled, the KafkaRecordSenderContext was adding a new traceparent header without removing existing ones, resulting in multiple traceparent headers in the same record. This commit fixes the issue by Updating KafkaRecordSenderContext to remove existing traceparent headers before adding new ones.

Auto-cherry-pick to 3.3.x & 3.2.x

…header

Fixes: spring-projects#3786
Issue link: spring-projects#3786

When tracing is enabled, the KafkaRecordSenderContext was adding a new
traceparent header without removing existing ones, resulting in multiple
traceparent headers in the same record. This commit fixes the issue by
Updating KafkaRecordSenderContext to remove existing traceparent headers
before adding new ones.

**Auto-cherry-pick to `3.3.x` & `3.2.x`**

Signed-off-by: Soby Chacko <[email protected]>
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some review.
Thanks

super((carrier, key, value) -> {
Headers headers = record.headers();
// For traceparent context headers, ensure there's only one
if ("traceparent".equals(key)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct.
Different vendors provides different header name.
I have so far know here: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis/src/main/java/org/springframework/cloud/stream/binder/kinesis/KinesisMessageChannelBinder.java#L575-L577.

But that does not mean that anything else custom could be provided from the context supplier.

I think it is totally safe to replace whatever was asked from the supplier.
The point is to not mislead with extra entry from what might came from the previous observation.
We have to provide here what can be set from keys supplier.

Iterable<Header> traceparentHeaders = record.headers().headers("traceparent");
List<String> headerValues = StreamSupport.stream(traceparentHeaders.spliterator(), false)
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
.collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason in this. Just toList() is enough.

}

// https://github.com/spring-cloud/spring-cloud-stream/issues/3095#issuecomment-2707075861
// https://github.com/spring-projects/spring-kafka/issues/3786
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a reason in this comments.
We always can find out that from the commit history.

super((carrier, key, value) -> {
Headers headers = record.headers();
Iterable<Header> existingHeaders = headers.headers(key);
if (existingHeaders.iterator().hasNext()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this check?
Why just plain remove() not enough?

@artembilan artembilan merged commit 3a4e45b into spring-projects:main Mar 13, 2025
2 of 3 checks passed
@artembilan
Copy link
Member

Auto-cherry-pick has failed.
Please, take look.
Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka ProducerRecord may end up with duplicated trace headers

2 participants