Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal class DataStreamsMessagePackFormatter
private readonly byte[] _serviceBytes = StringEncoding.UTF8.GetBytes("Service");
private readonly long _productMask;
private readonly bool _isInDefaultState;
private readonly bool _writeProcessTags;

private readonly byte[] _serviceValueBytes;

Expand Down Expand Up @@ -50,6 +51,7 @@ internal class DataStreamsMessagePackFormatter
private readonly byte[] _backlogTagsBytes = StringEncoding.UTF8.GetBytes("Tags");
private readonly byte[] _backlogValueBytes = StringEncoding.UTF8.GetBytes("Value");
private readonly byte[] _productMaskBytes = StringEncoding.UTF8.GetBytes("ProductMask");
private readonly byte[] _processTagsBytes = StringEncoding.UTF8.GetBytes("ProcessTags");
private readonly byte[] _isInDefaultStateBytes = StringEncoding.UTF8.GetBytes("IsInDefaultState");

public DataStreamsMessagePackFormatter(TracerSettings tracerSettings, ProfilerSettings profilerSettings, string defaultServiceName)
Expand All @@ -63,6 +65,7 @@ public DataStreamsMessagePackFormatter(TracerSettings tracerSettings, ProfilerSe
_serviceValueBytes = StringEncoding.UTF8.GetBytes(defaultServiceName);
_productMask = GetProductsMask(tracerSettings, profilerSettings);
_isInDefaultState = tracerSettings.IsDataStreamsMonitoringInDefaultState;
_writeProcessTags = tracerSettings.PropagateProcessTags;
}

// should be the same across all languages
Expand Down Expand Up @@ -94,13 +97,14 @@ private static long GetProductsMask(TracerSettings tracerSettings, ProfilerSetti

public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStatsBucket> statsBuckets, List<SerializableBacklogBucket> backlogsBuckets)
{
var withProcessTags = _writeProcessTags && !string.IsNullOrEmpty(ProcessTags.SerializedTags);
var bytesWritten = 0;

// Should be in sync with Java
// https://github.com/DataDog/dd-trace-java/blob/a4b7a7b177709e6bdfd9261904cb9a777e4febbe/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java#L35
// https://github.com/DataDog/dd-trace-java/blob/master/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't use master for github links, because they break 😅

Suggested change
// https://github.com/DataDog/dd-trace-java/blob/master/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java
// https://github.com/DataDog/dd-trace-java/blob/8db72c0988adb1010c1f2d6b028b93b10cd10ff2/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah hmm well, yes they do if the file moves, but also a link to a fixed sha loses its relevance as time passes (for instance, here it was pointing to code that had changed a lot). This could be especially problematic for a comment that says "keep in sync with ...".

// -1 because we don't have a primary tag
// -1 because service name override is not supported
bytesWritten += MessagePackBinary.WriteMapHeader(stream, 7);
bytesWritten += MessagePackBinary.WriteMapHeader(stream, 7 + (withProcessTags ? 1 : 0));

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _environmentBytes);
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _environmentValueBytes);
Expand Down Expand Up @@ -194,6 +198,12 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _productMaskBytes);
bytesWritten += MessagePackBinary.WriteInt64(stream, _productMask);

if (withProcessTags)
{
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _processTagsBytes);
bytesWritten += MessagePackBinary.WriteString(stream, ProcessTags.SerializedTags);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not a fan of the fact that these are static globals, global state is the enemy of testing. Could/should we just pass it in as a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm yes we can, but at some point we'd query them from the static global anyway. I don't know what would be the alternative for something that's supposed to be a singleton.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably not a big deal here, but there's a "correct" answer:

I don't know what would be the alternative for something that's supposed to be a singleton.

The answer is to only have a thin global static at the "outside" of the app. Everything else uses dependency inversion to inject the values in. Makes everything inherently testable, and removes the implicit dependencies between components you get otherwise.

}

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _isInDefaultStateBytes);
bytesWritten += MessagePackBinary.WriteBoolean(stream, _isInDefaultState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ public DataStreamsManager(
string? env,
string defaultServiceName,
IDataStreamsWriter? writer,
bool isInDefaultState)
bool isInDefaultState,
string? processTags)
{
// We don't yet support primary tag in .NET yet
_nodeHashBase = HashHelper.CalculateNodeHashBase(defaultServiceName, env, primaryTag: null);
// We don't support primary tag in .NET yet
_nodeHashBase = HashHelper.CalculateNodeHashBase(defaultServiceName, env, primaryTag: null, processTags);
_isEnabled = writer is not null;
_writer = writer;
_isInDefaultState = isInDefaultState;
Expand All @@ -59,7 +60,7 @@ public static DataStreamsManager Create(
? DataStreamsWriter.Create(settings, profilerSettings, discoveryService, defaultServiceName)
: null;

return new DataStreamsManager(settings.Environment, defaultServiceName, writer, settings.IsDataStreamsMonitoringInDefaultState);
return new DataStreamsManager(settings.Environment, defaultServiceName, writer, settings.IsDataStreamsMonitoringInDefaultState, settings.PropagateProcessTags ? ProcessTags.SerializedTags : null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls ProcessTags.SerializedTags, which realizes the tags. Given that the DataStreamsManager is created in the app initialization (before a request has arrived), I think this completely breaks the "lazy" aspect of the serialized tags? 🤔

}

public async Task DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ internal static class HashHelper
/// Calculates the base NodeHash for a service.
/// This can be used to create a <see cref="NodeHash"/> by calling <see cref="CalculateNodeHash"/>
/// </summary>
public static NodeHashBase CalculateNodeHashBase(string service, string? env, string? primaryTag)
public static NodeHashBase CalculateNodeHashBase(string service, string? env, string? primaryTag, string? processTags)
{
var hash = FnvHash64.GenerateHash(service, HashVersion);
if (!string.IsNullOrEmpty(env))
if (!StringUtil.IsNullOrEmpty(env))
{
hash = FnvHash64.GenerateHash(env!, HashVersion, hash);
hash = FnvHash64.GenerateHash(env, HashVersion, hash);
}

if (!string.IsNullOrEmpty(primaryTag))
if (!StringUtil.IsNullOrEmpty(primaryTag))
{
hash = FnvHash64.GenerateHash(primaryTag!, HashVersion, hash);
hash = FnvHash64.GenerateHash(primaryTag, HashVersion, hash);
}

if (!StringUtil.IsNullOrEmpty(processTags))
{
hash = FnvHash64.GenerateHash(processTags, HashVersion, hash);
}

return new NodeHashBase(hash);
Expand Down
14 changes: 7 additions & 7 deletions tracer/src/Datadog.Trace/ProcessTags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ internal static class ProcessTags
public const string EntrypointBasedir = "entrypoint.basedir";
public const string EntrypointWorkdir = "entrypoint.workdir";

private static readonly Lazy<string> LazySerializedTags = new(GetSerializedTags);

public static string SerializedTags
{
get => LazySerializedTags.Value;
}
public static readonly string SerializedTags = GetSerializedTags();

/// <summary>
/// From the full path of a directory, get the name of the leaf directory.
Expand All @@ -43,7 +38,7 @@ private static string GetSerializedTags()
List<KeyValuePair<string, string?>> tags =
[
new(EntrypointBasedir, GetLastPathSegment(AppContext.BaseDirectory)),
new(EntrypointName, EntryAssemblyLocator.GetEntryAssembly()?.EntryPoint?.DeclaringType?.FullName),
new(EntrypointName, GetEntryPointName()),
// workdir can be changed by the code, but we consider that capturing the value when this is called is good enough
new(EntrypointWorkdir, GetLastPathSegment(Environment.CurrentDirectory))
];
Expand All @@ -62,6 +57,11 @@ private static string GetSerializedTags()
return StringBuilderCache.GetStringAndRelease(serializedTags);
}

private static string? GetEntryPointName()
{
return EntryAssemblyLocator.GetEntryAssembly()?.EntryPoint?.DeclaringType?.FullName;
}

private static string NormalizeTagValue(string tagValue)
{
// TraceUtil.NormalizeTag does almost exactly what we want, except it allows ':',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class MockDataStreamsPayload
[Key(nameof(ProductMask))]
public long ProductMask { get; set; }

[Key(nameof(ProcessTags))]
public string ProcessTags { get; set; }

public static MockDataStreamsPayload Normalize(IImmutableList<MockDataStreamsPayload> dataStreams)
{
// This is nasty and hacky, but it's the only way I could get any semblance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void WhenEnabled_AndNoContext_HashShouldUseParentHashOfZero()
var context = dsm.SetCheckpoint(parentPathway: null, CheckpointKind.Consume, edgeTags, 100, 100);
context.Should().NotBeNull();

var baseHash = HashHelper.CalculateNodeHashBase(service, env, primaryTag: null);
var baseHash = HashHelper.CalculateNodeHashBase(service, env, primaryTag: null, processTags: null);
var nodeHash = HashHelper.CalculateNodeHash(baseHash, edgeTags);
var hash = HashHelper.CalculatePathwayHash(nodeHash, parentHash: new PathwayHash(0));

Expand All @@ -182,13 +182,25 @@ public void WhenEnabled_AndHashContext_HashShouldUseParentHash()
var context = dsm.SetCheckpoint(parent, CheckpointKind.Consume, edgeTags, 100, 100);
context.Should().NotBeNull();

var baseHash = HashHelper.CalculateNodeHashBase(service, env, primaryTag: null);
var baseHash = HashHelper.CalculateNodeHashBase(service, env, primaryTag: null, processTags: null);
var nodeHash = HashHelper.CalculateNodeHash(baseHash, edgeTags);
var hash = HashHelper.CalculatePathwayHash(nodeHash, parentHash: parent.Hash);

context.Value.Hash.Value.Should().Be(hash.Value);
}

[Fact]
public void ProcessTagsUsedInBaseHash()
{
var env = "foo";
var service = "bar";

var hashWithout = HashHelper.CalculateNodeHashBase(service, env, primaryTag: null, null);
var hashWith = HashHelper.CalculateNodeHashBase(service, env, primaryTag: null, "hello:world");

hashWith.Value.Should().NotBe(hashWithout.Value);
}

[Fact]
public void WhenDisabled_SetCheckpoint_ReturnsNull()
{
Expand Down Expand Up @@ -304,7 +316,8 @@ private static DataStreamsManager GetDataStreamManager(bool enabled, out DataStr
env: "foo",
defaultServiceName: "bar",
writer,
isInDefaultState: false);
isInDefaultState: false,
processTags: null);
}

internal class DataStreamsWriterMock : IDataStreamsWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,22 @@ public void CanRoundTripMessagePackFormat()
result.Should().BeEquivalentTo(expected);
}

[Fact]
public void ProcessTagsGetWritten()
{
var bucketDuration = 10_000_000_000;
var settings = TracerSettings.Create(new Dictionary<string, object> { { ConfigurationKeys.Environment, "my-env" }, { ConfigurationKeys.PropagateProcessTags, "true" } });
var formatter = new DataStreamsMessagePackFormatter(settings, new ProfilerSettings(ProfilerState.Disabled), "service=name");

using var ms = new MemoryStream();
formatter.Serialize(ms, bucketDuration, [], []);
var result = MessagePackSerializer.Deserialize<MockDataStreamsPayload>(new ArraySegment<byte>(ms.GetBuffer()));

// content varies depending on how the tests are run, so we cannot really assert on the content.
result.ProcessTags.Should().NotBeEmpty();
result.ProcessTags.Should().Contain(":");
}

private static DDSketch CreateSketch(params int[] values)
{
// don't actually need to pool them for these tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ namespace Datadog.Trace.Tests.DataStreamsMonitoring;
public class HashHelperTests
{
[Theory]
[InlineData("service-1", "env-1", "d:1")]
[InlineData("service-1", "env-1", null)]
[InlineData("service-1", "env-1", "d:1", "edge-1")]
[InlineData("service-1", "env-1", "d:1", "edge-1", "edge-2")]
public void NodeHashSanityCheck(string service, string env, string primaryTag, params string[] edgeArgs)
[InlineData("service-1", "env-1", null, null)]
[InlineData("service-1", "env-1", "d:1", null)]
[InlineData("service-1", "env-1", "d:1", "entrypoint.name:hello")]
[InlineData("service-1", "env-1", "d:1", "entrypoint.name:hello", "edge-1")]
[InlineData("service-1", "env-1", "d:1", "entrypoint.name:hello", "edge-1", "edge-2")]
public void NodeHashSanityCheck(string service, string env, string primaryTag, string processTags, params string[] edgeArgs)
{
// naive implementation (similar to e.g. go/java)
var sb = new StringBuilder()
Expand All @@ -31,6 +32,11 @@ public void NodeHashSanityCheck(string service, string env, string primaryTag, p
sb.Append(primaryTag);
}

if (!string.IsNullOrEmpty(processTags))
{
sb.Append(processTags);
}

var sortedArgs = new List<string>(edgeArgs);
sortedArgs.Sort(StringComparer.Ordinal);

Expand All @@ -40,7 +46,7 @@ public void NodeHashSanityCheck(string service, string env, string primaryTag, p
}

var expectedHash = FnvHash64.GenerateHash(sb.ToString(), FnvHash64.Version.V1);
var baseHash = HashHelper.CalculateNodeHashBase(service, env, primaryTag);
var baseHash = HashHelper.CalculateNodeHashBase(service, env, primaryTag, processTags);
var actual = HashHelper.CalculateNodeHash(baseHash, sortedArgs);

actual.Value.Should().Be(expectedHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ private static DataStreamsManager GetEnabledDataStreamManager()
env: "env",
defaultServiceName: "service",
new Mock<IDataStreamsWriter>().Object,
isInDefaultState: false);
isInDefaultState: false,
processTags: null);
return dsm;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private static TracerManager CreateTracerManager(TracerSettings settings)
BuildLogSubmissionManager(),
Mock.Of<ITelemetryController>(),
Mock.Of<IDiscoveryService>(),
new DataStreamsManager("env", "service", Mock.Of<IDataStreamsWriter>(), isInDefaultState: false),
new DataStreamsManager("env", "service", Mock.Of<IDataStreamsWriter>(), isInDefaultState: false, processTags: null),
remoteConfigurationManager: null,
dynamicConfigurationManager: null,
tracerFlareManager: null,
Expand Down
Loading