diff --git a/internal/llmobs/llmobs.go b/internal/llmobs/llmobs.go index 9fd47bbc3f..0c15452b50 100644 --- a/internal/llmobs/llmobs.go +++ b/internal/llmobs/llmobs.go @@ -382,9 +382,9 @@ func (l *LLMObs) batchSend(params batchSendParams) { } } if err := l.Transport.PushSpanEvents(ctx, events); err != nil { - log.Error("llmobs: PushSpanEvents failed: %v", err.Error()) + log.Error("llmobs: failed to push span events: %v", err.Error()) } else { - log.Debug("llmobs: PushSpanEvents success") + log.Debug("llmobs: push span events success") } }() } @@ -402,9 +402,9 @@ func (l *LLMObs) batchSend(params batchSendParams) { } } if err := l.Transport.PushEvalMetrics(ctx, metrics); err != nil { - log.Error("llmobs: PushEvalMetrics failed: %v", err.Error()) + log.Error("llmobs: failed to push eval metrics: %v", err.Error()) } else { - log.Debug("llmobs: PushEvalMetrics success") + log.Debug("llmobs: push eval metrics success") } }() } diff --git a/internal/llmobs/transport/dne.go b/internal/llmobs/transport/dne.go index b545ee773e..5436a18364 100644 --- a/internal/llmobs/transport/dne.go +++ b/internal/llmobs/transport/dne.go @@ -201,14 +201,17 @@ func (c *Transport) GetDatasetByName(ctx context.Context, name, projectID string datasetPath := fmt.Sprintf("%s/%s/datasets?%s", endpointPrefixDNE, url.PathEscape(projectID), q.Encode()) method := http.MethodGet - status, b, err := c.jsonRequest(ctx, method, datasetPath, subdomainDNE, nil, defaultTimeout) - if err != nil || status != http.StatusOK { - return nil, fmt.Errorf("get dataset by name %q failed: %v", name, err) + result, err := c.jsonRequest(ctx, method, datasetPath, subdomainDNE, nil, defaultTimeout) + if err != nil { + return nil, err + } + if result.statusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } var datasetResp GetDatasetResponse - if err := json.Unmarshal(b, &datasetResp); err != nil { - return nil, fmt.Errorf("decode datasets list: %w", err) + if err := json.Unmarshal(result.body, &datasetResp); err != nil { + return nil, fmt.Errorf("failed to decode json response: %w", err) } if len(datasetResp.Data) == 0 { return nil, ErrDatasetNotFound @@ -238,15 +241,15 @@ func (c *Transport) CreateDataset(ctx context.Context, name, description, projec }, }, } - status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) + result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) if err != nil { - return nil, fmt.Errorf("create dataset %q failed: %v", name, err) + return nil, err } - log.Debug("llmobs/internal/transport.DatasetGetOrCreate: create dataset success (status code: %d)", status) + log.Debug("llmobs: create dataset success (status code: %d)", result.statusCode) var resp CreateDatasetResponse - if err := json.Unmarshal(b, &resp); err != nil { - return nil, fmt.Errorf("decode create dataset response: %w", err) + if err := json.Unmarshal(result.body, &resp); err != nil { + return nil, fmt.Errorf("failed to decode json response: %w", err) } id := resp.Data.ID dataset := resp.Data.Attributes @@ -266,9 +269,12 @@ func (c *Transport) DeleteDataset(ctx context.Context, datasetIDs ...string) err }, } - status, _, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) - if err != nil || status != http.StatusOK { - return fmt.Errorf("delete dataset %v failed: %v", datasetIDs, err) + result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) + if err != nil { + return err + } + if result.statusCode != http.StatusOK { + return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } return nil } @@ -294,14 +300,17 @@ func (c *Transport) BatchUpdateDataset( }, } - status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) - if err != nil || status != http.StatusOK { - return -1, nil, fmt.Errorf("batch_update for dataset %q failed: %v", datasetID, err) + result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) + if err != nil { + return -1, nil, err + } + if result.statusCode != http.StatusOK { + return -1, nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } var resp BatchUpdateDatasetResponse - if err := json.Unmarshal(b, &resp); err != nil { - return -1, nil, fmt.Errorf("decode batch_update response: %w", err) + if err := json.Unmarshal(result.body, &resp); err != nil { + return -1, nil, fmt.Errorf("failed to decode json response: %w", err) } // FIXME: we don't get version numbers in responses to deletion requests @@ -336,14 +345,17 @@ func (c *Transport) GetDatasetRecordsPage(ctx context.Context, datasetID, cursor recordsPath = fmt.Sprintf("%s?page[cursor]=%s", recordsPath, url.QueryEscape(cursor)) } - status, b, err := c.jsonRequest(ctx, method, recordsPath, subdomainDNE, nil, getDatasetRecordsTimeout) - if err != nil || status != http.StatusOK { - return nil, "", fmt.Errorf("get dataset records page failed: %v (datasetID=%q, status=%d)", err, datasetID, status) + result, err := c.jsonRequest(ctx, method, recordsPath, subdomainDNE, nil, getDatasetRecordsTimeout) + if err != nil { + return nil, "", err + } + if result.statusCode != http.StatusOK { + return nil, "", fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } var recordsResp GetDatasetRecordsResponse - if err := json.Unmarshal(b, &recordsResp); err != nil { - return nil, "", fmt.Errorf("decode dataset records: %w", err) + if err := json.Unmarshal(result.body, &recordsResp); err != nil { + return nil, "", fmt.Errorf("failed to decode json response: %w", err) } records := make([]DatasetRecordView, 0, len(recordsResp.Data)) @@ -404,14 +416,17 @@ func (c *Transport) GetOrCreateProject(ctx context.Context, name string) (*Proje }, }, } - status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) - if err != nil || status != http.StatusOK { - return nil, fmt.Errorf("create project %q failed: %v", name, err) + result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) + if err != nil { + return nil, err + } + if result.statusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } var resp CreateProjectResponse - if err := json.Unmarshal(b, &resp); err != nil { - return nil, fmt.Errorf("decode project response: %w", err) + if err := json.Unmarshal(result.body, &resp); err != nil { + return nil, fmt.Errorf("failed to decode json response: %w", err) } project := resp.Data.Attributes @@ -450,14 +465,17 @@ func (c *Transport) CreateExperiment( }, } - status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) - if err != nil || status != http.StatusOK { - return nil, fmt.Errorf("create experiment %q failed: %v", name, err) + result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) + if err != nil { + return nil, err + } + if result.statusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } var resp CreateExperimentResponse - if err := json.Unmarshal(b, &resp); err != nil { - return nil, fmt.Errorf("decode experiment response: %w", err) + if err := json.Unmarshal(result.body, &resp); err != nil { + return nil, fmt.Errorf("failed to decode json response: %w", err) } exp := resp.Data.Attributes exp.ID = resp.Data.ID @@ -485,12 +503,12 @@ func (c *Transport) PushExperimentEvents( }, } - status, b, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) + result, err := c.jsonRequest(ctx, method, path, subdomainDNE, body, defaultTimeout) if err != nil { - return fmt.Errorf("post experiment eval metrics failed: %v", err) + return err } - if status != http.StatusOK && status != http.StatusAccepted { - return fmt.Errorf("unexpected status %d: %s", status, string(b)) + if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted { + return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } return nil } @@ -552,11 +570,14 @@ func (c *Transport) BulkUploadDataset(ctx context.Context, datasetID string, rec path := fmt.Sprintf("%s/datasets/%s/records/upload", endpointPrefixDNE, url.PathEscape(datasetID)) contentType := fmt.Sprintf("multipart/form-data; boundary=%s", boundary) - status, respBody, err := c.request(ctx, http.MethodPost, path, subdomainDNE, bytes.NewReader(body.Bytes()), contentType, bulkUploadTimeout) - if err != nil || status != http.StatusOK { - return fmt.Errorf("bulk upload failed: %w", err) + result, err := c.request(ctx, http.MethodPost, path, subdomainDNE, bytes.NewReader(body.Bytes()), contentType, bulkUploadTimeout) + if err != nil { + return err + } + if result.statusCode != http.StatusOK { + return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } - log.Debug("llmobs/transport: successfully bulk uploaded %d records to dataset %q: %s", len(records), datasetID, string(respBody)) + log.Debug("llmobs/transport: successfully bulk uploaded %d records to dataset %q: %s", len(records), datasetID, string(result.body)) return nil } diff --git a/internal/llmobs/transport/eval_metric.go b/internal/llmobs/transport/eval_metric.go index 6e1bda4620..937e632524 100644 --- a/internal/llmobs/transport/eval_metric.go +++ b/internal/llmobs/transport/eval_metric.go @@ -80,12 +80,12 @@ func (c *Transport) PushEvalMetrics( }, } - status, b, err := c.jsonRequest(ctx, method, path, subdomainEvalMetric, body, defaultTimeout) + result, err := c.jsonRequest(ctx, method, path, subdomainEvalMetric, body, defaultTimeout) if err != nil { - return fmt.Errorf("post llmobs eval metrics failed: %v", err) + return err } - if status != http.StatusOK && status != http.StatusAccepted { - return fmt.Errorf("unexpected status %d: %s", status, string(b)) + if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted { + return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } return nil } diff --git a/internal/llmobs/transport/span.go b/internal/llmobs/transport/span.go index f30c319d12..50521db0fd 100644 --- a/internal/llmobs/transport/span.go +++ b/internal/llmobs/transport/span.go @@ -71,12 +71,12 @@ func (c *Transport) PushSpanEvents( body = append(body, req) } - status, b, err := c.jsonRequest(ctx, method, path, subdomainLLMSpan, body, defaultTimeout) + result, err := c.jsonRequest(ctx, method, path, subdomainLLMSpan, body, defaultTimeout) if err != nil { - return fmt.Errorf("post llmobs spans failed: %w", err) + return err } - if status != http.StatusOK && status != http.StatusAccepted { - return fmt.Errorf("unexpected status %d: %s", status, string(b)) + if result.statusCode != http.StatusOK && result.statusCode != http.StatusAccepted { + return fmt.Errorf("unexpected status %d: %s", result.statusCode, string(result.body)) } return nil } diff --git a/internal/llmobs/transport/transport.go b/internal/llmobs/transport/transport.go index 39f9a84022..4ddd3f91c9 100644 --- a/internal/llmobs/transport/transport.go +++ b/internal/llmobs/transport/transport.go @@ -48,9 +48,9 @@ const ( defaultSite = "datadoghq.com" defaultMaxRetries uint = 3 - defaultTimeout time.Duration = 5 * time.Second - bulkUploadTimeout time.Duration = 60 * time.Second - getDatasetRecordsTimeout time.Duration = 20 * time.Second + defaultTimeout = 5 * time.Second + bulkUploadTimeout = 60 * time.Second + getDatasetRecordsTimeout = 20 * time.Second ) var ( @@ -156,28 +156,33 @@ func (c *Transport) baseURL(subdomain string) string { return u } -func (c *Transport) jsonRequest(ctx context.Context, method, path, subdomain string, body any, timeout time.Duration) (int, []byte, error) { +func (c *Transport) jsonRequest(ctx context.Context, method, path, subdomain string, body any, timeout time.Duration) (requestResult, error) { var jsonBody io.Reader if body != nil { var buf bytes.Buffer enc := json.NewEncoder(&buf) enc.SetEscapeHTML(false) if err := enc.Encode(body); err != nil { - return 0, nil, fmt.Errorf("encode body: %w", err) + return requestResult{}, fmt.Errorf("failed to json encode body: %w", err) } jsonBody = bytes.NewReader(buf.Bytes()) } return c.request(ctx, method, path, subdomain, jsonBody, "application/json", timeout) } -func (c *Transport) request(ctx context.Context, method, path, subdomain string, body io.Reader, contentType string, timeout time.Duration) (int, []byte, error) { +type requestResult struct { + statusCode int + body []byte +} + +func (c *Transport) request(ctx context.Context, method, path, subdomain string, body io.Reader, contentType string, timeout time.Duration) (requestResult, error) { if timeout == 0 { timeout = defaultTimeout } urlStr := c.baseURL(subdomain) + path backoffStrat := defaultBackoffStrategy() - doRequest := func() (resp *http.Response, err error) { + doRequest := func() (result requestResult, err error) { log.Debug("llmobs: sending request (method: %s | url: %s)", method, urlStr) defer func() { if err != nil { @@ -189,14 +194,14 @@ func (c *Transport) request(ctx context.Context, method, path, subdomain string, if body != nil { if seeker, ok := body.(io.Seeker); ok { if _, err := seeker.Seek(0, io.SeekStart); err != nil { - return nil, fmt.Errorf("failed to reset body reader: %w", err) + return requestResult{}, fmt.Errorf("failed to reset body reader: %w", err) } } } req, err := http.NewRequestWithContext(ctx, method, urlStr, body) if err != nil { - return nil, err + return requestResult{}, err } req.Header.Set("Content-Type", contentType) @@ -217,50 +222,65 @@ func (c *Transport) request(ctx context.Context, method, path, subdomain string, req.Header.Set("X-Datadog-NeedsAppKey", "true") } } + + // Set per-endpoint timeout timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() req = req.WithContext(timeoutCtx) - resp, err = c.httpClient.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { - return nil, err + return requestResult{}, err } - defer func() { - if err != nil && resp != nil { - _ = resp.Body.Close() - } - }() + defer resp.Body.Close() code := resp.StatusCode if code >= 200 && code <= 299 { - return resp, nil + b, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return requestResult{}, fmt.Errorf("failed to read response body: %w", readErr) + } + log.Debug("llmobs: got success response: %s", string(b)) + return requestResult{statusCode: code, body: b}, nil } if isRetriableStatus(code) { - return nil, fmt.Errorf("request failed with transient http status code: %d", code) + errMsg := fmt.Sprintf("request failed with transient http status code: %d", code) + if body := readErrorBody(resp); body != "" { + errMsg = fmt.Sprintf("%s: %s", errMsg, body) + } + return requestResult{}, fmt.Errorf("%s", errMsg) } if code == http.StatusTooManyRequests { wait := parseRetryAfter(resp.Header) log.Debug("llmobs: status code 429, waiting %s before retry...", wait.String()) drainAndClose(resp.Body) - return nil, backoff.RetryAfter(int(wait.Seconds())) + return requestResult{}, backoff.RetryAfter(int(wait.Seconds())) + } + errMsg := fmt.Sprintf("request failed with http status code: %d", resp.StatusCode) + if body := readErrorBody(resp); body != "" { + errMsg = fmt.Sprintf("%s: %s", errMsg, body) } drainAndClose(resp.Body) - return nil, backoff.Permanent(fmt.Errorf("request failed with http status code: %d", resp.StatusCode)) + return requestResult{}, backoff.Permanent(fmt.Errorf("%s", errMsg)) } - resp, err := backoff.Retry(ctx, doRequest, backoff.WithBackOff(backoffStrat), backoff.WithMaxTries(defaultMaxRetries)) - if err != nil { - return 0, nil, err - } - defer resp.Body.Close() + return backoff.Retry(ctx, doRequest, backoff.WithBackOff(backoffStrat), backoff.WithMaxTries(defaultMaxRetries)) +} - b, err := io.ReadAll(resp.Body) +func readErrorBody(resp *http.Response) string { + if resp == nil || resp.Body == nil { + return "" + } + // Only read the body if it's JSON + contentType := resp.Header.Get("Content-Type") + if !strings.Contains(contentType, "application/json") { + return "" + } + body, err := io.ReadAll(resp.Body) if err != nil { - return resp.StatusCode, nil, err + return "" } - log.Debug("llmobs: got success response: %s", string(b)) - - return resp.StatusCode, b, nil + return strings.TrimSpace(string(body)) } func drainAndClose(b io.ReadCloser) { diff --git a/llmobs/dataset/dataset.go b/llmobs/dataset/dataset.go index 21da9e41a0..4ce27c3bd2 100644 --- a/llmobs/dataset/dataset.go +++ b/llmobs/dataset/dataset.go @@ -304,6 +304,11 @@ func Pull(ctx context.Context, name string, opts ...PullOption) (*Dataset, error opt(cfg) } + // Validate required fields + if ll.Config.ResolvedAgentlessEnabled && ll.Config.TracerConfig.APPKey == "" { + return nil, errRequiresAppKey + } + // Determine project name: option takes precedence over global config projectName := cfg.projectName if projectName == "" { diff --git a/llmobs/dataset/dataset_test.go b/llmobs/dataset/dataset_test.go index 2d1d9d2026..365643ecf5 100644 --- a/llmobs/dataset/dataset_test.go +++ b/llmobs/dataset/dataset_test.go @@ -549,6 +549,34 @@ func TestDatasetPull(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "project name must be provided") }) + t.Run("pull-missing-dd-app-key-agentless", func(t *testing.T) { + t.Setenv("DD_API_KEY", testAPIKey) + t.Setenv("DD_APP_KEY", "") + + // Use agentless mode to trigger app key requirement + tt := testTracer(t, testtracer.WithTracerStartOpts(tracer.WithLLMObsAgentlessEnabled(true))) + defer tt.Stop() + + _, err := Pull(context.Background(), "existing-dataset") + + // Should fail - datasets require DD_APP_KEY in agentless mode + require.Error(t, err) + assert.Contains(t, err.Error(), "an app key must be provided") + }) + t.Run("pull-missing-dd-app-key-agent-mode", func(t *testing.T) { + t.Setenv("DD_APP_KEY", "") + + // Use agent mode - app key should not be required + tt := testTracer(t, testtracer.WithTracerStartOpts(tracer.WithLLMObsAgentlessEnabled(false))) + defer tt.Stop() + + ds, err := Pull(context.Background(), "existing-dataset") + + // Should succeed - app key not required in agent mode + require.NoError(t, err) + assert.NotNil(t, ds) + assert.Equal(t, "existing-dataset", ds.Name()) + }) t.Run("pull-dataset-with-non-map-input", func(t *testing.T) { h := func(r *http.Request) *http.Response { path := strings.TrimPrefix(r.URL.Path, "/evp_proxy/v2")