Skip to content

Commit d596d9f

Browse files
committed
Adding "deep cancellation" API surface predicated by a NO_DEEP_CANCELLATION symbol.
1 parent 568e78f commit d596d9f

40 files changed

+5178
-14
lines changed

Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.Opt.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System.Collections.Generic;
6+
using System.Threading;
67
using System.Threading.Tasks;
78

89
namespace System.Linq
@@ -19,6 +20,13 @@ public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, ValueTask
1920
return new AsyncEnumerable.SelectEnumerableAsyncIteratorWithTask<TSource, TResult>(this, selector);
2021
}
2122

23+
#if !NO_DEEP_CANCELLATION
24+
public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, CancellationToken, ValueTask<TResult>> selector)
25+
{
26+
return new AsyncEnumerable.SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult>(this, selector);
27+
}
28+
#endif
29+
2230
public virtual IAsyncEnumerable<TSource> Where(Func<TSource, bool> predicate)
2331
{
2432
return new AsyncEnumerable.WhereEnumerableAsyncIterator<TSource>(this, predicate);
@@ -28,5 +36,12 @@ public virtual IAsyncEnumerable<TSource> Where(Func<TSource, ValueTask<bool>> pr
2836
{
2937
return new AsyncEnumerable.WhereEnumerableAsyncIteratorWithTask<TSource>(this, predicate);
3038
}
39+
40+
#if !NO_DEEP_CANCELLATION
41+
public virtual IAsyncEnumerable<TSource> Where(Func<TSource, CancellationToken, ValueTask<bool>> predicate)
42+
{
43+
return new AsyncEnumerable.WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource>(this, predicate);
44+
}
45+
#endif
3146
}
3247
}

Ix.NET/Source/System.Linq.Async/System/Linq/IOrderedAsyncEnumerable.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System.Collections.Generic;
6+
using System.Threading;
67
using System.Threading.Tasks;
78

89
namespace System.Linq
@@ -11,5 +12,9 @@ public interface IOrderedAsyncEnumerable<out TElement> : IAsyncEnumerable<TEleme
1112
{
1213
IOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, TKey> keySelector, IComparer<TKey> comparer, bool descending);
1314
IOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending);
15+
16+
#if !NO_DEEP_CANCELLATION
17+
IOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending);
18+
#endif
1419
}
1520
}

Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Aggregate.cs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSourc
5050
return AggregateCore(source, accumulator, cancellationToken);
5151
}
5252

53+
#if !NO_DEEP_CANCELLATION
54+
public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
55+
{
56+
if (source == null)
57+
throw Error.ArgumentNull(nameof(source));
58+
if (accumulator == null)
59+
throw Error.ArgumentNull(nameof(accumulator));
60+
61+
return AggregateCore(source, accumulator, cancellationToken);
62+
}
63+
#endif
64+
5365
public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
5466
{
5567
if (source == null)
@@ -90,6 +102,18 @@ public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsync
90102
return AggregateCore(source, seed, accumulator, cancellationToken);
91103
}
92104

105+
#if !NO_DEEP_CANCELLATION
106+
public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
107+
{
108+
if (source == null)
109+
throw Error.ArgumentNull(nameof(source));
110+
if (accumulator == null)
111+
throw Error.ArgumentNull(nameof(accumulator));
112+
113+
return AggregateCore(source, seed, accumulator, cancellationToken);
114+
}
115+
#endif
116+
93117
public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
94118
{
95119
if (source == null)
@@ -138,6 +162,20 @@ public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this I
138162
return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
139163
}
140164

165+
#if !NO_DEEP_CANCELLATION
166+
public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
167+
{
168+
if (source == null)
169+
throw Error.ArgumentNull(nameof(source));
170+
if (accumulator == null)
171+
throw Error.ArgumentNull(nameof(accumulator));
172+
if (resultSelector == null)
173+
throw Error.ArgumentNull(nameof(resultSelector));
174+
175+
return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
176+
}
177+
#endif
178+
141179
private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
142180
{
143181
var acc = seed;
@@ -206,6 +244,29 @@ private static async Task<TResult> AggregateCore<TSource, TResult>(IAsyncEnumera
206244
return acc;
207245
}
208246

247+
#if !NO_DEEP_CANCELLATION
248+
private static async Task<TResult> AggregateCore<TSource, TResult>(IAsyncEnumerable<TSource> source, TResult seed, Func<TResult, TSource, CancellationToken, ValueTask<TResult>> accumulator, CancellationToken cancellationToken)
249+
{
250+
var acc = seed;
251+
252+
var e = source.GetAsyncEnumerator(cancellationToken);
253+
254+
try
255+
{
256+
while (await e.MoveNextAsync().ConfigureAwait(false))
257+
{
258+
acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
259+
}
260+
}
261+
finally
262+
{
263+
await e.DisposeAsync().ConfigureAwait(false);
264+
}
265+
266+
return acc;
267+
}
268+
#endif
269+
209270
private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
210271
{
211272
var acc = seed;
@@ -227,6 +288,29 @@ private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(
227288
return await resultSelector(acc).ConfigureAwait(false);
228289
}
229290

291+
#if !NO_DEEP_CANCELLATION
292+
private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
293+
{
294+
var acc = seed;
295+
296+
var e = source.GetAsyncEnumerator(cancellationToken);
297+
298+
try
299+
{
300+
while (await e.MoveNextAsync().ConfigureAwait(false))
301+
{
302+
acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
303+
}
304+
}
305+
finally
306+
{
307+
await e.DisposeAsync().ConfigureAwait(false);
308+
}
309+
310+
return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
311+
}
312+
#endif
313+
230314
private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
231315
{
232316
var e = source.GetAsyncEnumerator(cancellationToken);
@@ -252,5 +336,33 @@ private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSour
252336
await e.DisposeAsync().ConfigureAwait(false);
253337
}
254338
}
339+
340+
#if !NO_DEEP_CANCELLATION
341+
private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
342+
{
343+
var e = source.GetAsyncEnumerator(cancellationToken);
344+
345+
try
346+
{
347+
if (!await e.MoveNextAsync().ConfigureAwait(false))
348+
{
349+
throw Error.NoElements();
350+
}
351+
352+
var acc = e.Current;
353+
354+
while (await e.MoveNextAsync().ConfigureAwait(false))
355+
{
356+
acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
357+
}
358+
359+
return acc;
360+
}
361+
finally
362+
{
363+
await e.DisposeAsync().ConfigureAwait(false);
364+
}
365+
}
366+
#endif
255367
}
256368
}

Ix.NET/Source/System.Linq.Async/System/Linq/Operators/All.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ public static Task<bool> AllAsync<TSource>(this IAsyncEnumerable<TSource> source
5050
return AllCore(source, predicate, cancellationToken);
5151
}
5252

53+
#if !NO_DEEP_CANCELLATION
54+
public static Task<bool> AllAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate, CancellationToken cancellationToken)
55+
{
56+
if (source == null)
57+
throw Error.ArgumentNull(nameof(source));
58+
if (predicate == null)
59+
throw Error.ArgumentNull(nameof(predicate));
60+
61+
return AllCore(source, predicate, cancellationToken);
62+
}
63+
#endif
64+
5365
private static async Task<bool> AllCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
5466
{
5567
var e = source.GetAsyncEnumerator(cancellationToken);
@@ -89,5 +101,27 @@ private static async Task<bool> AllCore<TSource>(IAsyncEnumerable<TSource> sourc
89101

90102
return true;
91103
}
104+
105+
#if !NO_DEEP_CANCELLATION
106+
private static async Task<bool> AllCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate, CancellationToken cancellationToken)
107+
{
108+
var e = source.GetAsyncEnumerator(cancellationToken);
109+
110+
try
111+
{
112+
while (await e.MoveNextAsync().ConfigureAwait(false))
113+
{
114+
if (!await predicate(e.Current, cancellationToken).ConfigureAwait(false))
115+
return false;
116+
}
117+
}
118+
finally
119+
{
120+
await e.DisposeAsync().ConfigureAwait(false);
121+
}
122+
123+
return true;
124+
}
125+
#endif
92126
}
93127
}

Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Any.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ public static Task<bool> AnyAsync<TSource>(this IAsyncEnumerable<TSource> source
6666
return AnyCore(source, predicate, cancellationToken);
6767
}
6868

69+
#if !NO_DEEP_CANCELLATION
70+
public static Task<bool> AnyAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate, CancellationToken cancellationToken)
71+
{
72+
if (source == null)
73+
throw Error.ArgumentNull(nameof(source));
74+
if (predicate == null)
75+
throw Error.ArgumentNull(nameof(predicate));
76+
77+
return AnyCore(source, predicate, cancellationToken);
78+
}
79+
#endif
80+
6981
private static async Task<bool> AnyCore<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
7082
{
7183
var e = source.GetAsyncEnumerator(cancellationToken);
@@ -119,5 +131,27 @@ private static async Task<bool> AnyCore<TSource>(IAsyncEnumerable<TSource> sourc
119131

120132
return false;
121133
}
134+
135+
#if !NO_DEEP_CANCELLATION
136+
private static async Task<bool> AnyCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate, CancellationToken cancellationToken)
137+
{
138+
var e = source.GetAsyncEnumerator(cancellationToken);
139+
140+
try
141+
{
142+
while (await e.MoveNextAsync().ConfigureAwait(false))
143+
{
144+
if (await predicate(e.Current, cancellationToken).ConfigureAwait(false))
145+
return true;
146+
}
147+
}
148+
finally
149+
{
150+
await e.DisposeAsync().ConfigureAwait(false);
151+
}
152+
153+
return false;
154+
}
155+
#endif
122156
}
123157
}

0 commit comments

Comments
 (0)