Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/llmobs/llmobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ func (l *LLMObs) batchSend(params batchSendParams) {
}
}
if err := l.Transport.PushSpanEvents(ctx, events); err != nil {
log.Error("llmobs: PushSpanEvents failed: %v", err.Error())
log.Error("llmobs: failed to push span events: %v", err.Error())
} else {
log.Debug("llmobs: PushSpanEvents success")
log.Debug("llmobs: push span events success")
}
}()
}
Expand All @@ -402,9 +402,9 @@ func (l *LLMObs) batchSend(params batchSendParams) {
}
}
if err := l.Transport.PushEvalMetrics(ctx, metrics); err != nil {
log.Error("llmobs: PushEvalMetrics failed: %v", err.Error())
log.Error("llmobs: failed to push eval metrics: %v", err.Error())
} else {
log.Debug("llmobs: PushEvalMetrics success")
log.Debug("llmobs: push eval metrics success")
}
}()
}
Expand Down
103 changes: 62 additions & 41 deletions internal/llmobs/transport/dne.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,17 @@ func (c *Transport) GetDatasetByName(ctx context.Context, name, projectID string
datasetPath := fmt.Sprintf("%s/%s/datasets?%s", endpointPrefixDNE, url.PathEscape(projectID), q.Encode())
method := http.MethodGet

status, b, err := c.jsonRequest(ctx, method, datasetPath, subdomainDNE, nil, defaultTimeout)
if err != nil || status != http.StatusOK {
return nil, fmt.Errorf("get dataset by name %q failed: %v", name, err)
result, err := c.jsonRequest(ctx, method, datasetPath, subdomainDNE, nil, defaultTimeout)
if err != nil {
return nil, err
}
if result.statusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}

var datasetResp GetDatasetResponse
if err := json.Unmarshal(b, &datasetResp); err != nil {
return nil, fmt.Errorf("decode datasets list: %w", err)
if err := json.Unmarshal(result.body, &datasetResp); err != nil {
return nil, fmt.Errorf("failed to decode json response: %w", err)
}
if len(datasetResp.Data) == 0 {
return nil, ErrDatasetNotFound
Expand Down Expand Up @@ -238,15 +241,15 @@ func (c *Transport) CreateDataset(ctx context.Context, name, description, projec
},
},
}
status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil {
return nil, fmt.Errorf("create dataset %q failed: %v", name, err)
return nil, err
}
log.Debug("llmobs/internal/transport.DatasetGetOrCreate: create dataset success (status code: %d)", status)
log.Debug("llmobs: create dataset success (status code: %d)", result.statusCode)

var resp CreateDatasetResponse
if err := json.Unmarshal(b, &resp); err != nil {
return nil, fmt.Errorf("decode create dataset response: %w", err)
if err := json.Unmarshal(result.body, &resp); err != nil {
return nil, fmt.Errorf("failed to decode json response: %w", err)
}
id := resp.Data.ID
dataset := resp.Data.Attributes
Expand All @@ -266,9 +269,12 @@ func (c *Transport) DeleteDataset(ctx context.Context, datasetIDs ...string) err
},
}

status, _, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil || status != http.StatusOK {
return fmt.Errorf("delete dataset %v failed: %v", datasetIDs, err)
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil {
return err
}
if result.statusCode != http.StatusOK {
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}
return nil
}
Expand All @@ -294,14 +300,17 @@ func (c *Transport) BatchUpdateDataset(
},
}

status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil || status != http.StatusOK {
return -1, nil, fmt.Errorf("batch_update for dataset %q failed: %v", datasetID, err)
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil {
return -1, nil, err
}
if result.statusCode != http.StatusOK {
return -1, nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}

var resp BatchUpdateDatasetResponse
if err := json.Unmarshal(b, &resp); err != nil {
return -1, nil, fmt.Errorf("decode batch_update response: %w", err)
if err := json.Unmarshal(result.body, &resp); err != nil {
return -1, nil, fmt.Errorf("failed to decode json response: %w", err)
}

// FIXME: we don't get version numbers in responses to deletion requests
Expand Down Expand Up @@ -336,14 +345,17 @@ func (c *Transport) GetDatasetRecordsPage(ctx context.Context, datasetID, cursor
recordsPath = fmt.Sprintf("%s?page[cursor]=%s", recordsPath, url.QueryEscape(cursor))
}

status, b, err := c.jsonRequest(ctx, method, recordsPath, subdomainDNE, nil, getDatasetRecordsTimeout)
if err != nil || status != http.StatusOK {
return nil, "", fmt.Errorf("get dataset records page failed: %v (datasetID=%q, status=%d)", err, datasetID, status)
result, err := c.jsonRequest(ctx, method, recordsPath, subdomainDNE, nil, getDatasetRecordsTimeout)
if err != nil {
return nil, "", err
}
if result.statusCode != http.StatusOK {
return nil, "", fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}

var recordsResp GetDatasetRecordsResponse
if err := json.Unmarshal(b, &recordsResp); err != nil {
return nil, "", fmt.Errorf("decode dataset records: %w", err)
if err := json.Unmarshal(result.body, &recordsResp); err != nil {
return nil, "", fmt.Errorf("failed to decode json response: %w", err)
}

records := make([]DatasetRecordView, 0, len(recordsResp.Data))
Expand Down Expand Up @@ -404,14 +416,17 @@ func (c *Transport) GetOrCreateProject(ctx context.Context, name string) (*Proje
},
},
}
status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil || status != http.StatusOK {
return nil, fmt.Errorf("create project %q failed: %v", name, err)
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil {
return nil, err
}
if result.statusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}

var resp CreateProjectResponse
if err := json.Unmarshal(b, &resp); err != nil {
return nil, fmt.Errorf("decode project response: %w", err)
if err := json.Unmarshal(result.body, &resp); err != nil {
return nil, fmt.Errorf("failed to decode json response: %w", err)
}

project := resp.Data.Attributes
Expand Down Expand Up @@ -450,14 +465,17 @@ func (c *Transport) CreateExperiment(
},
}

status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil || status != http.StatusOK {
return nil, fmt.Errorf("create experiment %q failed: %v", name, err)
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil {
return nil, err
}
if result.statusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}

var resp CreateExperimentResponse
if err := json.Unmarshal(b, &resp); err != nil {
return nil, fmt.Errorf("decode experiment response: %w", err)
if err := json.Unmarshal(result.body, &resp); err != nil {
return nil, fmt.Errorf("failed to decode json response: %w", err)
}
exp := resp.Data.Attributes
exp.ID = resp.Data.ID
Expand Down Expand Up @@ -485,12 +503,12 @@ func (c *Transport) PushExperimentEvents(
},
}

status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout)
if err != nil {
return fmt.Errorf("post experiment eval metrics failed: %v", err)
return err
}
if status != http.StatusOK && status != http.StatusAccepted {
return fmt.Errorf("unexpected status %d: %s", status, string(b))
if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted {
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}
return nil
}
Expand Down Expand Up @@ -552,11 +570,14 @@ func (c *Transport) BulkUploadDataset(ctx context.Context, datasetID string, rec
path := fmt.Sprintf("%s/datasets/%s/records/upload", endpointPrefixDNE, url.PathEscape(datasetID))
contentType := fmt.Sprintf("multipart/form-data; boundary=%s", boundary)

status, respBody, err := c.request(ctx, http.MethodPost, path, subdomainDNE, bytes.NewReader(body.Bytes()), contentType, bulkUploadTimeout)
if err != nil || status != http.StatusOK {
return fmt.Errorf("bulk upload failed: %w", err)
result, err := c.request(ctx, http.MethodPost, path, subdomainDNE, bytes.NewReader(body.Bytes()), contentType, bulkUploadTimeout)
if err != nil {
return err
}
if result.statusCode != http.StatusOK {
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}

log.Debug("llmobs/transport: successfully bulk uploaded %d records to dataset %q: %s", len(records), datasetID, string(respBody))
log.Debug("llmobs/transport: successfully bulk uploaded %d records to dataset %q: %s", len(records), datasetID, string(result.body))
return nil
}
8 changes: 4 additions & 4 deletions internal/llmobs/transport/eval_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ func (c *Transport) PushEvalMetrics(
},
}

status, b, err := c.jsonRequest(ctx, method, path, subdomainEvalMetric, body, defaultTimeout)
result, err := c.jsonRequest(ctx, method, path, subdomainEvalMetric, body, defaultTimeout)
if err != nil {
return fmt.Errorf("post llmobs eval metrics failed: %v", err)
return err
}
if status != http.StatusOK && status != http.StatusAccepted {
return fmt.Errorf("unexpected status %d: %s", status, string(b))
if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted {
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}
return nil
}
8 changes: 4 additions & 4 deletions internal/llmobs/transport/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ func (c *Transport) PushSpanEvents(
body = append(body, req)
}

status, b, err := c.jsonRequest(ctx, method, path, subdomainLLMSpan, body, defaultTimeout)
result, err := c.jsonRequest(ctx, method, path, subdomainLLMSpan, body, defaultTimeout)
if err != nil {
return fmt.Errorf("post llmobs spans failed: %w", err)
return err
}
if status != http.StatusOK && status != http.StatusAccepted {
return fmt.Errorf("unexpected status %d: %s", status, string(b))
if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted {
return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body))
}
return nil
}
Loading
Loading