diff --git a/gateway/package.json b/gateway/package.json index 81bac5a..43291c6 100644 --- a/gateway/package.json +++ b/gateway/package.json @@ -5,7 +5,7 @@ "main": "src/index.ts", "scripts": { "typecheck": "tsgo --noEmit && cd test && tsgo --noEmit", - "test": "vitest" + "test": "vitest --reporter=verbose" }, "dependencies": { "@opentelemetry/api": "^1.9.0", diff --git a/gateway/src/gateway.ts b/gateway/src/gateway.ts index df85ea7..7d25dc9 100644 --- a/gateway/src/gateway.ts +++ b/gateway/src/gateway.ts @@ -64,32 +64,31 @@ export async function gateway( const result = await proxy.dispatch() // This doesn't work on streaming because the `result` object is returned as soon as we create the streaming response. - if (!('responseStream' in result) && !('httpStatusCode' in result)) { + if (!('responseStream' in result) && !('response' in result)) { const [spanName, attributes, level] = genAiOtelAttributes(result, proxy) dispatchSpan.end(spanName, attributes, { level }) } let response: Response - if ('httpStatusCode' in result) { - const { httpStatusCode, responseHeaders, responseBody } = result - response = new Response(responseBody, { status: httpStatusCode, headers: responseHeaders }) + if ('response' in result) { + response = result.response } else if ('responseStream' in result) { - const { successStatus: status, responseHeaders: headers, responseStream, disableKey, waitCompletion } = result + const { successStatus: status, responseHeaders: headers, responseStream, disableKey, onStreamComplete } = result runAfter( ctx, 'recordSpend', (async () => { - await waitCompletion - const cost = proxy.cost - if (cost) { - await recordSpend(apiKeyInfo, cost, options) - } else { + const complete = await onStreamComplete + if ('cost' in complete && complete.cost) { + await recordSpend(apiKeyInfo, complete.cost, options) + } else if ('error' in complete) { const { key: _key, ...context } = apiKeyInfo if (disableKey) { - logfire.error('api key blocked', { context, error: 'Unable to calculate cost' }) - await blockApiKey(apiKeyInfo, options, 'Unable to calculate cost') + logfire.reportError('api key blocked', complete.error, { context }) + await blockApiKey(apiKeyInfo, options, JSON.stringify(complete.error)) + } else { + logfire.reportError('Unable to calculate cost', complete.error, { context }) } - logfire.error('Unable to calculate cost', { context, error: 'Unable to calculate cost' }) } await otel.send() })(), diff --git a/gateway/src/otel/attributes.ts b/gateway/src/otel/attributes.ts index 401a9fb..129b1bb 100644 --- a/gateway/src/otel/attributes.ts +++ b/gateway/src/otel/attributes.ts @@ -58,6 +58,25 @@ export function genAiOtelAttributes( return [spanName, attributes, level] } +export function attributesFromRequest(request: Request): Attributes { + return { + 'http.request.method': request.method, + 'url.full': request.url, + ...Object.fromEntries( + Array.from(request.headers.entries()).map(([name, value]) => [`http.request.header.${name}`, value]), + ), + } +} + +export function attributesFromResponse(response: Response): Attributes { + return { + 'http.response.status_code': response.status, + ...Object.fromEntries( + Array.from(response.headers.entries()).map(([name, value]) => [`http.response.header.${name}`, value]), + ), + } +} + /** Semantic conventions for Generative AI * @see https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/ */ diff --git a/gateway/src/providers/default.ts b/gateway/src/providers/default.ts index 2032648..4a0cacc 100644 --- a/gateway/src/providers/default.ts +++ b/gateway/src/providers/default.ts @@ -12,7 +12,7 @@ import type { GatewayOptions } from '..' import type { ModelAPI } from '../api' import type { BaseAPI } from '../api/base' import type { OtelSpan } from '../otel' -import type { GenAIAttributes } from '../otel/attributes' +import { attributesFromRequest, attributesFromResponse, type GenAIAttributes } from '../otel/attributes' import type { ApiKeyInfo, ProviderProxy } from '../types' import { runAfter } from '../utils' @@ -29,10 +29,7 @@ export interface ProxySuccess { } export interface ProxyWhitelistedEndpoint { - requestBody: string - httpStatusCode: number - responseHeaders: Headers - responseBody: string + response: Response } export interface ProxyStreamingSuccess { @@ -42,7 +39,7 @@ export interface ProxyStreamingSuccess { responseHeaders: Headers responseStream: ReadableStream otelAttributes?: GenAIAttributes - waitCompletion: Promise + onStreamComplete: Promise<{ cost?: number } | { error: Error }> // In case we get to the end of the response, and we are unable to calculate the cost, we need to know if we can disable the key. disableKey?: boolean } @@ -297,27 +294,16 @@ export class DefaultProviderProxy { const response = await this.fetch(url, { method, headers: requestHeaders, body: requestBodyText }) if (this.isWhitelistedEndpoint()) { - // TODO(Marcelo): We can't read the body if it's a streaming response. - const responseBody = await response.text() - const { headers, status } = response this.otelSpan.end( `${this.request.method} ${this.restOfPath}`, { - 'http.method': this.request.method, - 'http.url': this.restOfPath, - 'http.response.status_code': status, + ...attributesFromRequest(this.request), + ...attributesFromResponse(response), 'http.request.body.text': requestBodyText, - 'http.response.body.text': responseBody, - ...Object.fromEntries( - Array.from(requestHeaders.entries()).map(([name, value]) => [`http.request.header.${name}`, value]), - ), - ...Object.fromEntries( - Array.from(headers.entries()).map(([name, value]) => [`http.response.header.${name}`, value]), - ), }, { level: 'info' }, ) - return { requestBody: requestBodyText, httpStatusCode: status, responseHeaders: headers, responseBody } + return { response } } // Each provider should be able to modify the response headers, e.g. remove openai org @@ -402,7 +388,23 @@ export class DefaultProviderProxy { // Track completion but don't wait for it before returning this.runAfter('extract-stream', extractionPromise) - const waitCompletion = extractionPromise.catch(() => {}) // Swallow errors, already logged + + const onStreamComplete = extractionPromise + .then((result) => { + // TODO(Marcelo): I think we actually need to emit 2 spans: one for HTTP, and another for the LLM. + this.otelSpan.end( + `chat ${modelAPI.extractedRequest?.requestModel ?? 'streaming'}`, + { + ...modelAPI.toGenAiOtelAttributes(), + ...attributesFromRequest(this.request), + ...attributesFromResponse(response), + }, + { level: 'info' }, + ) + + return result + }) + .catch() // Swallow errors, already logged return { requestModel, @@ -410,36 +412,31 @@ export class DefaultProviderProxy { successStatus: response.status, responseHeaders, responseStream, - waitCompletion, + onStreamComplete, } } - private async processChunks(modelAPI: BaseAPI, events: AsyncIterable): Promise { + private async processChunks( + modelAPI: BaseAPI, + events: AsyncIterable, + ): Promise<{ cost?: number } | { error: Error }> { for await (const chunk of events) { modelAPI.processChunk(chunk) } - this.otelSpan.end( - `chat ${modelAPI.extractedRequest?.requestModel ?? 'streaming'}`, - // TODO(Marcelo): Missing the HTTP attributes - Should we pass them around or store in ModelAPI? - { ...modelAPI.toGenAiOtelAttributes() }, - { level: 'info' }, - ) - const provider = this.usageProvider() const usage = modelAPI.extractedResponse.usage const responseModel = modelAPI.extractedResponse.responseModel if (!provider || !usage || !responseModel) { - logfire.warning('Unable to calculate cost', { provider, usage, responseModel }) + return { error: new Error(`Unable to calculate cost for model ${responseModel}`) } + } + + const price = calcPrice(usage, responseModel, { provider }) + if (price) { + return { cost: price.total_price } } else { - const price = calcPrice(usage, responseModel, { providerId: this.providerId() }) - if (price) { - this.cost = price.total_price - logfire.info('cost {cost}', { cost: this.cost, usage, responseModel }) - } else { - logfire.warning('Unable to calculate cost', { provider, usage, responseModel }) - } + return { error: new Error(`Unable to calculate cost for model ${responseModel} and provider ${provider.name}`) } } } diff --git a/gateway/test/providers/anthropic.spec.ts.snap b/gateway/test/providers/anthropic.spec.ts.snap index 92f6a56..f440728 100644 --- a/gateway/test/providers/anthropic.spec.ts.snap +++ b/gateway/test/providers/anthropic.spec.ts.snap @@ -1130,7 +1130,7 @@ exports[`anthropic > should call anthropic via gateway with stream > span 1`] = { "key": "logfire.json_schema", "value": { - "stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.cache_write_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"}}}", + "stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.cache_write_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"},"http.request.method":{"type":"string"},"url.full":{"type":"string"},"http.request.header.accept":{"type":"string"},"http.request.header.anthropic-version":{"type":"string"},"http.request.header.authorization":{"type":"string"},"http.request.header.content-type":{"type":"string"},"http.request.header.user-agent":{"type":"string"},"http.request.header.x-stainless-arch":{"type":"string"},"http.request.header.x-stainless-lang":{"type":"string"},"http.request.header.x-stainless-os":{"type":"string"},"http.request.header.x-stainless-package-version":{"type":"string"},"http.request.header.x-stainless-retry-count":{"type":"string"},"http.request.header.x-stainless-runtime":{"type":"string"},"http.request.header.x-stainless-runtime-version":{"type":"string"},"http.request.header.x-stainless-timeout":{"type":"string"},"http.response.status_code":{"type":"number"},"http.response.header.server":{"type":"string"},"http.response.header.transfer-encoding":{"type":"string"}}}", }, }, { @@ -1199,5 +1199,113 @@ exports[`anthropic > should call anthropic via gateway with stream > span 1`] = "intValue": 10, }, }, + { + "key": "http.request.method", + "value": { + "stringValue": "POST", + }, + }, + { + "key": "url.full", + "value": { + "stringValue": "https://example.com/anthropic/v1/messages?beta=true", + }, + }, + { + "key": "http.request.header.accept", + "value": { + "stringValue": "application/json", + }, + }, + { + "key": "http.request.header.anthropic-version", + "value": { + "stringValue": "2023-06-01", + }, + }, + { + "key": "http.request.header.authorization", + "value": { + "stringValue": "Bearer healthy", + }, + }, + { + "key": "http.request.header.content-type", + "value": { + "stringValue": "application/json", + }, + }, + { + "key": "http.request.header.user-agent", + "value": { + "stringValue": "Anthropic/JS 0.62.0", + }, + }, + { + "key": "http.request.header.x-stainless-arch", + "value": { + "stringValue": "unknown", + }, + }, + { + "key": "http.request.header.x-stainless-lang", + "value": { + "stringValue": "js", + }, + }, + { + "key": "http.request.header.x-stainless-os", + "value": { + "stringValue": "Unknown", + }, + }, + { + "key": "http.request.header.x-stainless-package-version", + "value": { + "stringValue": "0.62.0", + }, + }, + { + "key": "http.request.header.x-stainless-retry-count", + "value": { + "stringValue": "0", + }, + }, + { + "key": "http.request.header.x-stainless-runtime", + "value": { + "stringValue": "unknown", + }, + }, + { + "key": "http.request.header.x-stainless-runtime-version", + "value": { + "stringValue": "unknown", + }, + }, + { + "key": "http.request.header.x-stainless-timeout", + "value": { + "stringValue": "600", + }, + }, + { + "key": "http.response.status_code", + "value": { + "intValue": 200, + }, + }, + { + "key": "http.response.header.server", + "value": { + "stringValue": "uvicorn", + }, + }, + { + "key": "http.response.header.transfer-encoding", + "value": { + "stringValue": "chunked", + }, + }, ] `; diff --git a/gateway/test/providers/openai.spec.ts.snap b/gateway/test/providers/openai.spec.ts.snap index c6c2b95..05f39f6 100644 --- a/gateway/test/providers/openai.spec.ts.snap +++ b/gateway/test/providers/openai.spec.ts.snap @@ -463,7 +463,7 @@ exports[`openai > openai chat stream > span 1`] = ` { "key": "logfire.json_schema", "value": { - "stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"},"gen_ai.usage.input_audio_tokens":{"type":"number"},"gen_ai.usage.output_audio_tokens":{"type":"number"}}}", + "stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"},"gen_ai.usage.input_audio_tokens":{"type":"number"},"gen_ai.usage.output_audio_tokens":{"type":"number"},"http.request.method":{"type":"string"},"url.full":{"type":"string"},"http.request.header.accept":{"type":"string"},"http.request.header.authorization":{"type":"string"},"http.request.header.content-length":{"type":"string"},"http.request.header.content-type":{"type":"string"},"http.request.header.user-agent":{"type":"string"},"http.request.header.x-stainless-arch":{"type":"string"},"http.request.header.x-stainless-lang":{"type":"string"},"http.request.header.x-stainless-os":{"type":"string"},"http.request.header.x-stainless-package-version":{"type":"string"},"http.request.header.x-stainless-retry-count":{"type":"string"},"http.request.header.x-stainless-runtime":{"type":"string"},"http.request.header.x-stainless-runtime-version":{"type":"string"},"http.request.header.x-stainless-timeout":{"type":"string"},"http.response.status_code":{"type":"number"},"http.response.header.server":{"type":"string"},"http.response.header.transfer-encoding":{"type":"string"}}}", }, }, { @@ -538,6 +538,114 @@ exports[`openai > openai chat stream > span 1`] = ` "intValue": 0, }, }, + { + "key": "http.request.method", + "value": { + "stringValue": "POST", + }, + }, + { + "key": "url.full", + "value": { + "stringValue": "https://example.com/openai/chat/completions", + }, + }, + { + "key": "http.request.header.accept", + "value": { + "stringValue": "application/json", + }, + }, + { + "key": "http.request.header.authorization", + "value": { + "stringValue": "Bearer healthy", + }, + }, + { + "key": "http.request.header.content-length", + "value": { + "stringValue": "319", + }, + }, + { + "key": "http.request.header.content-type", + "value": { + "stringValue": "application/json", + }, + }, + { + "key": "http.request.header.user-agent", + "value": { + "stringValue": "OpenAI/JS 4.104.0", + }, + }, + { + "key": "http.request.header.x-stainless-arch", + "value": { + "stringValue": "unknown", + }, + }, + { + "key": "http.request.header.x-stainless-lang", + "value": { + "stringValue": "js", + }, + }, + { + "key": "http.request.header.x-stainless-os", + "value": { + "stringValue": "Unknown", + }, + }, + { + "key": "http.request.header.x-stainless-package-version", + "value": { + "stringValue": "4.104.0", + }, + }, + { + "key": "http.request.header.x-stainless-retry-count", + "value": { + "stringValue": "0", + }, + }, + { + "key": "http.request.header.x-stainless-runtime", + "value": { + "stringValue": "unknown", + }, + }, + { + "key": "http.request.header.x-stainless-runtime-version", + "value": { + "stringValue": "unknown", + }, + }, + { + "key": "http.request.header.x-stainless-timeout", + "value": { + "stringValue": "600", + }, + }, + { + "key": "http.response.status_code", + "value": { + "intValue": 200, + }, + }, + { + "key": "http.response.header.server", + "value": { + "stringValue": "uvicorn", + }, + }, + { + "key": "http.response.header.transfer-encoding", + "value": { + "stringValue": "chunked", + }, + }, ] `; diff --git a/proxy-vcr/proxy_vcr/main.py b/proxy-vcr/proxy_vcr/main.py index 0c03d2f..4cf10a0 100644 --- a/proxy-vcr/proxy_vcr/main.py +++ b/proxy-vcr/proxy_vcr/main.py @@ -110,7 +110,14 @@ async def health_check(_: Request) -> Response: if __name__ == '__main__': this_dir = pathlib.Path(__file__).parent - uvicorn.run('proxy_vcr.main:app', host='0.0.0.0', port=8005, reload=True, reload_dirs=[str(this_dir)]) + uvicorn.run( + 'proxy_vcr.main:app', + host='0.0.0.0', + port=8005, + reload=True, + reload_dirs=[str(this_dir)], + date_header=False, + ) def cassette_name(provider: str, hash: str) -> str: