Skip to content

Commit 3b824da

Browse files
authored
fix(llmobs): do not cancel the context before reading response bodies (#4075)
Co-authored-by: rodrigo.arguello <[email protected]>
1 parent 6854674 commit 3b824da

File tree

7 files changed

+157
-83
lines changed

7 files changed

+157
-83
lines changed

internal/llmobs/llmobs.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -382,9 +382,9 @@ func (l *LLMObs) batchSend(params batchSendParams) {
382382
}
383383
}
384384
if err := l.Transport.PushSpanEvents(ctx, events); err != nil {
385-
log.Error("llmobs: PushSpanEvents failed: %v", err.Error())
385+
log.Error("llmobs: failed to push span events: %v", err.Error())
386386
} else {
387-
log.Debug("llmobs: PushSpanEvents success")
387+
log.Debug("llmobs: push span events success")
388388
}
389389
}()
390390
}
@@ -402,9 +402,9 @@ func (l *LLMObs) batchSend(params batchSendParams) {
402402
}
403403
}
404404
if err := l.Transport.PushEvalMetrics(ctx, metrics); err != nil {
405-
log.Error("llmobs: PushEvalMetrics failed: %v", err.Error())
405+
log.Error("llmobs: failed to push eval metrics: %v", err.Error())
406406
} else {
407-
log.Debug("llmobs: PushEvalMetrics success")
407+
log.Debug("llmobs: push eval metrics success")
408408
}
409409
}()
410410
}

internal/llmobs/transport/dne.go

Lines changed: 62 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,17 @@ func (c *Transport) GetDatasetByName(ctx context.Context, name, projectID string
201201
datasetPath := fmt.Sprintf("%s/%s/datasets?%s", endpointPrefixDNE, url.PathEscape(projectID), q.Encode())
202202
method := http.MethodGet
203203

204-
status, b, err := c.jsonRequest(ctx, method, datasetPath, subdomainDNE, nil, defaultTimeout)
205-
if err != nil || status != http.StatusOK {
206-
return nil, fmt.Errorf("get dataset by name %q failed: %v", name, err)
204+
result, err := c.jsonRequest(ctx, method, datasetPath, subdomainDNE, nil, defaultTimeout)
205+
if err != nil {
206+
return nil, err
207+
}
208+
if result.statusCode != http.StatusOK {
209+
return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
207210
}
208211

209212
var datasetResp GetDatasetResponse
210-
if err := json.Unmarshal(b, &datasetResp); err != nil {
211-
return nil, fmt.Errorf("decode datasets list: %w", err)
213+
if err := json.Unmarshal(result.body, &datasetResp); err != nil {
214+
return nil, fmt.Errorf("failed to decode json response: %w", err)
212215
}
213216
if len(datasetResp.Data) == 0 {
214217
return nil, ErrDatasetNotFound
@@ -238,15 +241,15 @@ func (c *Transport) CreateDataset(ctx context.Context, name, description, projec
238241
},
239242
},
240243
}
241-
status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
244+
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
242245
if err != nil {
243-
return nil, fmt.Errorf("create dataset %q failed: %v", name, err)
246+
return nil, err
244247
}
245-
log.Debug("llmobs/internal/transport.DatasetGetOrCreate: create dataset success (status code: %d)", status)
248+
log.Debug("llmobs: create dataset success (status code: %d)", result.statusCode)
246249

247250
var resp CreateDatasetResponse
248-
if err := json.Unmarshal(b, &resp); err != nil {
249-
return nil, fmt.Errorf("decode create dataset response: %w", err)
251+
if err := json.Unmarshal(result.body, &resp); err != nil {
252+
return nil, fmt.Errorf("failed to decode json response: %w", err)
250253
}
251254
id := resp.Data.ID
252255
dataset := resp.Data.Attributes
@@ -266,9 +269,12 @@ func (c *Transport) DeleteDataset(ctx context.Context, datasetIDs ...string) err
266269
},
267270
}
268271

269-
status, _, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
270-
if err != nil || status != http.StatusOK {
271-
return fmt.Errorf("delete dataset %v failed: %v", datasetIDs, err)
272+
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
273+
if err != nil {
274+
return err
275+
}
276+
if result.statusCode != http.StatusOK {
277+
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
272278
}
273279
return nil
274280
}
@@ -294,14 +300,17 @@ func (c *Transport) BatchUpdateDataset(
294300
},
295301
}
296302

297-
status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
298-
if err != nil || status != http.StatusOK {
299-
return -1, nil, fmt.Errorf("batch_update for dataset %q failed: %v", datasetID, err)
303+
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
304+
if err != nil {
305+
return -1, nil, err
306+
}
307+
if result.statusCode != http.StatusOK {
308+
return -1, nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
300309
}
301310

302311
var resp BatchUpdateDatasetResponse
303-
if err := json.Unmarshal(b, &resp); err != nil {
304-
return -1, nil, fmt.Errorf("decode batch_update response: %w", err)
312+
if err := json.Unmarshal(result.body, &resp); err != nil {
313+
return -1, nil, fmt.Errorf("failed to decode json response: %w", err)
305314
}
306315

307316
// FIXME: we don't get version numbers in responses to deletion requests
@@ -336,14 +345,17 @@ func (c *Transport) GetDatasetRecordsPage(ctx context.Context, datasetID, cursor
336345
recordsPath = fmt.Sprintf("%s?page[cursor]=%s", recordsPath, url.QueryEscape(cursor))
337346
}
338347

339-
status, b, err := c.jsonRequest(ctx, method, recordsPath, subdomainDNE, nil, getDatasetRecordsTimeout)
340-
if err != nil || status != http.StatusOK {
341-
return nil, "", fmt.Errorf("get dataset records page failed: %v (datasetID=%q, status=%d)", err, datasetID, status)
348+
result, err := c.jsonRequest(ctx, method, recordsPath, subdomainDNE, nil, getDatasetRecordsTimeout)
349+
if err != nil {
350+
return nil, "", err
351+
}
352+
if result.statusCode != http.StatusOK {
353+
return nil, "", fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
342354
}
343355

344356
var recordsResp GetDatasetRecordsResponse
345-
if err := json.Unmarshal(b, &recordsResp); err != nil {
346-
return nil, "", fmt.Errorf("decode dataset records: %w", err)
357+
if err := json.Unmarshal(result.body, &recordsResp); err != nil {
358+
return nil, "", fmt.Errorf("failed to decode json response: %w", err)
347359
}
348360

349361
records := make([]DatasetRecordView, 0, len(recordsResp.Data))
@@ -404,14 +416,17 @@ func (c *Transport) GetOrCreateProject(ctx context.Context, name string) (*Proje
404416
},
405417
},
406418
}
407-
status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
408-
if err != nil || status != http.StatusOK {
409-
return nil, fmt.Errorf("create project %q failed: %v", name, err)
419+
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
420+
if err != nil {
421+
return nil, err
422+
}
423+
if result.statusCode != http.StatusOK {
424+
return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
410425
}
411426

412427
var resp CreateProjectResponse
413-
if err := json.Unmarshal(b, &resp); err != nil {
414-
return nil, fmt.Errorf("decode project response: %w", err)
428+
if err := json.Unmarshal(result.body, &resp); err != nil {
429+
return nil, fmt.Errorf("failed to decode json response: %w", err)
415430
}
416431

417432
project := resp.Data.Attributes
@@ -450,14 +465,17 @@ func (c *Transport) CreateExperiment(
450465
},
451466
}
452467

453-
status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
454-
if err != nil || status != http.StatusOK {
455-
return nil, fmt.Errorf("create experiment %q failed: %v", name, err)
468+
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
469+
if err != nil {
470+
return nil, err
471+
}
472+
if result.statusCode != http.StatusOK {
473+
return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
456474
}
457475

458476
var resp CreateExperimentResponse
459-
if err := json.Unmarshal(b, &resp); err != nil {
460-
return nil, fmt.Errorf("decode experiment response: %w", err)
477+
if err := json.Unmarshal(result.body, &resp); err != nil {
478+
return nil, fmt.Errorf("failed to decode json response: %w", err)
461479
}
462480
exp := resp.Data.Attributes
463481
exp.ID = resp.Data.ID
@@ -485,12 +503,12 @@ func (c *Transport) PushExperimentEvents(
485503
},
486504
}
487505

488-
status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
506+
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
489507
if err != nil {
490-
return fmt.Errorf("post experiment eval metrics failed: %v", err)
508+
return err
491509
}
492-
if status != http.StatusOK && status != http.StatusAccepted {
493-
return fmt.Errorf("unexpected status %d: %s", status, string(b))
510+
if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted {
511+
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
494512
}
495513
return nil
496514
}
@@ -552,11 +570,14 @@ func (c *Transport) BulkUploadDataset(ctx context.Context, datasetID string, rec
552570
path := fmt.Sprintf("%s/datasets/%s/records/upload", endpointPrefixDNE, url.PathEscape(datasetID))
553571
contentType := fmt.Sprintf("multipart/form-data; boundary=%s", boundary)
554572

555-
status, respBody, err := c.request(ctx, http.MethodPost, path, subdomainDNE, bytes.NewReader(body.Bytes()), contentType, bulkUploadTimeout)
556-
if err != nil || status != http.StatusOK {
557-
return fmt.Errorf("bulk upload failed: %w", err)
573+
result, err := c.request(ctx, http.MethodPost, path, subdomainDNE, bytes.NewReader(body.Bytes()), contentType, bulkUploadTimeout)
574+
if err != nil {
575+
return err
576+
}
577+
if result.statusCode != http.StatusOK {
578+
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
558579
}
559580

560-
log.Debug("llmobs/transport: successfully bulk uploaded %d records to dataset %q: %s", len(records), datasetID, string(respBody))
581+
log.Debug("llmobs/transport: successfully bulk uploaded %d records to dataset %q: %s", len(records), datasetID, string(result.body))
561582
return nil
562583
}

internal/llmobs/transport/eval_metric.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,12 @@ func (c *Transport) PushEvalMetrics(
8080
},
8181
}
8282

83-
status, b, err := c.jsonRequest(ctx, method, path, subdomainEvalMetric, body, defaultTimeout)
83+
result, err := c.jsonRequest(ctx, method, path, subdomainEvalMetric, body, defaultTimeout)
8484
if err != nil {
85-
return fmt.Errorf("post llmobs eval metrics failed: %v", err)
85+
return err
8686
}
87-
if status != http.StatusOK && status != http.StatusAccepted {
88-
return fmt.Errorf("unexpected status %d: %s", status, string(b))
87+
if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted {
88+
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
8989
}
9090
return nil
9191
}

internal/llmobs/transport/span.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,12 @@ func (c *Transport) PushSpanEvents(
7171
body = append(body, req)
7272
}
7373

74-
status, b, err := c.jsonRequest(ctx, method, path, subdomainLLMSpan, body, defaultTimeout)
74+
result, err := c.jsonRequest(ctx, method, path, subdomainLLMSpan, body, defaultTimeout)
7575
if err != nil {
76-
return fmt.Errorf("post llmobs spans failed: %w", err)
76+
return err
7777
}
78-
if status != http.StatusOK && status != http.StatusAccepted {
79-
return fmt.Errorf("unexpected status %d: %s", status, string(b))
78+
if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted {
79+
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
8080
}
8181
return nil
8282
}

0 commit comments

Comments
 (0)