Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
GOFLAGS: ${{ matrix.flags }}
LOG_LEVEL: error
run: |
go test -coverprofile=profile${{ matrix.flags }}.out -covermode=atomic -v -coverpkg=./... ./... -json > test-report${{ matrix.flags }}.json
go test -coverprofile=profile${{ matrix.flags }}.out -covermode=atomic -v -coverpkg=./... ./... -json | tee test-report${{ matrix.flags }}.json

- name: Coverage report
run: |
Expand Down Expand Up @@ -101,7 +101,7 @@ jobs:
GOFLAGS: ${{ matrix.flags }}
LOG_LEVEL: error
run: |
go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json > test-report-e2e${{ matrix.flags }}.json
go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json | tee test-report-e2e${{ matrix.flags }}.json

- name: Coverage report
run: |
Expand Down
2 changes: 1 addition & 1 deletion _sidebar.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@global-contents-table-plugin-input|links-list
- Action
@global-contents-table-plugin-action|links-list
- Output
- [Output](/plugin/output/README.md)
@global-contents-table-plugin-output|links-list

- **Pipeline**
Expand Down
2 changes: 1 addition & 1 deletion _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
- [split](plugin/action/split/README.md)
- [throttle](plugin/action/throttle/README.md)

- Output
- [Output](/plugin/output/README.md)
- [clickhouse](plugin/output/clickhouse/README.md)
- [devnull](plugin/output/devnull/README.md)
- [elasticsearch](plugin/output/elasticsearch/README.md)
Expand Down
50 changes: 49 additions & 1 deletion e2e/file_clickhouse/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,59 @@ pipelines:
override: true
- type: debug
output:
deadqueue:
type: clickhouse
addresses:
- 127.0.0.1:9001
table: test_table_insert
insert_timeout: 1m
columns:
- name: c1
type: String
- name: c2
type: Int8
- name: c3
type: Int16
- name: c4
type: Nullable(Int16)
- name: c5
type: Nullable(String)
- name: level
type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4)
- name: ipv4
type: Nullable(IPv4)
- name: ipv6
type: Nullable(IPv6)
- name: ts
type: DateTime
- name: ts_with_tz
type: DateTime('Europe/Moscow')
- name: ts64
type: DateTime64(3, 'UTC')
- name: ts64_auto
type: DateTime64(9, 'UTC')
- name: ts_rfc3339nano
type: DateTime64(9)
- name: f32
type: Float32
- name: f64
type: Float64
- name: lc_str
type: LowCardinality(String)
- name: str_arr
type: Array(String)
- name: map_str_str
type: Map(String,String)
- name: uuid
type: UUID
- name: uuid_nullable
type: Nullable(UUID)
type: clickhouse
addresses:
- 127.0.0.1:9001
table: test_table_insert
table: test_table_insert_not_exists
insert_timeout: 1m
retry: 0
columns:
- name: c1
type: String
Expand Down
9 changes: 6 additions & 3 deletions e2e/kafka_auth/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ version: "2.1"

services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.9
image: zookeeper:3.9
ports:
- "2182:2181"
volumes:
- "zookeeper_data:/bitnami"
- "zookeeper_data:/data"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
init-certs:
image: docker.io/bitnami/kafka:3.6
command: /tmp/generate.sh
Expand Down
9 changes: 6 additions & 3 deletions e2e/kafka_file/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ version: "2"

services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
image: zookeeper:3.9
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
- "zookeeper_data:/data"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
kafka:
image: docker.io/bitnami/kafka:3.1
ports:
Expand Down
43 changes: 43 additions & 0 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ func (f *FileD) setupOutput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCo
PluginRuntimeInfo: f.instantiatePlugin(info),
})

if info.DeadQueueInfo != nil {
p.SetDeadQueueOutput(&pipeline.OutputPluginInfo{
PluginStaticInfo: info.DeadQueueInfo,
PluginRuntimeInfo: f.instantiatePlugin(info.DeadQueueInfo),
})
}

return nil
}

Expand Down Expand Up @@ -249,6 +256,39 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip
if err != nil {
return nil, err
}

deadqueue := configJSON.Get("deadqueue")
var deadqueueInfo *pipeline.PluginStaticInfo
if deadqueueMap := deadqueue.MustMap(); deadqueueMap != nil {
if len(deadqueueMap) > 0 {
deadqueueType := deadqueue.Get("type").MustString()
if deadqueueType == "" {
return nil, fmt.Errorf("deadqueue of %s doesn't have type", pluginKind)
}
deadqueueInfo, err = f.plugins.Get(pluginKind, deadqueueType)
if err != nil {
return nil, err
}

deadqueue.Del("type")

deadqueueConfigJson, err := deadqueue.Encode()
if err != nil {
logger.Panicf("can't create config json for %s deadqueue", deadqueueType)
}

config, err := pipeline.GetConfig(deadqueueInfo, deadqueueConfigJson, values)
if err != nil {
logger.Fatalf("error on creating deadqueue of %s with type %q: %s", deadqueueType, pluginKind, err.Error())
}
deadqueueInfo.Config = config

// TODO: recursive deadqueue config
// deadqueueForDeadqueue := deadqueue.Get("deadqueue").MustMap()
}
configJSON.Del("deadqueue")
}

configJson, err := configJSON.Encode()
if err != nil {
logger.Panicf("can't create config json for %s", t)
Expand All @@ -260,6 +300,9 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip

infoCopy := *info
infoCopy.Config = config
if deadqueueInfo != nil {
infoCopy.DeadQueueInfo = deadqueueInfo
}

return &infoCopy, nil
}
Expand Down
35 changes: 23 additions & 12 deletions pipeline/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,28 @@ import (
)

type RetriableBatcher struct {
outFn RetriableBatcherOutFn
batcher *Batcher
backoffOpts BackoffOpts
onRetryError func(err error)
outFn RetriableBatcherOutFn
batcher *Batcher
backoffOpts BackoffOpts
isDeadQueueAvailable bool
onRetryError func(err error, events []*Event)
}

type RetriableBatcherOutFn func(*WorkerData, *Batch) error

type BackoffOpts struct {
MinRetention time.Duration
Multiplier float64
AttemptNum int
MinRetention time.Duration
Multiplier float64
AttemptNum int
IsDeadQueueAvailable bool
}

func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher {
func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error, events []*Event)) *RetriableBatcher {
batcherBackoff := &RetriableBatcher{
outFn: batcherOutFn,
backoffOpts: opts,
onRetryError: onError,
outFn: batcherOutFn,
backoffOpts: opts,
onRetryError: onError,
isDeadQueueAvailable: opts.IsDeadQueueAvailable,
}
batcherBackoff.setBatcher(batcherOpts)
return batcherBackoff
Expand Down Expand Up @@ -58,7 +61,15 @@ func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) {
}
next := exponentionalBackoff.NextBackOff()
if next == backoff.Stop || (b.backoffOpts.AttemptNum >= 0 && numTries > b.backoffOpts.AttemptNum) {
b.onRetryError(err)
var events []*Event
if batch != nil {
events = batch.events
}
b.onRetryError(err, events)
if batch != nil && b.isDeadQueueAvailable {
batch.reset()
batch.status = BatchStatusInDeadQueue
}
return
}
numTries++
Expand Down
58 changes: 51 additions & 7 deletions pipeline/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pipeline
import (
"errors"
"testing"
"time"

"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -17,7 +18,7 @@ func TestBackoff(t *testing.T) {
eventCount := &atomic.Int32{}
eventCountBefore := eventCount.Load()

errorFn := func(err error) {
errorFn := func(err error, events []*Event) {
errorCount.Inc()
}

Expand All @@ -29,20 +30,55 @@ func TestBackoff(t *testing.T) {
eventCount.Inc()
return nil
},
BackoffOpts{AttemptNum: 3},
BackoffOpts{
AttemptNum: 3,
IsDeadQueueAvailable: false,
},
errorFn,
)

batcherBackoff.Out(nil, nil)

data := WorkerData(nil)
batch := newBatch(1, 1, 1*time.Second)
batch.append(&Event{})
batcherBackoff.Out(&data, batch)
assert.Equal(t, errorCountBefore, errorCount.Load(), "wrong error count")
assert.Equal(t, eventCountBefore+1, eventCount.Load(), "wrong event count")
}

func TestBackoffWithError(t *testing.T) {
errorCount := &atomic.Int32{}
prevValue := errorCount.Load()
errorFn := func(err error) {
errorFn := func(err error, events []*Event) {
errorCount.Inc()
}

batcherBackoff := NewRetriableBatcher(
&BatcherOptions{
MetricCtl: metric.NewCtl("", prometheus.NewRegistry()),
},
func(workerData *WorkerData, batch *Batch) error {
return errors.New("some error")
},
BackoffOpts{
AttemptNum: 3,
IsDeadQueueAvailable: false,
},
errorFn,
)

data := WorkerData(nil)
batch := newBatch(1, 1, 1*time.Second)
batch.append(&Event{})
batcherBackoff.Out(&data, batch)
assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count")
assert.Equal(t, 1, len(batch.events), "wrong number of events in batch")
assert.Equal(t, BatchStatusNotReady, batch.status, "wrong batch status")
}

func TestBackoffWithErrorWithDeadQueue(t *testing.T) {
errorCount := &atomic.Int32{}
prevValue := errorCount.Load()
errorFn := func(err error, events []*Event) {
errorCount.Inc()
}

Expand All @@ -53,10 +89,18 @@ func TestBackoffWithError(t *testing.T) {
func(workerData *WorkerData, batch *Batch) error {
return errors.New("some error")
},
BackoffOpts{AttemptNum: 3},
BackoffOpts{
AttemptNum: 3,
IsDeadQueueAvailable: true,
},
errorFn,
)

batcherBackoff.Out(nil, nil)
data := WorkerData(nil)
batch := newBatch(1, 1, 1*time.Second)
batch.append(&Event{})
batcherBackoff.Out(&data, batch)
assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count")
assert.Equal(t, 0, len(batch.events), "wrong number of events in batch")
assert.Equal(t, BatchStatusInDeadQueue, batch.status, "wrong batch status")
}
Loading
Loading