-
Notifications
You must be signed in to change notification settings - Fork 315
Add RUM SDK injection for servlet based web servers #9110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 32 commits
257f167
5e734e3
dde7164
bb22e1a
1bf99a1
8814d86
a1f0a77
f941b4a
fe2006c
2ac167a
c389b77
01f6aae
486ee57
2a55b69
ce2c141
9d43968
a809044
a0bae01
42e3e6b
4f0b646
1c49a79
24c817e
4beb3a8
23f65fb
dc3beab
78ce365
58c8ef3
71a0923
af937dc
79db3ce
6672db9
fc23ee8
b1ea903
6fb58b4
1e6faf9
a4d960d
d4e5f6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| package datadog.trace.bootstrap.instrumentation.buffer; | ||
|
|
||
| import static java.util.concurrent.TimeUnit.MICROSECONDS; | ||
| import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
|
||
| import java.io.ByteArrayOutputStream; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.PrintWriter; | ||
| import java.net.URL; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.List; | ||
| import org.apache.commons.io.IOUtils; | ||
| import org.openjdk.jmh.annotations.Benchmark; | ||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||
| import org.openjdk.jmh.annotations.Fork; | ||
| import org.openjdk.jmh.annotations.Measurement; | ||
| import org.openjdk.jmh.annotations.Mode; | ||
| import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
| import org.openjdk.jmh.annotations.Scope; | ||
| import org.openjdk.jmh.annotations.State; | ||
| import org.openjdk.jmh.annotations.Warmup; | ||
|
|
||
| /* | ||
| * Benchmark Mode Cnt Score Error Units | ||
| * InjectingPipeOutputStreamBenchmark.withPipe avgt 2 16.802 us/op | ||
| * InjectingPipeOutputStreamBenchmark.withoutPipe avgt 2 13.001 us/op | ||
| */ | ||
| @State(Scope.Benchmark) | ||
| @Warmup(iterations = 1, time = 30, timeUnit = SECONDS) | ||
| @Measurement(iterations = 2, time = 30, timeUnit = SECONDS) | ||
| @BenchmarkMode(Mode.AverageTime) | ||
| @OutputTimeUnit(MICROSECONDS) | ||
| @Fork(value = 1) | ||
| public class InjectingPipeOutputStreamBenchmark { | ||
| private static final List<String> htmlContent; | ||
| private static final byte[] marker; | ||
| private static final byte[] content; | ||
|
|
||
| static { | ||
| try (InputStream is = new URL("https://www.google.com").openStream()) { | ||
| htmlContent = IOUtils.readLines(is, StandardCharsets.UTF_8); | ||
| } catch (IOException ioe) { | ||
| throw new RuntimeException(ioe); | ||
| } | ||
| marker = "</head>".getBytes(StandardCharsets.UTF_8); | ||
| content = "<script/>".getBytes(StandardCharsets.UTF_8); | ||
| } | ||
|
|
||
| @Benchmark | ||
| public void withPipe() throws Exception { | ||
| try (final PrintWriter out = | ||
| new PrintWriter( | ||
| new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content, null))) { | ||
| htmlContent.forEach(out::println); | ||
| } | ||
| } | ||
|
|
||
| @Benchmark | ||
| public void withoutPipe() throws Exception { | ||
| try (final PrintWriter out = new PrintWriter(new ByteArrayOutputStream())) { | ||
| htmlContent.forEach(out::println); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| package datadog.trace.bootstrap.instrumentation.buffer; | ||
|
|
||
| import java.io.FilterOutputStream; | ||
| import java.io.IOException; | ||
| import java.io.OutputStream; | ||
|
|
||
| /** | ||
| * A circular buffer with a lookbehind buffer of n bytes. The first time that the latest n bytes | ||
| * matches the marker, a content is injected before. | ||
| */ | ||
| public class InjectingPipeOutputStream extends FilterOutputStream { | ||
| private final byte[] lookbehind; | ||
| private int pos; | ||
| private boolean bufferFilled; | ||
| private final byte[] marker; | ||
| private final byte[] contentToInject; | ||
| private boolean found = false; | ||
| private int matchingPos = 0; | ||
| private final Runnable onContentInjected; | ||
|
|
||
| /** | ||
| * @param downstream the delegate output stream | ||
| * @param marker the marker to find in the stream | ||
| * @param contentToInject the content to inject once before the marker if found. | ||
| * @param onContentInjected callback called when and if the content is injected. | ||
| */ | ||
| public InjectingPipeOutputStream( | ||
| final OutputStream downstream, | ||
| final byte[] marker, | ||
| final byte[] contentToInject, | ||
| final Runnable onContentInjected) { | ||
| super(downstream); | ||
| this.marker = marker; | ||
| this.lookbehind = new byte[marker.length + 1]; | ||
| this.pos = 0; | ||
| this.contentToInject = contentToInject; | ||
| this.onContentInjected = onContentInjected; | ||
| } | ||
|
|
||
| @Override | ||
| public void write(int b) throws IOException { | ||
| if (found) { | ||
| out.write(b); | ||
| return; | ||
| } | ||
|
|
||
| if (bufferFilled) { | ||
| out.write(lookbehind[pos]); | ||
| } | ||
|
|
||
| lookbehind[pos] = (byte) b; | ||
| pos = (pos + 1) % lookbehind.length; | ||
|
|
||
| if (!bufferFilled) { | ||
| bufferFilled = pos == 0; | ||
| } | ||
|
|
||
| if (marker[matchingPos++] == b) { | ||
| if (matchingPos == marker.length) { | ||
| found = true; | ||
| out.write(lookbehind[pos]); | ||
| pos = (pos + 1) % lookbehind.length; | ||
| out.write(contentToInject); | ||
| if (onContentInjected != null) { | ||
| onContentInjected.run(); | ||
| } | ||
| drain(lookbehind.length - 1); | ||
| } | ||
| } else { | ||
| matchingPos = 0; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void write(byte[] b, int off, int len) throws IOException { | ||
| if (found) { | ||
| out.write(b, off, len); | ||
| return; | ||
| } | ||
| if (len > marker.length * 2) { | ||
| int idx = arrayContains(b, marker); | ||
| if (idx >= 0) { | ||
| // we have a full match. just write everything | ||
| found = true; | ||
| drain(lookbehind.length); | ||
| out.write(b, off, idx); | ||
| out.write(contentToInject); | ||
| if (onContentInjected != null) { | ||
| onContentInjected.run(); | ||
| } | ||
| out.write(b, off + idx, len - idx); | ||
| } else { | ||
| // we don't have a full match. write everything in a bulk except the lookbehind buffer | ||
| // sequentially | ||
| for (int i = off; i < off + marker.length; i++) { | ||
| write(b[i]); | ||
| } | ||
| drain(lookbehind.length); | ||
| out.write(b, off + marker.length, len - marker.length * 2); | ||
| for (int i = len - marker.length; i < len; i++) { | ||
| write(b[i]); | ||
| } | ||
| drain(lookbehind.length); | ||
| } | ||
| } else { | ||
| // use slow path because the length to write is small and within the lookbehind buffer size | ||
| super.write(b, off, len); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assumes that If it instead calls a shared internal method used by both EDIT: to be sure we probably want to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assumed that because of |
||
| } | ||
| } | ||
|
|
||
| private int arrayContains(byte[] array, byte[] search) { | ||
amarziali marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for (int i = 0; i < array.length - search.length; i++) { | ||
| if (array[i] == search[0]) { | ||
| boolean found = true; | ||
| int k = i; | ||
| for (int j = 1; j < search.length; j++) { | ||
| k++; | ||
| if (array[k] != search[j]) { | ||
| found = false; | ||
| break; | ||
| } | ||
| } | ||
| if (found) { | ||
| return i; | ||
| } | ||
| } | ||
| } | ||
| return -1; | ||
| } | ||
|
|
||
| private void drain(int len) throws IOException { | ||
| if (bufferFilled) { | ||
| for (int i = 0; i < len; i++) { | ||
| out.write(lookbehind[(pos + i) % lookbehind.length]); | ||
| } | ||
| } else { | ||
| out.write(this.lookbehind, 0, pos); | ||
| } | ||
| pos = 0; | ||
| matchingPos = 0; | ||
| bufferFilled = false; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| if (!found) { | ||
| drain(lookbehind.length); | ||
| } | ||
| super.close(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| package datadog.trace.bootstrap.instrumentation.buffer | ||
|
|
||
| import spock.lang.Specification | ||
|
|
||
| class InjectingPipeOutputStreamTest extends Specification { | ||
| def 'should filter a buffer and inject if found #found'() { | ||
| setup: | ||
| def downstream = new ByteArrayOutputStream() | ||
| def piped = new OutputStreamWriter(new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null), | ||
| "UTF-8") | ||
| when: | ||
| try (def closeme = piped) { | ||
| piped.write(body) | ||
| } | ||
| then: | ||
| assert downstream.toByteArray() == expected.getBytes("UTF-8") | ||
| where: | ||
| body | marker | contentToInject | found | expected | ||
| "<html><head><foo/></head><body/></html>" | "</head>" | "<script>true</script>" | true | "<html><head><foo/><script>true</script></head><body/></html>" | ||
| "<html><body/></html>" | "</head>" | "<something/>" | false | "<html><body/></html>" | ||
| "<foo/>" | "<longerThanFoo>" | "<nothing>" | false | "<foo/>" | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How resilient is this code to a misbehaving
outstream?In other words if
out.writesuddenly threw anIOExceptionin the middle of one of our buffering methods, would the state kept here remain consistent - so that if the application code attempted to write again, everything would resume as normal...One way to confirm this would be to run tests where the
outstream being wrapped threw anIOExceptionon every other call. Ideally you should end up with the same behaviour from the application's perspective with or without this wrapper.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not today. I need kind of checkpoint the action on the buffer in order to make it resilient