Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "src/index.ts",
"scripts": {
"typecheck": "tsgo --noEmit && cd test && tsgo --noEmit",
"test": "vitest"
"test": "vitest --reporter=verbose"
},
"dependencies": {
"@opentelemetry/api": "^1.9.0",
Expand Down
25 changes: 12 additions & 13 deletions gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,31 @@ export async function gateway(
const result = await proxy.dispatch()

// This doesn't work on streaming because the `result` object is returned as soon as we create the streaming response.
if (!('responseStream' in result) && !('httpStatusCode' in result)) {
if (!('responseStream' in result) && !('response' in result)) {
const [spanName, attributes, level] = genAiOtelAttributes(result, proxy)
dispatchSpan.end(spanName, attributes, { level })
}

let response: Response
if ('httpStatusCode' in result) {
const { httpStatusCode, responseHeaders, responseBody } = result
response = new Response(responseBody, { status: httpStatusCode, headers: responseHeaders })
if ('response' in result) {
response = result.response
} else if ('responseStream' in result) {
const { successStatus: status, responseHeaders: headers, responseStream, disableKey, waitCompletion } = result
const { successStatus: status, responseHeaders: headers, responseStream, disableKey, onStreamComplete } = result
runAfter(
ctx,
'recordSpend',
(async () => {
await waitCompletion
const cost = proxy.cost
if (cost) {
await recordSpend(apiKeyInfo, cost, options)
} else {
const complete = await onStreamComplete
if ('cost' in complete && complete.cost) {
await recordSpend(apiKeyInfo, complete.cost, options)
} else if ('error' in complete) {
const { key: _key, ...context } = apiKeyInfo
if (disableKey) {
logfire.error('api key blocked', { context, error: 'Unable to calculate cost' })
await blockApiKey(apiKeyInfo, options, 'Unable to calculate cost')
logfire.reportError('api key blocked', complete.error, { context })
await blockApiKey(apiKeyInfo, options, JSON.stringify(complete.error))
} else {
logfire.reportError('Unable to calculate cost', complete.error, { context })
}
logfire.error('Unable to calculate cost', { context, error: 'Unable to calculate cost' })
}
await otel.send()
})(),
Expand Down
19 changes: 19 additions & 0 deletions gateway/src/otel/attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ export function genAiOtelAttributes(
return [spanName, attributes, level]
}

export function attributesFromRequest(request: Request): Attributes {
return {
'http.request.method': request.method,
'url.full': request.url,
...Object.fromEntries(
Array.from(request.headers.entries()).map(([name, value]) => [`http.request.header.${name}`, value]),
),
}
}

export function attributesFromResponse(response: Response): Attributes {
return {
'http.response.status_code': response.status,
...Object.fromEntries(
Array.from(response.headers.entries()).map(([name, value]) => [`http.response.header.${name}`, value]),
),
}
}

/** Semantic conventions for Generative AI
* @see https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/
*/
Expand Down
73 changes: 35 additions & 38 deletions gateway/src/providers/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type { GatewayOptions } from '..'
import type { ModelAPI } from '../api'
import type { BaseAPI } from '../api/base'
import type { OtelSpan } from '../otel'
import type { GenAIAttributes } from '../otel/attributes'
import { attributesFromRequest, attributesFromResponse, type GenAIAttributes } from '../otel/attributes'
import type { ApiKeyInfo, ProviderProxy } from '../types'
import { runAfter } from '../utils'

Expand All @@ -29,10 +29,7 @@ export interface ProxySuccess {
}

export interface ProxyWhitelistedEndpoint {
requestBody: string
httpStatusCode: number
responseHeaders: Headers
responseBody: string
response: Response
}

export interface ProxyStreamingSuccess {
Expand All @@ -42,7 +39,7 @@ export interface ProxyStreamingSuccess {
responseHeaders: Headers
responseStream: ReadableStream
otelAttributes?: GenAIAttributes
waitCompletion: Promise<void>
onStreamComplete: Promise<{ cost?: number } | { error: Error }>
// In case we get to the end of the response, and we are unable to calculate the cost, we need to know if we can disable the key.
disableKey?: boolean
}
Expand Down Expand Up @@ -297,27 +294,16 @@ export class DefaultProviderProxy {
const response = await this.fetch(url, { method, headers: requestHeaders, body: requestBodyText })

if (this.isWhitelistedEndpoint()) {
// TODO(Marcelo): We can't read the body if it's a streaming response.
const responseBody = await response.text()
const { headers, status } = response
this.otelSpan.end(
`${this.request.method} ${this.restOfPath}`,
{
'http.method': this.request.method,
'http.url': this.restOfPath,
'http.response.status_code': status,
...attributesFromRequest(this.request),
...attributesFromResponse(response),
'http.request.body.text': requestBodyText,
'http.response.body.text': responseBody,
...Object.fromEntries(
Array.from(requestHeaders.entries()).map(([name, value]) => [`http.request.header.${name}`, value]),
),
...Object.fromEntries(
Array.from(headers.entries()).map(([name, value]) => [`http.response.header.${name}`, value]),
),
},
{ level: 'info' },
)
return { requestBody: requestBodyText, httpStatusCode: status, responseHeaders: headers, responseBody }
return { response }
}

// Each provider should be able to modify the response headers, e.g. remove openai org
Expand Down Expand Up @@ -402,44 +388,55 @@ export class DefaultProviderProxy {

// Track completion but don't wait for it before returning
this.runAfter('extract-stream', extractionPromise)
const waitCompletion = extractionPromise.catch(() => {}) // Swallow errors, already logged

const onStreamComplete = extractionPromise
.then((result) => {
// TODO(Marcelo): I think we actually need to emit 2 spans: one for HTTP, and another for the LLM.
this.otelSpan.end(
`chat ${modelAPI.extractedRequest?.requestModel ?? 'streaming'}`,
{
...modelAPI.toGenAiOtelAttributes(),
...attributesFromRequest(this.request),
...attributesFromResponse(response),
},
{ level: 'info' },
)

return result
})
.catch() // Swallow errors, already logged

return {
requestModel,
requestBody: requestBodyText,
successStatus: response.status,
responseHeaders,
responseStream,
waitCompletion,
onStreamComplete,
}
}

private async processChunks<T>(modelAPI: BaseAPI<unknown, unknown, T>, events: AsyncIterable<T>): Promise<void> {
private async processChunks<T>(
modelAPI: BaseAPI<unknown, unknown, T>,
events: AsyncIterable<T>,
): Promise<{ cost?: number } | { error: Error }> {
for await (const chunk of events) {
modelAPI.processChunk(chunk)
}

this.otelSpan.end(
`chat ${modelAPI.extractedRequest?.requestModel ?? 'streaming'}`,
// TODO(Marcelo): Missing the HTTP attributes - Should we pass them around or store in ModelAPI?
{ ...modelAPI.toGenAiOtelAttributes() },
{ level: 'info' },
)

const provider = this.usageProvider()
const usage = modelAPI.extractedResponse.usage
const responseModel = modelAPI.extractedResponse.responseModel

if (!provider || !usage || !responseModel) {
logfire.warning('Unable to calculate cost', { provider, usage, responseModel })
return { error: new Error(`Unable to calculate cost for model ${responseModel}`) }
}

const price = calcPrice(usage, responseModel, { provider })
if (price) {
return { cost: price.total_price }
} else {
const price = calcPrice(usage, responseModel, { providerId: this.providerId() })
if (price) {
this.cost = price.total_price
logfire.info('cost {cost}', { cost: this.cost, usage, responseModel })
} else {
logfire.warning('Unable to calculate cost', { provider, usage, responseModel })
}
return { error: new Error(`Unable to calculate cost for model ${responseModel} and provider ${provider.name}`) }
}
}

Expand Down
110 changes: 109 additions & 1 deletion gateway/test/providers/anthropic.spec.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ exports[`anthropic > should call anthropic via gateway with stream > span 1`] =
{
"key": "logfire.json_schema",
"value": {
"stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.cache_write_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"}}}",
"stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.cache_write_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"},"http.request.method":{"type":"string"},"url.full":{"type":"string"},"http.request.header.accept":{"type":"string"},"http.request.header.anthropic-version":{"type":"string"},"http.request.header.authorization":{"type":"string"},"http.request.header.content-type":{"type":"string"},"http.request.header.user-agent":{"type":"string"},"http.request.header.x-stainless-arch":{"type":"string"},"http.request.header.x-stainless-lang":{"type":"string"},"http.request.header.x-stainless-os":{"type":"string"},"http.request.header.x-stainless-package-version":{"type":"string"},"http.request.header.x-stainless-retry-count":{"type":"string"},"http.request.header.x-stainless-runtime":{"type":"string"},"http.request.header.x-stainless-runtime-version":{"type":"string"},"http.request.header.x-stainless-timeout":{"type":"string"},"http.response.status_code":{"type":"number"},"http.response.header.server":{"type":"string"},"http.response.header.transfer-encoding":{"type":"string"}}}",
},
},
{
Expand Down Expand Up @@ -1199,5 +1199,113 @@ exports[`anthropic > should call anthropic via gateway with stream > span 1`] =
"intValue": 10,
},
},
{
"key": "http.request.method",
"value": {
"stringValue": "POST",
},
},
{
"key": "url.full",
"value": {
"stringValue": "https://example.com/anthropic/v1/messages?beta=true",
},
},
{
"key": "http.request.header.accept",
"value": {
"stringValue": "application/json",
},
},
{
"key": "http.request.header.anthropic-version",
"value": {
"stringValue": "2023-06-01",
},
},
{
"key": "http.request.header.authorization",
"value": {
"stringValue": "Bearer healthy",
},
},
{
"key": "http.request.header.content-type",
"value": {
"stringValue": "application/json",
},
},
{
"key": "http.request.header.user-agent",
"value": {
"stringValue": "Anthropic/JS 0.62.0",
},
},
{
"key": "http.request.header.x-stainless-arch",
"value": {
"stringValue": "unknown",
},
},
{
"key": "http.request.header.x-stainless-lang",
"value": {
"stringValue": "js",
},
},
{
"key": "http.request.header.x-stainless-os",
"value": {
"stringValue": "Unknown",
},
},
{
"key": "http.request.header.x-stainless-package-version",
"value": {
"stringValue": "0.62.0",
},
},
{
"key": "http.request.header.x-stainless-retry-count",
"value": {
"stringValue": "0",
},
},
{
"key": "http.request.header.x-stainless-runtime",
"value": {
"stringValue": "unknown",
},
},
{
"key": "http.request.header.x-stainless-runtime-version",
"value": {
"stringValue": "unknown",
},
},
{
"key": "http.request.header.x-stainless-timeout",
"value": {
"stringValue": "600",
},
},
{
"key": "http.response.status_code",
"value": {
"intValue": 200,
},
},
{
"key": "http.response.header.server",
"value": {
"stringValue": "uvicorn",
},
},
{
"key": "http.response.header.transfer-encoding",
"value": {
"stringValue": "chunked",
},
},
]
`;
Loading