diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 88f137197..b890e4603 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -193,16 +193,23 @@ where let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); + let mut buffers = Vec::new(); + move |input, output| { - // Stash capabilities and associated data (ordered by time). + // Read input out without doing work, to avoid trapping overloaded workers. input.for_each(|cap, data| { capabilities.insert(cap.retain()); data.swap(&mut buffer); - for (key, val, time) in buffer.drain(..) { + buffers.push(std::mem::take(&mut buffer)); + }); + + // Stash capabilities and associated data (ordered by time). + for buffer in buffers.drain(..) { + for (key, val, time) in buffer { priority_queue.push(std::cmp::Reverse((time, key, val))) } - }); + } // Test to see if strict progress has occurred, which happens whenever any element of // the old frontier is not greater or equal to the new frontier. It is only in this