Skip to content

Commit 19ed3ce

Browse files
committed
Fix cancellation unregistration in DataflowBlock.OutputAvailableAsync
OutputAvailableAsync is not unregistering from the supplied CancellationToken. If a cancelable token is supplied and is long lived, each call with that token to OutputAvailableAsync will add another callback into that token, and that will continue to grow until either the token is dropped or has been cancellation requested. For a long-lived cancellation token, this is akin to a leak.
1 parent f36aa7e commit 19ed3ce

File tree

1 file changed

+12
-12
lines changed

1 file changed

+12
-12
lines changed

src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,6 +1476,14 @@ public static Task<bool> OutputAvailableAsync<TOutput>(
14761476
static state => OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink(state, default),
14771477
#endif
14781478
target);
1479+
1480+
if (target.Task.IsCompleted)
1481+
{
1482+
// If the source calls to the target to complete it before we register for cancellation,
1483+
// we need to ensure that we unregister from the cancellation token. If the task hasn't
1484+
// yet completed, then its completion will handle the unregistration.
1485+
target._ctr.Dispose();
1486+
}
14791487
}
14801488

14811489
return target.Task;
@@ -1504,18 +1512,6 @@ public OutputAvailableAsyncTarget() :
15041512
{
15051513
}
15061514

1507-
/// <summary>
1508-
/// Cached continuation delegate that unregisters from cancellation and
1509-
/// marshals the antecedent's result to the return value.
1510-
/// </summary>
1511-
internal static readonly Func<Task<bool>, object?, bool> s_handleCompletion = (antecedent, state) =>
1512-
{
1513-
var target = state as OutputAvailableAsyncTarget<T>;
1514-
Debug.Assert(target != null, "Expected non-null target");
1515-
target._ctr.Dispose();
1516-
return antecedent.GetAwaiter().GetResult();
1517-
};
1518-
15191515
/// <summary>Cancels the target and unlinks the target from the source.</summary>
15201516
/// <param name="state">An OutputAvailableAsyncTarget.</param>
15211517
/// <param name="cancellationToken">The token that triggered cancellation</param>
@@ -1551,13 +1547,16 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
15511547
if (source == null) throw new ArgumentNullException(nameof(source));
15521548

15531549
TrySetResult(true);
1550+
_ctr.Dispose();
1551+
15541552
return DataflowMessageStatus.DecliningPermanently;
15551553
}
15561554

15571555
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
15581556
void IDataflowBlock.Complete()
15591557
{
15601558
TrySetResult(false);
1559+
_ctr.Dispose();
15611560
}
15621561

15631562
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
@@ -1569,6 +1568,7 @@ void IDataflowBlock.Fault(Exception exception)
15691568
}
15701569

15711570
TrySetResult(false);
1571+
_ctr.Dispose();
15721572
}
15731573

15741574
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />

0 commit comments

Comments
 (0)