Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
95c6f1c
Globmatcher done without cache matching, Datadogsampler scaffolding
ZStriker19 Apr 10, 2025
dcbb668
add globmatcher cache
ZStriker19 Apr 10, 2025
30f633c
remove original pattern from glob_matcher struct
ZStriker19 Apr 10, 2025
cd2f80e
add constants file and update samplers to use
ZStriker19 Apr 14, 2025
1666972
determine sampling mechanism, add constants for determining _sampling…
ZStriker19 Apr 15, 2025
98da39a
assign sampling_priority_v1 and _dd.p.dm to attributes returned
ZStriker19 Apr 15, 2025
2178084
refactor and make code simpler
ZStriker19 Apr 15, 2025
0a8cff3
implement logic and adding of _dd.rule.psr and _dd.agent.psr
ZStriker19 Apr 16, 2025
fe55dd4
add configuration of DatadogSampler and rules from json
ZStriker19 Apr 18, 2025
fdbbe9a
remove unneeded rate_limit_always on config
ZStriker19 Apr 18, 2025
413f0c1
add ratelimiter implementation
ZStriker19 Apr 18, 2025
869ee8f
refactor and add utils
ZStriker19 Apr 18, 2025
c6e2196
Add ratelimiter to DatadogSampler with proper logic and tags
ZStriker19 Apr 18, 2025
fe2cb30
Merge branch 'main' into zachg/datadog_sampler
ZStriker19 Apr 21, 2025
84a4f93
initialize DatadogSampler and pass to tracer_provider_builder
ZStriker19 Apr 21, 2025
8d04c65
add tests, refactor
ZStriker19 Apr 21, 2025
75733b0
linting
ZStriker19 Apr 21, 2025
2f1b51b
fmting
ZStriker19 Apr 21, 2025
a2ab01a
fix compile error
ZStriker19 Apr 22, 2025
7b61b31
fmt
ZStriker19 Apr 22, 2025
b6e5a2e
constant value update
ZStriker19 Apr 22, 2025
13868f2
Merge branch 'main' into zachg/datadog_sampler
ZStriker19 Apr 22, 2025
822cc69
attempt adding configuration
ZStriker19 Apr 24, 2025
9d0bad7
improve attribute checking performance for matches method
ZStriker19 Apr 25, 2025
8fef32d
fix copyright date
ZStriker19 Apr 25, 2025
5b5281b
short circuits for matching
ZStriker19 Apr 25, 2025
59f20e5
handle floats in attributes properly
ZStriker19 Apr 25, 2025
c25ed06
mimize logic needed in lock
ZStriker19 Apr 25, 2025
fb2f03d
comments
ZStriker19 Apr 28, 2025
79e3efa
Merge branch 'main' into zachg/datadog_sampler
ZStriker19 Apr 28, 2025
64a1874
add in attribute conversions
ZStriker19 Apr 29, 2025
7ac748d
short circuit attribute mapping
ZStriker19 Apr 30, 2025
8680d03
add otel dd conversion for name value
ZStriker19 Apr 30, 2025
a742f0d
clippy
ZStriker19 Apr 30, 2025
b054dbf
rework getting resource and move getting operation name to as late as…
ZStriker19 May 1, 2025
f62a5f6
stopping point
ZStriker19 May 1, 2025
99bd86b
implement resource conversion
ZStriker19 May 2, 2025
670d83d
remove unused methods from otel_utils and fix test
ZStriker19 May 2, 2025
4f9b30d
add back in resource to otel attribute conversion methods, add resour…
ZStriker19 May 5, 2025
582598f
Merge branch 'main' into zachg/datadog_sampler
ZStriker19 May 5, 2025
f0657ce
attempt at using an arc to make a shared resource, named resource bet…
ZStriker19 May 5, 2025
72fdb51
Update datadog-opentelemetry/src/sampler.rs
ZStriker19 May 6, 2025
13d2e13
res arc sharing, service conversion tests finished
ZStriker19 May 7, 2025
bc96561
remove sorting rules by provenance
ZStriker19 May 7, 2025
345d296
enable adding resource on DatadogSampler new config
ZStriker19 May 7, 2025
a6942f3
add special casing for http status code matching
ZStriker19 May 7, 2025
b3abc7c
otel_utils tests
ZStriker19 May 8, 2025
ba4da3e
switch to using integers and check bucket before replenish
ZStriker19 May 8, 2025
3194a6c
change ratesampler to only take trace_id
ZStriker19 May 8, 2025
361bb78
fmt and clippy
ZStriker19 May 8, 2025
f9f3896
small refactor and prep for config change and move
ZStriker19 May 9, 2025
0dcccee
move sampler configuration
ZStriker19 May 9, 2025
8131963
comment wrapping
ZStriker19 May 10, 2025
7efc92a
nit: adress my own comments
paullegranddc Jun 2, 2025
224a144
fix : fmt
paullegranddc Jun 2, 2025
0f181fd
Merge branch 'main' into zachg/datadog_sampler
paullegranddc Jun 2, 2025
aa02a76
fix: use RecordOnly instead of drop
paullegranddc Jun 2, 2025
0b0c62f
nit: remove public exports
paullegranddc Jun 2, 2025
e0eaa4f
nit: remove unused fields and methods
paullegranddc Jun 2, 2025
3007af4
feat(sampler): integrate sampler with the text map propgator
paullegranddc Jun 5, 2025
ac34fec
feat: define the default value for the rate sampler in the configurat…
paullegranddc Jun 5, 2025
35c0515
fix: remove test using set_env
paullegranddc Jun 5, 2025
6088d83
fix: lint
paullegranddc Jun 5, 2025
3edf239
fix: update LICENSE-3rdparty
paullegranddc Jun 5, 2025
c6f822e
Merge branch 'main' into zachg/datadog_sampler
paullegranddc Jun 5, 2025
56b8954
feat: propgate tracestate and reuse sampling decision if previously r…
paullegranddc Jun 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 167 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

982 changes: 922 additions & 60 deletions LICENSE-3rdparty.yml

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions datadog-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description.workspace = true
[dependencies]
# Workspace dependencies
dd-trace = { path = "../dd-trace" }
dd-trace-sampling = { path = "../dd-trace-sampling" }
dd-trace-propagation = { path = "../dd-trace-propagation", features = ["opentelemetry"] }

# External dependencies
Expand All @@ -31,6 +32,7 @@ assert_unordered = "0.3"

# Libdatadog dependencies - change to a stable version once we release
datadog-trace-utils = { workspace = true, features = ["test-utils"] }
dd-trace-sampling = { path = "../dd-trace-sampling" }

# Depend on ourselves to have APIs exposed only during tests
datadog-opentelemetry = { path = ".", features = ["test-utils"] }
Expand Down
64 changes: 37 additions & 27 deletions datadog-opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
// SPDX-License-Identifier: Apache-2.0

mod ddtrace_transform;
mod sampler;
mod span_exporter;
mod span_processor;
mod text_map_propagator;
mod trace_id;
mod transform;

use std::sync::Arc;
use std::sync::{Arc, RwLock};

use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::{trace::SdkTracerProvider, Resource};
use sampler::Sampler;
use span_processor::{DatadogSpanProcessor, TraceRegistry};
use text_map_propagator::DatadogPropagator;

Expand Down Expand Up @@ -41,41 +43,49 @@ pub fn init_datadog(
// all parameters and has an install method?
tracer_provider_builder: opentelemetry_sdk::trace::TracerProviderBuilder,
) -> SdkTracerProvider {
let (tracer_provider, propagator) = make_tracer(config, tracer_provider_builder, None);

opentelemetry::global::set_text_map_propagator(propagator);
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
tracer_provider
}

/// Create an instance of the tracer provider
fn make_tracer(
config: dd_trace::Config,
mut tracer_provider_builder: opentelemetry_sdk::trace::TracerProviderBuilder,
resource: Option<Resource>,
) -> (SdkTracerProvider, DatadogPropagator) {
let registry = Arc::new(TraceRegistry::new());
let resource_slot = Arc::new(RwLock::new(Resource::builder_empty().build()));
let sampler = Sampler::new(&config, resource_slot.clone(), registry.clone());

if let Some(resource) = resource {
tracer_provider_builder = tracer_provider_builder.with_resource(resource)
}

let propagator = DatadogPropagator::new(&config, registry.clone());
opentelemetry::global::set_text_map_propagator(propagator);

let span_processor = DatadogSpanProcessor::new(config, registry.clone(), resource_slot.clone());
let tracer_provider = tracer_provider_builder
.with_span_processor(DatadogSpanProcessor::new(config, registry))
.with_span_processor(span_processor)
.with_sampler(sampler) // Use the sampler created above
.with_id_generator(trace_id::TraceidGenerator)
// TODO: hookup additional components
// .with_sampler(sampler)
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
tracer_provider

(tracer_provider, propagator)
}

#[cfg(feature = "test-utils")]
/// Create a local instance of the tracer provider
pub fn make_tracer(
pub fn make_test_tracer(
config: dd_trace::Config,
tracer_provider_builder: opentelemetry_sdk::trace::TracerProviderBuilder,
) -> SdkTracerProvider {
use opentelemetry::KeyValue;
use opentelemetry_sdk::Resource;

let registry = Arc::new(TraceRegistry::new());

tracer_provider_builder
.with_resource(
Resource::builder()
.with_attribute(KeyValue::new("service.name", config.service().to_string()))
.build(),
)
.with_span_processor(DatadogSpanProcessor::new(config, registry))
.with_id_generator(trace_id::TraceidGenerator)
// TODO: hookup additional components
// .with_sampler(sampler)
.build()
) -> (SdkTracerProvider, DatadogPropagator) {
let resource = Resource::builder()
.with_attribute(opentelemetry::KeyValue::new(
"service.name",
config.service().to_string(),
))
.build();
make_tracer(config, tracer_provider_builder, Some(resource))
}
229 changes: 229 additions & 0 deletions datadog-opentelemetry/src/sampler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use dd_trace::Config;
use dd_trace_sampling::DatadogSampler;
use opentelemetry::trace::TraceContextExt;
use opentelemetry_sdk::{trace::ShouldSample, Resource};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

use crate::{
span_processor::{RegisterTracePropagationResult, SamplingDecision},
TraceRegistry,
};

#[derive(Debug, Clone)]
pub struct Sampler {
sampler: DatadogSampler,
trace_registry: Arc<TraceRegistry>,
}

impl Sampler {
pub fn new(
cfg: &Config,
resource: Arc<RwLock<Resource>>,
trace_registry: Arc<TraceRegistry>,
) -> Self {
let rules = cfg
.trace_sampling_rules()
.iter()
.map(|r| {
dd_trace_sampling::SamplingRule::new(
r.sample_rate,
r.service.clone(),
r.name.clone(),
r.resource.clone(),
Some(r.tags.clone()),
Some(r.provenance.clone()),
)
})
.collect::<Vec<_>>();
let sampler =
dd_trace_sampling::DatadogSampler::new(rules, cfg.trace_rate_limit(), resource);
Self {
sampler,
trace_registry,
}
}
}

impl ShouldSample for Sampler {
fn should_sample(
&self,
parent_context: Option<&opentelemetry::Context>,
trace_id: opentelemetry::trace::TraceId,
name: &str,
span_kind: &opentelemetry::trace::SpanKind,
attributes: &[opentelemetry::KeyValue],
_links: &[opentelemetry::trace::Link],
) -> opentelemetry::trace::SamplingResult {
let result = self.sampler.sample(
parent_context
.filter(|c| c.has_active_span())
.map(|c| c.span().span_context().is_sampled()),
trace_id,
name,
span_kind,
attributes,
);
if let Some(trace_root_info) = &result.trace_root_info {
match self.trace_registry.register_trace_propagation_data(
trace_id.to_bytes(),
SamplingDecision {
decision: trace_root_info.sampling_priority(result.is_sampled).value(),
// TODO: unify these types with decision maker with the one in the span
// processor
decision_maker: trace_root_info.mechanism.value() as i8,
},
None,
// TODO(paullgdc): This is here so the injector adds the t.dm tag to
// tracecontext. The injector should probably inject it from
// the trace propagation data instead of tags.
Some(HashMap::from_iter([(
"_dd.p.dm".to_string(),
format!("{}", -(trace_root_info.mechanism.value() as i32)),
)])),
) {
RegisterTracePropagationResult::Existing(sampling_decision) => {
return opentelemetry::trace::SamplingResult {
decision: if sampling_decision.decision > 0 {
opentelemetry::trace::SamplingDecision::RecordAndSample
} else {
opentelemetry::trace::SamplingDecision::RecordOnly
},
attributes: Vec::new(),
trace_state: parent_context
.map(|c| c.span().span_context().trace_state().clone())
.unwrap_or_default(),
}
}
RegisterTracePropagationResult::New => {}
}
}

opentelemetry::trace::SamplingResult {
decision: result.to_otel_decision(),
attributes: result.to_dd_sampling_tags(),
trace_state: parent_context
.map(|c| c.span().span_context().trace_state().clone())
.unwrap_or_default(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use dd_trace::configuration::SamplingRuleConfig;
use opentelemetry::{
trace::{SamplingDecision, SpanContext, SpanKind, TraceId, TraceState},
Context, SpanId, TraceFlags,
};
use opentelemetry_sdk::trace::ShouldSample;
use std::env;

#[test]
fn test_create_sampler_with_sampling_rules() {
// Build a fresh config to pick up the env var
let mut config = Config::builder();
config.set_trace_sampling_rules(vec![SamplingRuleConfig {
sample_rate: 0.5,
service: Some("test-service".to_string()),
name: None,
resource: None,
tags: HashMap::new(),
provenance: "customer".to_string(),
}]);
let config = config.build();

let test_resource = Arc::new(RwLock::new(Resource::builder().build()));
let sampler = Sampler::new(&config, test_resource, Arc::new(TraceRegistry::new()));

let trace_id_bytes = [1; 16];
let trace_id = TraceId::from_bytes(trace_id_bytes);

// Basic assertion: Check if the attributes added by the sampler are not empty,
// implying some sampling logic (like adding priority tags) ran.
assert!(
!sampler
.should_sample(None, trace_id, "test", &SpanKind::Client, &[], &[])
.attributes
.is_empty(),
"Sampler should add attributes even if decision is complex"
);

// Clean up environment
env::remove_var("DD_TRACE_SAMPLING_RULES");
}

#[test]
fn test_create_default_sampler() {
// Create a default config (no rules, no specific rate limit)
let config = Config::builder().build();

let test_resource = Arc::new(RwLock::new(Resource::builder_empty().build()));
let sampler = Sampler::new(&config, test_resource, Arc::new(TraceRegistry::new()));

let trace_id_bytes = [2; 16];
let trace_id = TraceId::from_bytes(trace_id_bytes);

// Verify the default sampler behavior
let result = sampler.should_sample(None, trace_id, "test", &SpanKind::Client, &[], &[]);
assert_eq!(
result.decision,
SamplingDecision::RecordAndSample,
"Default sampler should record and sample by default"
);
}

#[test]
fn test_trace_state_propagation() {
let config = Config::builder().build();

let test_resource = Arc::new(RwLock::new(Resource::builder_empty().build()));
let sampler = Sampler::new(&config, test_resource, Arc::new(TraceRegistry::new()));

let trace_id = TraceId::from_bytes([2; 16]);
let span_id = SpanId::from_bytes([3; 8]);

for is_sampled in [true, false] {
let trace_state = TraceState::from_key_value([("test_key", "test_value")]).unwrap();
let span_context = SpanContext::new(
trace_id,
span_id,
is_sampled
.then_some(TraceFlags::SAMPLED)
.unwrap_or_default(),
true,
trace_state.clone(),
);

// Verify the sampler with a parent context
let result = sampler.should_sample(
Some(&Context::new().with_remote_span_context(span_context)),
trace_id,
"test",
&SpanKind::Client,
&[],
&[],
);
assert_eq!(
result.decision,
if is_sampled {
SamplingDecision::RecordAndSample
} else {
SamplingDecision::RecordOnly
},
"Sampler should respect parent context sampling decision"
);
assert_eq!(
result.trace_state.header(),
"test_key=test_value",
"Sampler should propagate trace state from parent context"
);
}
}
}
Loading
Loading