Skip to content

Commit f4df972

Browse files
authored
fix(core): Prevent instrumentAnthropicAiClient breaking MessageStream api (#17754)
Previously, we completely walked over anthropic's SDK and replaced `message.stream` with our own method that returns an async generator. This breaks the SDK as `MessageStream` has further user callable api, such as adding event handlers. This fix proxies `message.stream` instead of replacing it with our own method. Instead of returning an async generator, we now hook into various events to do our instrumentation. Streams requested via `stream: true` are expected to return async generators, so the current approach still holds, the only change is that we proxy instead of overwrite. Fixes: #17734
1 parent 450d9f5 commit f4df972

File tree

4 files changed

+337
-74
lines changed

4 files changed

+337
-74
lines changed

dev-packages/node-integration-tests/suites/tracing/anthropic/scenario.mjs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,55 @@ function startMockAnthropicServer() {
2929
return;
3030
}
3131

32+
// Check if streaming is requested
33+
if (req.body.stream === true) {
34+
res.writeHead(200, {
35+
'Content-Type': 'text/event-stream',
36+
'Cache-Control': 'no-cache',
37+
Connection: 'keep-alive',
38+
});
39+
40+
// Send streaming events
41+
const events = [
42+
{
43+
type: 'message_start',
44+
message: {
45+
id: 'msg_stream123',
46+
type: 'message',
47+
role: 'assistant',
48+
model,
49+
content: [],
50+
usage: { input_tokens: 10 },
51+
},
52+
},
53+
{ type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } },
54+
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Hello ' } },
55+
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'from ' } },
56+
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'stream!' } },
57+
{ type: 'content_block_stop', index: 0 },
58+
{
59+
type: 'message_delta',
60+
delta: { stop_reason: 'end_turn', stop_sequence: null },
61+
usage: { output_tokens: 15 },
62+
},
63+
{ type: 'message_stop' },
64+
];
65+
66+
events.forEach((event, index) => {
67+
setTimeout(() => {
68+
res.write(`event: ${event.type}\n`);
69+
res.write(`data: ${JSON.stringify(event)}\n\n`);
70+
71+
if (index === events.length - 1) {
72+
res.end();
73+
}
74+
}, index * 10); // Small delay between events
75+
});
76+
77+
return;
78+
}
79+
80+
// Non-streaming response
3281
res.send({
3382
id: 'msg_mock123',
3483
type: 'message',
@@ -92,8 +141,32 @@ async function run() {
92141

93142
// Fourth test: models.retrieve
94143
await client.models.retrieve('claude-3-haiku-20240307');
144+
145+
// Fifth test: streaming via messages.create
146+
const stream = await client.messages.create({
147+
model: 'claude-3-haiku-20240307',
148+
messages: [{ role: 'user', content: 'What is the capital of France?' }],
149+
stream: true,
150+
});
151+
152+
for await (const _ of stream) {
153+
void _;
154+
}
155+
156+
// Sixth test: streaming via messages.stream
157+
await client.messages
158+
.stream({
159+
model: 'claude-3-haiku-20240307',
160+
messages: [{ role: 'user', content: 'What is the capital of France?' }],
161+
})
162+
.on('streamEvent', () => {
163+
Sentry.captureMessage('stream event from user-added event listener captured');
164+
});
95165
});
96166

167+
// Wait for the stream event handler to finish
168+
await Sentry.flush(2000);
169+
97170
server.close();
98171
}
99172

dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,30 @@ describe('Anthropic integration', () => {
152152
origin: 'auto.ai.anthropic',
153153
status: 'ok',
154154
}),
155+
// Fifth span - messages.create with stream: true
156+
expect.objectContaining({
157+
data: expect.objectContaining({
158+
'gen_ai.operation.name': 'messages',
159+
'gen_ai.request.model': 'claude-3-haiku-20240307',
160+
'gen_ai.request.stream': true,
161+
}),
162+
description: 'messages claude-3-haiku-20240307 stream-response',
163+
op: 'gen_ai.messages',
164+
origin: 'auto.ai.anthropic',
165+
status: 'ok',
166+
}),
167+
// Sixth span - messages.stream
168+
expect.objectContaining({
169+
data: expect.objectContaining({
170+
'gen_ai.operation.name': 'messages',
171+
'gen_ai.request.model': 'claude-3-haiku-20240307',
172+
'gen_ai.request.stream': true,
173+
}),
174+
description: 'messages claude-3-haiku-20240307 stream-response',
175+
op: 'gen_ai.messages',
176+
origin: 'auto.ai.anthropic',
177+
status: 'ok',
178+
}),
155179
]),
156180
};
157181

@@ -189,6 +213,21 @@ describe('Anthropic integration', () => {
189213
]),
190214
};
191215

216+
const EXPECTED_MODEL_ERROR = {
217+
exception: {
218+
values: [
219+
{
220+
type: 'Error',
221+
value: '404 Model not found',
222+
},
223+
],
224+
},
225+
};
226+
227+
const EXPECTED_STREAM_EVENT_HANDLER_MESSAGE = {
228+
message: 'stream event from user-added event listener captured',
229+
};
230+
192231
createEsmAndCjsTests(__dirname, 'scenario-manual-client.mjs', 'instrument.mjs', (createRunner, test) => {
193232
test('creates anthropic related spans when manually insturmenting client', async () => {
194233
await createRunner()
@@ -202,8 +241,9 @@ describe('Anthropic integration', () => {
202241
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createRunner, test) => {
203242
test('creates anthropic related spans with sendDefaultPii: false', async () => {
204243
await createRunner()
205-
.ignore('event')
244+
.expect({ event: EXPECTED_MODEL_ERROR })
206245
.expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE })
246+
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
207247
.start()
208248
.completed();
209249
});
@@ -212,8 +252,9 @@ describe('Anthropic integration', () => {
212252
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-pii.mjs', (createRunner, test) => {
213253
test('creates anthropic related spans with sendDefaultPii: true', async () => {
214254
await createRunner()
215-
.ignore('event')
255+
.expect({ event: EXPECTED_MODEL_ERROR })
216256
.expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE })
257+
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
217258
.start()
218259
.completed();
219260
});
@@ -222,8 +263,9 @@ describe('Anthropic integration', () => {
222263
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-options.mjs', (createRunner, test) => {
223264
test('creates anthropic related spans with custom options', async () => {
224265
await createRunner()
225-
.ignore('event')
266+
.expect({ event: EXPECTED_MODEL_ERROR })
226267
.expect({ transaction: EXPECTED_TRANSACTION_WITH_OPTIONS })
268+
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
227269
.start()
228270
.completed();
229271
});

packages/core/src/utils/anthropic-ai/index.ts

Lines changed: 116 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
} from '../ai/gen-ai-attributes';
2626
import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils';
2727
import { handleCallbackErrors } from '../handleCallbackErrors';
28-
import { instrumentStream } from './streaming';
28+
import { instrumentAsyncIterableStream, instrumentMessageStream } from './streaming';
2929
import type {
3030
AnthropicAiInstrumentedMethod,
3131
AnthropicAiOptions,
@@ -194,6 +194,74 @@ function addResponseAttributes(span: Span, response: AnthropicAiResponse, record
194194
addMetadataAttributes(span, response);
195195
}
196196

197+
/**
198+
* Handle common error catching and reporting for streaming requests
199+
*/
200+
function handleStreamingError(error: unknown, span: Span, methodPath: string): never {
201+
captureException(error, {
202+
mechanism: { handled: false, type: 'auto.ai.anthropic', data: { function: methodPath } },
203+
});
204+
205+
if (span.isRecording()) {
206+
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
207+
span.end();
208+
}
209+
throw error;
210+
}
211+
212+
/**
213+
* Handle streaming cases with common logic
214+
*/
215+
function handleStreamingRequest<T extends unknown[], R>(
216+
originalMethod: (...args: T) => Promise<R>,
217+
target: (...args: T) => Promise<R>,
218+
context: unknown,
219+
args: T,
220+
requestAttributes: Record<string, unknown>,
221+
operationName: string,
222+
methodPath: string,
223+
params: Record<string, unknown> | undefined,
224+
options: AnthropicAiOptions,
225+
isStreamRequested: boolean,
226+
): Promise<R> {
227+
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
228+
const spanConfig = {
229+
name: `${operationName} ${model} stream-response`,
230+
op: getSpanOperation(methodPath),
231+
attributes: requestAttributes as Record<string, SpanAttributeValue>,
232+
};
233+
234+
if (isStreamRequested) {
235+
return startSpanManual(spanConfig, async span => {
236+
try {
237+
if (options.recordInputs && params) {
238+
addPrivateRequestAttributes(span, params);
239+
}
240+
const result = await originalMethod.apply(context, args);
241+
return instrumentAsyncIterableStream(
242+
result as AsyncIterable<AnthropicAiStreamingEvent>,
243+
span,
244+
options.recordOutputs ?? false,
245+
) as unknown as R;
246+
} catch (error) {
247+
return handleStreamingError(error, span, methodPath);
248+
}
249+
});
250+
} else {
251+
return startSpanManual(spanConfig, span => {
252+
try {
253+
if (options.recordInputs && params) {
254+
addPrivateRequestAttributes(span, params);
255+
}
256+
const messageStream = target.apply(context, args);
257+
return instrumentMessageStream(messageStream, span, options.recordOutputs ?? false);
258+
} catch (error) {
259+
return handleStreamingError(error, span, methodPath);
260+
}
261+
});
262+
}
263+
}
264+
197265
/**
198266
* Instrument a method with Sentry spans
199267
* Following Sentry AI Agents Manual Instrumentation conventions
@@ -205,82 +273,62 @@ function instrumentMethod<T extends unknown[], R>(
205273
context: unknown,
206274
options: AnthropicAiOptions,
207275
): (...args: T) => Promise<R> {
208-
return async function instrumentedMethod(...args: T): Promise<R> {
209-
const requestAttributes = extractRequestAttributes(args, methodPath);
210-
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
211-
const operationName = getFinalOperationName(methodPath);
276+
return new Proxy(originalMethod, {
277+
apply(target, thisArg, args: T): Promise<R> {
278+
const requestAttributes = extractRequestAttributes(args, methodPath);
279+
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
280+
const operationName = getFinalOperationName(methodPath);
212281

213-
const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
214-
const isStreamRequested = Boolean(params?.stream);
215-
const isStreamingMethod = methodPath === 'messages.stream';
282+
const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
283+
const isStreamRequested = Boolean(params?.stream);
284+
const isStreamingMethod = methodPath === 'messages.stream';
216285

217-
if (isStreamRequested || isStreamingMethod) {
218-
return startSpanManual(
286+
if (isStreamRequested || isStreamingMethod) {
287+
return handleStreamingRequest(
288+
originalMethod,
289+
target,
290+
context,
291+
args,
292+
requestAttributes,
293+
operationName,
294+
methodPath,
295+
params,
296+
options,
297+
isStreamRequested,
298+
);
299+
}
300+
301+
return startSpan(
219302
{
220-
name: `${operationName} ${model} stream-response`,
303+
name: `${operationName} ${model}`,
221304
op: getSpanOperation(methodPath),
222305
attributes: requestAttributes as Record<string, SpanAttributeValue>,
223306
},
224-
async span => {
225-
try {
226-
if (options.recordInputs && params) {
227-
addPrivateRequestAttributes(span, params);
228-
}
229-
230-
const result = await originalMethod.apply(context, args);
231-
return instrumentStream(
232-
result as AsyncIterable<AnthropicAiStreamingEvent>,
233-
span,
234-
options.recordOutputs ?? false,
235-
) as unknown as R;
236-
} catch (error) {
237-
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
238-
captureException(error, {
239-
mechanism: {
240-
handled: false,
241-
type: 'auto.ai.anthropic',
242-
data: {
243-
function: methodPath,
244-
},
245-
},
246-
});
247-
span.end();
248-
throw error;
307+
span => {
308+
if (options.recordInputs && params) {
309+
addPrivateRequestAttributes(span, params);
249310
}
250-
},
251-
);
252-
}
253311

254-
return startSpan(
255-
{
256-
name: `${operationName} ${model}`,
257-
op: getSpanOperation(methodPath),
258-
attributes: requestAttributes as Record<string, SpanAttributeValue>,
259-
},
260-
span => {
261-
if (options.recordInputs && params) {
262-
addPrivateRequestAttributes(span, params);
263-
}
264-
265-
return handleCallbackErrors(
266-
() => originalMethod.apply(context, args),
267-
error => {
268-
captureException(error, {
269-
mechanism: {
270-
handled: false,
271-
type: 'auto.ai.anthropic',
272-
data: {
273-
function: methodPath,
312+
return handleCallbackErrors(
313+
() => target.apply(context, args),
314+
error => {
315+
captureException(error, {
316+
mechanism: {
317+
handled: false,
318+
type: 'auto.ai.anthropic',
319+
data: {
320+
function: methodPath,
321+
},
274322
},
275-
},
276-
});
277-
},
278-
() => {},
279-
result => addResponseAttributes(span, result as AnthropicAiResponse, options.recordOutputs),
280-
);
281-
},
282-
);
283-
};
323+
});
324+
},
325+
() => {},
326+
result => addResponseAttributes(span, result as AnthropicAiResponse, options.recordOutputs),
327+
);
328+
},
329+
);
330+
},
331+
}) as (...args: T) => Promise<R>;
284332
}
285333

286334
/**

0 commit comments

Comments
 (0)