From 9f57de9a8270e951dc7b7ca3dec36133bf6fddf8 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 17 Sep 2024 11:54:00 +0700 Subject: [PATCH 1/4] clear decoder pool after spawn --- pipeline/processor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pipeline/processor.go b/pipeline/processor.go index de4cd0eaa..0f04ac350 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -414,6 +414,7 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { child.stage = eventStageOutput p.output.Out(child) } + child.Root.ReleaseBufMem() } if p.busyActionsTotal == 0 { From 49d809a27a74f1202a1741c3aa64e2f24633478d Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 17 Sep 2024 13:18:05 +0700 Subject: [PATCH 2/4] parallel process events in spawn --- pipeline/processor.go | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/pipeline/processor.go b/pipeline/processor.go index 0f04ac350..d2118ff54 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -1,6 +1,8 @@ package pipeline import ( + "sync" + "github.com/ozontech/file.d/logger" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/atomic" @@ -397,6 +399,9 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { parent.SetChildParentKind() nextActionIdx := parent.action + 1 + wg := &sync.WaitGroup{} + results := make(chan *Event) + for _, node := range nodes { // we can't reuse parent event (using insaneJSON.Root{Node: child} // because of nil decoder @@ -409,12 +414,25 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { child.SetChildKind() child.action = nextActionIdx - ok, _ := p.doActions(child) - if ok { - child.stage = eventStageOutput - p.output.Out(child) - } - child.Root.ReleaseBufMem() + wg.Add(1) + go func(child *Event) { + defer wg.Done() + ok, _ := p.doActions(child) + if ok { + results <- child + } + }(child) + } + + go func() { + wg.Wait() + close(results) + }() + + for child := range results { + child.stage = eventStageOutput + p.output.Out(child) + child.Root.ReleaseMem() } if p.busyActionsTotal == 0 { From e10c4bb34e9d1dd8e26303f1a4c5ecd3ce1fb56b Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 18 Sep 2024 17:12:51 +0700 Subject: [PATCH 3/4] keep order in processor spawn --- pipeline/processor.go | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/pipeline/processor.go b/pipeline/processor.go index d2118ff54..287578974 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -400,9 +400,13 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { nextActionIdx := parent.action + 1 wg := &sync.WaitGroup{} - results := make(chan *Event) + results := make([]*Event, len(nodes)) + resultsChan := make(chan struct { + index int + child *Event + }, len(nodes)) - for _, node := range nodes { + for i, node := range nodes { // we can't reuse parent event (using insaneJSON.Root{Node: child} // because of nil decoder child := &Event{ @@ -415,24 +419,33 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { child.action = nextActionIdx wg.Add(1) - go func(child *Event) { + go func(i int, child *Event) { defer wg.Done() ok, _ := p.doActions(child) if ok { - results <- child + resultsChan <- struct { + index int + child *Event + }{index: i, child: child} } - }(child) + }(i, child) } go func() { wg.Wait() - close(results) + close(resultsChan) }() - for child := range results { - child.stage = eventStageOutput - p.output.Out(child) - child.Root.ReleaseMem() + for result := range resultsChan { + results[result.index] = result.child + } + + for _, child := range results { + if child != nil { + child.stage = eventStageOutput + p.output.Out(child) + child.Root.ReleaseMem() + } } if p.busyActionsTotal == 0 { From 9df7a5edd5bf789ed3d35b7ff832e226fb322228 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 19 Sep 2024 19:11:25 +0300 Subject: [PATCH 4/4] use ReleasePoolMem --- pipeline/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipeline/processor.go b/pipeline/processor.go index 287578974..66b5beadd 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -444,7 +444,7 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { if child != nil { child.stage = eventStageOutput p.output.Out(child) - child.Root.ReleaseMem() + child.Root.ReleasePoolMem() } }