Skip to content

Commit 36aabe6

Browse files
committed
keep order in processor spawn
1 parent 49d809a commit 36aabe6

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

pipeline/processor.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -400,9 +400,13 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
400400
nextActionIdx := parent.action + 1
401401

402402
wg := &sync.WaitGroup{}
403-
results := make(chan *Event)
403+
results := make([]*Event, len(nodes))
404+
resultsChan := make(chan struct {
405+
index int
406+
child *Event
407+
}, len(nodes))
404408

405-
for _, node := range nodes {
409+
for i, node := range nodes {
406410
// we can't reuse parent event (using insaneJSON.Root{Node: child}
407411
// because of nil decoder
408412
child := &Event{
@@ -415,21 +419,28 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
415419
child.action = nextActionIdx
416420

417421
wg.Add(1)
418-
go func(child *Event) {
422+
go func(i int, child *Event) {
419423
defer wg.Done()
420424
ok, _ := p.doActions(child)
421425
if ok {
422-
results <- child
426+
resultsChan <- struct {
427+
index int
428+
child *Event
429+
}{index: i, child: child}
423430
}
424-
}(child)
431+
}(i, child)
425432
}
426433

427434
go func() {
428435
wg.Wait()
429-
close(results)
436+
close(resultsChan)
430437
}()
431438

432-
for child := range results {
439+
for result := range resultsChan {
440+
results[result.index] = result.child
441+
}
442+
443+
for _, child := range results {
433444
child.stage = eventStageOutput
434445
p.output.Out(child)
435446
child.Root.ReleaseMem()

0 commit comments

Comments
 (0)