-
Notifications
You must be signed in to change notification settings - Fork 0
feat(sampling): datadog sampler #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
66 commits
Select commit
Hold shift + click to select a range
95c6f1c
Globmatcher done without cache matching, Datadogsampler scaffolding
ZStriker19 dcbb668
add globmatcher cache
ZStriker19 30f633c
remove original pattern from glob_matcher struct
ZStriker19 cd2f80e
add constants file and update samplers to use
ZStriker19 1666972
determine sampling mechanism, add constants for determining _sampling…
ZStriker19 98da39a
assign sampling_priority_v1 and _dd.p.dm to attributes returned
ZStriker19 2178084
refactor and make code simpler
ZStriker19 0a8cff3
implement logic and adding of _dd.rule.psr and _dd.agent.psr
ZStriker19 fe55dd4
add configuration of DatadogSampler and rules from json
ZStriker19 fdbbe9a
remove unneeded rate_limit_always on config
ZStriker19 413f0c1
add ratelimiter implementation
ZStriker19 869ee8f
refactor and add utils
ZStriker19 c6e2196
Add ratelimiter to DatadogSampler with proper logic and tags
ZStriker19 fe2cb30
Merge branch 'main' into zachg/datadog_sampler
ZStriker19 84a4f93
initialize DatadogSampler and pass to tracer_provider_builder
ZStriker19 8d04c65
add tests, refactor
ZStriker19 75733b0
linting
ZStriker19 2f1b51b
fmting
ZStriker19 a2ab01a
fix compile error
ZStriker19 7b61b31
fmt
ZStriker19 b6e5a2e
constant value update
ZStriker19 13868f2
Merge branch 'main' into zachg/datadog_sampler
ZStriker19 822cc69
attempt adding configuration
ZStriker19 9d0bad7
improve attribute checking performance for matches method
ZStriker19 8fef32d
fix copyright date
ZStriker19 5b5281b
short circuits for matching
ZStriker19 59f20e5
handle floats in attributes properly
ZStriker19 c25ed06
mimize logic needed in lock
ZStriker19 fb2f03d
comments
ZStriker19 79e3efa
Merge branch 'main' into zachg/datadog_sampler
ZStriker19 64a1874
add in attribute conversions
ZStriker19 7ac748d
short circuit attribute mapping
ZStriker19 8680d03
add otel dd conversion for name value
ZStriker19 a742f0d
clippy
ZStriker19 b054dbf
rework getting resource and move getting operation name to as late as…
ZStriker19 f62a5f6
stopping point
ZStriker19 99bd86b
implement resource conversion
ZStriker19 670d83d
remove unused methods from otel_utils and fix test
ZStriker19 4f9b30d
add back in resource to otel attribute conversion methods, add resour…
ZStriker19 582598f
Merge branch 'main' into zachg/datadog_sampler
ZStriker19 f0657ce
attempt at using an arc to make a shared resource, named resource bet…
ZStriker19 72fdb51
Update datadog-opentelemetry/src/sampler.rs
ZStriker19 13d2e13
res arc sharing, service conversion tests finished
ZStriker19 bc96561
remove sorting rules by provenance
ZStriker19 345d296
enable adding resource on DatadogSampler new config
ZStriker19 a6942f3
add special casing for http status code matching
ZStriker19 b3abc7c
otel_utils tests
ZStriker19 ba4da3e
switch to using integers and check bucket before replenish
ZStriker19 3194a6c
change ratesampler to only take trace_id
ZStriker19 361bb78
fmt and clippy
ZStriker19 f9f3896
small refactor and prep for config change and move
ZStriker19 0dcccee
move sampler configuration
ZStriker19 8131963
comment wrapping
ZStriker19 7efc92a
nit: adress my own comments
paullegranddc 224a144
fix : fmt
paullegranddc 0f181fd
Merge branch 'main' into zachg/datadog_sampler
paullegranddc aa02a76
fix: use RecordOnly instead of drop
paullegranddc 0b0c62f
nit: remove public exports
paullegranddc e0eaa4f
nit: remove unused fields and methods
paullegranddc 3007af4
feat(sampler): integrate sampler with the text map propgator
paullegranddc ac34fec
feat: define the default value for the rate sampler in the configurat…
paullegranddc 35c0515
fix: remove test using set_env
paullegranddc 6088d83
fix: lint
paullegranddc 3edf239
fix: update LICENSE-3rdparty
paullegranddc c6f822e
Merge branch 'main' into zachg/datadog_sampler
paullegranddc 56b8954
feat: propgate tracestate and reuse sampling decision if previously r…
paullegranddc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,229 @@ | ||
| // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| use dd_trace::Config; | ||
| use dd_trace_sampling::DatadogSampler; | ||
| use opentelemetry::trace::TraceContextExt; | ||
| use opentelemetry_sdk::{trace::ShouldSample, Resource}; | ||
| use std::{ | ||
| collections::HashMap, | ||
| sync::{Arc, RwLock}, | ||
| }; | ||
|
|
||
| use crate::{ | ||
| span_processor::{RegisterTracePropagationResult, SamplingDecision}, | ||
| TraceRegistry, | ||
| }; | ||
|
|
||
| #[derive(Debug, Clone)] | ||
| pub struct Sampler { | ||
| sampler: DatadogSampler, | ||
| trace_registry: Arc<TraceRegistry>, | ||
| } | ||
|
|
||
| impl Sampler { | ||
| pub fn new( | ||
| cfg: &Config, | ||
| resource: Arc<RwLock<Resource>>, | ||
| trace_registry: Arc<TraceRegistry>, | ||
| ) -> Self { | ||
| let rules = cfg | ||
| .trace_sampling_rules() | ||
| .iter() | ||
| .map(|r| { | ||
| dd_trace_sampling::SamplingRule::new( | ||
| r.sample_rate, | ||
| r.service.clone(), | ||
| r.name.clone(), | ||
| r.resource.clone(), | ||
| Some(r.tags.clone()), | ||
| Some(r.provenance.clone()), | ||
| ) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| let sampler = | ||
| dd_trace_sampling::DatadogSampler::new(rules, cfg.trace_rate_limit(), resource); | ||
| Self { | ||
| sampler, | ||
| trace_registry, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ShouldSample for Sampler { | ||
| fn should_sample( | ||
| &self, | ||
| parent_context: Option<&opentelemetry::Context>, | ||
| trace_id: opentelemetry::trace::TraceId, | ||
| name: &str, | ||
| span_kind: &opentelemetry::trace::SpanKind, | ||
| attributes: &[opentelemetry::KeyValue], | ||
| _links: &[opentelemetry::trace::Link], | ||
| ) -> opentelemetry::trace::SamplingResult { | ||
| let result = self.sampler.sample( | ||
| parent_context | ||
| .filter(|c| c.has_active_span()) | ||
| .map(|c| c.span().span_context().is_sampled()), | ||
| trace_id, | ||
| name, | ||
| span_kind, | ||
| attributes, | ||
| ); | ||
| if let Some(trace_root_info) = &result.trace_root_info { | ||
| match self.trace_registry.register_trace_propagation_data( | ||
| trace_id.to_bytes(), | ||
| SamplingDecision { | ||
| decision: trace_root_info.sampling_priority(result.is_sampled).value(), | ||
| // TODO: unify these types with decision maker with the one in the span | ||
| // processor | ||
| decision_maker: trace_root_info.mechanism.value() as i8, | ||
| }, | ||
| None, | ||
| // TODO(paullgdc): This is here so the injector adds the t.dm tag to | ||
| // tracecontext. The injector should probably inject it from | ||
| // the trace propagation data instead of tags. | ||
| Some(HashMap::from_iter([( | ||
| "_dd.p.dm".to_string(), | ||
| format!("{}", -(trace_root_info.mechanism.value() as i32)), | ||
| )])), | ||
| ) { | ||
| RegisterTracePropagationResult::Existing(sampling_decision) => { | ||
| return opentelemetry::trace::SamplingResult { | ||
| decision: if sampling_decision.decision > 0 { | ||
| opentelemetry::trace::SamplingDecision::RecordAndSample | ||
| } else { | ||
| opentelemetry::trace::SamplingDecision::RecordOnly | ||
| }, | ||
| attributes: Vec::new(), | ||
| trace_state: parent_context | ||
| .map(|c| c.span().span_context().trace_state().clone()) | ||
| .unwrap_or_default(), | ||
| } | ||
| } | ||
| RegisterTracePropagationResult::New => {} | ||
| } | ||
| } | ||
|
|
||
| opentelemetry::trace::SamplingResult { | ||
| decision: result.to_otel_decision(), | ||
| attributes: result.to_dd_sampling_tags(), | ||
| trace_state: parent_context | ||
| .map(|c| c.span().span_context().trace_state().clone()) | ||
| .unwrap_or_default(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use dd_trace::configuration::SamplingRuleConfig; | ||
| use opentelemetry::{ | ||
| trace::{SamplingDecision, SpanContext, SpanKind, TraceId, TraceState}, | ||
| Context, SpanId, TraceFlags, | ||
| }; | ||
| use opentelemetry_sdk::trace::ShouldSample; | ||
| use std::env; | ||
|
|
||
| #[test] | ||
| fn test_create_sampler_with_sampling_rules() { | ||
| // Build a fresh config to pick up the env var | ||
| let mut config = Config::builder(); | ||
| config.set_trace_sampling_rules(vec![SamplingRuleConfig { | ||
| sample_rate: 0.5, | ||
| service: Some("test-service".to_string()), | ||
| name: None, | ||
| resource: None, | ||
| tags: HashMap::new(), | ||
| provenance: "customer".to_string(), | ||
| }]); | ||
| let config = config.build(); | ||
|
|
||
| let test_resource = Arc::new(RwLock::new(Resource::builder().build())); | ||
| let sampler = Sampler::new(&config, test_resource, Arc::new(TraceRegistry::new())); | ||
|
|
||
| let trace_id_bytes = [1; 16]; | ||
| let trace_id = TraceId::from_bytes(trace_id_bytes); | ||
|
|
||
| // Basic assertion: Check if the attributes added by the sampler are not empty, | ||
| // implying some sampling logic (like adding priority tags) ran. | ||
| assert!( | ||
| !sampler | ||
| .should_sample(None, trace_id, "test", &SpanKind::Client, &[], &[]) | ||
| .attributes | ||
| .is_empty(), | ||
| "Sampler should add attributes even if decision is complex" | ||
| ); | ||
|
|
||
| // Clean up environment | ||
| env::remove_var("DD_TRACE_SAMPLING_RULES"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_create_default_sampler() { | ||
| // Create a default config (no rules, no specific rate limit) | ||
| let config = Config::builder().build(); | ||
|
|
||
| let test_resource = Arc::new(RwLock::new(Resource::builder_empty().build())); | ||
| let sampler = Sampler::new(&config, test_resource, Arc::new(TraceRegistry::new())); | ||
|
|
||
| let trace_id_bytes = [2; 16]; | ||
| let trace_id = TraceId::from_bytes(trace_id_bytes); | ||
|
|
||
| // Verify the default sampler behavior | ||
| let result = sampler.should_sample(None, trace_id, "test", &SpanKind::Client, &[], &[]); | ||
| assert_eq!( | ||
| result.decision, | ||
| SamplingDecision::RecordAndSample, | ||
| "Default sampler should record and sample by default" | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_trace_state_propagation() { | ||
| let config = Config::builder().build(); | ||
|
|
||
| let test_resource = Arc::new(RwLock::new(Resource::builder_empty().build())); | ||
| let sampler = Sampler::new(&config, test_resource, Arc::new(TraceRegistry::new())); | ||
|
|
||
| let trace_id = TraceId::from_bytes([2; 16]); | ||
| let span_id = SpanId::from_bytes([3; 8]); | ||
|
|
||
| for is_sampled in [true, false] { | ||
| let trace_state = TraceState::from_key_value([("test_key", "test_value")]).unwrap(); | ||
| let span_context = SpanContext::new( | ||
| trace_id, | ||
| span_id, | ||
| is_sampled | ||
| .then_some(TraceFlags::SAMPLED) | ||
| .unwrap_or_default(), | ||
| true, | ||
| trace_state.clone(), | ||
| ); | ||
|
|
||
| // Verify the sampler with a parent context | ||
| let result = sampler.should_sample( | ||
| Some(&Context::new().with_remote_span_context(span_context)), | ||
| trace_id, | ||
| "test", | ||
| &SpanKind::Client, | ||
| &[], | ||
| &[], | ||
| ); | ||
| assert_eq!( | ||
| result.decision, | ||
| if is_sampled { | ||
| SamplingDecision::RecordAndSample | ||
| } else { | ||
| SamplingDecision::RecordOnly | ||
| }, | ||
| "Sampler should respect parent context sampling decision" | ||
| ); | ||
| assert_eq!( | ||
| result.trace_state.header(), | ||
| "test_key=test_value", | ||
| "Sampler should propagate trace state from parent context" | ||
| ); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.