Skip to content

Commit 508e560

Browse files
authored
Remove FileStream long pinning, and simplify synchronization (#51462)
* Remove pinning in BufferedFileStreamStrategy * Simplify code/synchronization in ValueTaskSource
1 parent 291e7c0 commit 508e560

File tree

5 files changed

+225
-305
lines changed

5 files changed

+225
-305
lines changed

src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.ValueTaskSource.cs

Lines changed: 89 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -3,47 +3,49 @@
33

44
using System.Buffers;
55
using System.Diagnostics;
6-
using System.Runtime.InteropServices;
76
using System.Threading;
87
using System.Threading.Tasks.Sources;
9-
using TaskSourceCodes = System.IO.Strategies.FileStreamHelpers.TaskSourceCodes;
108

119
namespace System.IO.Strategies
1210
{
1311
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
1412
{
15-
/// <summary>
16-
/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync.
17-
/// </summary>
13+
/// <summary>Reusable IValueTaskSource for FileStream ValueTask-returning async operations.</summary>
1814
private sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
1915
{
2016
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;
17+
2118
internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
2219
private readonly AsyncWindowsFileStreamStrategy _strategy;
23-
private MemoryHandle _handle;
20+
internal MemoryHandle _memoryHandle;
2421
internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
2522
private NativeOverlapped* _overlapped;
2623
private CancellationTokenRegistration _cancellationRegistration;
27-
private long _result; // Using long since this needs to be used in Interlocked APIs
28-
#if DEBUG
29-
private bool _cancellationHasBeenRegistered;
30-
#endif
24+
/// <summary>
25+
/// 0 when the operation hasn't been scheduled, non-zero when either the operation has completed,
26+
/// in which case its value is a packed combination of the error code and number of bytes, or when
27+
/// the read/write call has finished scheduling the async operation.
28+
/// </summary>
29+
internal ulong _result;
3130

3231
internal ValueTaskSource(AsyncWindowsFileStreamStrategy strategy)
3332
{
3433
_strategy = strategy;
35-
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);
36-
3734
_source.RunContinuationsAsynchronously = true;
35+
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);
3836
}
3937

40-
internal NativeOverlapped* Configure(ReadOnlyMemory<byte> memory)
38+
internal void Dispose()
4139
{
42-
_result = TaskSourceCodes.NoResult;
40+
ReleaseResources();
41+
_preallocatedOverlapped.Dispose();
42+
}
4343

44-
_handle = memory.Pin();
44+
internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory)
45+
{
46+
_result = 0;
47+
_memoryHandle = memory.Pin();
4548
_overlapped = _strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
46-
4749
return _overlapped;
4850
}
4951

@@ -69,140 +71,110 @@ private int GetResultAndRelease(short token)
6971

7072
internal void RegisterForCancellation(CancellationToken cancellationToken)
7173
{
72-
#if DEBUG
73-
Debug.Assert(cancellationToken.CanBeCanceled);
74-
Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
75-
_cancellationHasBeenRegistered = true;
76-
#endif
77-
78-
// Quick check to make sure the IO hasn't completed
79-
if (_overlapped != null)
74+
Debug.Assert(_overlapped != null);
75+
if (cancellationToken.CanBeCanceled)
8076
{
81-
// Register the cancellation only if the IO hasn't completed
82-
long packedResult = Interlocked.CompareExchange(ref _result, TaskSourceCodes.RegisteringCancellation, TaskSourceCodes.NoResult);
83-
if (packedResult == TaskSourceCodes.NoResult)
77+
try
8478
{
85-
_cancellationRegistration = cancellationToken.UnsafeRegister((s, token) => Cancel(token), this);
86-
87-
// Switch the result, just in case IO completed while we were setting the registration
88-
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
89-
}
90-
else if (packedResult != TaskSourceCodes.CompletedCallback)
91-
{
92-
// Failed to set the result, IO is in the process of completing
93-
// Attempt to take the packed result
94-
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
79+
_cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) =>
80+
{
81+
ValueTaskSource vts = (ValueTaskSource)s!;
82+
if (!vts._strategy._fileHandle.IsInvalid)
83+
{
84+
try
85+
{
86+
Interop.Kernel32.CancelIoEx(vts._strategy._fileHandle, vts._overlapped);
87+
// Ignore all failures: no matter whether it succeeds or fails, completion is handled via the IOCallback.
88+
}
89+
catch (ObjectDisposedException) { } // in case the SafeHandle is (erroneously) closed concurrently
90+
}
91+
}, this);
9592
}
96-
97-
// If we have a callback that needs to be completed
98-
if ((packedResult != TaskSourceCodes.NoResult) && (packedResult != TaskSourceCodes.CompletedCallback) && (packedResult != TaskSourceCodes.RegisteringCancellation))
93+
catch (OutOfMemoryException)
9994
{
100-
CompleteCallback((ulong)packedResult);
95+
// Just in case trying to register OOMs, we ignore it in order to
96+
// protect the higher-level calling code that would proceed to unpin
97+
// memory that might be actively used by an in-flight async operation.
10198
}
10299
}
103100
}
104101

105-
internal void ReleaseNativeResource()
102+
internal void ReleaseResources()
106103
{
107-
_handle.Dispose();
104+
// Unpin any pinned buffer.
105+
_memoryHandle.Dispose();
108106

109-
// Ensure that cancellation has been completed and cleaned up.
107+
// Ensure that any cancellation callback has either completed or will never run,
108+
// so that we don't try to access an overlapped for this operation after it's already
109+
// been freed.
110110
_cancellationRegistration.Dispose();
111111

112112
// Free the overlapped.
113-
// NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
114-
// (this is why we disposed the registration above).
115113
if (_overlapped != null)
116114
{
117115
_strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped);
118116
_overlapped = null;
119117
}
120118
}
121119

122-
private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
123-
{
124-
ValueTaskSource valueTaskSource = (ValueTaskSource)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped)!;
125-
Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match");
126-
127-
// Handle reading from & writing to closed pipes. While I'm not sure
128-
// this is entirely necessary anymore, maybe it's possible for
129-
// an async read on a pipe to be issued and then the pipe is closed,
130-
// returning this error. This may very well be necessary.
131-
ulong packedResult;
132-
if (errorCode != 0 && errorCode != Interop.Errors.ERROR_BROKEN_PIPE && errorCode != Interop.Errors.ERROR_NO_DATA)
133-
{
134-
packedResult = ((ulong)TaskSourceCodes.ResultError | errorCode);
135-
}
136-
else
137-
{
138-
packedResult = ((ulong)TaskSourceCodes.ResultSuccess | numBytes);
139-
}
120+
// After calling Read/WriteFile to start the asynchronous operation, the caller may configure cancellation,
121+
// and only after that should we allow for completing the operation, as completion needs to factor in work
122+
// done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's
123+
// responsible for calling Complete and for passing the necessary data between parties.
140124

141-
// Stow the result so that other threads can observe it
142-
// And, if no other thread is registering cancellation, continue
143-
if (Interlocked.Exchange(ref valueTaskSource._result, (long)packedResult) == TaskSourceCodes.NoResult)
125+
/// <summary>Invoked when AsyncWindowsFileStreamStrategy has finished scheduling the async operation.</summary>
126+
internal void FinishedScheduling()
127+
{
128+
// Set the value to 1. If it was already non-0, then the asynchronous operation already completed but
129+
// didn't call Complete, so we call Complete here. The read result value is the data (packed) necessary
130+
// to make the call.
131+
ulong result = Interlocked.Exchange(ref _result, 1);
132+
if (result != 0)
144133
{
145-
// Successfully set the state, attempt to take back the callback
146-
if (Interlocked.Exchange(ref valueTaskSource._result, TaskSourceCodes.CompletedCallback) != TaskSourceCodes.NoResult)
147-
{
148-
// Successfully got the callback, finish the callback
149-
valueTaskSource.CompleteCallback(packedResult);
150-
}
151-
// else: Some other thread stole the result, so now it is responsible to finish the callback
134+
Complete(errorCode: (uint)result, numBytes: (uint)(result >> 32) & 0x7FFFFFFF);
152135
}
153-
// else: Some other thread is registering a cancellation, so it *must* finish the callback
154136
}
155137

156-
private void CompleteCallback(ulong packedResult)
138+
/// <summary>Invoked when the asynchronous operation has completed asynchronously.</summary>
139+
private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
157140
{
158-
CancellationToken cancellationToken = _cancellationRegistration.Token;
159-
160-
ReleaseNativeResource();
161-
162-
// Unpack the result and send it to the user
163-
long result = (long)(packedResult & TaskSourceCodes.ResultMask);
164-
if (result == TaskSourceCodes.ResultError)
141+
ValueTaskSource? vts = (ValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
142+
Debug.Assert(vts is not null);
143+
Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match");
144+
145+
// Set the value to a packed combination of the error code and number of bytes (plus a high-bit 1
146+
// to ensure the value we're setting is non-zero). If it was already non-0 (the common case), then
147+
// the call site already finished scheduling the async operation, in which case we're ready to complete.
148+
Debug.Assert(numBytes < int.MaxValue);
149+
if (Interlocked.Exchange(ref vts._result, (1ul << 63) | ((ulong)numBytes << 32) | errorCode) != 0)
165150
{
166-
int errorCode = unchecked((int)(packedResult & uint.MaxValue));
167-
Exception e;
168-
if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
169-
{
170-
CancellationToken ct = cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(canceled: true);
171-
e = new OperationCanceledException(ct);
172-
}
173-
else
174-
{
175-
e = Win32Marshal.GetExceptionForWin32Error(errorCode);
176-
}
177-
e.SetCurrentStackTrace();
178-
_source.SetException(e);
179-
}
180-
else
181-
{
182-
Debug.Assert(result == TaskSourceCodes.ResultSuccess, "Unknown result");
183-
_source.SetResult((int)(packedResult & uint.MaxValue));
151+
vts.Complete(errorCode, numBytes);
184152
}
185153
}
186154

187-
private void Cancel(CancellationToken token)
155+
internal void Complete(uint errorCode, uint numBytes)
188156
{
189-
// WARNING: This may potentially be called under a lock (during cancellation registration)
190-
Debug.Assert(_overlapped != null && GetStatus(Version) != ValueTaskSourceStatus.Succeeded, "IO should not have completed yet");
157+
ReleaseResources();
191158

192-
// If the handle is still valid, attempt to cancel the IO
193-
if (!_strategy._fileHandle.IsInvalid &&
194-
!Interop.Kernel32.CancelIoEx(_strategy._fileHandle, _overlapped))
159+
switch (errorCode)
195160
{
196-
int errorCode = Marshal.GetLastWin32Error();
197-
198-
// ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel.
199-
// This probably means that the IO operation has completed.
200-
if (errorCode != Interop.Errors.ERROR_NOT_FOUND)
201-
{
202-
Exception e = new OperationCanceledException(SR.OperationCanceled, Win32Marshal.GetExceptionForWin32Error(errorCode), token);
203-
e.SetCurrentStackTrace();
204-
_source.SetException(e);
205-
}
161+
case 0:
162+
case Interop.Errors.ERROR_BROKEN_PIPE:
163+
case Interop.Errors.ERROR_NO_DATA:
164+
// Success
165+
_source.SetResult((int)numBytes);
166+
break;
167+
168+
case Interop.Errors.ERROR_OPERATION_ABORTED:
169+
// Cancellation
170+
CancellationToken ct = _cancellationRegistration.Token;
171+
_source.SetException(ct.IsCancellationRequested ? new OperationCanceledException(ct) : new OperationCanceledException());
172+
break;
173+
174+
default:
175+
// Failure
176+
_source.SetException(Win32Marshal.GetExceptionForWin32Error((int)errorCode));
177+
break;
206178
}
207179
}
208180
}

0 commit comments

Comments
 (0)