@@ -147,262 +147,12 @@ final class MergeStorage<
147147 // sequences. We must store it to cancel it at the right times.
148148 let task = Task {
149149 await withThrowingTaskGroup ( of: Void . self) { group in
150- // For each upstream sequence we are adding a child task that
151- // is consuming the upstream sequence
152- group. addTask {
153- var iterator1 = base1. makeAsyncIterator ( )
154-
155- // This is our upstream consumption loop
156- loop: while true {
157- // We are creating a continuation before requesting the next
158- // element from upstream. This continuation is only resumed
159- // if the downstream consumer called `next` to signal his demand.
160- try await withUnsafeThrowingContinuation { continuation in
161- let action = self . lock. withLock {
162- self . stateMachine. childTaskSuspended ( continuation)
163- }
164-
165- switch action {
166- case let . resumeContinuation( continuation) :
167- // This happens if there is outstanding demand
168- // and we need to demand from upstream right away
169- continuation. resume ( returning: ( ) )
170-
171- case let . resumeContinuationWithError( continuation, error) :
172- // This happens if another upstream already failed or if
173- // the task got cancelled.
174- continuation. resume ( throwing: error)
175-
176- case . none:
177- break
178- }
179- }
180-
181- // We got signalled from the downstream that we have demand so let's
182- // request a new element from the upstream
183- if let element1 = try await iterator1. next ( ) {
184- let action = self . lock. withLock {
185- self . stateMachine. elementProduced ( element1)
186- }
187-
188- switch action {
189- case let . resumeContinuation( continuation, element) :
190- // We had an outstanding demand and where the first
191- // upstream to produce an element so we can forward it to
192- // the downstream
193- continuation. resume ( returning: element)
194-
195- case . none:
196- break
197- }
198-
199- } else {
200- // The upstream returned `nil` which indicates that it finished
201- let action = self . lock. withLock {
202- self . stateMachine. upstreamFinished ( )
203- }
204-
205- // All of this is mostly cleanup around the Task and the outstanding
206- // continuations used for signalling.
207- switch action {
208- case let . resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
209- downstreamContinuation,
210- task,
211- upstreamContinuations
212- ) :
213- upstreamContinuations. forEach { $0. resume ( throwing: CancellationError ( ) ) }
214- task. cancel ( )
215-
216- downstreamContinuation. resume ( returning: nil )
217-
218- break loop
219-
220- case let . cancelTaskAndUpstreamContinuations(
221- task,
222- upstreamContinuations
223- ) :
224- upstreamContinuations. forEach { $0. resume ( throwing: CancellationError ( ) ) }
225- task. cancel ( )
226-
227- break loop
228- case . none:
229-
230- break loop
231- }
232- }
233- }
234- }
235-
236- // Copy from the above just using the base2 sequence
237- group. addTask {
238- var iterator2 = base2. makeAsyncIterator ( )
239-
240- // This is our upstream consumption loop
241- loop: while true {
242- // We are creating a continuation before requesting the next
243- // element from upstream. This continuation is only resumed
244- // if the downstream consumer called `next` to signal his demand.
245- try await withUnsafeThrowingContinuation { continuation in
246- let action = self . lock. withLock {
247- self . stateMachine. childTaskSuspended ( continuation)
248- }
249-
250- switch action {
251- case let . resumeContinuation( continuation) :
252- // This happens if there is outstanding demand
253- // and we need to demand from upstream right away
254- continuation. resume ( returning: ( ) )
255-
256- case let . resumeContinuationWithError( continuation, error) :
257- // This happens if another upstream already failed or if
258- // the task got cancelled.
259- continuation. resume ( throwing: error)
260-
261- case . none:
262- break
263- }
264- }
265-
266- // We got signalled from the downstream that we have demand so let's
267- // request a new element from the upstream
268- if let element2 = try await iterator2. next ( ) {
269- let action = self . lock. withLock {
270- self . stateMachine. elementProduced ( element2)
271- }
272-
273- switch action {
274- case let . resumeContinuation( continuation, element) :
275- // We had an outstanding demand and where the first
276- // upstream to produce an element so we can forward it to
277- // the downstream
278- continuation. resume ( returning: element)
279-
280- case . none:
281- break
282- }
283-
284- } else {
285- // The upstream returned `nil` which indicates that it finished
286- let action = self . lock. withLock {
287- self . stateMachine. upstreamFinished ( )
288- }
289-
290- // All of this is mostly cleanup around the Task and the outstanding
291- // continuations used for signalling.
292- switch action {
293- case let . resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
294- downstreamContinuation,
295- task,
296- upstreamContinuations
297- ) :
298- upstreamContinuations. forEach { $0. resume ( throwing: CancellationError ( ) ) }
299- task. cancel ( )
300-
301- downstreamContinuation. resume ( returning: nil )
302-
303- break loop
304-
305- case let . cancelTaskAndUpstreamContinuations(
306- task,
307- upstreamContinuations
308- ) :
309- upstreamContinuations. forEach { $0. resume ( throwing: CancellationError ( ) ) }
310- task. cancel ( )
311-
312- break loop
313- case . none:
314-
315- break loop
316- }
317- }
318- }
319- }
150+ self . iterateAsyncSequence ( base1, in: & group)
151+ self . iterateAsyncSequence ( base2, in: & group)
320152
321153 // Copy from the above just using the base3 sequence
322154 if let base3 = base3 {
323- group. addTask {
324- var iterator3 = base3. makeAsyncIterator ( )
325-
326- // This is our upstream consumption loop
327- loop: while true {
328- // We are creating a continuation before requesting the next
329- // element from upstream. This continuation is only resumed
330- // if the downstream consumer called `next` to signal his demand.
331- try await withUnsafeThrowingContinuation { continuation in
332- let action = self . lock. withLock {
333- self . stateMachine. childTaskSuspended ( continuation)
334- }
335-
336- switch action {
337- case let . resumeContinuation( continuation) :
338- // This happens if there is outstanding demand
339- // and we need to demand from upstream right away
340- continuation. resume ( returning: ( ) )
341-
342- case let . resumeContinuationWithError( continuation, error) :
343- // This happens if another upstream already failed or if
344- // the task got cancelled.
345- continuation. resume ( throwing: error)
346-
347- case . none:
348- break
349- }
350- }
351-
352- // We got signalled from the downstream that we have demand so let's
353- // request a new element from the upstream
354- if let element3 = try await iterator3. next ( ) {
355- let action = self . lock. withLock {
356- self . stateMachine. elementProduced ( element3)
357- }
358-
359- switch action {
360- case let . resumeContinuation( continuation, element) :
361- // We had an outstanding demand and where the first
362- // upstream to produce an element so we can forward it to
363- // the downstream
364- continuation. resume ( returning: element)
365-
366- case . none:
367- break
368- }
369-
370- } else {
371- // The upstream returned `nil` which indicates that it finished
372- let action = self . lock. withLock {
373- self . stateMachine. upstreamFinished ( )
374- }
375-
376- // All of this is mostly cleanup around the Task and the outstanding
377- // continuations used for signalling.
378- switch action {
379- case let . resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
380- downstreamContinuation,
381- task,
382- upstreamContinuations
383- ) :
384- upstreamContinuations. forEach { $0. resume ( throwing: CancellationError ( ) ) }
385- task. cancel ( )
386-
387- downstreamContinuation. resume ( returning: nil )
388-
389- break loop
390-
391- case let . cancelTaskAndUpstreamContinuations(
392- task,
393- upstreamContinuations
394- ) :
395- upstreamContinuations. forEach { $0. resume ( throwing: CancellationError ( ) ) }
396- task. cancel ( )
397-
398- break loop
399- case . none:
400-
401- break loop
402- }
403- }
404- }
405- }
155+ self . iterateAsyncSequence ( base3, in: & group)
406156 }
407157
408158 while !group. isEmpty {
@@ -444,5 +194,95 @@ final class MergeStorage<
444194 // We need to inform our state machine that we started the Task
445195 stateMachine. taskStarted ( task)
446196 }
447- }
448197
198+ private func iterateAsyncSequence< AsyncSequence: _Concurrency . AsyncSequence > (
199+ _ base: AsyncSequence ,
200+ in taskGroup: inout ThrowingTaskGroup < Void , Error >
201+ ) where AsyncSequence. Element == Base1 . Element {
202+ // For each upstream sequence we are adding a child task that
203+ // is consuming the upstream sequence
204+ taskGroup. addTask {
205+ var iterator = base. makeAsyncIterator ( )
206+
207+ // This is our upstream consumption loop
208+ loop: while true {
209+ // We are creating a continuation before requesting the next
210+ // element from upstream. This continuation is only resumed
211+ // if the downstream consumer called `next` to signal his demand.
212+ try await withUnsafeThrowingContinuation { continuation in
213+ let action = self . lock. withLock {
214+ self . stateMachine. childTaskSuspended ( continuation)
215+ }
216+
217+ switch action {
218+ case let . resumeContinuation( continuation) :
219+ // This happens if there is outstanding demand
220+ // and we need to demand from upstream right away
221+ continuation. resume ( returning: ( ) )
222+
223+ case let . resumeContinuationWithError( continuation, error) :
224+ // This happens if another upstream already failed or if
225+ // the task got cancelled.
226+ continuation. resume ( throwing: error)
227+
228+ case . none:
229+ break
230+ }
231+ }
232+
233+ // We got signalled from the downstream that we have demand so let's
234+ // request a new element from the upstream
235+ if let element1 = try await iterator. next ( ) {
236+ let action = self . lock. withLock {
237+ self . stateMachine. elementProduced ( element1)
238+ }
239+
240+ switch action {
241+ case let . resumeContinuation( continuation, element) :
242+ // We had an outstanding demand and where the first
243+ // upstream to produce an element so we can forward it to
244+ // the downstream
245+ continuation. resume ( returning: element)
246+
247+ case . none:
248+ break
249+ }
250+
251+ } else {
252+ // The upstream returned `nil` which indicates that it finished
253+ let action = self . lock. withLock {
254+ self . stateMachine. upstreamFinished ( )
255+ }
256+
257+ // All of this is mostly cleanup around the Task and the outstanding
258+ // continuations used for signalling.
259+ switch action {
260+ case let . resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
261+ downstreamContinuation,
262+ task,
263+ upstreamContinuations
264+ ) :
265+ upstreamContinuations. forEach { $0. resume ( throwing: CancellationError ( ) ) }
266+ task. cancel ( )
267+
268+ downstreamContinuation. resume ( returning: nil )
269+
270+ break loop
271+
272+ case let . cancelTaskAndUpstreamContinuations(
273+ task,
274+ upstreamContinuations
275+ ) :
276+ upstreamContinuations. forEach { $0. resume ( throwing: CancellationError ( ) ) }
277+ task. cancel ( )
278+
279+ break loop
280+ case . none:
281+
282+ break loop
283+ }
284+ }
285+ }
286+ }
287+ }
288+ }
0 commit comments