Skip to content

Commit 817a2fb

Browse files
authored
QUIC: fix unobserved exception from _connectionCloseTcs (#104894)
1 parent bc9b3b6 commit 817a2fb

File tree

7 files changed

+160
-66
lines changed

7 files changed

+160
-66
lines changed

src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Diagnostics;
45
using System.Diagnostics.CodeAnalysis;
56
using System.Net.Security;
67
using System.Net.Sockets;
78
using System.Runtime.CompilerServices;
89
using System.Security.Authentication;
910
using System.Threading;
11+
using System.Threading.Tasks;
1012
using static Microsoft.Quic.MsQuic;
1113

1214
namespace System.Net.Quic;
@@ -28,27 +30,14 @@ internal static QuicException GetOperationAbortedException(string? message = nul
2830
return new QuicException(QuicError.OperationAborted, null, message ?? SR.net_quic_operationaborted);
2931
}
3032

31-
internal static bool TryGetStreamExceptionForMsQuicStatus(int status, [NotNullWhen(true)] out Exception? exception, bool streamWasSuccessfullyStarted = true, string? message = null)
33+
internal static bool TryGetStreamExceptionForMsQuicStatus(int status, [NotNullWhen(true)] out Exception? exception)
3234
{
3335
if (status == QUIC_STATUS_ABORTED)
3436
{
35-
// Connection has been closed by the peer (either at transport or application level),
36-
if (streamWasSuccessfullyStarted)
37-
{
38-
// we will receive an event later, which will complete the stream with concrete
39-
// information why the connection was aborted.
40-
exception = null;
41-
return false;
42-
}
43-
else
44-
{
45-
// we won't be receiving any event callback for shutdown on this stream, so we don't
46-
// necessarily know which error to report. So we throw an exception which we can distinguish
47-
// at the caller (ConnectionAborted normally has App error code) and throw the correct
48-
// exception from there.
49-
exception = new QuicException(QuicError.ConnectionAborted, null, "");
50-
return true;
51-
}
37+
// If status == QUIC_STATUS_ABORTED, the connection was closed by transport or the peer.
38+
// We will receive an event later with details for ConnectionAborted exception to complete the task source with.
39+
exception = null;
40+
return false;
5241
}
5342
else if (status == QUIC_STATUS_INVALID_STATE)
5443
{
@@ -58,16 +47,13 @@ internal static bool TryGetStreamExceptionForMsQuicStatus(int status, [NotNullWh
5847
}
5948
else if (StatusFailed(status))
6049
{
61-
exception = GetExceptionForMsQuicStatus(status, message: message);
50+
exception = GetExceptionForMsQuicStatus(status);
6251
return true;
6352
}
6453
exception = null;
6554
return false;
6655
}
6756

68-
// see TryGetStreamExceptionForMsQuicStatus for explanation
69-
internal static bool IsConnectionAbortedWhenStartingStreamException(Exception ex) => ex is QuicException qe && qe.QuicError == QuicError.ConnectionAborted && qe.ApplicationErrorCode is null;
70-
7157
internal static Exception GetExceptionForMsQuicStatus(int status, long? errorCode = default, string? message = null)
7258
{
7359
Exception ex = GetExceptionInternal(status, errorCode, message);
@@ -229,4 +215,26 @@ public static void ValidateNotNull(string argumentName, string resourceName, obj
229215
throw new ArgumentNullException(argumentName, SR.Format(resourceName, propertyName));
230216
}
231217
}
218+
219+
public static void ObserveException(this Task task)
220+
{
221+
if (task.IsCompleted)
222+
{
223+
ObserveExceptionCore(task);
224+
}
225+
else
226+
{
227+
task.ContinueWith(static (t) => ObserveExceptionCore(t), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
228+
}
229+
230+
static void ObserveExceptionCore(Task task)
231+
{
232+
Debug.Assert(task.IsCompleted);
233+
if (task.IsFaulted)
234+
{
235+
// Access Exception to avoid TaskScheduler.UnobservedTaskException firing.
236+
Exception? e = task.Exception!.InnerException;
237+
}
238+
}
239+
}
232240
}

src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ static async ValueTask<QuicConnection> StartConnectAsync(QuicClientConnectionOpt
8787
{
8888
await connection.DisposeAsync().ConfigureAwait(false);
8989

90-
// throw OCE with correct token if cancellation requested by user
90+
// Throw OCE with correct token if cancellation requested by user.
9191
cancellationToken.ThrowIfCancellationRequested();
9292

93-
// cancellation by the linkedCts.CancelAfter. Convert to Timeout
93+
// Cancellation by the linkedCts.CancelAfter, convert to timeout.
9494
throw new QuicException(QuicError.ConnectionTimeout, null, SR.Format(SR.net_quic_handshake_timeout, options.HandshakeTimeout));
9595
}
9696
catch
@@ -113,11 +113,6 @@ static async ValueTask<QuicConnection> StartConnectAsync(QuicClientConnectionOpt
113113
/// </summary>
114114
private int _disposed;
115115

116-
/// <summary>
117-
/// Completed when connection shutdown is initiated.
118-
/// </summary>
119-
private TaskCompletionSource _connectionCloseTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
120-
121116
private readonly ValueTaskSource _connectedTcs = new ValueTaskSource();
122117
private readonly ResettableValueTaskSource _shutdownTcs = new ResettableValueTaskSource()
123118
{
@@ -140,6 +135,11 @@ static async ValueTask<QuicConnection> StartConnectAsync(QuicClientConnectionOpt
140135
}
141136
};
142137

138+
/// <summary>
139+
/// Completed when connection shutdown is initiated.
140+
/// </summary>
141+
private readonly TaskCompletionSource _connectionCloseTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
142+
143143
private readonly CancellationTokenSource _shutdownTokenSource = new CancellationTokenSource();
144144

145145
// Token that fires when the connection is closed.
@@ -369,7 +369,7 @@ private async ValueTask FinishConnectAsync(QuicClientConnectionOptions options,
369369
{
370370
Debug.Assert(host is not null);
371371

372-
// Given just a ServerName to connect to, msquic would also use the first address after the resolution
372+
// Given just a ServerName to connect to, MsQuic would also use the first address after the resolution
373373
// (https://github.com/microsoft/msquic/issues/1181) and it would not return a well-known error code
374374
// for resolution failures we could rely on. By doing the resolution in managed code, we can guarantee
375375
// that a SocketException will surface to the user if the name resolution fails.
@@ -526,13 +526,9 @@ public async ValueTask<QuicStream> OpenOutboundStreamAsync(QuicStreamType type,
526526
// Propagate ODE if disposed in the meantime.
527527
ObjectDisposedException.ThrowIf(_disposed == 1, this);
528528

529-
// In case of an incoming race when the connection is closed by the peer just before we open the stream,
530-
// we receive QUIC_STATUS_ABORTED from MsQuic, but we don't know how the connection was closed. We throw
531-
// special exception and handle it here where we can determine the shutdown reason.
532-
bool connectionAbortedByPeer = ThrowHelper.IsConnectionAbortedWhenStartingStreamException(ex);
533-
534-
// Propagate connection error if present.
535-
if (_connectionCloseTcs.Task.IsFaulted || connectionAbortedByPeer)
529+
// Propagate connection error when the connection was closed (remotely = ABORTED / locally = INVALID_STATE).
530+
if (ex is QuicException qex && qex.QuicError == QuicError.InternalError &&
531+
(qex.HResult == QUIC_STATUS_ABORTED || qex.HResult == QUIC_STATUS_INVALID_STATE))
536532
{
537533
await _connectionCloseTcs.Task.ConfigureAwait(false);
538534
}
@@ -822,8 +818,10 @@ public async ValueTask DisposeAsync()
822818
// Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released.
823819
await _shutdownTcs.GetFinalTask(this).ConfigureAwait(false);
824820
Debug.Assert(_connectedTcs.IsCompleted);
821+
Debug.Assert(_connectionCloseTcs.Task.IsCompleted);
825822
_handle.Dispose();
826823
_shutdownTokenSource.Dispose();
824+
_connectionCloseTcs.Task.ObserveException();
827825
_configuration?.Dispose();
828826

829827
// Dispose remote certificate only if it hasn't been accessed via getter, in which case the accessing code becomes the owner of the certificate lifetime.

src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -169,20 +169,17 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QuicStreamT
169169
try
170170
{
171171
QUIC_HANDLE* handle;
172-
int status = MsQuicApi.Api.StreamOpen(
172+
ThrowHelper.ThrowIfMsQuicError(MsQuicApi.Api.StreamOpen(
173173
connectionHandle,
174174
type == QuicStreamType.Unidirectional ? QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL : QUIC_STREAM_OPEN_FLAGS.NONE,
175175
&NativeCallback,
176176
(void*)GCHandle.ToIntPtr(context),
177-
&handle);
178-
179-
if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? ex, streamWasSuccessfullyStarted: false, message: "StreamOpen failed"))
177+
&handle),
178+
"StreamOpen failed");
179+
_handle = new MsQuicContextSafeHandle(handle, context, SafeHandleType.Stream, connectionHandle)
180180
{
181-
throw ex;
182-
}
183-
184-
_handle = new MsQuicContextSafeHandle(handle, context, SafeHandleType.Stream, connectionHandle);
185-
_handle.Disposable = _sendBuffers;
181+
Disposable = _sendBuffers
182+
};
186183
}
187184
catch
188185
{
@@ -213,8 +210,10 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QUIC_HANDLE
213210
GCHandle context = GCHandle.Alloc(this, GCHandleType.Weak);
214211
try
215212
{
216-
_handle = new MsQuicContextSafeHandle(handle, context, SafeHandleType.Stream, connectionHandle);
217-
_handle.Disposable = _sendBuffers;
213+
_handle = new MsQuicContextSafeHandle(handle, context, SafeHandleType.Stream, connectionHandle)
214+
{
215+
Disposable = _sendBuffers
216+
};
218217
delegate* unmanaged[Cdecl]<QUIC_HANDLE*, void*, QUIC_STREAM_EVENT*, int> nativeCallback = &NativeCallback;
219218
MsQuicApi.Api.SetCallbackHandler(
220219
_handle,
@@ -261,14 +260,12 @@ internal ValueTask StartAsync(Action<QuicStreamType> decrementStreamCapacity, Ca
261260
int status = MsQuicApi.Api.StreamStart(
262261
_handle,
263262
QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT);
264-
265-
if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception, streamWasSuccessfullyStarted: false))
263+
if (StatusFailed(status))
266264
{
267265
_decrementStreamCapacity = null;
268-
_startedTcs.TrySetException(exception);
266+
_startedTcs.TrySetException(ThrowHelper.GetExceptionForMsQuicStatus(status));
269267
}
270268
}
271-
272269
return valueTask;
273270
}
274271

@@ -637,7 +634,7 @@ private unsafe int HandleEventShutdownComplete(ref SHUTDOWN_COMPLETE_DATA data)
637634
// It's local shutdown by app, this side called QuicConnection.CloseAsync, throw QuicError.OperationAborted.
638635
(shutdownByApp: true, closedRemotely: false) => ThrowHelper.GetOperationAbortedException(),
639636
// It's remote shutdown by transport, we received a CONNECTION_CLOSE frame with a QUIC transport error code, throw error based on the status.
640-
(shutdownByApp: false, closedRemotely: true) => ThrowHelper.GetExceptionForMsQuicStatus(data.ConnectionCloseStatus, (long)data.ConnectionErrorCode, $"Shutdown by transport {data.ConnectionErrorCode}"),
637+
(shutdownByApp: false, closedRemotely: true) => ThrowHelper.GetExceptionForMsQuicStatus(data.ConnectionCloseStatus, (long)data.ConnectionErrorCode),
641638
// It's local shutdown by transport, most likely due to a timeout, throw error based on the status.
642639
(shutdownByApp: false, closedRemotely: false) => ThrowHelper.GetExceptionForMsQuicStatus(data.ConnectionCloseStatus, (long)data.ConnectionErrorCode),
643640
};

src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ public async Task ConnectWithCertificate_MissingTargetHost_Succeeds()
580580
return true;
581581
};
582582

583-
await CreateQuicConnection(clientOptions);
583+
await using QuicConnection connection = await CreateQuicConnection(clientOptions);
584584
}
585585
finally
586586
{

src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,67 @@ await RunClientServer(
117117
});
118118
}
119119

120+
[Theory]
121+
[InlineData(true)]
122+
[InlineData(false)]
123+
[InlineData(null)]
124+
public async Task CloseAsync_PendingOpenStream_Throws(bool? localClose)
125+
{
126+
byte[] data = new byte[10];
127+
128+
await using QuicListener listener = await CreateQuicListener(changeServerOptions: localClose is null ? options => options.IdleTimeout = TimeSpan.FromSeconds(10) : null);
129+
130+
// Allow client to accept a stream, one will be accepted and another will be pending while we close the server connection.
131+
QuicClientConnectionOptions clientOptions = CreateQuicClientOptions(listener.LocalEndPoint);
132+
clientOptions.MaxInboundBidirectionalStreams = 1;
133+
await using QuicConnection clientConnection = await CreateQuicConnection(clientOptions);
134+
135+
await using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
136+
137+
// Put one stream into server stream queue.
138+
QuicStream queuedStream = await clientConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional);
139+
await queuedStream.WriteAsync(data.AsMemory(), completeWrites: true);
140+
141+
// Open one stream to the client that is allowed.
142+
QuicStream firstStream = await serverConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional);
143+
await firstStream.WriteAsync(data.AsMemory(), completeWrites: true);
144+
145+
// Try to open another stream which should wait on capacity.
146+
ValueTask<QuicStream> secondStreamTask = serverConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional);
147+
Assert.False(secondStreamTask.IsCompleted);
148+
149+
// Close the connection, second stream task should complete with appropriate error.
150+
if (localClose is true)
151+
{
152+
await serverConnection.CloseAsync(123);
153+
await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, async () => await secondStreamTask);
154+
155+
// Try to open yet another stream which should fail because of already closed connection.
156+
await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, async () => await serverConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional));
157+
}
158+
else if (localClose is false)
159+
{
160+
await clientConnection.CloseAsync(456);
161+
QuicException ex1 = await AssertThrowsQuicExceptionAsync(QuicError.ConnectionAborted, async () => await secondStreamTask);
162+
Assert.Equal(456, ex1.ApplicationErrorCode);
163+
164+
// Try to open yet another stream which should fail because of already closed connection.
165+
QuicException ex2 = await AssertThrowsQuicExceptionAsync(QuicError.ConnectionAborted, async () => await serverConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional));
166+
Assert.Equal(456, ex2.ApplicationErrorCode);
167+
}
168+
else
169+
{
170+
await Task.Delay(TimeSpan.FromSeconds(15));
171+
172+
QuicException ex1 = await AssertThrowsQuicExceptionAsync(QuicError.ConnectionIdle, async () => await secondStreamTask);
173+
Assert.Equal(1, ex1.TransportErrorCode);
174+
175+
// Try to open yet another stream which should fail because of already closed connection.
176+
QuicException ex2 = await AssertThrowsQuicExceptionAsync(QuicError.ConnectionIdle, async () => await serverConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional));
177+
Assert.Equal(1, ex2.TransportErrorCode);
178+
}
179+
}
180+
120181
[Fact]
121182
public async Task Dispose_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException()
122183
{
@@ -228,6 +289,9 @@ public async Task GetStreamCapacity_OpenCloseStream_CountsCorrectly()
228289
await streamsAvailableFired.WaitAsync();
229290
Assert.Equal(0, bidiIncrement);
230291
Assert.Equal(1, unidiIncrement);
292+
293+
await clientConnection.DisposeAsync();
294+
await serverConnection.DisposeAsync();
231295
}
232296

233297
[Theory]
@@ -298,6 +362,9 @@ public async Task GetStreamCapacity_OpenCloseStreamIntoNegative_CountsCorrectly(
298362
Assert.False(streamsAvailableFired.CurrentCount > 0);
299363
Assert.Equal(unidirectional ? QuicDefaults.DefaultServerMaxInboundBidirectionalStreams : QuicDefaults.DefaultServerMaxInboundBidirectionalStreams * 3, bidiTotal);
300364
Assert.Equal(unidirectional ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams * 3 : QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiTotal);
365+
366+
await clientConnection.DisposeAsync();
367+
await serverConnection.DisposeAsync();
301368
}
302369

303370
[Theory]
@@ -368,6 +435,9 @@ public async Task GetStreamCapacity_OpenCloseStreamCanceledIntoNegative_CountsCo
368435
Assert.False(streamsAvailableFired.CurrentCount > 0);
369436
Assert.Equal(unidirectional ? QuicDefaults.DefaultServerMaxInboundBidirectionalStreams : QuicDefaults.DefaultServerMaxInboundBidirectionalStreams * 3, bidiTotal);
370437
Assert.Equal(unidirectional ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams * 3 : QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiTotal);
438+
439+
await clientConnection.DisposeAsync();
440+
await serverConnection.DisposeAsync();
371441
}
372442

373443
[Fact]
@@ -434,6 +504,9 @@ public async Task GetStreamCapacity_SumInvariant()
434504

435505
// by now, we opened and closed 2 * Limit, and expect a budget of 'Limit' more
436506
Assert.Equal(3 * Limit, maxStreamIndex);
507+
508+
await clientConnection.DisposeAsync();
509+
await serverConnection.DisposeAsync();
437510
}
438511

439512
[Fact]
@@ -634,6 +707,8 @@ public async Task AcceptStreamAsync_ConnectionDisposed_Throws()
634707

635708
var accept1Exception = await Assert.ThrowsAsync<ObjectDisposedException>(async () => await acceptTask1);
636709
var accept2Exception = await Assert.ThrowsAsync<ObjectDisposedException>(async () => await acceptTask2);
710+
711+
await clientConnection.DisposeAsync();
637712
}
638713

639714
[Theory]

0 commit comments

Comments
 (0)