diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 23fe2bf2b..f11bd69c5 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -35,7 +35,7 @@ jobs:
GOFLAGS: ${{ matrix.flags }}
LOG_LEVEL: error
run: |
- go test -coverprofile=profile${{ matrix.flags }}.out -covermode=atomic -v -coverpkg=./... ./... -json > test-report${{ matrix.flags }}.json
+ go test -coverprofile=profile${{ matrix.flags }}.out -covermode=atomic -v -coverpkg=./... ./... -json | tee test-report${{ matrix.flags }}.json
- name: Coverage report
run: |
@@ -101,7 +101,7 @@ jobs:
GOFLAGS: ${{ matrix.flags }}
LOG_LEVEL: error
run: |
- go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json > test-report-e2e${{ matrix.flags }}.json
+ go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json | tee test-report-e2e${{ matrix.flags }}.json
- name: Coverage report
run: |
diff --git a/_sidebar.idoc.md b/_sidebar.idoc.md
index 0573472e5..a70f5603b 100644
--- a/_sidebar.idoc.md
+++ b/_sidebar.idoc.md
@@ -16,7 +16,7 @@
@global-contents-table-plugin-input|links-list
- Action
@global-contents-table-plugin-action|links-list
- - Output
+ - [Output](/plugin/output/README.md)
@global-contents-table-plugin-output|links-list
- **Pipeline**
diff --git a/_sidebar.md b/_sidebar.md
index 637498584..cc4491022 100644
--- a/_sidebar.md
+++ b/_sidebar.md
@@ -50,7 +50,7 @@
- [split](plugin/action/split/README.md)
- [throttle](plugin/action/throttle/README.md)
- - Output
+ - [Output](/plugin/output/README.md)
- [clickhouse](plugin/output/clickhouse/README.md)
- [devnull](plugin/output/devnull/README.md)
- [elasticsearch](plugin/output/elasticsearch/README.md)
diff --git a/e2e/file_clickhouse/config.yml b/e2e/file_clickhouse/config.yml
index 63de453ee..7097ea0e2 100644
--- a/e2e/file_clickhouse/config.yml
+++ b/e2e/file_clickhouse/config.yml
@@ -21,11 +21,59 @@ pipelines:
override: true
- type: debug
output:
+ deadqueue:
+ type: clickhouse
+ addresses:
+ - 127.0.0.1:9001
+ table: test_table_insert
+ insert_timeout: 1m
+ columns:
+ - name: c1
+ type: String
+ - name: c2
+ type: Int8
+ - name: c3
+ type: Int16
+ - name: c4
+ type: Nullable(Int16)
+ - name: c5
+ type: Nullable(String)
+ - name: level
+ type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4)
+ - name: ipv4
+ type: Nullable(IPv4)
+ - name: ipv6
+ type: Nullable(IPv6)
+ - name: ts
+ type: DateTime
+ - name: ts_with_tz
+ type: DateTime('Europe/Moscow')
+ - name: ts64
+ type: DateTime64(3, 'UTC')
+ - name: ts64_auto
+ type: DateTime64(9, 'UTC')
+ - name: ts_rfc3339nano
+ type: DateTime64(9)
+ - name: f32
+ type: Float32
+ - name: f64
+ type: Float64
+ - name: lc_str
+ type: LowCardinality(String)
+ - name: str_arr
+ type: Array(String)
+ - name: map_str_str
+ type: Map(String,String)
+ - name: uuid
+ type: UUID
+ - name: uuid_nullable
+ type: Nullable(UUID)
type: clickhouse
addresses:
- 127.0.0.1:9001
- table: test_table_insert
+ table: test_table_insert_not_exists
insert_timeout: 1m
+ retry: 0
columns:
- name: c1
type: String
diff --git a/e2e/kafka_auth/docker-compose.yml b/e2e/kafka_auth/docker-compose.yml
index 90edf224c..3e10b9acd 100644
--- a/e2e/kafka_auth/docker-compose.yml
+++ b/e2e/kafka_auth/docker-compose.yml
@@ -2,13 +2,16 @@ version: "2.1"
services:
zookeeper:
- image: docker.io/bitnami/zookeeper:3.9
+ image: zookeeper:3.9
ports:
- "2182:2181"
volumes:
- - "zookeeper_data:/bitnami"
+ - "zookeeper_data:/data"
environment:
- - ALLOW_ANONYMOUS_LOGIN=yes
+ ZOO_MY_ID: 1
+ ZOO_PORT: 2181
+ ZOO_4LW_COMMANDS_WHITELIST: "*"
+ ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
init-certs:
image: docker.io/bitnami/kafka:3.6
command: /tmp/generate.sh
diff --git a/e2e/kafka_file/docker-compose.yml b/e2e/kafka_file/docker-compose.yml
index 924743dfe..927ad4bb7 100644
--- a/e2e/kafka_file/docker-compose.yml
+++ b/e2e/kafka_file/docker-compose.yml
@@ -2,13 +2,16 @@ version: "2"
services:
zookeeper:
- image: docker.io/bitnami/zookeeper:3.8
+ image: zookeeper:3.9
ports:
- "2181:2181"
volumes:
- - "zookeeper_data:/bitnami"
+ - "zookeeper_data:/data"
environment:
- - ALLOW_ANONYMOUS_LOGIN=yes
+ ZOO_MY_ID: 1
+ ZOO_PORT: 2181
+ ZOO_4LW_COMMANDS_WHITELIST: "*"
+ ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
kafka:
image: docker.io/bitnami/kafka:3.1
ports:
diff --git a/fd/file.d.go b/fd/file.d.go
index 055083883..11e1eb0eb 100644
--- a/fd/file.d.go
+++ b/fd/file.d.go
@@ -222,6 +222,13 @@ func (f *FileD) setupOutput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCo
PluginRuntimeInfo: f.instantiatePlugin(info),
})
+ if info.DeadQueueInfo != nil {
+ p.SetDeadQueueOutput(&pipeline.OutputPluginInfo{
+ PluginStaticInfo: info.DeadQueueInfo,
+ PluginRuntimeInfo: f.instantiatePlugin(info.DeadQueueInfo),
+ })
+ }
+
return nil
}
@@ -249,6 +256,39 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip
if err != nil {
return nil, err
}
+
+ deadqueue := configJSON.Get("deadqueue")
+ var deadqueueInfo *pipeline.PluginStaticInfo
+ if deadqueueMap := deadqueue.MustMap(); deadqueueMap != nil {
+ if len(deadqueueMap) > 0 {
+ deadqueueType := deadqueue.Get("type").MustString()
+ if deadqueueType == "" {
+ return nil, fmt.Errorf("deadqueue of %s doesn't have type", pluginKind)
+ }
+ deadqueueInfo, err = f.plugins.Get(pluginKind, deadqueueType)
+ if err != nil {
+ return nil, err
+ }
+
+ deadqueue.Del("type")
+
+ deadqueueConfigJson, err := deadqueue.Encode()
+ if err != nil {
+ logger.Panicf("can't create config json for %s deadqueue", deadqueueType)
+ }
+
+ config, err := pipeline.GetConfig(deadqueueInfo, deadqueueConfigJson, values)
+ if err != nil {
+ logger.Fatalf("error on creating deadqueue of %s with type %q: %s", deadqueueType, pluginKind, err.Error())
+ }
+ deadqueueInfo.Config = config
+
+ // TODO: recursive deadqueue config
+ // deadqueueForDeadqueue := deadqueue.Get("deadqueue").MustMap()
+ }
+ configJSON.Del("deadqueue")
+ }
+
configJson, err := configJSON.Encode()
if err != nil {
logger.Panicf("can't create config json for %s", t)
@@ -260,6 +300,9 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip
infoCopy := *info
infoCopy.Config = config
+ if deadqueueInfo != nil {
+ infoCopy.DeadQueueInfo = deadqueueInfo
+ }
return &infoCopy, nil
}
diff --git a/pipeline/backoff.go b/pipeline/backoff.go
index 06ca8db89..735c42eae 100644
--- a/pipeline/backoff.go
+++ b/pipeline/backoff.go
@@ -8,25 +8,28 @@ import (
)
type RetriableBatcher struct {
- outFn RetriableBatcherOutFn
- batcher *Batcher
- backoffOpts BackoffOpts
- onRetryError func(err error)
+ outFn RetriableBatcherOutFn
+ batcher *Batcher
+ backoffOpts BackoffOpts
+ isDeadQueueAvailable bool
+ onRetryError func(err error, events []*Event)
}
type RetriableBatcherOutFn func(*WorkerData, *Batch) error
type BackoffOpts struct {
- MinRetention time.Duration
- Multiplier float64
- AttemptNum int
+ MinRetention time.Duration
+ Multiplier float64
+ AttemptNum int
+ IsDeadQueueAvailable bool
}
-func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher {
+func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error, events []*Event)) *RetriableBatcher {
batcherBackoff := &RetriableBatcher{
- outFn: batcherOutFn,
- backoffOpts: opts,
- onRetryError: onError,
+ outFn: batcherOutFn,
+ backoffOpts: opts,
+ onRetryError: onError,
+ isDeadQueueAvailable: opts.IsDeadQueueAvailable,
}
batcherBackoff.setBatcher(batcherOpts)
return batcherBackoff
@@ -58,7 +61,15 @@ func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) {
}
next := exponentionalBackoff.NextBackOff()
if next == backoff.Stop || (b.backoffOpts.AttemptNum >= 0 && numTries > b.backoffOpts.AttemptNum) {
- b.onRetryError(err)
+ var events []*Event
+ if batch != nil {
+ events = batch.events
+ }
+ b.onRetryError(err, events)
+ if batch != nil && b.isDeadQueueAvailable {
+ batch.reset()
+ batch.status = BatchStatusInDeadQueue
+ }
return
}
numTries++
diff --git a/pipeline/backoff_test.go b/pipeline/backoff_test.go
index 1d73a2bf1..fd5c01ea3 100644
--- a/pipeline/backoff_test.go
+++ b/pipeline/backoff_test.go
@@ -3,6 +3,7 @@ package pipeline
import (
"errors"
"testing"
+ "time"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
@@ -17,7 +18,7 @@ func TestBackoff(t *testing.T) {
eventCount := &atomic.Int32{}
eventCountBefore := eventCount.Load()
- errorFn := func(err error) {
+ errorFn := func(err error, events []*Event) {
errorCount.Inc()
}
@@ -29,12 +30,17 @@ func TestBackoff(t *testing.T) {
eventCount.Inc()
return nil
},
- BackoffOpts{AttemptNum: 3},
+ BackoffOpts{
+ AttemptNum: 3,
+ IsDeadQueueAvailable: false,
+ },
errorFn,
)
- batcherBackoff.Out(nil, nil)
-
+ data := WorkerData(nil)
+ batch := newBatch(1, 1, 1*time.Second)
+ batch.append(&Event{})
+ batcherBackoff.Out(&data, batch)
assert.Equal(t, errorCountBefore, errorCount.Load(), "wrong error count")
assert.Equal(t, eventCountBefore+1, eventCount.Load(), "wrong event count")
}
@@ -42,7 +48,37 @@ func TestBackoff(t *testing.T) {
func TestBackoffWithError(t *testing.T) {
errorCount := &atomic.Int32{}
prevValue := errorCount.Load()
- errorFn := func(err error) {
+ errorFn := func(err error, events []*Event) {
+ errorCount.Inc()
+ }
+
+ batcherBackoff := NewRetriableBatcher(
+ &BatcherOptions{
+ MetricCtl: metric.NewCtl("", prometheus.NewRegistry()),
+ },
+ func(workerData *WorkerData, batch *Batch) error {
+ return errors.New("some error")
+ },
+ BackoffOpts{
+ AttemptNum: 3,
+ IsDeadQueueAvailable: false,
+ },
+ errorFn,
+ )
+
+ data := WorkerData(nil)
+ batch := newBatch(1, 1, 1*time.Second)
+ batch.append(&Event{})
+ batcherBackoff.Out(&data, batch)
+ assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count")
+ assert.Equal(t, 1, len(batch.events), "wrong number of events in batch")
+ assert.Equal(t, BatchStatusNotReady, batch.status, "wrong batch status")
+}
+
+func TestBackoffWithErrorWithDeadQueue(t *testing.T) {
+ errorCount := &atomic.Int32{}
+ prevValue := errorCount.Load()
+ errorFn := func(err error, events []*Event) {
errorCount.Inc()
}
@@ -53,10 +89,18 @@ func TestBackoffWithError(t *testing.T) {
func(workerData *WorkerData, batch *Batch) error {
return errors.New("some error")
},
- BackoffOpts{AttemptNum: 3},
+ BackoffOpts{
+ AttemptNum: 3,
+ IsDeadQueueAvailable: true,
+ },
errorFn,
)
- batcherBackoff.Out(nil, nil)
+ data := WorkerData(nil)
+ batch := newBatch(1, 1, 1*time.Second)
+ batch.append(&Event{})
+ batcherBackoff.Out(&data, batch)
assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count")
+ assert.Equal(t, 0, len(batch.events), "wrong number of events in batch")
+ assert.Equal(t, BatchStatusInDeadQueue, batch.status, "wrong batch status")
}
diff --git a/pipeline/batch.go b/pipeline/batch.go
index 22ee3bcfa..03af1fe01 100644
--- a/pipeline/batch.go
+++ b/pipeline/batch.go
@@ -16,6 +16,7 @@ const (
BatchStatusNotReady BatchStatus = iota
BatchStatusMaxSizeExceeded
BatchStatusTimeoutExceeded
+ BatchStatusInDeadQueue
)
type Batch struct {
@@ -125,11 +126,12 @@ type Batcher struct {
outSeq int64
commitSeq int64
- batchOutFnSeconds prometheus.Observer
- commitWaitingSeconds prometheus.Observer
- workersInProgress prometheus.Gauge
- batchesDoneByMaxSize prometheus.Counter
- batchesDoneByTimeout prometheus.Counter
+ batchOutFnSeconds prometheus.Observer
+ commitWaitingSeconds prometheus.Observer
+ workersInProgress prometheus.Gauge
+ batchesDoneByMaxSize prometheus.Counter
+ batchesDoneByTimeout prometheus.Counter
+ batchesRoutedToDeadQueue prometheus.Counter
}
type (
@@ -163,16 +165,17 @@ func NewBatcher(opts BatcherOptions) *Batcher { // nolint: gocritic // hugeParam
seqMu := &sync.Mutex{}
return &Batcher{
- seqMu: seqMu,
- cond: sync.NewCond(seqMu),
- freeBatches: freeBatches,
- fullBatches: fullBatches,
- opts: opts,
- batchOutFnSeconds: ctl.RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong),
- commitWaitingSeconds: ctl.RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed),
- workersInProgress: ctl.RegisterGauge("batcher_workers_in_progress", ""),
- batchesDoneByMaxSize: jobsDone.WithLabelValues("max_size_exceeded"),
- batchesDoneByTimeout: jobsDone.WithLabelValues("timeout_exceeded"),
+ seqMu: seqMu,
+ cond: sync.NewCond(seqMu),
+ freeBatches: freeBatches,
+ fullBatches: fullBatches,
+ opts: opts,
+ batchOutFnSeconds: ctl.RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong),
+ commitWaitingSeconds: ctl.RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed),
+ workersInProgress: ctl.RegisterGauge("batcher_workers_in_progress", ""),
+ batchesDoneByMaxSize: jobsDone.WithLabelValues("max_size_exceeded"),
+ batchesDoneByTimeout: jobsDone.WithLabelValues("timeout_exceeded"),
+ batchesRoutedToDeadQueue: jobsDone.WithLabelValues("routed_in_deadqueue"),
}
}
@@ -220,6 +223,8 @@ func (b *Batcher) work() {
b.batchesDoneByMaxSize.Inc()
case BatchStatusTimeoutExceeded:
b.batchesDoneByTimeout.Inc()
+ case BatchStatusInDeadQueue:
+ b.batchesRoutedToDeadQueue.Inc()
default:
logger.Panic("unreachable")
}
diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go
index 50d896f4f..261e5bc57 100644
--- a/pipeline/pipeline.go
+++ b/pipeline/pipeline.go
@@ -113,8 +113,7 @@ type Pipeline struct {
procCount *atomic.Int32
activeProcs *atomic.Int32
- output OutputPlugin
- outputInfo *OutputPluginInfo
+ router *Router
metricHolder *metric.Holder
@@ -199,6 +198,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap
PipelineSettings: settings,
MetricCtl: metricCtl,
},
+ router: NewRouter(),
actionMetrics: actionMetrics{
m: make(map[string]*actionMetric),
mu: new(sync.RWMutex),
@@ -301,7 +301,7 @@ func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux) {
if p.input == nil {
p.logger.Panic("input isn't set")
}
- if p.output == nil {
+ if p.router.output == nil {
p.logger.Panic("output isn't set")
}
@@ -321,7 +321,7 @@ func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux) {
}
}
- for hName, handler := range p.outputInfo.PluginStaticInfo.Endpoints {
+ for hName, handler := range p.router.outputInfo.PluginStaticInfo.Endpoints {
mux.HandleFunc(fmt.Sprintf("%s/%d/%s", prefix, len(p.actionInfos)+1, hName), handler)
}
}
@@ -330,7 +330,7 @@ func (p *Pipeline) Start() {
if p.input == nil {
p.logger.Panic("input isn't set")
}
- if p.output == nil {
+ if p.router.output == nil {
p.logger.Panic("output isn't set")
}
@@ -339,11 +339,11 @@ func (p *Pipeline) Start() {
outputParams := &OutputPluginParams{
PluginDefaultParams: p.actionParams,
Controller: p,
- Logger: p.logger.Sugar().Named("output").Named(p.outputInfo.Type),
+ Logger: p.logger.Sugar().Named("output").Named(p.router.outputInfo.Type),
}
- p.logger.Info("starting output plugin", zap.String("name", p.outputInfo.Type))
+ p.logger.Info("starting output plugin", zap.String("name", p.router.outputInfo.Type))
- p.output.Start(p.outputInfo.Config, outputParams)
+ p.router.Start(outputParams)
p.logger.Info("stating processors", zap.Int("count", len(p.Procs)))
for _, processor := range p.Procs {
@@ -382,7 +382,7 @@ func (p *Pipeline) Stop() {
p.input.Stop()
p.logger.Info("stopping output")
- p.output.Stop()
+ p.router.Stop()
p.shouldStop.Store(true)
@@ -399,12 +399,15 @@ func (p *Pipeline) GetInput() InputPlugin {
}
func (p *Pipeline) SetOutput(info *OutputPluginInfo) {
- p.outputInfo = info
- p.output = info.Plugin.(OutputPlugin)
+ p.router.SetOutput(info)
+}
+
+func (p *Pipeline) SetDeadQueueOutput(info *OutputPluginInfo) {
+ p.router.SetDeadQueueOutput(info)
}
func (p *Pipeline) GetOutput() OutputPlugin {
- return p.output
+ return p.router.output
}
// In decodes message and passes it to event stream.
@@ -624,6 +627,7 @@ func (p *Pipeline) streamEvent(event *Event) uint64 {
}
func (p *Pipeline) Commit(event *Event) {
+ p.router.Ack(event)
p.finalize(event, true, true)
}
@@ -758,7 +762,7 @@ func (p *Pipeline) newProc(id int) *processor {
id,
&p.actionMetrics,
p.activeProcs,
- p.output,
+ p.router,
p.streamer,
p.finalize,
p.IncMaxEventSizeExceeded,
diff --git a/pipeline/plugin.go b/pipeline/plugin.go
index 3e0a7c749..95da53d8e 100644
--- a/pipeline/plugin.go
+++ b/pipeline/plugin.go
@@ -59,6 +59,7 @@ type ActionPluginParams struct {
type OutputPluginParams struct {
PluginDefaultParams
Controller OutputPluginController
+ Router *Router
Logger *zap.SugaredLogger
}
@@ -77,6 +78,8 @@ type PluginStaticInfo struct {
// Every plugin can provide their own API through Endpoints.
Endpoints map[string]func(http.ResponseWriter, *http.Request)
AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config
+ // TODO: maybe to OutputPluginStaticInfo cause uses by output and action plugins?
+ DeadQueueInfo *PluginStaticInfo
}
type PluginRuntimeInfo struct {
diff --git a/pipeline/processor.go b/pipeline/processor.go
index cf02c51c8..f0efbbe33 100644
--- a/pipeline/processor.go
+++ b/pipeline/processor.go
@@ -55,7 +55,7 @@ func allEventStatuses() []eventStatus {
type processor struct {
id int
streamer *streamer
- output OutputPlugin
+ router *Router
finalize finalizeFn
activeCounter *atomic.Int32
@@ -78,7 +78,7 @@ func newProcessor(
id int,
actionMetrics *actionMetrics,
activeCounter *atomic.Int32,
- output OutputPlugin,
+ router *Router,
streamer *streamer,
finalizeFn finalizeFn,
incMaxEventSizeExceededFn func(lvs ...string),
@@ -88,7 +88,7 @@ func newProcessor(
id: id,
streamer: streamer,
actionMetrics: actionMetrics,
- output: output,
+ router: router,
finalize: finalizeFn,
activeCounter: activeCounter,
@@ -153,7 +153,7 @@ func (p *processor) processSequence(event *Event) bool {
}
event.stage = eventStageOutput
- p.output.Out(event)
+ p.router.Out(event)
}
return true
@@ -447,7 +447,7 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
ok, _ := p.doActions(child)
if ok {
child.stage = eventStageOutput
- p.output.Out(child)
+ p.router.Out(child)
}
}
diff --git a/pipeline/router.go b/pipeline/router.go
new file mode 100644
index 000000000..31f8c587d
--- /dev/null
+++ b/pipeline/router.go
@@ -0,0 +1,56 @@
+package pipeline
+
+type Router struct {
+ output OutputPlugin
+ outputInfo *OutputPluginInfo
+
+ deadQueue OutputPlugin
+ deadQueueInfo *OutputPluginInfo
+}
+
+func NewRouter() *Router {
+ return &Router{}
+}
+
+func (r *Router) SetOutput(info *OutputPluginInfo) {
+ r.outputInfo = info
+ r.output = info.Plugin.(OutputPlugin)
+}
+
+func (r *Router) SetDeadQueueOutput(info *OutputPluginInfo) {
+ r.deadQueueInfo = info
+ r.deadQueue = info.Plugin.(OutputPlugin)
+}
+
+func (r *Router) Ack(event *Event) {
+ // TODO: send commit to input after receiving all acks from outputs
+}
+
+func (r *Router) Fail(event *Event) {
+ if r.IsDeadQueueAvailable() {
+ r.deadQueue.Out(event)
+ }
+}
+
+func (r *Router) Out(event *Event) {
+ r.output.Out(event)
+}
+
+func (r *Router) Stop() {
+ r.output.Stop()
+ if r.IsDeadQueueAvailable() {
+ r.deadQueue.Stop()
+ }
+}
+
+func (r *Router) IsDeadQueueAvailable() bool {
+ return r.deadQueue != nil
+}
+
+func (r *Router) Start(params *OutputPluginParams) {
+ params.Router = r
+ r.output.Start(r.outputInfo.Config, params)
+ if r.IsDeadQueueAvailable() {
+ r.deadQueue.Start(r.deadQueueInfo.Config, params)
+ }
+}
diff --git a/pipeline/router_test.go b/pipeline/router_test.go
new file mode 100644
index 000000000..f6518b681
--- /dev/null
+++ b/pipeline/router_test.go
@@ -0,0 +1,160 @@
+package pipeline_test
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/plugin/output/devnull"
+ "github.com/ozontech/file.d/test"
+ insaneJSON "github.com/ozontech/insane-json"
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/atomic"
+)
+
+type fakeOutputPluginController struct {
+ mu sync.Mutex
+ commits []*pipeline.Event
+ errors []string
+}
+
+func (f *fakeOutputPluginController) Commit(event *pipeline.Event) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.commits = append(f.commits, event)
+}
+func (f *fakeOutputPluginController) Error(err string) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.errors = append(f.errors, err)
+}
+
+func (f *fakeOutputPluginController) getCommits() []*pipeline.Event {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ return f.commits
+}
+
+func (f *fakeOutputPluginController) getErrors() []string {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ return f.errors
+}
+
+func TestRouterNormalProcessing(t *testing.T) {
+ t.Parallel()
+
+ r := pipeline.NewRouter()
+ controller := &fakeOutputPluginController{}
+
+ // Setup main output that succeeds
+ var outputCount atomic.Int32
+ outputPlugin, outputConfig := createDevNullPlugin(func(event *pipeline.Event) {
+ outputCount.Add(1)
+ })
+
+ r.SetOutput(&pipeline.OutputPluginInfo{
+ PluginStaticInfo: &pipeline.PluginStaticInfo{Config: outputConfig},
+ PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: outputPlugin},
+ })
+
+ // Setup dead queue that shouldn't be used
+ var deadQueueCount atomic.Int32
+ deadQueuePlugin, deadQueueConfig := createDevNullPlugin(func(event *pipeline.Event) {
+ deadQueueCount.Add(1)
+ })
+
+ r.SetDeadQueueOutput(&pipeline.OutputPluginInfo{
+ PluginStaticInfo: &pipeline.PluginStaticInfo{Config: deadQueueConfig},
+ PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: deadQueuePlugin},
+ })
+
+ params := test.NewEmptyOutputPluginParams()
+ params.PipelineName = "test_pipeline"
+ params.Router = r
+ params.Controller = controller
+ r.Start(params)
+ defer r.Stop()
+
+ // Send test event
+ event := newEvent(t)
+ r.Out(event)
+
+ // Wait for processing
+ assert.Eventually(t, func() bool {
+ return outputCount.Load() == 1
+ }, 100*time.Millisecond, 10*time.Millisecond, "event should be processed")
+
+ assert.Equal(t, int32(0), deadQueueCount.Load(), "dead queue should not be used")
+ assert.Len(t, controller.getCommits(), 1, "should commit successful event")
+ assert.Empty(t, controller.getErrors(), "should not produce errors")
+}
+
+func TestRouterDeadQueueProcessing(t *testing.T) {
+ t.Parallel()
+
+ r := pipeline.NewRouter()
+ controller := &fakeOutputPluginController{}
+
+ // Setup main output that fails
+ var outputCount atomic.Int32
+ outputPlugin, outputConfig := createDevNullPlugin(func(event *pipeline.Event) {
+ outputCount.Add(1)
+ r.Fail(event)
+ })
+
+ r.SetOutput(&pipeline.OutputPluginInfo{
+ PluginStaticInfo: &pipeline.PluginStaticInfo{Config: outputConfig},
+ PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: outputPlugin},
+ })
+
+ // Setup dead queue
+ var deadQueueCount atomic.Int32
+ deadQueuePlugin, deadQueueConfig := createDevNullPlugin(func(event *pipeline.Event) {
+ deadQueueCount.Add(1)
+ })
+
+ r.SetDeadQueueOutput(&pipeline.OutputPluginInfo{
+ PluginStaticInfo: &pipeline.PluginStaticInfo{Config: deadQueueConfig},
+ PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: deadQueuePlugin},
+ })
+
+ params := test.NewEmptyOutputPluginParams()
+ params.PipelineName = "test_pipeline"
+ params.Router = r
+ params.Controller = controller
+ r.Start(params)
+ defer r.Stop()
+
+ // Send test event
+ event := newEvent(t)
+ r.Out(event)
+
+ // Wait for processing
+ assert.Eventually(t, func() bool {
+ return deadQueueCount.Load() == 1
+ }, 100*time.Millisecond, 10*time.Millisecond, "event should go to dead queue")
+
+ assert.Equal(t, int32(1), outputCount.Load(), "main output should try to process")
+ assert.Len(t, controller.getCommits(), 2, "should commit successful event")
+ assert.Empty(t, controller.getErrors(), "should not produce errors")
+}
+
+func createDevNullPlugin(outFn func(event *pipeline.Event)) (*devnull.Plugin, pipeline.AnyConfig) {
+ plugin, config := devnull.Factory()
+ p := plugin.(*devnull.Plugin)
+ p.SetOutFn(outFn)
+ return p, config
+}
+
+func newEvent(t *testing.T) *pipeline.Event {
+ root, err := insaneJSON.DecodeString(`{}`)
+ if err != nil {
+ t.Skip() // ignore invalid input
+ }
+ return &pipeline.Event{
+ Root: root,
+ Buf: make([]byte, 0, 1024),
+ }
+}
diff --git a/plugin/README.md b/plugin/README.md
index 1e6c8f9e4..ac1f3fe5b 100755
--- a/plugin/README.md
+++ b/plugin/README.md
@@ -764,6 +764,8 @@ It sends the event batches to Clickhouse database using
File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/clickhouse/README.md)
## devnull
It provides an API to test pipelines and other plugins.
@@ -773,6 +775,8 @@ It provides an API to test pipelines and other plugins.
It sends events into Elasticsearch. It uses `_bulk` API to send events in batches.
If a network error occurs, the batch will infinitely try to be delivered to the random endpoint.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/elasticsearch/README.md)
## file
It sends event batches into files.
@@ -796,18 +800,26 @@ GELF messages are separated by null byte. Each message is a JSON with the follow
Every field with an underscore prefix `_` will be treated as an extra field.
Allowed characters in field names are letters, numbers, underscores, dashes, and dots.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/gelf/README.md)
## kafka
It sends the event batches to kafka brokers using `franz-go` lib.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/kafka/README.md)
## loki
It sends the logs batches to Loki using HTTP API.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/loki/README.md)
## postgres
It sends the event batches to postgres db using pgx.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/postgres/README.md)
## s3
Sends events to s3 output of one or multiple buckets.
@@ -936,6 +948,8 @@ Out:
}
```
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/splunk/README.md)
## stdout
It writes events to stdout(also known as console).
diff --git a/plugin/output/README.idoc.md b/plugin/output/README.idoc.md
index 70d8c3b7f..3e5b3a9c3 100644
--- a/plugin/output/README.idoc.md
+++ b/plugin/output/README.idoc.md
@@ -1,3 +1,74 @@
# Output plugins
-@global-contents-table-plugin-output|contents-table
\ No newline at end of file
+@global-contents-table-plugin-output|contents-table
+
+## dead queue
+
+Failed events from the main pipeline are redirected to a dead-letter queue (DLQ) to prevent data loss and enable recovery.
+
+### Examples
+
+#### Dead queue to the reserve elasticsearch
+
+Consumes logs from a Kafka topic. Sends them to Elasticsearch (primary cluster). Fails over to a reserve ("dead-letter") Elasticsearch if the primary is unavailable.
+
+```yaml
+main_pipeline:
+ input:
+ type: kafka
+ brokers:
+ - kafka:9092
+ topics:
+ - logs
+ output:
+ type: elasticsearch
+ workers_count: 32
+ endpoints:
+ - http://elasticsearch-primary:9200
+ # route to reserve elasticsearch
+ deadqueue:
+ endpoints:
+ - http://elasticsearch-reserve:9200
+ type: elasticsearch
+```
+
+#### Dead queue with second kafka topic and low priority consumer
+
+Main Pipeline: Processes logs from Kafka → Elasticsearch. Failed events go to a dead-letter Kafka topic.
+
+Dead-Queue Pipeline: Re-processes failed events from the DLQ topic with lower priority.
+
+```yaml
+main_pipeline:
+ input:
+ type: kafka
+ brokers:
+ - kafka:9092
+ topics:
+ - logs
+ output:
+ type: elasticsearch
+ workers_count: 32
+ endpoints:
+ - http://elasticsearch:9200
+ # route to deadqueue pipeline
+ deadqueue:
+ brokers:
+ - kafka:9092
+ default_topic: logs-deadqueue
+ type: kafka
+
+deadqueue_pipeline:
+ input:
+ type: kafka
+ brokers:
+ - kafka:9092
+ topics:
+ - logs-deadqueue
+ output:
+ type: elasticsearch
+ workers_count: 1 # low priority
+ fatal_on_failed_insert: false
+ endpoints:
+ - http://elasticsearch:9200
+```
\ No newline at end of file
diff --git a/plugin/output/README.md b/plugin/output/README.md
index f968a6b45..0a584b075 100755
--- a/plugin/output/README.md
+++ b/plugin/output/README.md
@@ -7,6 +7,8 @@ It sends the event batches to Clickhouse database using
File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/clickhouse/README.md)
## devnull
It provides an API to test pipelines and other plugins.
@@ -16,6 +18,8 @@ It provides an API to test pipelines and other plugins.
It sends events into Elasticsearch. It uses `_bulk` API to send events in batches.
If a network error occurs, the batch will infinitely try to be delivered to the random endpoint.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/elasticsearch/README.md)
## file
It sends event batches into files.
@@ -39,18 +43,26 @@ GELF messages are separated by null byte. Each message is a JSON with the follow
Every field with an underscore prefix `_` will be treated as an extra field.
Allowed characters in field names are letters, numbers, underscores, dashes, and dots.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/gelf/README.md)
## kafka
It sends the event batches to kafka brokers using `franz-go` lib.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/kafka/README.md)
## loki
It sends the logs batches to Loki using HTTP API.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/loki/README.md)
## postgres
It sends the event batches to postgres db using pgx.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/postgres/README.md)
## s3
Sends events to s3 output of one or multiple buckets.
@@ -179,9 +191,82 @@ Out:
}
```
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
[More details...](plugin/output/splunk/README.md)
## stdout
It writes events to stdout(also known as console).
[More details...](plugin/output/stdout/README.md)
+
+## dead queue
+
+Failed events from the main pipeline are redirected to a dead-letter queue (DLQ) to prevent data loss and enable recovery.
+
+### Examples
+
+#### Dead queue to the reserve elasticsearch
+
+Consumes logs from a Kafka topic. Sends them to Elasticsearch (primary cluster). Fails over to a reserve ("dead-letter") Elasticsearch if the primary is unavailable.
+
+```yaml
+main_pipeline:
+ input:
+ type: kafka
+ brokers:
+ - kafka:9092
+ topics:
+ - logs
+ output:
+ type: elasticsearch
+ workers_count: 32
+ endpoints:
+ - http://elasticsearch-primary:9200
+ # route to reserve elasticsearch
+ deadqueue:
+ endpoints:
+ - http://elasticsearch-reserve:9200
+ type: elasticsearch
+```
+
+#### Dead queue with second kafka topic and low priority consumer
+
+Main Pipeline: Processes logs from Kafka → Elasticsearch. Failed events go to a dead-letter Kafka topic.
+
+Dead-Queue Pipeline: Re-processes failed events from the DLQ topic with lower priority.
+
+```yaml
+main_pipeline:
+ input:
+ type: kafka
+ brokers:
+ - kafka:9092
+ topics:
+ - logs
+ output:
+ type: elasticsearch
+ workers_count: 32
+ endpoints:
+ - http://elasticsearch:9200
+ # route to deadqueue pipeline
+ deadqueue:
+ brokers:
+ - kafka:9092
+ default_topic: logs-deadqueue
+ type: kafka
+
+deadqueue_pipeline:
+ input:
+ type: kafka
+ brokers:
+ - kafka:9092
+ topics:
+ - logs-deadqueue
+ output:
+ type: elasticsearch
+ workers_count: 1 # low priority
+ fatal_on_failed_insert: false
+ endpoints:
+ - http://elasticsearch:9200
+```
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md
index a59df51b0..08b1b374e 100644
--- a/plugin/output/clickhouse/README.md
+++ b/plugin/output/clickhouse/README.md
@@ -5,6 +5,8 @@ It sends the event batches to Clickhouse database using
File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
### Config params
**`addresses`** *`[]Address`* *`required`*
@@ -128,8 +130,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in
**`fatal_on_failed_insert`** *`bool`* *`default=false`*
-After an insert error, fall with a non-zero exit code or not
-**Experimental feature**
+After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go
index 04e69ef32..ca3338647 100644
--- a/plugin/output/clickhouse/clickhouse.go
+++ b/plugin/output/clickhouse/clickhouse.go
@@ -30,6 +30,8 @@ It sends the event batches to Clickhouse database using
[Native protocol](https://clickhouse.com/docs/en/interfaces/tcp/).
File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features.
+
+Supports [dead queue](/plugin/output/README.md#dead-queue).
}*/
const (
@@ -58,6 +60,8 @@ type Plugin struct {
// plugin metrics
insertErrorsMetric prometheus.Counter
queriesCountMetric prometheus.Counter
+
+ router *pipeline.Router
}
type Setting struct {
@@ -245,8 +249,7 @@ type Config struct {
// > @3@4@5@6
// >
- // > After an insert error, fall with a non-zero exit code or not
- // > **Experimental feature**
+ // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
// > @3@4@5@6
@@ -437,15 +440,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
MetricCtl: params.MetricCtl,
}
+ p.router = params.Router
backoffOpts := pipeline.BackoffOpts{
- MinRetention: p.config.Retention_,
- Multiplier: float64(p.config.RetentionExponentMultiplier),
- AttemptNum: p.config.Retry,
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: p.config.Retry,
+ IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
}
- onError := func(err error) {
+ onError := func(err error, events []*pipeline.Event) {
var level zapcore.Level
- if p.config.FatalOnFailedInsert {
+ if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
@@ -454,6 +459,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.logger.Log(level, "can't insert to the table", zap.Error(err),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))
+
+ for i := range events {
+ p.router.Fail(events[i])
+ }
}
p.batcher = pipeline.NewRetriableBatcher(
diff --git a/plugin/output/devnull/devnull.go b/plugin/output/devnull/devnull.go
index 09522be46..4c2668ff6 100644
--- a/plugin/output/devnull/devnull.go
+++ b/plugin/output/devnull/devnull.go
@@ -12,6 +12,7 @@ It provides an API to test pipelines and other plugins.
type Plugin struct {
controller pipeline.OutputPluginController
+ router *pipeline.Router
outFn func(event *pipeline.Event)
total *atomic.Int64
}
@@ -31,6 +32,7 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
func (p *Plugin) Start(_ pipeline.AnyConfig, params *pipeline.OutputPluginParams) {
p.controller = params.Controller
+ p.router = params.Router
p.total = &atomic.Int64{}
}
diff --git a/plugin/output/elasticsearch/README.md b/plugin/output/elasticsearch/README.md
index 409c8ecee..21114e946 100755
--- a/plugin/output/elasticsearch/README.md
+++ b/plugin/output/elasticsearch/README.md
@@ -2,6 +2,8 @@
It sends events into Elasticsearch. It uses `_bulk` API to send events in batches.
If a network error occurs, the batch will infinitely try to be delivered to the random endpoint.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
### Config params
**`endpoints`** *`[]string`* *`required`*
@@ -128,8 +130,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in
**`fatal_on_failed_insert`** *`bool`* *`default=false`*
-After an insert error, fall with a non-zero exit code or not
-**Experimental feature**
+After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go
index c96fea372..27d82e9ca 100644
--- a/plugin/output/elasticsearch/elasticsearch.go
+++ b/plugin/output/elasticsearch/elasticsearch.go
@@ -23,6 +23,8 @@ import (
/*{ introduction
It sends events into Elasticsearch. It uses `_bulk` API to send events in batches.
If a network error occurs, the batch will infinitely try to be delivered to the random endpoint.
+
+Supports [dead queue](/plugin/output/README.md#dead-queue).
}*/
const (
@@ -50,6 +52,8 @@ type Plugin struct {
// plugin metrics
sendErrorMetric *prometheus.CounterVec
indexingErrorsMetric prometheus.Counter
+
+ router *pipeline.Router
}
// ! config-params
@@ -167,8 +171,7 @@ type Config struct {
// > @3@4@5@6
// >
- // > After an insert error, fall with a non-zero exit code or not
- // > **Experimental feature**
+ // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
// > @3@4@5@6
@@ -256,15 +259,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
MetricCtl: params.MetricCtl,
}
+ p.router = params.Router
backoffOpts := pipeline.BackoffOpts{
- MinRetention: p.config.Retention_,
- Multiplier: float64(p.config.RetentionExponentMultiplier),
- AttemptNum: p.config.Retry,
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: p.config.Retry,
+ IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
}
- onError := func(err error) {
+ onError := func(err error, events []*pipeline.Event) {
var level zapcore.Level
- if p.config.FatalOnFailedInsert {
+ if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
@@ -273,6 +278,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.logger.Log(level, "can't send to the elastic", zap.Error(err),
zap.Int("retries", p.config.Retry),
)
+
+ for i := range events {
+ p.router.Fail(events[i])
+ }
}
p.batcher = pipeline.NewRetriableBatcher(
diff --git a/plugin/output/gelf/README.md b/plugin/output/gelf/README.md
index 3e5738d3c..5666789d7 100755
--- a/plugin/output/gelf/README.md
+++ b/plugin/output/gelf/README.md
@@ -16,6 +16,8 @@ GELF messages are separated by null byte. Each message is a JSON with the follow
Every field with an underscore prefix `_` will be treated as an extra field.
Allowed characters in field names are letters, numbers, underscores, dashes, and dots.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
### Config params
**`endpoint`** *`string`* *`required`*
@@ -121,8 +123,7 @@ After this timeout the batch will be sent even if batch isn't completed.
**`retry`** *`int`* *`default=0`*
-Retries of insertion. If File.d cannot insert for this number of attempts,
-File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go
index 1b01226c7..8b83f5714 100644
--- a/plugin/output/gelf/gelf.go
+++ b/plugin/output/gelf/gelf.go
@@ -33,6 +33,8 @@ GELF messages are separated by null byte. Each message is a JSON with the follow
Every field with an underscore prefix `_` will be treated as an extra field.
Allowed characters in field names are letters, numbers, underscores, dashes, and dots.
+
+Supports [dead queue](/plugin/output/README.md#dead-queue).
}*/
const (
@@ -48,6 +50,8 @@ type Plugin struct {
// plugin metrics
sendErrorMetric prometheus.Counter
+
+ router *pipeline.Router
}
// ! config-params
@@ -149,8 +153,7 @@ type Config struct {
// > @3@4@5@6
// >
- // > Retries of insertion. If File.d cannot insert for this number of attempts,
- // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+ // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
Retry int `json:"retry" default:"0"` // *
// > @3@4@5@6
@@ -230,15 +233,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
MetricCtl: params.MetricCtl,
}
+ p.router = params.Router
backoffOpts := pipeline.BackoffOpts{
- MinRetention: p.config.Retention_,
- Multiplier: float64(p.config.RetentionExponentMultiplier),
- AttemptNum: p.config.Retry,
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: p.config.Retry,
+ IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
}
- onError := func(err error) {
+ onError := func(err error, events []*pipeline.Event) {
var level zapcore.Level
- if p.config.FatalOnFailedInsert {
+ if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
@@ -247,6 +252,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.logger.Desugar().Log(level, "can't send to gelf", zap.Error(err),
zap.Int("retries", p.config.Retry),
)
+
+ for i := range events {
+ p.router.Fail(events[i])
+ }
}
p.batcher = pipeline.NewRetriableBatcher(
diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md
index eda1a50ec..d1ef51b20 100755
--- a/plugin/output/kafka/README.md
+++ b/plugin/output/kafka/README.md
@@ -1,6 +1,8 @@
# Kafka output
It sends the event batches to kafka brokers using `franz-go` lib.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
### Config params
**`brokers`** *`[]string`* *`required`*
@@ -91,8 +93,7 @@ the client.ForceMetadataRefresh() function is used for some ProduceSync errors:
**`fatal_on_failed_insert`** *`bool`* *`default=false`*
-After an insert error, fall with a non-zero exit code or not
-**Experimental feature**
+After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go
index e82b0a76e..213ff57ec 100644
--- a/plugin/output/kafka/kafka.go
+++ b/plugin/output/kafka/kafka.go
@@ -18,6 +18,8 @@ import (
/*{ introduction
It sends the event batches to kafka brokers using `franz-go` lib.
+
+Supports [dead queue](/plugin/output/README.md#dead-queue).
}*/
const (
@@ -40,6 +42,7 @@ type Plugin struct {
// plugin metrics
sendErrorMetric prometheus.Counter
+ router *pipeline.Router
}
// ! config-params
@@ -126,8 +129,7 @@ type Config struct {
// > @3@4@5@6
// >
- // > After an insert error, fall with a non-zero exit code or not
- // > **Experimental feature**
+ // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
// > @3@4@5@6
@@ -257,15 +259,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
MetricCtl: params.MetricCtl,
}
+ p.router = params.Router
backoffOpts := pipeline.BackoffOpts{
- MinRetention: p.config.Retention_,
- Multiplier: float64(p.config.RetentionExponentMultiplier),
- AttemptNum: p.config.Retry,
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: p.config.Retry,
+ IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
}
- onError := func(err error) {
+ onError := func(err error, events []*pipeline.Event) {
var level zapcore.Level
- if p.config.FatalOnFailedInsert {
+ if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
@@ -274,6 +278,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.logger.Desugar().Log(level, "can't write batch",
zap.Int("retries", p.config.Retry),
)
+
+ for i := range events {
+ p.router.Fail(events[i])
+ }
}
p.batcher = pipeline.NewRetriableBatcher(
diff --git a/plugin/output/loki/README.md b/plugin/output/loki/README.md
index 051449dfd..1ce47cee1 100644
--- a/plugin/output/loki/README.md
+++ b/plugin/output/loki/README.md
@@ -1,6 +1,8 @@
# Loki output
It sends the logs batches to Loki using HTTP API.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
### Config params
**`address`** *`string`* *`required`*
@@ -137,8 +139,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in
**`fatal_on_failed_insert`** *`bool`* *`default=false`*
-After an insert error, fall with a non-zero exit code or not
-**Experimental feature**
+After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/loki/loki.go b/plugin/output/loki/loki.go
index 8353ee7eb..7837ad8ba 100644
--- a/plugin/output/loki/loki.go
+++ b/plugin/output/loki/loki.go
@@ -24,6 +24,8 @@ import (
/*{ introduction
It sends the logs batches to Loki using HTTP API.
+
+Supports [dead queue](/plugin/output/README.md#dead-queue).
}*/
var errUnixNanoFormat = errors.New("please send time in UnixNano format or add a convert_date action")
@@ -170,8 +172,7 @@ type Config struct {
// > @3@4@5@6
// >
- // > After an insert error, fall with a non-zero exit code or not
- // > **Experimental feature**
+ // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
// > @3@4@5@6
@@ -235,6 +236,8 @@ type Plugin struct {
sendErrorMetric *prometheus.CounterVec
labels map[string]string
+
+ router *pipeline.Router
}
func init() {
@@ -270,15 +273,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
MetricCtl: params.MetricCtl,
}
+ p.router = params.Router
backoffOpts := pipeline.BackoffOpts{
- MinRetention: p.config.Retention_,
- Multiplier: float64(p.config.RetentionExponentMultiplier),
- AttemptNum: p.config.Retry,
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: p.config.Retry,
+ IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
}
- onError := func(err error) {
+ onError := func(err error, events []*pipeline.Event) {
var level zapcore.Level
- if p.config.FatalOnFailedInsert {
+ if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
@@ -286,6 +291,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.logger.Log(level, "can't send data to loki", zap.Error(err),
zap.Int("retries", p.config.Retry))
+
+ for i := range events {
+ p.router.Fail(events[i])
+ }
}
p.batcher = pipeline.NewRetriableBatcher(
diff --git a/plugin/output/postgres/README.md b/plugin/output/postgres/README.md
index 9bfd08881..64a46d73b 100755
--- a/plugin/output/postgres/README.md
+++ b/plugin/output/postgres/README.md
@@ -1,6 +1,8 @@
# Postgres output
It sends the event batches to postgres db using pgx.
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
### Config params
**`strict`** *`bool`* *`default=false`*
@@ -47,8 +49,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in
**`fatal_on_failed_insert`** *`bool`* *`default=false`*
-After an insert error, fall with a non-zero exit code or not
-**Experimental feature**
+After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go
index 0126fc43f..e47b0498e 100644
--- a/plugin/output/postgres/postgres.go
+++ b/plugin/output/postgres/postgres.go
@@ -22,6 +22,8 @@ import (
/*{ introduction
It sends the event batches to postgres db using pgx.
+
+Supports [dead queue](/plugin/output/README.md#dead-queue).
}*/
/*{ example
@@ -111,6 +113,8 @@ type Plugin struct {
duplicatedEventMetric prometheus.Counter
writtenEventMetric prometheus.Counter
insertErrorsMetric prometheus.Counter
+
+ router *pipeline.Router
}
type ConfigColumn struct {
@@ -162,8 +166,7 @@ type Config struct {
// > @3@4@5@6
// >
- // > After an insert error, fall with a non-zero exit code or not
- // > **Experimental feature**
+ // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
// > @3@4@5@6
@@ -286,15 +289,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
MetricCtl: params.MetricCtl,
}
+ p.router = params.Router
backoffOpts := pipeline.BackoffOpts{
- MinRetention: p.config.Retention_,
- Multiplier: float64(p.config.RetentionExponentMultiplier),
- AttemptNum: p.config.Retry,
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: p.config.Retry,
+ IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
}
- onError := func(err error) {
+ onError := func(err error, events []*pipeline.Event) {
var level zapcore.Level
- if p.config.FatalOnFailedInsert {
+ if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
@@ -303,6 +308,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.logger.Desugar().Log(level, "can't insert to the table", zap.Error(err),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))
+
+ for i := range events {
+ p.router.Fail(events[i])
+ }
}
p.batcher = pipeline.NewRetriableBatcher(
diff --git a/plugin/output/s3/README.md b/plugin/output/s3/README.md
index 5b914e248..8564e3d1b 100755
--- a/plugin/output/s3/README.md
+++ b/plugin/output/s3/README.md
@@ -154,7 +154,6 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in
**`fatal_on_failed_insert`** *`bool`* *`default=false`*
After an insert error, fall with a non-zero exit code or not
-**Experimental feature**
diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go
index e846b605d..1e9ca0f67 100644
--- a/plugin/output/s3/s3.go
+++ b/plugin/output/s3/s3.go
@@ -244,7 +244,6 @@ type Config struct {
// > @3@4@5@6
// >
// > After an insert error, fall with a non-zero exit code or not
- // > **Experimental feature**
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
// > @3@4@5@6
diff --git a/plugin/output/splunk/README.md b/plugin/output/splunk/README.md
index 78cc23bec..b9df5013f 100755
--- a/plugin/output/splunk/README.md
+++ b/plugin/output/splunk/README.md
@@ -47,6 +47,8 @@ Out:
}
```
+Supports [dead queue](/plugin/output/README.md#dead-queue).
+
### Config params
**`endpoint`** *`string`* *`required`*
@@ -125,8 +127,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in
**`fatal_on_failed_insert`** *`bool`* *`default=false`*
-After an insert error, fall with a non-zero exit code or not
-**Experimental feature**
+After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go
index 84d1f005b..57df2a09a 100644
--- a/plugin/output/splunk/splunk.go
+++ b/plugin/output/splunk/splunk.go
@@ -68,6 +68,8 @@ Out:
}
}
```
+
+Supports [dead queue](/plugin/output/README.md#dead-queue).
}*/
const (
@@ -96,6 +98,8 @@ type Plugin struct {
// plugin metrics
sendErrorMetric *prometheus.CounterVec
+
+ router *pipeline.Router
}
type CopyField struct {
@@ -177,8 +181,7 @@ type Config struct {
// > @3@4@5@6
// >
- // > After an insert error, fall with a non-zero exit code or not
- // > **Experimental feature**
+ // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
// > @3@4@5@6
@@ -263,15 +266,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
MetricCtl: params.MetricCtl,
}
+ p.router = params.Router
backoffOpts := pipeline.BackoffOpts{
- MinRetention: p.config.Retention_,
- Multiplier: float64(p.config.RetentionExponentMultiplier),
- AttemptNum: p.config.Retry,
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: p.config.Retry,
+ IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
}
- onError := func(err error) {
+ onError := func(err error, events []*pipeline.Event) {
var level zapcore.Level
- if p.config.FatalOnFailedInsert {
+ if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
@@ -279,6 +284,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.logger.Desugar().Log(level, "can't send data to splunk", zap.Error(err),
zap.Int("retries", p.config.Retry))
+
+ for i := range events {
+ p.router.Fail(events[i])
+ }
}
p.batcher = pipeline.NewRetriableBatcher(
diff --git a/plugin/output/stdout/stdout.go b/plugin/output/stdout/stdout.go
index 98fa2b60b..b581d0456 100644
--- a/plugin/output/stdout/stdout.go
+++ b/plugin/output/stdout/stdout.go
@@ -13,6 +13,7 @@ It writes events to stdout(also known as console).
type Plugin struct {
controller pipeline.OutputPluginController
+ router *pipeline.Router
}
type Config struct{}
@@ -30,6 +31,7 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
func (p *Plugin) Start(_ pipeline.AnyConfig, params *pipeline.OutputPluginParams) {
p.controller = params.Controller
+ p.router = params.Router
}
func (_ *Plugin) Stop() {}
diff --git a/test/test.go b/test/test.go
index 37e73e12e..6efd3a307 100644
--- a/test/test.go
+++ b/test/test.go
@@ -210,6 +210,7 @@ func NewEmptyOutputPluginParams() *pipeline.OutputPluginParams {
return &pipeline.OutputPluginParams{
PluginDefaultParams: newDefaultParams(),
Logger: newLogger().Named("output"),
+ Router: pipeline.NewRouter(),
}
}