Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pipeline

import (
"sync"

"github.com/ozontech/file.d/logger"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"
Expand Down Expand Up @@ -397,7 +399,14 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
parent.SetChildParentKind()
nextActionIdx := parent.action + 1

for _, node := range nodes {
wg := &sync.WaitGroup{}
results := make([]*Event, len(nodes))
resultsChan := make(chan struct {
index int
child *Event
}, len(nodes))

for i, node := range nodes {
// we can't reuse parent event (using insaneJSON.Root{Node: child}
// because of nil decoder
child := &Event{
Expand All @@ -409,10 +418,33 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
child.SetChildKind()
child.action = nextActionIdx

ok, _ := p.doActions(child)
if ok {
wg.Add(1)
go func(i int, child *Event) {
defer wg.Done()
ok, _ := p.doActions(child)
if ok {
resultsChan <- struct {
index int
child *Event
}{index: i, child: child}
}
}(i, child)
}

go func() {
wg.Wait()
close(resultsChan)
}()

for result := range resultsChan {
results[result.index] = result.child
}

for _, child := range results {
if child != nil {
child.stage = eventStageOutput
p.output.Out(child)
child.Root.ReleasePoolMem()
}
}

Expand Down
Loading