diff --git a/backend/modules/observability/infra/repo/ck/annotation.go b/backend/modules/observability/infra/repo/ck/annotation.go index 38365907d..cb282330b 100644 --- a/backend/modules/observability/infra/repo/ck/annotation.go +++ b/backend/modules/observability/infra/repo/ck/annotation.go @@ -5,9 +5,16 @@ package ck import ( "context" + "fmt" + "strings" "github.com/coze-dev/coze-loop/backend/infra/ck" "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/gorm_gen/model" + obErrorx "github.com/coze-dev/coze-loop/backend/modules/observability/pkg/errno" + "github.com/coze-dev/coze-loop/backend/pkg/errorx" + "github.com/coze-dev/coze-loop/backend/pkg/logs" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) type InsertAnnotationParam struct { @@ -50,13 +57,149 @@ type AnnotationCkDaoImpl struct { } func (a *AnnotationCkDaoImpl) Insert(ctx context.Context, params *InsertAnnotationParam) error { - return nil + if params == nil || len(params.Annotations) == 0 { + return errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode) + } + db := a.db.NewSession(ctx) + retryTimes := 3 + var lastErr error + for i := 0; i < retryTimes; i++ { + if err := db.Table(params.Table).Create(params.Annotations).Error; err != nil { + lastErr = err + } else { + logs.CtxInfo(ctx, "insert annotations successfully") + return nil + } + } + logs.CtxError(ctx, "fail to insert annotations: %v", lastErr) + return errorx.WrapByCode(lastErr, obErrorx.CommercialCommonInternalErrorCodeCode) } func (a *AnnotationCkDaoImpl) Get(ctx context.Context, params *GetAnnotationParam) (*model.ObservabilityAnnotation, error) { - return nil, nil + if params == nil || params.ID == "" { + return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode) + } + db, err := a.buildSql(ctx, &annoSqlParam{ + Tables: params.Tables, + StartTime: params.StartTime, + EndTime: params.EndTime, + ID: params.ID, + Limit: 1, + }) + if err != nil { + return nil, err + } + logs.CtxInfo(ctx, "Get Annotation SQL: %s", db.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + })) + var annotations []*model.ObservabilityAnnotation + if err := db.Find(&annotations).Error; err != nil { + return nil, err + } + if len(annotations) == 0 { + return nil, nil + } else if len(annotations) > 1 { + logs.CtxWarn(ctx, "multiple annotations found") + } + return annotations[0], nil } func (a *AnnotationCkDaoImpl) List(ctx context.Context, params *ListAnnotationsParam) ([]*model.ObservabilityAnnotation, error) { - return nil, nil + if params == nil || len(params.SpanIDs) == 0 { + return nil, nil + } + db, err := a.buildSql(ctx, &annoSqlParam{ + Tables: params.Tables, + StartTime: params.StartTime, + EndTime: params.EndTime, + SpanIDs: params.SpanIDs, + DescByUpdatedAt: params.DescByUpdatedAt, + Limit: params.Limit, + }) + if err != nil { + return nil, err + } + logs.CtxInfo(ctx, "List Annotations SQL: %s", db.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + })) + var annotations []*model.ObservabilityAnnotation + if err := db.Find(&annotations).Error; err != nil { + return nil, err + } + return annotations, nil +} + +type annoSqlParam struct { + Tables []string + StartTime int64 + EndTime int64 + ID string + SpanIDs []string + DescByUpdatedAt bool + Limit int32 +} + +func (a *AnnotationCkDaoImpl) buildSql(ctx context.Context, param *annoSqlParam) (*gorm.DB, error) { + db := a.db.NewSession(ctx) + var tableQueries []*gorm.DB + for _, table := range param.Tables { + query, err := a.buildSingleSql(ctx, db, table, param) + if err != nil { + return nil, err + } + tableQueries = append(tableQueries, query) + } + if len(tableQueries) == 0 { + return nil, fmt.Errorf("no table configured") + } else if len(tableQueries) == 1 { + query := tableQueries[0].ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + }) + query += " SETTINGS final = 1" + return db.Raw(query), nil + } else { + queries := make([]string, 0) + for i := 0; i < len(tableQueries); i++ { + query := tableQueries[i].ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + }) + queries = append(queries, "("+query+")") + } + sql := fmt.Sprintf("SELECT * FROM (%s)", strings.Join(queries, " UNION ALL ")) + if param.DescByUpdatedAt { + sql += " ORDER BY updated_at DESC" + } else { + sql += " ORDER BY created_at ASC" + } + sql += fmt.Sprintf(" LIMIT %d SETTINGS final = 1", param.Limit) + return db.Raw(sql), nil + } +} + +// buildSingleSql 构建单表查询SQL +func (a *AnnotationCkDaoImpl) buildSingleSql(ctx context.Context, db *gorm.DB, tableName string, param *annoSqlParam) (*gorm.DB, error) { + sqlQuery := db. + Table(tableName). + Where("deleted_at = 0") + + if param.ID != "" { + sqlQuery = sqlQuery.Where("id = ?", param.ID) + } + if len(param.SpanIDs) > 0 { + sqlQuery = sqlQuery.Where("span_id IN (?)", param.SpanIDs) + } + sqlQuery = sqlQuery. + Where("start_time >= ?", param.StartTime). + Where("start_time <= ?", param.EndTime) + if param.DescByUpdatedAt { + sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{ + {Column: clause.Column{Name: "updated_at"}, Desc: true}, + }}) + } else { + sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{ + {Column: clause.Column{Name: "created_at"}, Desc: false}, + }}) + } + sqlQuery = sqlQuery.Limit(int(param.Limit)) + return sqlQuery, nil } diff --git a/backend/modules/observability/infra/repo/ck/annotation_test.go b/backend/modules/observability/infra/repo/ck/annotation_test.go new file mode 100755 index 000000000..69816b314 --- /dev/null +++ b/backend/modules/observability/infra/repo/ck/annotation_test.go @@ -0,0 +1,374 @@ +// Copyright (c) 2025 coze-dev Authors +// SPDX-License-Identifier: Apache-2.0 + +package ck + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/gorm_gen/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/clickhouse" + "gorm.io/gorm" +) + +type mockCkProvider struct { + db *gorm.DB +} + +func (m *mockCkProvider) NewSession(ctx context.Context) *gorm.DB { + if m.db == nil { + return nil + } + return m.db.WithContext(ctx) +} + +func newInsertAnnotationDao(t *testing.T, failUntil int) (*AnnotationCkDaoImpl, func(), *int) { + t.Helper() + sqlDB, _, err := sqlmock.New() + require.NoError(t, err) + + db, err := gorm.Open(clickhouse.New(clickhouse.Config{ + Conn: sqlDB, + SkipInitializeWithVersion: true, + }), &gorm.Config{SkipDefaultTransaction: true}) + require.NoError(t, err) + + count := 0 + _ = db.Callback().Create().Replace("gorm:create", func(tx *gorm.DB) { + count++ + if count <= failUntil { + tx.Error = errors.New("insert error") + return + } + }) + + provider := &mockCkProvider{db: db.Session(&gorm.Session{DryRun: true})} + return &AnnotationCkDaoImpl{db: provider}, func() { + _ = sqlDB.Close() + }, &count +} + +func newAnnotationDao(t *testing.T) (*AnnotationCkDaoImpl, sqlmock.Sqlmock, *gorm.DB, func()) { + t.Helper() + sqlDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + require.NoError(t, err) + + db, err := gorm.Open(clickhouse.New(clickhouse.Config{ + Conn: sqlDB, + SkipInitializeWithVersion: true, + }), &gorm.Config{SkipDefaultTransaction: true}) + require.NoError(t, err) + + provider := &mockCkProvider{db: db} + return &AnnotationCkDaoImpl{db: provider}, mock, db, func() { + _ = sqlDB.Close() + } +} + +func TestAnnotationCkDaoImpl_Insert(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("nil params", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + err := dao.Insert(ctx, nil) + assert.Error(t, err) + }) + + t.Run("empty annotations", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + err := dao.Insert(ctx, &InsertAnnotationParam{}) + assert.Error(t, err) + }) + + t.Run("success", func(t *testing.T) { + dao, cleanup, calls := newInsertAnnotationDao(t, 0) + defer cleanup() + annotation := &model.ObservabilityAnnotation{ID: "anno-1"} + + err := dao.Insert(ctx, &InsertAnnotationParam{ + Table: "observability_annotations", + Annotations: []*model.ObservabilityAnnotation{annotation}, + }) + assert.NoError(t, err) + assert.Equal(t, 1, *calls) + }) + + t.Run("retry failed", func(t *testing.T) { + dao, cleanup, calls := newInsertAnnotationDao(t, 3) + defer cleanup() + annotation := &model.ObservabilityAnnotation{ID: "anno-2"} + + err := dao.Insert(ctx, &InsertAnnotationParam{ + Table: "observability_annotations", + Annotations: []*model.ObservabilityAnnotation{annotation}, + }) + assert.Error(t, err) + assert.Equal(t, 3, *calls) + }) +} + +func TestAnnotationCkDaoImpl_Get(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("invalid params", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + _, err := dao.Get(ctx, &GetAnnotationParam{}) + assert.Error(t, err) + }) + + t.Run("build sql error", func(t *testing.T) { + dao, _, _, cleanup := newAnnotationDao(t) + defer cleanup() + + _, err := dao.Get(ctx, &GetAnnotationParam{ + ID: "anno-1", + StartTime: 1, + EndTime: 2, + }) + assert.Error(t, err) + }) + + t.Run("success", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + rows := sqlmock.NewRows([]string{"id", "span_id"}).AddRow("anno-1", "span-1") + mock.ExpectQuery("SELECT").WillReturnRows(rows) + + anno, err := dao.Get(ctx, &GetAnnotationParam{ + ID: "anno-1", + Tables: []string{"observability_annotations"}, + StartTime: 1, + EndTime: 2, + }) + assert.NoError(t, err) + assert.NotNil(t, anno) + assert.Equal(t, "anno-1", anno.ID) + assert.Equal(t, "span-1", anno.SpanID) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("multiple results returns first", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + rows := sqlmock.NewRows([]string{"id", "span_id"}). + AddRow("anno-1", "span-1"). + AddRow("anno-2", "span-2") + mock.ExpectQuery("SELECT").WillReturnRows(rows) + + anno, err := dao.Get(ctx, &GetAnnotationParam{ + ID: "anno-1", + Tables: []string{"observability_annotations"}, + StartTime: 1, + EndTime: 2, + }) + assert.NoError(t, err) + assert.Equal(t, "anno-1", anno.ID) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("database error", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + mock.ExpectQuery("SELECT").WillReturnError(fmt.Errorf("db error")) + + _, err := dao.Get(ctx, &GetAnnotationParam{ + ID: "anno-1", + Tables: []string{"observability_annotations"}, + StartTime: 1, + EndTime: 2, + }) + assert.Error(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) + }) +} + +func TestAnnotationCkDaoImpl_List(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("nil params", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + annos, err := dao.List(ctx, nil) + assert.NoError(t, err) + assert.Nil(t, annos) + }) + + t.Run("empty span ids", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + annos, err := dao.List(ctx, &ListAnnotationsParam{}) + assert.NoError(t, err) + assert.Nil(t, annos) + }) + + t.Run("success", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + rows := sqlmock.NewRows([]string{"id", "span_id"}).AddRow("anno-1", "span-1") + mock.ExpectQuery("SELECT").WillReturnRows(rows) + + annos, err := dao.List(ctx, &ListAnnotationsParam{ + Tables: []string{"observability_annotations"}, + SpanIDs: []string{"span-1"}, + StartTime: 1, + EndTime: 2, + Limit: 10, + }) + assert.NoError(t, err) + require.Len(t, annos, 1) + assert.Equal(t, "span-1", annos[0].SpanID) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("database error", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + mock.ExpectQuery("SELECT").WillReturnError(fmt.Errorf("db error")) + + _, err := dao.List(ctx, &ListAnnotationsParam{ + Tables: []string{"observability_annotations"}, + SpanIDs: []string{"span-1"}, + StartTime: 1, + EndTime: 2, + Limit: 10, + }) + assert.Error(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("build sql error", func(t *testing.T) { + dao, _, _, cleanup := newAnnotationDao(t) + defer cleanup() + + _, err := dao.List(ctx, &ListAnnotationsParam{ + SpanIDs: []string{"span-1"}, + StartTime: 1, + EndTime: 2, + }) + assert.Error(t, err) + }) +} + +func TestAnnotationCkDaoImpl_buildSingleSql(t *testing.T) { + t.Parallel() + + dao, _, db, cleanup := newAnnotationDao(t) + defer cleanup() + + ctx := context.Background() + baseSession := dao.db.NewSession(ctx) + require.NotNil(t, baseSession) + + testCases := []struct { + name string + param *annoSqlParam + assert func(t *testing.T, sql string) + }{ + { + name: "with id filter", + param: &annoSqlParam{ + Tables: []string{"observability_annotations"}, + ID: "anno-1", + StartTime: 1, + EndTime: 2, + Limit: 10, + }, + assert: func(t *testing.T, sql string) { + assert.Contains(t, sql, "FROM `observability_annotations`") + assert.Contains(t, sql, "id = 'anno-1'") + assert.Contains(t, sql, "ORDER BY `created_at`") + assert.Contains(t, sql, "LIMIT 10") + }, + }, + { + name: "with span ids and desc", + param: &annoSqlParam{ + Tables: []string{"observability_annotations"}, + SpanIDs: []string{"span-1"}, + StartTime: 10, + EndTime: 20, + Limit: 5, + DescByUpdatedAt: true, + }, + assert: func(t *testing.T, sql string) { + assert.Contains(t, sql, "span_id IN ('span-1')") + assert.Contains(t, sql, "ORDER BY `updated_at` DESC") + assert.Contains(t, sql, "LIMIT 5") + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + session := baseSession.Session(&gorm.Session{DryRun: true}) + query, err := dao.buildSingleSql(ctx, session, tc.param.Tables[0], tc.param) + require.NoError(t, err) + sql := query.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find([]*model.ObservabilityAnnotation{}) + }) + tc.assert(t, sql) + }) + } + + _ = db // silence unused (db kept for completeness) +} + +func TestAnnotationCkDaoImpl_buildSql(t *testing.T) { + t.Parallel() + + dao, _, _, cleanup := newAnnotationDao(t) + defer cleanup() + + ctx := context.Background() + + t.Run("no tables", func(t *testing.T) { + _, err := dao.buildSql(ctx, &annoSqlParam{}) + assert.Error(t, err) + }) + + t.Run("single table", func(t *testing.T) { + result, err := dao.buildSql(ctx, &annoSqlParam{ + Tables: []string{"observability_annotations"}, + StartTime: 1, + EndTime: 2, + Limit: 3, + }) + assert.NoError(t, err) + sql := result.Statement.SQL.String() + assert.Contains(t, sql, "FROM `observability_annotations`") + assert.Contains(t, sql, "LIMIT 3") + assert.Contains(t, sql, "SETTINGS final = 1") + }) + + t.Run("multiple tables", func(t *testing.T) { + result, err := dao.buildSql(ctx, &annoSqlParam{ + Tables: []string{"observability_annotations", "observability_annotations_v2"}, + StartTime: 1, + EndTime: 2, + Limit: 5, + DescByUpdatedAt: true, + }) + assert.NoError(t, err) + sql := result.Statement.SQL.String() + assert.Contains(t, sql, "UNION ALL") + assert.Contains(t, sql, "ORDER BY updated_at DESC") + assert.Contains(t, sql, "LIMIT 5") + assert.Contains(t, sql, "SETTINGS final = 1") + }) +} diff --git a/backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go b/backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go index ca7fd9a34..3c4d914e3 100644 --- a/backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go +++ b/backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go @@ -4,7 +4,7 @@ package model -const TableNameAnnotation = "observability_annotation" +const TableNameAnnotation = "observability_annotations" // ObservabilityAnnotation mapped from table type ObservabilityAnnotation struct { diff --git a/backend/modules/observability/infra/repo/ck/spans.go b/backend/modules/observability/infra/repo/ck/spans.go index 6953d04f4..95b1569ce 100644 --- a/backend/modules/observability/infra/repo/ck/spans.go +++ b/backend/modules/observability/infra/repo/ck/spans.go @@ -26,6 +26,12 @@ import ( const ( QueryTypeGetTrace = "get_trace" QueryTypeListSpans = "list_spans" + + // 人工标注标签 + AnnotationManualFeedbackFieldPrefix = "manual_feedback_" + + // 人工标注标签类型 + AnnotationManualFeedbackType = "manual_feedback" ) type QueryParam struct { @@ -108,6 +114,12 @@ func (s *SpansCkDaoImpl) Get(ctx context.Context, param *QueryParam) ([]*model.O if err := sql.Find(&spans).Error; err != nil { return nil, errorx.WrapByCode(err, obErrorx.CommercialCommonRPCErrorCodeCode) } + for _, span := range spans { + if span.SystemTagsString == nil { + span.SystemTagsString = make(map[string]string) + } + span.SystemTagsString[loop_span.SpanFieldTenant] = "cozeloop" // tenant + } return spans, nil } @@ -204,11 +216,27 @@ func (s *SpansCkDaoImpl) formatAggregationExpression(ctx context.Context, dimens return fmt.Sprintf(dimension.Expression.Expression, replacements...), nil } +type buildSqlParam struct { + spanTable string + annoTable string + queryParam *QueryParam + db *gorm.DB + selectColumns []string + omitColumns []string +} + func (s *SpansCkDaoImpl) buildSql(ctx context.Context, param *QueryParam) (*gorm.DB, error) { db := s.newSession(ctx) var tableQueries []*gorm.DB for _, table := range param.Tables { - query, err := s.buildSingleSql(ctx, db, table, param) + query, err := s.buildSingleSql(ctx, &buildSqlParam{ + spanTable: table, + annoTable: param.AnnoTableMap[table], + queryParam: param, + db: db, + selectColumns: param.SelectColumns, + omitColumns: param.OmitColumns, + }) if err != nil { return nil, err } @@ -237,43 +265,43 @@ func (s *SpansCkDaoImpl) buildSql(ctx context.Context, param *QueryParam) (*gorm } } -func (s *SpansCkDaoImpl) buildSingleSql(ctx context.Context, db *gorm.DB, tableName string, param *QueryParam) (*gorm.DB, error) { - sqlQuery, err := s.buildSqlForFilterFields(ctx, db, param.Filters) +func (s *SpansCkDaoImpl) buildSingleSql(ctx context.Context, param *buildSqlParam) (*gorm.DB, error) { + sqlQuery, err := s.buildSqlForFilterFields(ctx, param, param.queryParam.Filters) if err != nil { return nil, err } queryColumns := lo.Ternary( - len(param.SelectColumns) == 0, - getColumnStr(spanColumns, param.OmitColumns), - getColumnStr(param.SelectColumns, param.OmitColumns), + len(param.selectColumns) == 0, + getColumnStr(spanColumns, param.omitColumns), + getColumnStr(param.selectColumns, param.omitColumns), ) - sqlQuery = db. - Table(tableName).Select(queryColumns). + sqlQuery = param.db. + Table(param.spanTable).Select(queryColumns). Where(sqlQuery). - Where("start_time >= ?", param.StartTime). - Where("start_time <= ?", param.EndTime) - if param.OrderByStartTime { + Where("start_time >= ?", param.queryParam.StartTime). + Where("start_time <= ?", param.queryParam.EndTime) + if param.queryParam.OrderByStartTime { sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{ {Column: clause.Column{Name: "start_time"}, Desc: true}, {Column: clause.Column{Name: "span_id"}, Desc: true}, }}) } - sqlQuery = sqlQuery.Limit(int(param.Limit)) + sqlQuery = sqlQuery.Limit(int(param.queryParam.Limit)) return sqlQuery, nil } // chain -func (s *SpansCkDaoImpl) buildSqlForFilterFields(ctx context.Context, db *gorm.DB, filter *loop_span.FilterFields) (*gorm.DB, error) { +func (s *SpansCkDaoImpl) buildSqlForFilterFields(ctx context.Context, param *buildSqlParam, filter *loop_span.FilterFields) (*gorm.DB, error) { if filter == nil { - return db, nil + return param.db, nil } - queryChain := db + queryChain := param.db if filter.QueryAndOr != nil && *filter.QueryAndOr == loop_span.QueryAndOrEnumOr { for _, subFilter := range filter.FilterFields { if subFilter == nil { continue } - subSql, err := s.buildSqlForFilterField(ctx, db, subFilter) + subSql, err := s.buildSqlForFilterField(ctx, param, subFilter) if err != nil { return nil, err } @@ -284,7 +312,7 @@ func (s *SpansCkDaoImpl) buildSqlForFilterFields(ctx context.Context, db *gorm.D if subFilter == nil { continue } - subSql, err := s.buildSqlForFilterField(ctx, db, subFilter) + subSql, err := s.buildSqlForFilterField(ctx, param, subFilter) if err != nil { return nil, err } @@ -294,89 +322,32 @@ func (s *SpansCkDaoImpl) buildSqlForFilterFields(ctx context.Context, db *gorm.D return queryChain, nil } -func (s *SpansCkDaoImpl) buildSqlForFilterField(ctx context.Context, db *gorm.DB, filter *loop_span.FilterField) (*gorm.DB, error) { - queryChain := db - if filter.FieldName != "" { - if filter.QueryType == nil { - return nil, fmt.Errorf("query type is required, not supposed to be here") +func (s *SpansCkDaoImpl) buildSqlForFilterField(ctx context.Context, param *buildSqlParam, filter *loop_span.FilterField) (*gorm.DB, error) { + queryChain := param.db + if s.isAnnotationFilter(filter.FieldName) { + annoSql, err := s.buildAnnotationSql(ctx, param, filter) + if err != nil { + return nil, fmt.Errorf("failed to build annotation sql: %v", err) } + queryChain = queryChain.Where(annoSql) + } else if filter.FieldName != "" { fieldName, err := s.convertFieldName(ctx, filter) if err != nil { return nil, err } - fieldValues, err := convertFieldValue(filter) + sql, err := s.buildFieldCondition(ctx, param.db, &loop_span.FilterField{ + FieldName: fieldName, + FieldType: filter.FieldType, + Values: filter.Values, + QueryType: filter.QueryType, + }) if err != nil { - return nil, err - } - switch *filter.QueryType { - case loop_span.QueryTypeEnumMatch: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s like ?", fieldName), fmt.Sprintf("%%%v%%", fieldValues[0])) - case loop_span.QueryTypeEnumNotMatch: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s NOT like ?", fieldName), fmt.Sprintf("%%%v%%", fieldValues[0])) - case loop_span.QueryTypeEnumEq: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s = ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumNotEq: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s != ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumLte: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s <= ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumGte: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s >= ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumLt: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s < ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumGt: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s > ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumExist: - defaultVal := getFieldDefaultValue(filter) - queryChain = queryChain. - Where(fmt.Sprintf("%s IS NOT NULL", fieldName)). - Where(fmt.Sprintf("%s != ?", fieldName), defaultVal) - case loop_span.QueryTypeEnumNotExist: - defaultVal := getFieldDefaultValue(filter) - queryChain = queryChain. - Where(fmt.Sprintf("%s IS NULL", fieldName)). - Or(fmt.Sprintf("%s = ?", fieldName), defaultVal) - case loop_span.QueryTypeEnumIn: - if len(fieldValues) < 1 { - return nil, fmt.Errorf("filter field %s should have at least one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s IN (?)", fieldName), fieldValues) - case loop_span.QueryTypeEnumNotIn: - if len(fieldValues) < 1 { - return nil, fmt.Errorf("filter field %s should have at least one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s NOT IN (?)", fieldName), fieldValues) - case loop_span.QueryTypeEnumAlwaysTrue: - queryChain = queryChain.Where("1 = 1") - default: - return nil, fmt.Errorf("filter field type %s not supported", filter.FieldType) + return nil, fmt.Errorf("failed to build field condition: %v", err) } + queryChain = queryChain.Where(sql) } if filter.SubFilter != nil { - subQuery, err := s.buildSqlForFilterFields(ctx, db, filter.SubFilter) + subQuery, err := s.buildSqlForFilterFields(ctx, param, filter.SubFilter) if err != nil { return nil, err } @@ -389,6 +360,154 @@ func (s *SpansCkDaoImpl) buildSqlForFilterField(ctx context.Context, db *gorm.DB return queryChain, nil } +func (s *SpansCkDaoImpl) buildFieldCondition(ctx context.Context, db *gorm.DB, filter *loop_span.FilterField) (*gorm.DB, error) { + queryChain := db + if filter.QueryType == nil { + return nil, fmt.Errorf("query type is required, not supposed to be here") + } + fieldValues, err := convertFieldValue(filter) + if err != nil { + return nil, err + } + switch *filter.QueryType { + case loop_span.QueryTypeEnumMatch: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s like ?", filter.FieldName), fmt.Sprintf("%%%v%%", fieldValues[0])) + case loop_span.QueryTypeEnumNotMatch: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s NOT like ?", filter.FieldName), fmt.Sprintf("%%%v%%", fieldValues[0])) + case loop_span.QueryTypeEnumEq: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s = ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumNotEq: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s != ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumLte: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s <= ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumGte: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s >= ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumLt: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s < ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumGt: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s > ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumExist: + defaultVal := getFieldDefaultValue(filter) + queryChain = queryChain. + Where(fmt.Sprintf("%s IS NOT NULL", filter.FieldName)). + Where(fmt.Sprintf("%s != ?", filter.FieldName), defaultVal) + case loop_span.QueryTypeEnumNotExist: + defaultVal := getFieldDefaultValue(filter) + queryChain = queryChain. + Where(fmt.Sprintf("%s IS NULL", filter.FieldName)). + Or(fmt.Sprintf("%s = ?", filter.FieldName), defaultVal) + case loop_span.QueryTypeEnumIn: + if len(fieldValues) < 1 { + return nil, fmt.Errorf("filter field %s should have at least one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s IN (?)", filter.FieldName), fieldValues) + case loop_span.QueryTypeEnumNotIn: + if len(fieldValues) < 1 { + return nil, fmt.Errorf("filter field %s should have at least one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s NOT IN (?)", filter.FieldName), fieldValues) + case loop_span.QueryTypeEnumAlwaysTrue: + queryChain = queryChain.Where("1 = 1") + default: + return nil, fmt.Errorf("filter field type %s not supported", filter.FieldType) + } + return queryChain, nil +} + +func (s *SpansCkDaoImpl) isAnnotationFilter(fieldName string) bool { + if strings.HasPrefix(fieldName, AnnotationManualFeedbackFieldPrefix) { + return true + } else { + return false + } +} + +func (s *SpansCkDaoImpl) buildAnnotationSql(ctx context.Context, param *buildSqlParam, filter *loop_span.FilterField) (*gorm.DB, error) { + queryChain := param.db + fieldName := filter.FieldName + if strings.HasPrefix(fieldName, AnnotationManualFeedbackFieldPrefix) { + // manual_feedback_{tag_key_id} + tagKeyId := fieldName[len(AnnotationManualFeedbackFieldPrefix):] + if tagKeyId == "" { + return nil, fmt.Errorf("invalid manual feedback field name %s", fieldName) + } + queryChain = queryChain. + Where("annotation_type = ?", AnnotationManualFeedbackType). + Where("key = ?", tagKeyId) + } else { + return nil, fmt.Errorf("field name %s not supported for annotation, not supposed to be here", fieldName) + } + if filter.QueryType != nil && *filter.QueryType != loop_span.QueryTypeEnumExist { + condition := &loop_span.FilterField{ + FieldType: filter.FieldType, + Values: filter.Values, + QueryType: filter.QueryType, + } + switch filter.FieldType { + case loop_span.FieldTypeString: + condition.FieldName = "value_string" + case loop_span.FieldTypeLong: + condition.FieldName = "value_long" + case loop_span.FieldTypeDouble: + condition.FieldName = "value_float" + case loop_span.FieldTypeBool: + condition.FieldName = "value_bool" + default: + return nil, fmt.Errorf("field type %s not supported", filter.FieldType) + } + fieldSql, err := s.buildFieldCondition(ctx, param.db, condition) + if err != nil { + return nil, err + } + queryChain = queryChain.Where(fieldSql) + } + _ = param.queryParam.Filters.Traverse(func(f *loop_span.FilterField) error { + if f.FieldName == loop_span.SpanFieldSpaceId { + commonSql, err := s.buildFieldCondition(ctx, param.db, f) + if err != nil { + return err + } + queryChain = queryChain.Where(commonSql) + } + return nil + }) + subsql := param.db. + Table(param.annoTable). + Select("span_id"). + Where(queryChain). + Where("deleted_at = 0"). + Where("start_time >= ?", param.queryParam.StartTime). + Where("start_time <= ?", param.queryParam.EndTime) + query := subsql.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + }) + return param.db.Where("span_id in (?)", param.db.Raw(query+" SETTINGS final = 1")), nil +} + func (s *SpansCkDaoImpl) getSuperFieldsMap(ctx context.Context) map[string]bool { return defSuperFieldsMap } @@ -430,11 +549,23 @@ func (s *SpansCkDaoImpl) convertFieldName(ctx context.Context, filter *loop_span } switch filter.FieldType { case loop_span.FieldTypeString: - return fmt.Sprintf("tags_string['%s']", filter.FieldName), nil + if filter.IsSystem { + return fmt.Sprintf("system_tags_string['%s']", filter.FieldName), nil + } else { + return fmt.Sprintf("tags_string['%s']", filter.FieldName), nil + } case loop_span.FieldTypeLong: - return fmt.Sprintf("tags_long['%s']", filter.FieldName), nil + if filter.IsSystem { + return fmt.Sprintf("system_tags_long['%s']", filter.FieldName), nil + } else { + return fmt.Sprintf("tags_long['%s']", filter.FieldName), nil + } case loop_span.FieldTypeDouble: - return fmt.Sprintf("tags_float['%s']", filter.FieldName), nil + if filter.IsSystem { + return fmt.Sprintf("system_tags_double['%s']", filter.FieldName), nil + } else { + return fmt.Sprintf("tags_float['%s']", filter.FieldName), nil + } case loop_span.FieldTypeBool: return fmt.Sprintf("tags_bool['%s']", filter.FieldName), nil default: // not expected to be here @@ -550,6 +681,12 @@ var spanColumns = []string{ "logic_delete_date", } +var validColumnRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_.]*$`) + +func isSafeColumnName(name string) bool { + return validColumnRegex.MatchString(name) +} + var defSuperFieldsMap = map[string]bool{ loop_span.SpanFieldStartTime: true, loop_span.SpanFieldSpanId: true, @@ -570,12 +707,6 @@ var defSuperFieldsMap = map[string]bool{ loop_span.SpanFieldLogicDeleteDate: true, } -var validColumnRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_.]*$`) - -func isSafeColumnName(name string) bool { - return validColumnRegex.MatchString(name) -} - func getTimeInterval(granularity metrics_entity.MetricGranularity) string { switch granularity { case metrics_entity.MetricGranularity1Min: diff --git a/backend/modules/observability/infra/repo/ck/spans_test.go b/backend/modules/observability/infra/repo/ck/spans_test.go index 382be3833..c7b898954 100644 --- a/backend/modules/observability/infra/repo/ck/spans_test.go +++ b/backend/modules/observability/infra/repo/ck/spans_test.go @@ -5,9 +5,11 @@ package ck import ( "context" + "fmt" "testing" "github.com/DATA-DOG/go-sqlmock" + metrics_entity "github.com/coze-dev/coze-loop/backend/modules/observability/domain/metric/entity" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span" "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/gorm_gen/model" "github.com/coze-dev/coze-loop/backend/pkg/lang/ptr" @@ -298,9 +300,15 @@ func TestBuildSql(t *testing.T) { Values: []string{}, QueryType: ptr.Of(loop_span.QueryTypeEnumNotExist), }, + { + FieldName: "custom_tag_long2", + FieldType: loop_span.FieldTypeLong, + Values: []string{}, + QueryType: ptr.Of(loop_span.QueryTypeEnumExist), + }, }, }, - expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE ((tags_string['custom_tag_string'] IS NULL OR tags_string['custom_tag_string'] = '') AND (tags_bool['custom_tag_bool'] IS NULL OR tags_bool['custom_tag_bool'] = 0) AND (tags_float['custom_tag_double'] IS NULL OR tags_float['custom_tag_double'] = 0) AND (tags_long['custom_tag_long'] IS NULL OR tags_long['custom_tag_long'] = 0)) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE ((tags_string['custom_tag_string'] IS NULL OR tags_string['custom_tag_string'] = '') AND (tags_bool['custom_tag_bool'] IS NULL OR tags_bool['custom_tag_bool'] = 0) AND (tags_float['custom_tag_double'] IS NULL OR tags_float['custom_tag_double'] = 0) AND (tags_long['custom_tag_long'] IS NULL OR tags_long['custom_tag_long'] = 0) AND (tags_long['custom_tag_long2'] IS NOT NULL AND tags_long['custom_tag_long2'] != 0)) AND start_time >= 1 AND start_time <= 2 LIMIT 100", }, { filter: &loop_span.FilterFields{ @@ -341,6 +349,56 @@ func TestBuildSql(t *testing.T) { }, expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `duration` >= 121 AND start_time >= 1 AND start_time <= 2 LIMIT 100", }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeLong, + Values: []string{"121"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumGt), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `duration` > 121 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeLong, + Values: []string{"121"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumLte), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `duration` <= 121 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeLong, + Values: []string{"121"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumLt), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `duration` < 121 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "a", + QueryType: ptr.Of(loop_span.QueryTypeEnumAlwaysTrue), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE 1 = 1 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, { filter: &loop_span.FilterFields{ FilterFields: []*loop_span.FilterField{ @@ -354,6 +412,19 @@ func TestBuildSql(t *testing.T) { }, expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE tags_bool['custom_tag_bool'] = 1 AND start_time >= 1 AND start_time <= 2 LIMIT 100", }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "custom_tag_bool", + FieldType: loop_span.FieldTypeBool, + Values: []string{"true"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumNotEq), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE tags_bool['custom_tag_bool'] != 1 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, { filter: &loop_span.FilterFields{ FilterFields: []*loop_span.FilterField{ @@ -380,13 +451,89 @@ func TestBuildSql(t *testing.T) { }, expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `input` NOT like '%123%' AND start_time >= 1 AND start_time <= 2 LIMIT 100", }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeString, + Values: []string{"123"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_string IN ('123')) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeLong, + Values: []string{"123"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_long IN (123)) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeDouble, + Values: []string{"123.1"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_float IN (123.1)) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeBool, + Values: []string{"true"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_bool IN (1)) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeBool, + Values: []string{"true"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + { + FieldName: loop_span.SpanFieldSpaceId, + FieldType: loop_span.FieldTypeString, + Values: []string{"123"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE (span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_bool IN (1) AND space_id IN ('123')) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND `space_id` IN ('123')) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, } for _, tc := range testCases { - qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), db, "observability_spans", &QueryParam{ - StartTime: 1, - EndTime: 2, - Filters: tc.filter, - Limit: 100, + qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), &buildSqlParam{ + spanTable: "observability_spans", + annoTable: "observability_annotations", + queryParam: &QueryParam{ + StartTime: 1, + EndTime: 2, + Filters: tc.filter, + Limit: 100, + }, + db: db, }) assert.Nil(t, err) sql := qDb.ToSQL(func(tx *gorm.DB) *gorm.DB { @@ -566,11 +713,15 @@ func TestQueryTypeEnumNotMatchSqlExceptionCases(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), db, "observability_spans", &QueryParam{ - StartTime: 1, - EndTime: 2, - Filters: tc.filter, - Limit: 100, + qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), &buildSqlParam{ + spanTable: "observability_spans", + queryParam: &QueryParam{ + StartTime: 1, + EndTime: 2, + Filters: tc.filter, + Limit: 100, + }, + db: db, }) if tc.shouldError { @@ -713,11 +864,15 @@ func TestQueryTypeEnumNotMatchComplexScenarios(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), db, "observability_spans", &QueryParam{ - StartTime: 1, - EndTime: 2, - Filters: tc.filter, - Limit: 100, + qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), &buildSqlParam{ + spanTable: "observability_spans", + queryParam: &QueryParam{ + StartTime: 1, + EndTime: 2, + Filters: tc.filter, + Limit: 100, + }, + db: db, }) assert.NoError(t, err, "Unexpected error for test case: %s", tc.name) sql := qDb.ToSQL(func(tx *gorm.DB) *gorm.DB { @@ -728,3 +883,161 @@ func TestQueryTypeEnumNotMatchComplexScenarios(t *testing.T) { }) } } + +func TestNewSpansCkDaoImpl(t *testing.T) { + t.Parallel() + + provider := &recordingProvider{} + dao, err := NewSpansCkDaoImpl(provider) + assert.NoError(t, err) + + impl, ok := dao.(*SpansCkDaoImpl) + assert.True(t, ok) + assert.Equal(t, provider, impl.db) +} + +func TestSpansCkDaoImpl_buildMetricsSql(t *testing.T) { + t.Parallel() + + baseDB, cleanup := newTestGormDB(t) + defer cleanup() + + dao := &testSpansDao{ + SpansCkDaoImpl: SpansCkDaoImpl{db: &stubCKProvider{db: baseDB}}, + } + + dao.buildSqlFunc = func(ctx context.Context, param *QueryParam) (*gorm.DB, error) { + return baseDB.Session(&gorm.Session{DryRun: true}).Raw("SELECT * FROM base"), nil + } + dao.convertFieldNameFunc = func(ctx context.Context, filter *loop_span.FilterField) (string, error) { + return "converted_" + filter.FieldName, nil + } + + durationField := &loop_span.FilterField{FieldName: "duration", FieldType: loop_span.FieldTypeLong} + spanTypeField := &loop_span.FilterField{FieldName: "span_type", FieldType: loop_span.FieldTypeString} + + sql, err := dao.buildMetricsSql(context.Background(), &GetMetricsParam{ + Tables: []string{"observability_spans"}, + Aggregations: []*metrics_entity.Dimension{ + { + Alias: "avg_duration", + Expression: &metrics_entity.Expression{ + Expression: "avg(%s)", + Fields: []*loop_span.FilterField{durationField}, + }, + }, + }, + GroupBys: []*metrics_entity.Dimension{ + { + Alias: "type", + Field: spanTypeField, + }, + }, + Filters: &loop_span.FilterFields{}, + StartAt: 1, + EndAt: 2, + Granularity: metrics_entity.MetricGranularity1Min, + }) + assert.NoError(t, err) + + expected := "SELECT toUnixTimestamp(toStartOfInterval(fromUnixTimestamp64Micro(start_time), INTERVAL 1 MINUTE)) * 1000 AS time_bucket, avg(`duration`) AS avg_duration, `span_type` AS type FROM (SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE start_time >= 1 AND start_time <= 2 ) GROUP BY time_bucket, `span_type` ORDER BY time_bucket" + assert.Equal(t, expected, sql) +} + +func TestSpansCkDaoImpl_buildSql_MultiTables(t *testing.T) { + t.Parallel() + + baseDB, cleanup := newTestGormDB(t) + defer cleanup() + + dao := &testSpansDao{ + SpansCkDaoImpl: SpansCkDaoImpl{db: &stubCKProvider{db: baseDB}}, + } + + dryRun := baseDB.Session(&gorm.Session{DryRun: true}) + dao.buildSingleSqlFunc = func(ctx context.Context, param *buildSqlParam) (*gorm.DB, error) { + switch param.spanTable { + case "t1": + return dryRun.Raw("SELECT * FROM t1"), nil + case "t2": + return dryRun.Raw("SELECT * FROM t2"), nil + default: + return nil, fmt.Errorf("unexpected table %s", param.spanTable) + } + } + + q, err := dao.buildSql(context.Background(), &QueryParam{ + Tables: []string{"t1", "t2"}, + StartTime: 0, + EndTime: 1, + Limit: 5, + OrderByStartTime: true, + }) + assert.NoError(t, err) + + sql := q.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + }) + assert.Equal(t, "SELECT * FROM ((SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `t1` WHERE start_time >= 0 AND start_time <= 1 ORDER BY `start_time` DESC,`span_id` DESC LIMIT 5) UNION ALL (SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `t2` WHERE start_time >= 0 AND start_time <= 1 ORDER BY `start_time` DESC,`span_id` DESC LIMIT 5)) ORDER BY start_time DESC, span_id DESC LIMIT 5", sql) +} + +func TestGetTimeInterval(t *testing.T) { + t.Parallel() + + assert.Equal(t, "INTERVAL 1 MINUTE", getTimeInterval(metrics_entity.MetricGranularity1Min)) + assert.Equal(t, "INTERVAL 1 HOUR", getTimeInterval(metrics_entity.MetricGranularity1Hour)) + assert.Equal(t, "INTERVAL 1 DAY", getTimeInterval(metrics_entity.MetricGranularity1Day)) + assert.Equal(t, "INTERVAL 1 DAY", getTimeInterval(metrics_entity.MetricGranularity1Week)) + assert.Equal(t, "INTERVAL 1 DAY", getTimeInterval(metrics_entity.MetricGranularity("unsupported"))) +} + +type stubCKProvider struct { + db *gorm.DB +} + +func (s *stubCKProvider) NewSession(ctx context.Context) *gorm.DB { + return s.db.Session(&gorm.Session{ + DryRun: true, + Context: ctx, + }) +} + +type recordingProvider struct { + lastCtx context.Context + returnDB *gorm.DB +} + +func (p *recordingProvider) NewSession(ctx context.Context) *gorm.DB { + p.lastCtx = ctx + return p.returnDB +} + +type testSpansDao struct { + SpansCkDaoImpl + buildSqlFunc func(context.Context, *QueryParam) (*gorm.DB, error) + buildSingleSqlFunc func(context.Context, *buildSqlParam) (*gorm.DB, error) + convertFieldNameFunc func(context.Context, *loop_span.FilterField) (string, error) +} + +func (s *testSpansDao) buildSql(ctx context.Context, param *QueryParam) (*gorm.DB, error) { + if s.buildSqlFunc != nil { + return s.buildSqlFunc(ctx, param) + } + return s.SpansCkDaoImpl.buildSql(ctx, param) +} + +func newTestGormDB(t *testing.T) (*gorm.DB, func()) { + sqlDB, _, err := sqlmock.New() + if err != nil { + t.Fatalf("failed to create sqlmock: %v", err) + } + gormDB, err := gorm.Open(clickhouse.New(clickhouse.Config{Conn: sqlDB, SkipInitializeWithVersion: true}), &gorm.Config{}) + if err != nil { + _ = sqlDB.Close() + t.Fatalf("failed to open gorm: %v", err) + } + cleanup := func() { + _ = sqlDB.Close() + } + return gormDB, cleanup +} diff --git a/release/deployment/docker-compose/bootstrap/clickhouse-init/init-sql/observability_annotations.sql b/release/deployment/docker-compose/bootstrap/clickhouse-init/init-sql/observability_annotations.sql new file mode 100755 index 000000000..03645b6ff --- /dev/null +++ b/release/deployment/docker-compose/bootstrap/clickhouse-init/init-sql/observability_annotations.sql @@ -0,0 +1,32 @@ +CREATE TABLE IF NOT EXISTS `observability_annotations` ( + `id` String, + `span_id` String, + `trace_id` String, + `start_time` Int64, + `space_id` String, + `annotation_type` String, + `annotation_index` Array(String), + `key` String, + `value_type` String, + `value_string` String, + `value_long` Int64, + `value_float` Float64, + `value_bool` Bool, + `reasoning` String, + `correction` String, + `metadata` String, + `status` String, + `created_by` String, + `created_at` UInt64, + `updated_by` String, + `updated_at` UInt64, + `deleted_at` UInt64, + `start_date` Date, + INDEX idx_id id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_span_id span_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_trace_id trace_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_space_id space_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_annotation_type annotation_type TYPE bloom_filter() GRANULARITY 1 +) ENGINE = ReplacingMergeTree(updated_at) PARTITION BY toDate(start_time / 1000000) +PRIMARY KEY (start_time) +ORDER BY (start_time, id); \ No newline at end of file diff --git a/release/deployment/docker-compose/conf/observability.yaml b/release/deployment/docker-compose/conf/observability.yaml index 51bfd44a0..9c4c40d63 100644 --- a/release/deployment/docker-compose/conf/observability.yaml +++ b/release/deployment/docker-compose/conf/observability.yaml @@ -61,9 +61,10 @@ trace_tenant_cfg: cozeloop: 365d: span_table: "observability_spans" + anno_table: "observability_annotations" default_ingest_tenant: "cozeloop" tenants_support_annotation: - cozeloop: false + cozeloop: true trace_field_meta_info: available_fields: @@ -177,6 +178,8 @@ trace_field_meta_info: - "exist" - "not_exist" support_custom: true + feedback_manual: + support_custom: true field_metas: default: root_span: @@ -381,3 +384,10 @@ task_mq_consumer_config: topic: "trace_to_task" consumer_group: "trace_to_task_cg" worker_num: 4 + + +annotation_source_cfg: + source_cfg: + default: + tenant: ["cozeloop"] + "annotation_type": "openapi_feedback" diff --git a/release/deployment/helm-chart/charts/app/bootstrap/init/clickhouse/init-sql/observability_annotations.sql b/release/deployment/helm-chart/charts/app/bootstrap/init/clickhouse/init-sql/observability_annotations.sql new file mode 100755 index 000000000..03645b6ff --- /dev/null +++ b/release/deployment/helm-chart/charts/app/bootstrap/init/clickhouse/init-sql/observability_annotations.sql @@ -0,0 +1,32 @@ +CREATE TABLE IF NOT EXISTS `observability_annotations` ( + `id` String, + `span_id` String, + `trace_id` String, + `start_time` Int64, + `space_id` String, + `annotation_type` String, + `annotation_index` Array(String), + `key` String, + `value_type` String, + `value_string` String, + `value_long` Int64, + `value_float` Float64, + `value_bool` Bool, + `reasoning` String, + `correction` String, + `metadata` String, + `status` String, + `created_by` String, + `created_at` UInt64, + `updated_by` String, + `updated_at` UInt64, + `deleted_at` UInt64, + `start_date` Date, + INDEX idx_id id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_span_id span_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_trace_id trace_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_space_id space_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_annotation_type annotation_type TYPE bloom_filter() GRANULARITY 1 +) ENGINE = ReplacingMergeTree(updated_at) PARTITION BY toDate(start_time / 1000000) +PRIMARY KEY (start_time) +ORDER BY (start_time, id); \ No newline at end of file diff --git a/release/deployment/helm-chart/umbrella/conf/observability.yaml b/release/deployment/helm-chart/umbrella/conf/observability.yaml index 4cb2dbd43..9c4c40d63 100644 --- a/release/deployment/helm-chart/umbrella/conf/observability.yaml +++ b/release/deployment/helm-chart/umbrella/conf/observability.yaml @@ -61,9 +61,10 @@ trace_tenant_cfg: cozeloop: 365d: span_table: "observability_spans" + anno_table: "observability_annotations" default_ingest_tenant: "cozeloop" tenants_support_annotation: - cozeloop: false + cozeloop: true trace_field_meta_info: available_fields: @@ -177,6 +178,8 @@ trace_field_meta_info: - "exist" - "not_exist" support_custom: true + feedback_manual: + support_custom: true field_metas: default: root_span: @@ -330,17 +333,6 @@ query_trace_rate_limit_config: space_max_qps: 123456: 100 -key_columns: - - "start_time" - - "span_id" - - "parent_id" - - "duration" - - "span_type" - - "span_name" - - "status_code" - - "tags_long" - - "logic_delete_date" - key_span_type: default: - "model" @@ -392,3 +384,10 @@ task_mq_consumer_config: topic: "trace_to_task" consumer_group: "trace_to_task_cg" worker_num: 4 + + +annotation_source_cfg: + source_cfg: + default: + tenant: ["cozeloop"] + "annotation_type": "openapi_feedback"