diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 23fe2bf2b..f11bd69c5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error run: | - go test -coverprofile=profile${{ matrix.flags }}.out -covermode=atomic -v -coverpkg=./... ./... -json > test-report${{ matrix.flags }}.json + go test -coverprofile=profile${{ matrix.flags }}.out -covermode=atomic -v -coverpkg=./... ./... -json | tee test-report${{ matrix.flags }}.json - name: Coverage report run: | @@ -101,7 +101,7 @@ jobs: GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error run: | - go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json > test-report-e2e${{ matrix.flags }}.json + go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json | tee test-report-e2e${{ matrix.flags }}.json - name: Coverage report run: | diff --git a/_sidebar.idoc.md b/_sidebar.idoc.md index 0573472e5..a70f5603b 100644 --- a/_sidebar.idoc.md +++ b/_sidebar.idoc.md @@ -16,7 +16,7 @@ @global-contents-table-plugin-input|links-list - Action @global-contents-table-plugin-action|links-list - - Output + - [Output](/plugin/output/README.md) @global-contents-table-plugin-output|links-list - **Pipeline** diff --git a/_sidebar.md b/_sidebar.md index 637498584..cc4491022 100644 --- a/_sidebar.md +++ b/_sidebar.md @@ -50,7 +50,7 @@ - [split](plugin/action/split/README.md) - [throttle](plugin/action/throttle/README.md) - - Output + - [Output](/plugin/output/README.md) - [clickhouse](plugin/output/clickhouse/README.md) - [devnull](plugin/output/devnull/README.md) - [elasticsearch](plugin/output/elasticsearch/README.md) diff --git a/e2e/file_clickhouse/config.yml b/e2e/file_clickhouse/config.yml index 63de453ee..7097ea0e2 100644 --- a/e2e/file_clickhouse/config.yml +++ b/e2e/file_clickhouse/config.yml @@ -21,11 +21,59 @@ pipelines: override: true - type: debug output: + deadqueue: + type: clickhouse + addresses: + - 127.0.0.1:9001 + table: test_table_insert + insert_timeout: 1m + columns: + - name: c1 + type: String + - name: c2 + type: Int8 + - name: c3 + type: Int16 + - name: c4 + type: Nullable(Int16) + - name: c5 + type: Nullable(String) + - name: level + type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4) + - name: ipv4 + type: Nullable(IPv4) + - name: ipv6 + type: Nullable(IPv6) + - name: ts + type: DateTime + - name: ts_with_tz + type: DateTime('Europe/Moscow') + - name: ts64 + type: DateTime64(3, 'UTC') + - name: ts64_auto + type: DateTime64(9, 'UTC') + - name: ts_rfc3339nano + type: DateTime64(9) + - name: f32 + type: Float32 + - name: f64 + type: Float64 + - name: lc_str + type: LowCardinality(String) + - name: str_arr + type: Array(String) + - name: map_str_str + type: Map(String,String) + - name: uuid + type: UUID + - name: uuid_nullable + type: Nullable(UUID) type: clickhouse addresses: - 127.0.0.1:9001 - table: test_table_insert + table: test_table_insert_not_exists insert_timeout: 1m + retry: 0 columns: - name: c1 type: String diff --git a/e2e/kafka_auth/docker-compose.yml b/e2e/kafka_auth/docker-compose.yml index 90edf224c..3e10b9acd 100644 --- a/e2e/kafka_auth/docker-compose.yml +++ b/e2e/kafka_auth/docker-compose.yml @@ -2,13 +2,16 @@ version: "2.1" services: zookeeper: - image: docker.io/bitnami/zookeeper:3.9 + image: zookeeper:3.9 ports: - "2182:2181" volumes: - - "zookeeper_data:/bitnami" + - "zookeeper_data:/data" environment: - - ALLOW_ANONYMOUS_LOGIN=yes + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_4LW_COMMANDS_WHITELIST: "*" + ZOO_SERVERS: server.1=zookeeper:2888:3888;2181 init-certs: image: docker.io/bitnami/kafka:3.6 command: /tmp/generate.sh diff --git a/e2e/kafka_file/docker-compose.yml b/e2e/kafka_file/docker-compose.yml index 924743dfe..927ad4bb7 100644 --- a/e2e/kafka_file/docker-compose.yml +++ b/e2e/kafka_file/docker-compose.yml @@ -2,13 +2,16 @@ version: "2" services: zookeeper: - image: docker.io/bitnami/zookeeper:3.8 + image: zookeeper:3.9 ports: - "2181:2181" volumes: - - "zookeeper_data:/bitnami" + - "zookeeper_data:/data" environment: - - ALLOW_ANONYMOUS_LOGIN=yes + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_4LW_COMMANDS_WHITELIST: "*" + ZOO_SERVERS: server.1=zookeeper:2888:3888;2181 kafka: image: docker.io/bitnami/kafka:3.1 ports: diff --git a/fd/file.d.go b/fd/file.d.go index 055083883..11e1eb0eb 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -222,6 +222,13 @@ func (f *FileD) setupOutput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCo PluginRuntimeInfo: f.instantiatePlugin(info), }) + if info.DeadQueueInfo != nil { + p.SetDeadQueueOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: info.DeadQueueInfo, + PluginRuntimeInfo: f.instantiatePlugin(info.DeadQueueInfo), + }) + } + return nil } @@ -249,6 +256,39 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip if err != nil { return nil, err } + + deadqueue := configJSON.Get("deadqueue") + var deadqueueInfo *pipeline.PluginStaticInfo + if deadqueueMap := deadqueue.MustMap(); deadqueueMap != nil { + if len(deadqueueMap) > 0 { + deadqueueType := deadqueue.Get("type").MustString() + if deadqueueType == "" { + return nil, fmt.Errorf("deadqueue of %s doesn't have type", pluginKind) + } + deadqueueInfo, err = f.plugins.Get(pluginKind, deadqueueType) + if err != nil { + return nil, err + } + + deadqueue.Del("type") + + deadqueueConfigJson, err := deadqueue.Encode() + if err != nil { + logger.Panicf("can't create config json for %s deadqueue", deadqueueType) + } + + config, err := pipeline.GetConfig(deadqueueInfo, deadqueueConfigJson, values) + if err != nil { + logger.Fatalf("error on creating deadqueue of %s with type %q: %s", deadqueueType, pluginKind, err.Error()) + } + deadqueueInfo.Config = config + + // TODO: recursive deadqueue config + // deadqueueForDeadqueue := deadqueue.Get("deadqueue").MustMap() + } + configJSON.Del("deadqueue") + } + configJson, err := configJSON.Encode() if err != nil { logger.Panicf("can't create config json for %s", t) @@ -260,6 +300,9 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip infoCopy := *info infoCopy.Config = config + if deadqueueInfo != nil { + infoCopy.DeadQueueInfo = deadqueueInfo + } return &infoCopy, nil } diff --git a/pipeline/backoff.go b/pipeline/backoff.go index 06ca8db89..735c42eae 100644 --- a/pipeline/backoff.go +++ b/pipeline/backoff.go @@ -8,25 +8,28 @@ import ( ) type RetriableBatcher struct { - outFn RetriableBatcherOutFn - batcher *Batcher - backoffOpts BackoffOpts - onRetryError func(err error) + outFn RetriableBatcherOutFn + batcher *Batcher + backoffOpts BackoffOpts + isDeadQueueAvailable bool + onRetryError func(err error, events []*Event) } type RetriableBatcherOutFn func(*WorkerData, *Batch) error type BackoffOpts struct { - MinRetention time.Duration - Multiplier float64 - AttemptNum int + MinRetention time.Duration + Multiplier float64 + AttemptNum int + IsDeadQueueAvailable bool } -func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher { +func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error, events []*Event)) *RetriableBatcher { batcherBackoff := &RetriableBatcher{ - outFn: batcherOutFn, - backoffOpts: opts, - onRetryError: onError, + outFn: batcherOutFn, + backoffOpts: opts, + onRetryError: onError, + isDeadQueueAvailable: opts.IsDeadQueueAvailable, } batcherBackoff.setBatcher(batcherOpts) return batcherBackoff @@ -58,7 +61,15 @@ func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) { } next := exponentionalBackoff.NextBackOff() if next == backoff.Stop || (b.backoffOpts.AttemptNum >= 0 && numTries > b.backoffOpts.AttemptNum) { - b.onRetryError(err) + var events []*Event + if batch != nil { + events = batch.events + } + b.onRetryError(err, events) + if batch != nil && b.isDeadQueueAvailable { + batch.reset() + batch.status = BatchStatusInDeadQueue + } return } numTries++ diff --git a/pipeline/backoff_test.go b/pipeline/backoff_test.go index 1d73a2bf1..fd5c01ea3 100644 --- a/pipeline/backoff_test.go +++ b/pipeline/backoff_test.go @@ -3,6 +3,7 @@ package pipeline import ( "errors" "testing" + "time" "github.com/ozontech/file.d/metric" "github.com/prometheus/client_golang/prometheus" @@ -17,7 +18,7 @@ func TestBackoff(t *testing.T) { eventCount := &atomic.Int32{} eventCountBefore := eventCount.Load() - errorFn := func(err error) { + errorFn := func(err error, events []*Event) { errorCount.Inc() } @@ -29,12 +30,17 @@ func TestBackoff(t *testing.T) { eventCount.Inc() return nil }, - BackoffOpts{AttemptNum: 3}, + BackoffOpts{ + AttemptNum: 3, + IsDeadQueueAvailable: false, + }, errorFn, ) - batcherBackoff.Out(nil, nil) - + data := WorkerData(nil) + batch := newBatch(1, 1, 1*time.Second) + batch.append(&Event{}) + batcherBackoff.Out(&data, batch) assert.Equal(t, errorCountBefore, errorCount.Load(), "wrong error count") assert.Equal(t, eventCountBefore+1, eventCount.Load(), "wrong event count") } @@ -42,7 +48,37 @@ func TestBackoff(t *testing.T) { func TestBackoffWithError(t *testing.T) { errorCount := &atomic.Int32{} prevValue := errorCount.Load() - errorFn := func(err error) { + errorFn := func(err error, events []*Event) { + errorCount.Inc() + } + + batcherBackoff := NewRetriableBatcher( + &BatcherOptions{ + MetricCtl: metric.NewCtl("", prometheus.NewRegistry()), + }, + func(workerData *WorkerData, batch *Batch) error { + return errors.New("some error") + }, + BackoffOpts{ + AttemptNum: 3, + IsDeadQueueAvailable: false, + }, + errorFn, + ) + + data := WorkerData(nil) + batch := newBatch(1, 1, 1*time.Second) + batch.append(&Event{}) + batcherBackoff.Out(&data, batch) + assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count") + assert.Equal(t, 1, len(batch.events), "wrong number of events in batch") + assert.Equal(t, BatchStatusNotReady, batch.status, "wrong batch status") +} + +func TestBackoffWithErrorWithDeadQueue(t *testing.T) { + errorCount := &atomic.Int32{} + prevValue := errorCount.Load() + errorFn := func(err error, events []*Event) { errorCount.Inc() } @@ -53,10 +89,18 @@ func TestBackoffWithError(t *testing.T) { func(workerData *WorkerData, batch *Batch) error { return errors.New("some error") }, - BackoffOpts{AttemptNum: 3}, + BackoffOpts{ + AttemptNum: 3, + IsDeadQueueAvailable: true, + }, errorFn, ) - batcherBackoff.Out(nil, nil) + data := WorkerData(nil) + batch := newBatch(1, 1, 1*time.Second) + batch.append(&Event{}) + batcherBackoff.Out(&data, batch) assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count") + assert.Equal(t, 0, len(batch.events), "wrong number of events in batch") + assert.Equal(t, BatchStatusInDeadQueue, batch.status, "wrong batch status") } diff --git a/pipeline/batch.go b/pipeline/batch.go index 22ee3bcfa..03af1fe01 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -16,6 +16,7 @@ const ( BatchStatusNotReady BatchStatus = iota BatchStatusMaxSizeExceeded BatchStatusTimeoutExceeded + BatchStatusInDeadQueue ) type Batch struct { @@ -125,11 +126,12 @@ type Batcher struct { outSeq int64 commitSeq int64 - batchOutFnSeconds prometheus.Observer - commitWaitingSeconds prometheus.Observer - workersInProgress prometheus.Gauge - batchesDoneByMaxSize prometheus.Counter - batchesDoneByTimeout prometheus.Counter + batchOutFnSeconds prometheus.Observer + commitWaitingSeconds prometheus.Observer + workersInProgress prometheus.Gauge + batchesDoneByMaxSize prometheus.Counter + batchesDoneByTimeout prometheus.Counter + batchesRoutedToDeadQueue prometheus.Counter } type ( @@ -163,16 +165,17 @@ func NewBatcher(opts BatcherOptions) *Batcher { // nolint: gocritic // hugeParam seqMu := &sync.Mutex{} return &Batcher{ - seqMu: seqMu, - cond: sync.NewCond(seqMu), - freeBatches: freeBatches, - fullBatches: fullBatches, - opts: opts, - batchOutFnSeconds: ctl.RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong), - commitWaitingSeconds: ctl.RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed), - workersInProgress: ctl.RegisterGauge("batcher_workers_in_progress", ""), - batchesDoneByMaxSize: jobsDone.WithLabelValues("max_size_exceeded"), - batchesDoneByTimeout: jobsDone.WithLabelValues("timeout_exceeded"), + seqMu: seqMu, + cond: sync.NewCond(seqMu), + freeBatches: freeBatches, + fullBatches: fullBatches, + opts: opts, + batchOutFnSeconds: ctl.RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong), + commitWaitingSeconds: ctl.RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed), + workersInProgress: ctl.RegisterGauge("batcher_workers_in_progress", ""), + batchesDoneByMaxSize: jobsDone.WithLabelValues("max_size_exceeded"), + batchesDoneByTimeout: jobsDone.WithLabelValues("timeout_exceeded"), + batchesRoutedToDeadQueue: jobsDone.WithLabelValues("routed_in_deadqueue"), } } @@ -220,6 +223,8 @@ func (b *Batcher) work() { b.batchesDoneByMaxSize.Inc() case BatchStatusTimeoutExceeded: b.batchesDoneByTimeout.Inc() + case BatchStatusInDeadQueue: + b.batchesRoutedToDeadQueue.Inc() default: logger.Panic("unreachable") } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 50d896f4f..261e5bc57 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -113,8 +113,7 @@ type Pipeline struct { procCount *atomic.Int32 activeProcs *atomic.Int32 - output OutputPlugin - outputInfo *OutputPluginInfo + router *Router metricHolder *metric.Holder @@ -199,6 +198,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap PipelineSettings: settings, MetricCtl: metricCtl, }, + router: NewRouter(), actionMetrics: actionMetrics{ m: make(map[string]*actionMetric), mu: new(sync.RWMutex), @@ -301,7 +301,7 @@ func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux) { if p.input == nil { p.logger.Panic("input isn't set") } - if p.output == nil { + if p.router.output == nil { p.logger.Panic("output isn't set") } @@ -321,7 +321,7 @@ func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux) { } } - for hName, handler := range p.outputInfo.PluginStaticInfo.Endpoints { + for hName, handler := range p.router.outputInfo.PluginStaticInfo.Endpoints { mux.HandleFunc(fmt.Sprintf("%s/%d/%s", prefix, len(p.actionInfos)+1, hName), handler) } } @@ -330,7 +330,7 @@ func (p *Pipeline) Start() { if p.input == nil { p.logger.Panic("input isn't set") } - if p.output == nil { + if p.router.output == nil { p.logger.Panic("output isn't set") } @@ -339,11 +339,11 @@ func (p *Pipeline) Start() { outputParams := &OutputPluginParams{ PluginDefaultParams: p.actionParams, Controller: p, - Logger: p.logger.Sugar().Named("output").Named(p.outputInfo.Type), + Logger: p.logger.Sugar().Named("output").Named(p.router.outputInfo.Type), } - p.logger.Info("starting output plugin", zap.String("name", p.outputInfo.Type)) + p.logger.Info("starting output plugin", zap.String("name", p.router.outputInfo.Type)) - p.output.Start(p.outputInfo.Config, outputParams) + p.router.Start(outputParams) p.logger.Info("stating processors", zap.Int("count", len(p.Procs))) for _, processor := range p.Procs { @@ -382,7 +382,7 @@ func (p *Pipeline) Stop() { p.input.Stop() p.logger.Info("stopping output") - p.output.Stop() + p.router.Stop() p.shouldStop.Store(true) @@ -399,12 +399,15 @@ func (p *Pipeline) GetInput() InputPlugin { } func (p *Pipeline) SetOutput(info *OutputPluginInfo) { - p.outputInfo = info - p.output = info.Plugin.(OutputPlugin) + p.router.SetOutput(info) +} + +func (p *Pipeline) SetDeadQueueOutput(info *OutputPluginInfo) { + p.router.SetDeadQueueOutput(info) } func (p *Pipeline) GetOutput() OutputPlugin { - return p.output + return p.router.output } // In decodes message and passes it to event stream. @@ -624,6 +627,7 @@ func (p *Pipeline) streamEvent(event *Event) uint64 { } func (p *Pipeline) Commit(event *Event) { + p.router.Ack(event) p.finalize(event, true, true) } @@ -758,7 +762,7 @@ func (p *Pipeline) newProc(id int) *processor { id, &p.actionMetrics, p.activeProcs, - p.output, + p.router, p.streamer, p.finalize, p.IncMaxEventSizeExceeded, diff --git a/pipeline/plugin.go b/pipeline/plugin.go index 3e0a7c749..95da53d8e 100644 --- a/pipeline/plugin.go +++ b/pipeline/plugin.go @@ -59,6 +59,7 @@ type ActionPluginParams struct { type OutputPluginParams struct { PluginDefaultParams Controller OutputPluginController + Router *Router Logger *zap.SugaredLogger } @@ -77,6 +78,8 @@ type PluginStaticInfo struct { // Every plugin can provide their own API through Endpoints. Endpoints map[string]func(http.ResponseWriter, *http.Request) AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config + // TODO: maybe to OutputPluginStaticInfo cause uses by output and action plugins? + DeadQueueInfo *PluginStaticInfo } type PluginRuntimeInfo struct { diff --git a/pipeline/processor.go b/pipeline/processor.go index cf02c51c8..f0efbbe33 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -55,7 +55,7 @@ func allEventStatuses() []eventStatus { type processor struct { id int streamer *streamer - output OutputPlugin + router *Router finalize finalizeFn activeCounter *atomic.Int32 @@ -78,7 +78,7 @@ func newProcessor( id int, actionMetrics *actionMetrics, activeCounter *atomic.Int32, - output OutputPlugin, + router *Router, streamer *streamer, finalizeFn finalizeFn, incMaxEventSizeExceededFn func(lvs ...string), @@ -88,7 +88,7 @@ func newProcessor( id: id, streamer: streamer, actionMetrics: actionMetrics, - output: output, + router: router, finalize: finalizeFn, activeCounter: activeCounter, @@ -153,7 +153,7 @@ func (p *processor) processSequence(event *Event) bool { } event.stage = eventStageOutput - p.output.Out(event) + p.router.Out(event) } return true @@ -447,7 +447,7 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { ok, _ := p.doActions(child) if ok { child.stage = eventStageOutput - p.output.Out(child) + p.router.Out(child) } } diff --git a/pipeline/router.go b/pipeline/router.go new file mode 100644 index 000000000..31f8c587d --- /dev/null +++ b/pipeline/router.go @@ -0,0 +1,56 @@ +package pipeline + +type Router struct { + output OutputPlugin + outputInfo *OutputPluginInfo + + deadQueue OutputPlugin + deadQueueInfo *OutputPluginInfo +} + +func NewRouter() *Router { + return &Router{} +} + +func (r *Router) SetOutput(info *OutputPluginInfo) { + r.outputInfo = info + r.output = info.Plugin.(OutputPlugin) +} + +func (r *Router) SetDeadQueueOutput(info *OutputPluginInfo) { + r.deadQueueInfo = info + r.deadQueue = info.Plugin.(OutputPlugin) +} + +func (r *Router) Ack(event *Event) { + // TODO: send commit to input after receiving all acks from outputs +} + +func (r *Router) Fail(event *Event) { + if r.IsDeadQueueAvailable() { + r.deadQueue.Out(event) + } +} + +func (r *Router) Out(event *Event) { + r.output.Out(event) +} + +func (r *Router) Stop() { + r.output.Stop() + if r.IsDeadQueueAvailable() { + r.deadQueue.Stop() + } +} + +func (r *Router) IsDeadQueueAvailable() bool { + return r.deadQueue != nil +} + +func (r *Router) Start(params *OutputPluginParams) { + params.Router = r + r.output.Start(r.outputInfo.Config, params) + if r.IsDeadQueueAvailable() { + r.deadQueue.Start(r.deadQueueInfo.Config, params) + } +} diff --git a/pipeline/router_test.go b/pipeline/router_test.go new file mode 100644 index 000000000..f6518b681 --- /dev/null +++ b/pipeline/router_test.go @@ -0,0 +1,160 @@ +package pipeline_test + +import ( + "sync" + "testing" + "time" + + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/plugin/output/devnull" + "github.com/ozontech/file.d/test" + insaneJSON "github.com/ozontech/insane-json" + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +type fakeOutputPluginController struct { + mu sync.Mutex + commits []*pipeline.Event + errors []string +} + +func (f *fakeOutputPluginController) Commit(event *pipeline.Event) { + f.mu.Lock() + defer f.mu.Unlock() + f.commits = append(f.commits, event) +} +func (f *fakeOutputPluginController) Error(err string) { + f.mu.Lock() + defer f.mu.Unlock() + f.errors = append(f.errors, err) +} + +func (f *fakeOutputPluginController) getCommits() []*pipeline.Event { + f.mu.Lock() + defer f.mu.Unlock() + return f.commits +} + +func (f *fakeOutputPluginController) getErrors() []string { + f.mu.Lock() + defer f.mu.Unlock() + return f.errors +} + +func TestRouterNormalProcessing(t *testing.T) { + t.Parallel() + + r := pipeline.NewRouter() + controller := &fakeOutputPluginController{} + + // Setup main output that succeeds + var outputCount atomic.Int32 + outputPlugin, outputConfig := createDevNullPlugin(func(event *pipeline.Event) { + outputCount.Add(1) + }) + + r.SetOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: &pipeline.PluginStaticInfo{Config: outputConfig}, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: outputPlugin}, + }) + + // Setup dead queue that shouldn't be used + var deadQueueCount atomic.Int32 + deadQueuePlugin, deadQueueConfig := createDevNullPlugin(func(event *pipeline.Event) { + deadQueueCount.Add(1) + }) + + r.SetDeadQueueOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: &pipeline.PluginStaticInfo{Config: deadQueueConfig}, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: deadQueuePlugin}, + }) + + params := test.NewEmptyOutputPluginParams() + params.PipelineName = "test_pipeline" + params.Router = r + params.Controller = controller + r.Start(params) + defer r.Stop() + + // Send test event + event := newEvent(t) + r.Out(event) + + // Wait for processing + assert.Eventually(t, func() bool { + return outputCount.Load() == 1 + }, 100*time.Millisecond, 10*time.Millisecond, "event should be processed") + + assert.Equal(t, int32(0), deadQueueCount.Load(), "dead queue should not be used") + assert.Len(t, controller.getCommits(), 1, "should commit successful event") + assert.Empty(t, controller.getErrors(), "should not produce errors") +} + +func TestRouterDeadQueueProcessing(t *testing.T) { + t.Parallel() + + r := pipeline.NewRouter() + controller := &fakeOutputPluginController{} + + // Setup main output that fails + var outputCount atomic.Int32 + outputPlugin, outputConfig := createDevNullPlugin(func(event *pipeline.Event) { + outputCount.Add(1) + r.Fail(event) + }) + + r.SetOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: &pipeline.PluginStaticInfo{Config: outputConfig}, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: outputPlugin}, + }) + + // Setup dead queue + var deadQueueCount atomic.Int32 + deadQueuePlugin, deadQueueConfig := createDevNullPlugin(func(event *pipeline.Event) { + deadQueueCount.Add(1) + }) + + r.SetDeadQueueOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: &pipeline.PluginStaticInfo{Config: deadQueueConfig}, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: deadQueuePlugin}, + }) + + params := test.NewEmptyOutputPluginParams() + params.PipelineName = "test_pipeline" + params.Router = r + params.Controller = controller + r.Start(params) + defer r.Stop() + + // Send test event + event := newEvent(t) + r.Out(event) + + // Wait for processing + assert.Eventually(t, func() bool { + return deadQueueCount.Load() == 1 + }, 100*time.Millisecond, 10*time.Millisecond, "event should go to dead queue") + + assert.Equal(t, int32(1), outputCount.Load(), "main output should try to process") + assert.Len(t, controller.getCommits(), 2, "should commit successful event") + assert.Empty(t, controller.getErrors(), "should not produce errors") +} + +func createDevNullPlugin(outFn func(event *pipeline.Event)) (*devnull.Plugin, pipeline.AnyConfig) { + plugin, config := devnull.Factory() + p := plugin.(*devnull.Plugin) + p.SetOutFn(outFn) + return p, config +} + +func newEvent(t *testing.T) *pipeline.Event { + root, err := insaneJSON.DecodeString(`{}`) + if err != nil { + t.Skip() // ignore invalid input + } + return &pipeline.Event{ + Root: root, + Buf: make([]byte, 0, 1024), + } +} diff --git a/plugin/README.md b/plugin/README.md index 1e6c8f9e4..ac1f3fe5b 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -764,6 +764,8 @@ It sends the event batches to Clickhouse database using File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/clickhouse/README.md) ## devnull It provides an API to test pipelines and other plugins. @@ -773,6 +775,8 @@ It provides an API to test pipelines and other plugins. It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/elasticsearch/README.md) ## file It sends event batches into files. @@ -796,18 +800,26 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/gelf/README.md) ## kafka It sends the event batches to kafka brokers using `franz-go` lib. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/kafka/README.md) ## loki It sends the logs batches to Loki using HTTP API. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/loki/README.md) ## postgres It sends the event batches to postgres db using pgx. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/postgres/README.md) ## s3 Sends events to s3 output of one or multiple buckets. @@ -936,6 +948,8 @@ Out: } ``` +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/splunk/README.md) ## stdout It writes events to stdout(also known as console). diff --git a/plugin/output/README.idoc.md b/plugin/output/README.idoc.md index 70d8c3b7f..3e5b3a9c3 100644 --- a/plugin/output/README.idoc.md +++ b/plugin/output/README.idoc.md @@ -1,3 +1,74 @@ # Output plugins -@global-contents-table-plugin-output|contents-table \ No newline at end of file +@global-contents-table-plugin-output|contents-table + +## dead queue + +Failed events from the main pipeline are redirected to a dead-letter queue (DLQ) to prevent data loss and enable recovery. + +### Examples + +#### Dead queue to the reserve elasticsearch + +Consumes logs from a Kafka topic. Sends them to Elasticsearch (primary cluster). Fails over to a reserve ("dead-letter") Elasticsearch if the primary is unavailable. + +```yaml +main_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs + output: + type: elasticsearch + workers_count: 32 + endpoints: + - http://elasticsearch-primary:9200 + # route to reserve elasticsearch + deadqueue: + endpoints: + - http://elasticsearch-reserve:9200 + type: elasticsearch +``` + +#### Dead queue with second kafka topic and low priority consumer + +Main Pipeline: Processes logs from Kafka → Elasticsearch. Failed events go to a dead-letter Kafka topic. + +Dead-Queue Pipeline: Re-processes failed events from the DLQ topic with lower priority. + +```yaml +main_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs + output: + type: elasticsearch + workers_count: 32 + endpoints: + - http://elasticsearch:9200 + # route to deadqueue pipeline + deadqueue: + brokers: + - kafka:9092 + default_topic: logs-deadqueue + type: kafka + +deadqueue_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs-deadqueue + output: + type: elasticsearch + workers_count: 1 # low priority + fatal_on_failed_insert: false + endpoints: + - http://elasticsearch:9200 +``` \ No newline at end of file diff --git a/plugin/output/README.md b/plugin/output/README.md index f968a6b45..0a584b075 100755 --- a/plugin/output/README.md +++ b/plugin/output/README.md @@ -7,6 +7,8 @@ It sends the event batches to Clickhouse database using File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/clickhouse/README.md) ## devnull It provides an API to test pipelines and other plugins. @@ -16,6 +18,8 @@ It provides an API to test pipelines and other plugins. It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/elasticsearch/README.md) ## file It sends event batches into files. @@ -39,18 +43,26 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/gelf/README.md) ## kafka It sends the event batches to kafka brokers using `franz-go` lib. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/kafka/README.md) ## loki It sends the logs batches to Loki using HTTP API. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/loki/README.md) ## postgres It sends the event batches to postgres db using pgx. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/postgres/README.md) ## s3 Sends events to s3 output of one or multiple buckets. @@ -179,9 +191,82 @@ Out: } ``` +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/splunk/README.md) ## stdout It writes events to stdout(also known as console). [More details...](plugin/output/stdout/README.md) + +## dead queue + +Failed events from the main pipeline are redirected to a dead-letter queue (DLQ) to prevent data loss and enable recovery. + +### Examples + +#### Dead queue to the reserve elasticsearch + +Consumes logs from a Kafka topic. Sends them to Elasticsearch (primary cluster). Fails over to a reserve ("dead-letter") Elasticsearch if the primary is unavailable. + +```yaml +main_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs + output: + type: elasticsearch + workers_count: 32 + endpoints: + - http://elasticsearch-primary:9200 + # route to reserve elasticsearch + deadqueue: + endpoints: + - http://elasticsearch-reserve:9200 + type: elasticsearch +``` + +#### Dead queue with second kafka topic and low priority consumer + +Main Pipeline: Processes logs from Kafka → Elasticsearch. Failed events go to a dead-letter Kafka topic. + +Dead-Queue Pipeline: Re-processes failed events from the DLQ topic with lower priority. + +```yaml +main_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs + output: + type: elasticsearch + workers_count: 32 + endpoints: + - http://elasticsearch:9200 + # route to deadqueue pipeline + deadqueue: + brokers: + - kafka:9092 + default_topic: logs-deadqueue + type: kafka + +deadqueue_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs-deadqueue + output: + type: elasticsearch + workers_count: 1 # low priority + fatal_on_failed_insert: false + endpoints: + - http://elasticsearch:9200 +```
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md index a59df51b0..08b1b374e 100644 --- a/plugin/output/clickhouse/README.md +++ b/plugin/output/clickhouse/README.md @@ -5,6 +5,8 @@ It sends the event batches to Clickhouse database using File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`addresses`** *`[]Address`* *`required`* @@ -128,8 +130,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 04e69ef32..ca3338647 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -30,6 +30,8 @@ It sends the event batches to Clickhouse database using [Native protocol](https://clickhouse.com/docs/en/interfaces/tcp/). File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -58,6 +60,8 @@ type Plugin struct { // plugin metrics insertErrorsMetric prometheus.Counter queriesCountMetric prometheus.Counter + + router *pipeline.Router } type Setting struct { @@ -245,8 +249,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 @@ -437,15 +440,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -454,6 +459,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Log(level, "can't insert to the table", zap.Error(err), zap.Int("retries", p.config.Retry), zap.String("table", p.config.Table)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/devnull/devnull.go b/plugin/output/devnull/devnull.go index 09522be46..4c2668ff6 100644 --- a/plugin/output/devnull/devnull.go +++ b/plugin/output/devnull/devnull.go @@ -12,6 +12,7 @@ It provides an API to test pipelines and other plugins. type Plugin struct { controller pipeline.OutputPluginController + router *pipeline.Router outFn func(event *pipeline.Event) total *atomic.Int64 } @@ -31,6 +32,7 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { func (p *Plugin) Start(_ pipeline.AnyConfig, params *pipeline.OutputPluginParams) { p.controller = params.Controller + p.router = params.Router p.total = &atomic.Int64{} } diff --git a/plugin/output/elasticsearch/README.md b/plugin/output/elasticsearch/README.md index 409c8ecee..21114e946 100755 --- a/plugin/output/elasticsearch/README.md +++ b/plugin/output/elasticsearch/README.md @@ -2,6 +2,8 @@ It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`endpoints`** *`[]string`* *`required`* @@ -128,8 +130,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index c96fea372..27d82e9ca 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -23,6 +23,8 @@ import ( /*{ introduction It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -50,6 +52,8 @@ type Plugin struct { // plugin metrics sendErrorMetric *prometheus.CounterVec indexingErrorsMetric prometheus.Counter + + router *pipeline.Router } // ! config-params @@ -167,8 +171,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 @@ -256,15 +259,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -273,6 +278,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Log(level, "can't send to the elastic", zap.Error(err), zap.Int("retries", p.config.Retry), ) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/gelf/README.md b/plugin/output/gelf/README.md index 3e5738d3c..5666789d7 100755 --- a/plugin/output/gelf/README.md +++ b/plugin/output/gelf/README.md @@ -16,6 +16,8 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`endpoint`** *`string`* *`required`* @@ -121,8 +123,7 @@ After this timeout the batch will be sent even if batch isn't completed. **`retry`** *`int`* *`default=0`* -Retries of insertion. If File.d cannot insert for this number of attempts, -File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index 1b01226c7..8b83f5714 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -33,6 +33,8 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -48,6 +50,8 @@ type Plugin struct { // plugin metrics sendErrorMetric prometheus.Counter + + router *pipeline.Router } // ! config-params @@ -149,8 +153,7 @@ type Config struct { // > @3@4@5@6 // > - // > Retries of insertion. If File.d cannot insert for this number of attempts, - // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. Retry int `json:"retry" default:"0"` // * // > @3@4@5@6 @@ -230,15 +233,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -247,6 +252,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Desugar().Log(level, "can't send to gelf", zap.Error(err), zap.Int("retries", p.config.Retry), ) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md index eda1a50ec..d1ef51b20 100755 --- a/plugin/output/kafka/README.md +++ b/plugin/output/kafka/README.md @@ -1,6 +1,8 @@ # Kafka output It sends the event batches to kafka brokers using `franz-go` lib. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`brokers`** *`[]string`* *`required`* @@ -91,8 +93,7 @@ the client.ForceMetadataRefresh() function is used for some ProduceSync errors: **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index e82b0a76e..213ff57ec 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -18,6 +18,8 @@ import ( /*{ introduction It sends the event batches to kafka brokers using `franz-go` lib. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -40,6 +42,7 @@ type Plugin struct { // plugin metrics sendErrorMetric prometheus.Counter + router *pipeline.Router } // ! config-params @@ -126,8 +129,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 @@ -257,15 +259,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -274,6 +278,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Desugar().Log(level, "can't write batch", zap.Int("retries", p.config.Retry), ) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/loki/README.md b/plugin/output/loki/README.md index 051449dfd..1ce47cee1 100644 --- a/plugin/output/loki/README.md +++ b/plugin/output/loki/README.md @@ -1,6 +1,8 @@ # Loki output It sends the logs batches to Loki using HTTP API. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`address`** *`string`* *`required`* @@ -137,8 +139,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/loki/loki.go b/plugin/output/loki/loki.go index 8353ee7eb..7837ad8ba 100644 --- a/plugin/output/loki/loki.go +++ b/plugin/output/loki/loki.go @@ -24,6 +24,8 @@ import ( /*{ introduction It sends the logs batches to Loki using HTTP API. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ var errUnixNanoFormat = errors.New("please send time in UnixNano format or add a convert_date action") @@ -170,8 +172,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 @@ -235,6 +236,8 @@ type Plugin struct { sendErrorMetric *prometheus.CounterVec labels map[string]string + + router *pipeline.Router } func init() { @@ -270,15 +273,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -286,6 +291,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Log(level, "can't send data to loki", zap.Error(err), zap.Int("retries", p.config.Retry)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/postgres/README.md b/plugin/output/postgres/README.md index 9bfd08881..64a46d73b 100755 --- a/plugin/output/postgres/README.md +++ b/plugin/output/postgres/README.md @@ -1,6 +1,8 @@ # Postgres output It sends the event batches to postgres db using pgx. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`strict`** *`bool`* *`default=false`* @@ -47,8 +49,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 0126fc43f..e47b0498e 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -22,6 +22,8 @@ import ( /*{ introduction It sends the event batches to postgres db using pgx. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ /*{ example @@ -111,6 +113,8 @@ type Plugin struct { duplicatedEventMetric prometheus.Counter writtenEventMetric prometheus.Counter insertErrorsMetric prometheus.Counter + + router *pipeline.Router } type ConfigColumn struct { @@ -162,8 +166,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 @@ -286,15 +289,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -303,6 +308,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Desugar().Log(level, "can't insert to the table", zap.Error(err), zap.Int("retries", p.config.Retry), zap.String("table", p.config.Table)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/s3/README.md b/plugin/output/s3/README.md index 5b914e248..8564e3d1b 100755 --- a/plugin/output/s3/README.md +++ b/plugin/output/s3/README.md @@ -154,7 +154,6 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* After an insert error, fall with a non-zero exit code or not -**Experimental feature**
diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go index e846b605d..1e9ca0f67 100644 --- a/plugin/output/s3/s3.go +++ b/plugin/output/s3/s3.go @@ -244,7 +244,6 @@ type Config struct { // > @3@4@5@6 // > // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 diff --git a/plugin/output/splunk/README.md b/plugin/output/splunk/README.md index 78cc23bec..b9df5013f 100755 --- a/plugin/output/splunk/README.md +++ b/plugin/output/splunk/README.md @@ -47,6 +47,8 @@ Out: } ``` +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`endpoint`** *`string`* *`required`* @@ -125,8 +127,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index 84d1f005b..57df2a09a 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -68,6 +68,8 @@ Out: } } ``` + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -96,6 +98,8 @@ type Plugin struct { // plugin metrics sendErrorMetric *prometheus.CounterVec + + router *pipeline.Router } type CopyField struct { @@ -177,8 +181,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 @@ -263,15 +266,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -279,6 +284,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Desugar().Log(level, "can't send data to splunk", zap.Error(err), zap.Int("retries", p.config.Retry)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/stdout/stdout.go b/plugin/output/stdout/stdout.go index 98fa2b60b..b581d0456 100644 --- a/plugin/output/stdout/stdout.go +++ b/plugin/output/stdout/stdout.go @@ -13,6 +13,7 @@ It writes events to stdout(also known as console). type Plugin struct { controller pipeline.OutputPluginController + router *pipeline.Router } type Config struct{} @@ -30,6 +31,7 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { func (p *Plugin) Start(_ pipeline.AnyConfig, params *pipeline.OutputPluginParams) { p.controller = params.Controller + p.router = params.Router } func (_ *Plugin) Stop() {} diff --git a/test/test.go b/test/test.go index 37e73e12e..6efd3a307 100644 --- a/test/test.go +++ b/test/test.go @@ -210,6 +210,7 @@ func NewEmptyOutputPluginParams() *pipeline.OutputPluginParams { return &pipeline.OutputPluginParams{ PluginDefaultParams: newDefaultParams(), Logger: newLogger().Named("output"), + Router: pipeline.NewRouter(), } }