Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import typing
from re import compile as re_compile

Expand All @@ -30,6 +31,62 @@
from opentelemetry.trace import format_span_id, format_trace_id


class PropagatorSamplingArbiterInterface(abc.ABC):
"""Interface for classes that can arbitrate the default sampling decision whenever
explicit sampling information is absent from the carrier.

NOTE: This is consistent with B3 spec, which calls to "defer the decision
to the receiver" whenever explicit sampling is absent from the carrier
(https://github.com/openzipkin/b3-propagation)

NOTE: This is distinct from the OpenTelemetry `Sampler` interface, which
makes sampling decisions when starting new spans. This interface is
specifically for making sampling decisions when `B3MultiFormat` determines
that sampled state is missing from carrier. And this interface accepts a
different set of arguments than the `Sampler` interface.
"""

@abc.abstractmethod
def should_sample(
self,
carrier: CarrierT,
context: typing.Optional[Context],
getter: Getter,
) -> bool:
"""Determine whether the incoming trace should be sampled. This is invoked only when the
carrier does not contain explicit sampling information.

Same parameters as were passed to `B3MultiFormat.extract`
`
:return: The sampling decision: True to sample, False to not sample.
"""
pass


class StaticPropagatorSamplingArbiter(PropagatorSamplingArbiterInterface):
"""A simple stateless PropagatorSamplingArbiterInterface implementation that always returns
the same static sampling decision.
"""

def __init__(self, should_sample: bool):
self._should_sample = should_sample

def should_sample(
self,
carrier: CarrierT,
context: typing.Optional[Context],
getter: Getter,
) -> bool:
return self._should_sample


# A stateless arbiter that always defaults missing sample state to sampling Deny
default_to_deny_arbiter = StaticPropagatorSamplingArbiter(False)

# A stateless arbiter that always defaults missing sample state to sampling Accept
default_to_accept_arbiter = StaticPropagatorSamplingArbiter(True)


class B3MultiFormat(TextMapPropagator):
"""Propagator for the B3 HTTP multi-header format.

Expand All @@ -46,27 +103,50 @@ class B3MultiFormat(TextMapPropagator):
_trace_id_regex = re_compile(r"[\da-fA-F]{16}|[\da-fA-F]{32}")
_span_id_regex = re_compile(r"[\da-fA-F]{16}")

def __init__(
self, *,
sampling_arbiter: typing.Optional[PropagatorSamplingArbiterInterface] = None):
"""
NOTE: The B3 spec calls for "defer the decision to the receiver" whenever
explicit sampling information is absent from the carrier; in this scenario
we will invoke the given Propagator Sampling Arbiter to make the sampling
decision. (https://github.com/openzipkin/b3-propagation)

:param sampling_arbiter: A user-supplied arbiter instance to decide the sampled state
for our `extract()` method whenever sampling information is absent from carrier.
If None, falls back to the legacy behavior for backward compatibility: defaults
decision to Accept for single-header and to Deny for multi-header (in the
event explicit sampling information is absent from the carrier).
"""
super().__init__()
if sampling_arbiter is not None:
self._single_sampling_arbiter = self._multi_sampling_arbiter = sampling_arbiter
else:
# This reflects the original hard-coded defaults for backward compatibility
self._single_sampling_arbiter = default_to_accept_arbiter
self._multi_sampling_arbiter = default_to_deny_arbiter

def extract(
self,
carrier: CarrierT,
context: typing.Optional[Context] = None,
getter: Getter = default_getter,
) -> Context:
""" Extracts SpanContext from the carrier."""
original_context = context
if context is None:
context = Context()
trace_id = trace.INVALID_TRACE_ID
span_id = trace.INVALID_SPAN_ID
sampled = "0"
sampled = None
flags = None

single_header = _extract_first_element(
getter.get(carrier, self.SINGLE_HEADER_KEY)
)
if single_header:
# The b3 spec calls for the sampling state to be
# "deferred", which is unspecified. This concept does not
# translate to SpanContext, so we set it as recorded.
Copy link
Author

@vitaly-krugl vitaly-krugl Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The b3 spec calls for the sampling state to be deferred "to the receiver" whenever remote doesn't supply it. This pull request facilitates allowing the receiver to make the decision in this scenario.

See https://github.com/openzipkin/b3-propagation

sampled = "1"
sampling_arbiter = self._single_sampling_arbiter

fields = single_header.split("-", 4)

if len(fields) == 1:
Expand All @@ -78,6 +158,8 @@ def extract(
elif len(fields) == 4:
trace_id, span_id, sampled, _ = fields
else:
sampling_arbiter = self._multi_sampling_arbiter

trace_id = (
_extract_first_element(getter.get(carrier, self.TRACE_ID_KEY))
or trace_id
Expand Down Expand Up @@ -112,11 +194,16 @@ def extract(
# header is set to allow.
if sampled in self._SAMPLE_PROPAGATE_VALUES or flags == "1":
options |= trace.TraceFlags.SAMPLED
elif sampled is None and sampling_arbiter.should_sample(
carrier=carrier,
context=original_context,
getter=getter):
options |= trace.TraceFlags.SAMPLED

return trace.set_span_in_context(
trace.NonRecordingSpan(
trace.SpanContext(
# trace an span ids are encoded in hex, so must be converted
# trace and span ids are encoded in hex, so must be converted
trace_id=trace_id,
span_id=span_id,
is_remote=True,
Expand Down