Skip to content
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
257f167
feat(rum): Add initial config and API
PerfectSlayer Jul 3, 2025
5e734e3
feat(rum): Iterating on initial config and API
PerfectSlayer Jul 4, 2025
dde7164
fix(rum): Fix config print
PerfectSlayer Jul 4, 2025
bb22e1a
fix(rum): Remove content type support as only "text/html" is supported
PerfectSlayer Jul 4, 2025
1bf99a1
feat(rum): Add smoke tests
PerfectSlayer Jul 4, 2025
8814d86
feat(rum): Add smoke tests
PerfectSlayer Jul 4, 2025
a1f0a77
feat(rum): Add smoke tests
PerfectSlayer Jul 4, 2025
f941b4a
feat(rum): Add smoke tests
PerfectSlayer Jul 4, 2025
fe2006c
Add RUM injection for servlet 3
amarziali Jul 4, 2025
2ac167a
fix rum injection smoke test
amarziali Jul 4, 2025
c389b77
fix javadoc
amarziali Jul 4, 2025
01f6aae
Add benchmark
amarziali Jul 4, 2025
486ee57
avoid linkage issues with earlier servlet specs
amarziali Jul 4, 2025
2a55b69
feat(rum): Improve smoke test to add more cases
PerfectSlayer Jul 4, 2025
ce2c141
feat(rum): Add remote config and fix json encoding
PerfectSlayer Jul 4, 2025
9d43968
feat(rum): Add more smoke tests
PerfectSlayer Jul 4, 2025
a809044
fix(rum): Fix config
PerfectSlayer Jul 4, 2025
a0bae01
fix(rum): Fix smoke tests
PerfectSlayer Jul 4, 2025
42e3e6b
feat(rum): Add more smoke tests
PerfectSlayer Jul 4, 2025
4f0b646
feat(rum): Simplify config to remove dynamic init
PerfectSlayer Jul 7, 2025
1c49a79
Use runnable for callback
amarziali Jul 4, 2025
24c817e
improve pipe perfs
amarziali Jul 4, 2025
4beb3a8
feat(rum): Add injector and config unit tests
PerfectSlayer Jul 7, 2025
23f65fb
Add rum injection for jakarta servlet
amarziali Jul 7, 2025
dc3beab
codenarc
amarziali Jul 7, 2025
78ce365
Merge branch 'master' into project/rum-injection
PerfectSlayer Jul 7, 2025
58c8ef3
fix(rum): Fix smoke test merge
PerfectSlayer Jul 7, 2025
71a0923
exclude spring virtual filter chain
amarziali Jul 7, 2025
af937dc
fix(rum): Fix SDK snippet
PerfectSlayer Jul 7, 2025
79db3ce
final fixes
amarziali Jul 7, 2025
6672db9
fix more tests
amarziali Jul 7, 2025
fc23ee8
fix(rum): Fix privacy level encoding
PerfectSlayer Jul 7, 2025
b1ea903
Apply suggestions
amarziali Jul 8, 2025
6fb58b4
feat(rum): Improve config related to PR review feedback
PerfectSlayer Jul 8, 2025
1e6faf9
Improve and fix circular buffer
amarziali Jul 8, 2025
a4d960d
review
amarziali Jul 8, 2025
d4e5f6f
bad commit
amarziali Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package datadog.trace.bootstrap.instrumentation.buffer;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

/*
* Benchmark Mode Cnt Score Error Units
* InjectingPipeOutputStreamBenchmark.withPipe avgt 2 16.802 us/op
* InjectingPipeOutputStreamBenchmark.withoutPipe avgt 2 13.001 us/op
*/
@State(Scope.Benchmark)
@Warmup(iterations = 1, time = 30, timeUnit = SECONDS)
@Measurement(iterations = 2, time = 30, timeUnit = SECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
public class InjectingPipeOutputStreamBenchmark {
private static final List<String> htmlContent;
private static final byte[] marker;
private static final byte[] content;

static {
try (InputStream is = new URL("https://www.google.com").openStream()) {
htmlContent = IOUtils.readLines(is, StandardCharsets.UTF_8);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
marker = "</head>".getBytes(StandardCharsets.UTF_8);
content = "<script/>".getBytes(StandardCharsets.UTF_8);
}

@Benchmark
public void withPipe() throws Exception {
try (final PrintWriter out =
new PrintWriter(
new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content, null))) {
htmlContent.forEach(out::println);
}
}

@Benchmark
public void withoutPipe() throws Exception {
try (final PrintWriter out = new PrintWriter(new ByteArrayOutputStream())) {
htmlContent.forEach(out::println);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package datadog.trace.bootstrap.instrumentation.buffer;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/**
* A circular buffer with a lookbehind buffer of n bytes. The first time that the latest n bytes
* matches the marker, a content is injected before.
*/
public class InjectingPipeOutputStream extends FilterOutputStream {
private final byte[] lookbehind;
private int pos;
private boolean bufferFilled;
private final byte[] marker;
private final byte[] contentToInject;
private boolean found = false;
private int matchingPos = 0;
private final Runnable onContentInjected;

/**
* @param downstream the delegate output stream
* @param marker the marker to find in the stream
* @param contentToInject the content to inject once before the marker if found.
* @param onContentInjected callback called when and if the content is injected.
*/
public InjectingPipeOutputStream(
final OutputStream downstream,
final byte[] marker,
final byte[] contentToInject,
final Runnable onContentInjected) {
super(downstream);
this.marker = marker;
this.lookbehind = new byte[marker.length + 1];
this.pos = 0;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
}

@Override
public void write(int b) throws IOException {
if (found) {
out.write(b);
return;
}

if (bufferFilled) {
out.write(lookbehind[pos]);
}

lookbehind[pos] = (byte) b;
pos = (pos + 1) % lookbehind.length;

if (!bufferFilled) {
bufferFilled = pos == 0;
}

if (marker[matchingPos++] == b) {
if (matchingPos == marker.length) {
found = true;
out.write(lookbehind[pos]);
pos = (pos + 1) % lookbehind.length;
out.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
}
drain(lookbehind.length - 1);
}
} else {
matchingPos = 0;
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (found) {
out.write(b, off, len);
return;
}
if (len > marker.length * 2) {
int idx = arrayContains(b, marker);
if (idx >= 0) {
// we have a full match. just write everything
found = true;
drain(lookbehind.length);
out.write(b, off, idx);
out.write(contentToInject);
Copy link
Contributor

@mcculls mcculls Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How resilient is this code to a misbehaving out stream?

In other words if out.write suddenly threw an IOException in the middle of one of our buffering methods, would the state kept here remain consistent - so that if the application code attempted to write again, everything would resume as normal...

One way to confirm this would be to run tests where the out stream being wrapped threw an IOException on every other call. Ideally you should end up with the same behaviour from the application's perspective with or without this wrapper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not today. I need kind of checkpoint the action on the buffer in order to make it resilient

if (onContentInjected != null) {
onContentInjected.run();
}
out.write(b, off + idx, len - idx);
} else {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
for (int i = off; i < off + marker.length; i++) {
write(b[i]);
}
drain(lookbehind.length);
out.write(b, off + marker.length, len - marker.length * 2);
for (int i = len - marker.length; i < len; i++) {
write(b[i]);
}
drain(lookbehind.length);
}
} else {
// use slow path because the length to write is small and within the lookbehind buffer size
super.write(b, off, len);
Copy link
Contributor

@mcculls mcculls Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that super.write(b, off, len) will repeatedly call write(int b) on this instance.

If it instead calls a shared internal method used by both write methods then we wouldn't detect the marker.

EDIT: to be sure we probably want to call write(int b) ourselves in a loop...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that because of FilterOutputStream does it but I'm not sure that the implementation will stay stable throughout the jdk versions

}
}

private int arrayContains(byte[] array, byte[] search) {
for (int i = 0; i < array.length - search.length; i++) {
if (array[i] == search[0]) {
boolean found = true;
int k = i;
for (int j = 1; j < search.length; j++) {
k++;
if (array[k] != search[j]) {
found = false;
break;
}
}
if (found) {
return i;
}
}
}
return -1;
}

private void drain(int len) throws IOException {
if (bufferFilled) {
for (int i = 0; i < len; i++) {
out.write(lookbehind[(pos + i) % lookbehind.length]);
}
} else {
out.write(this.lookbehind, 0, pos);
}
pos = 0;
matchingPos = 0;
bufferFilled = false;
}

@Override
public void close() throws IOException {
if (!found) {
drain(lookbehind.length);
}
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public abstract class HttpServerDecorator<REQUEST, CONNECTION, RESPONSE, REQUEST

public static final String DD_SPAN_ATTRIBUTE = "datadog.span";
public static final String DD_DISPATCH_SPAN_ATTRIBUTE = "datadog.span.dispatch";
public static final String DD_RUM_INJECTED = "datadog.rum.injected";
public static final String DD_FIN_DISP_LIST_SPAN_ATTRIBUTE =
"datadog.span.finish_dispatch_listener";
public static final String DD_RESPONSE_ATTRIBUTE = "datadog.response";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package datadog.trace.bootstrap.instrumentation.buffer

import spock.lang.Specification

class InjectingPipeOutputStreamTest extends Specification {
def 'should filter a buffer and inject if found #found'() {
setup:
def downstream = new ByteArrayOutputStream()
def piped = new OutputStreamWriter(new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null),
"UTF-8")
when:
try (def closeme = piped) {
piped.write(body)
}
then:
assert downstream.toByteArray() == expected.getBytes("UTF-8")
where:
body | marker | contentToInject | found | expected
"<html><head><foo/></head><body/></html>" | "</head>" | "<script>true</script>" | true | "<html><head><foo/><script>true</script></head><body/></html>"
"<html><body/></html>" | "</head>" | "<something/>" | false | "<html><body/></html>"
"<foo/>" | "<longerThanFoo>" | "<nothing>" | false | "<foo/>"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@
0 org.springframework.web.context.support.AbstractRefreshableWebApplicationContext
0 org.springframework.web.context.support.GenericWebApplicationContext
0 org.springframework.web.context.support.XmlWebApplicationContext
1 org.springframework.web.filter.CompositeFilter$VirtualFilterChain
0 org.springframework.web.reactive.*
0 org.springframework.web.servlet.*
0 org.springframework.web.socket.*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datadog.trace.agent.test.base.HttpServer
import datadog.trace.agent.test.base.HttpServerTest
import datadog.trace.agent.test.naming.TestingGenericHttpNamingConventions
import datadog.trace.instrumentation.servlet5.HtmlRumServlet
import datadog.trace.instrumentation.servlet5.TestServlet5
import datadog.trace.instrumentation.servlet5.XmlRumServlet
import org.eclipse.jetty.server.Handler
import org.eclipse.jetty.server.Server

Expand Down Expand Up @@ -103,3 +105,18 @@ class Jetty11V1ForkedTest extends Jetty11Test implements TestingGenericHttpNamin
true
}
}

class JettyRumInjectionForkedTest extends Jetty11V0ForkedTest {
@Override
boolean testRumInjection() {
true
}

@Override
protected Handler handler() {
def handler = JettyServer.servletHandler(TestServlet5)
handler.addServlet(HtmlRumServlet, "/gimme-html")
handler.addServlet(XmlRumServlet, "/gimme-xml")
handler
}
}
2 changes: 2 additions & 0 deletions dd-java-agent/instrumentation/jetty-12/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ addTestSuiteForDir('ee9Test', 'test/ee9')
addTestSuiteExtendingForDir('ee9LatestDepTest', 'latestDepTest', 'test/ee9')
// ee10
addTestSuiteForDir('ee10Test', 'test/ee10')
addTestSuiteExtendingForDir('ee10ForkedTest', 'ee10Test', 'test/ee10')
addTestSuiteExtendingForDir('ee10LatestDepTest', 'latestDepTest', 'test/ee10')
addTestSuiteExtendingForDir('ee10LatestDepForkedTest', 'ee10LatestDepTest', 'test/ee10')

[compileMain_java17Java, compileTestJava].each {
it.configure {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import datadog.trace.agent.test.base.HttpServer
import datadog.trace.agent.test.base.HttpServerTest
import datadog.trace.agent.test.naming.TestingGenericHttpNamingConventions
import datadog.trace.instrumentation.servlet5.HtmlRumServlet
import datadog.trace.instrumentation.servlet5.TestServlet5
import datadog.trace.instrumentation.servlet5.XmlRumServlet
import org.eclipse.jetty.ee10.servlet.ServletContextHandler
import org.eclipse.jetty.server.Server

class Jetty12Test extends HttpServerTest<Server> implements TestingGenericHttpNamingConventions.ServerV0 {
Expand Down Expand Up @@ -61,3 +64,18 @@ class Jetty12PojoWebsocketTest extends Jetty12Test {
!isLatestDepTest
}
}

class Jetty12RumInjectionForkedTest extends Jetty12Test {
@Override
boolean testRumInjection() {
true
}

@Override
HttpServer server() {
ServletContextHandler handler = JettyServer.servletHandler(TestServlet5)
handler.addServlet(HtmlRumServlet, "/gimme-html")
handler.addServlet(XmlRumServlet, "/gimme-xml")
new JettyServer(handler, useWebsocketPojoEndpoint())
}
}
Loading