From 2631c3147804ccf334a241f8104d0b84089aeef8 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Mon, 2 Aug 2021 12:20:30 +0200 Subject: [PATCH 1/7] write failing test --- .../tests/FileStream/WriteAsync.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs index 99f51624cc1aa7..5092676f406a44 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs @@ -100,11 +100,15 @@ public async Task SimpleWriteAsync() } } - [Fact] - public async Task WriteAsyncCancelledFile() + [Theory] + [InlineData(0, true)] // 0 == no buffering + [InlineData(4096, true)] // 4096 == default buffer size + [InlineData(0, false)] + [InlineData(4096, false)] + public async Task WriteAsyncCancelledFile(int bufferSize, bool isAsync) { const int writeSize = 1024 * 1024; - using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create, FileAccess.Write, FileShare.None, bufferSize, isAsync)) { byte[] buffer = new byte[writeSize]; CancellationTokenSource cts = new CancellationTokenSource(); @@ -119,6 +123,8 @@ public async Task WriteAsyncCancelledFile() // Ideally we'd be doing an Assert.Throws // but since cancellation is a race condition we accept either outcome Assert.Equal(cts.Token, oce.CancellationToken); + + Assert.Equal(0, fs.Position); // if write was cancelled, the Position should remain unchanged } } } From 6cc6b338a16f7d2deda9d43ac1cd9f5eb538b06f Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Mon, 2 Aug 2021 12:32:58 +0200 Subject: [PATCH 2/7] handle incomplete (eg. cancelled) async writes --- ...andle.OverlappedValueTaskSource.Windows.cs | 4 ++-- .../src/System/IO/RandomAccess.Windows.cs | 19 ++++++++++++++----- .../AsyncWindowsFileStreamStrategy.cs | 17 +++-------------- .../IO/Strategies/OSFileStreamStrategy.cs | 4 ++-- 4 files changed, 21 insertions(+), 23 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs index 26e17f1d161d82..b0a39138f71b07 100644 --- a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs @@ -198,9 +198,9 @@ internal void Complete(uint errorCode, uint numBytes) AsyncWindowsFileStreamStrategy? strategy = _strategy; ReleaseResources(); - if (strategy is not null && _bufferSize != numBytes) // true only for incomplete reads + if (strategy is not null && _bufferSize != numBytes) // true only for incomplete operations { - strategy.OnIncompleteRead(_bufferSize, (int)numBytes); + strategy.OnIncompleteOperation(_bufferSize, (int)numBytes); } switch (errorCode) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs index 09f7a69552ef20..c87f81fc88b23e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs @@ -248,7 +248,7 @@ internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int error handle.EnsureThreadPoolBindingInitialized(); SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource(); - int errorCode = 0; + int errorCode = Interop.Errors.ERROR_SUCCESS; try { NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy); @@ -292,7 +292,7 @@ internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int error { if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS) { - strategy?.OnIncompleteRead(buffer.Length, 0); + strategy?.OnIncompleteOperation(buffer.Length, 0); } } @@ -323,21 +323,23 @@ internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemo return ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); } - internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken) + internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, + CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null) { handle.EnsureThreadPoolBindingInitialized(); SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource(); + int errorCode = Interop.Errors.ERROR_SUCCESS; try { - NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset); + NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy); Debug.Assert(vts._memoryHandle.Pointer != null); // Queue an async WriteFile operation. if (Interop.Kernel32.WriteFile(handle, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, nativeOverlapped) == 0) { // The operation failed, or it's pending. - int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle); + errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle); switch (errorCode) { case Interop.Errors.ERROR_IO_PENDING: @@ -360,6 +362,13 @@ internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int error vts.Dispose(); throw; } + finally + { + if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS) + { + strategy?.OnIncompleteOperation(buffer.Length, 0); + } + } // Completion handled by callback. vts.FinishedScheduling(); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs index 12af993b617f65..084d5053df554e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs @@ -43,28 +43,17 @@ public override ValueTask ReadAsync(Memory destination, CancellationT (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this); return vts != null ? new ValueTask(vts, vts.Version) - : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException(HandleIOError(readOffset, errorCode)); + : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException(SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path)); } public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, buffer.Length) - buffer.Length : -1; - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, buffer, writeOffset, cancellationToken); + (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, buffer, writeOffset, cancellationToken, this); return vts != null ? new ValueTask(vts, vts.Version) - : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode)); - } - - private Exception HandleIOError(long positionBefore, int errorCode) - { - if (_fileHandle.CanSeek) - { - // Update Position... it could be anywhere. - Interlocked.Exchange(ref _filePosition, positionBefore); - } - - return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path); + : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path)); } public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs index 260624bf93416c..1e768b276174a0 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs @@ -102,7 +102,7 @@ public unsafe sealed override long Length // in case of concurrent incomplete reads, there can be multiple threads trying to update the position // at the same time. That is why we are using Interlocked here. - internal void OnIncompleteRead(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead); + internal void OnIncompleteOperation(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead); protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached; @@ -382,7 +382,7 @@ private void Read() // if the read was incomplete, we need to update the file position: if (result != _destination.Length) { - _stream.OnIncompleteRead(_destination.Length, result); + _stream.OnIncompleteOperation(_destination.Length, result); } _destination = default; From 9f446594323a2f051f412c1e8bb55eaf47724a0c Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Mon, 2 Aug 2021 13:45:48 +0200 Subject: [PATCH 3/7] handle failed async writes for !IsAsync --- .../SafeFileHandle.ThreadPoolValueTaskSource.cs | 13 ++++++++++++- .../src/System/IO/RandomAccess.Unix.cs | 4 ++-- .../src/System/IO/RandomAccess.Windows.cs | 7 +++++-- .../src/System/IO/RandomAccess.cs | 5 +++-- .../System/IO/Strategies/OSFileStreamStrategy.cs | 5 +++-- 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs index abb6238b521395..6d340742c04313 100644 --- a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs +++ b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.IO.Strategies; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -32,6 +33,7 @@ internal sealed class ThreadPoolValueTaskSource : IThreadPoolWorkItem, IValueTas private ManualResetValueTaskSourceCore _source; private Operation _operation = Operation.None; private ExecutionContext? _context; + private OSFileStreamStrategy? _strategy; // These fields store the parameters for the operation. // The first two are common for all kinds of operations. @@ -116,8 +118,16 @@ private void ExecuteInternal() } finally { + if (_strategy is not null && exception is not null) + { + Debug.Assert(_operation == Operation.Write); + // WriteAtOffset returns void, so we need to fix position only in case of an exception + _strategy.OnIncompleteOperation(_singleSegment.Length, 0); + } + _operation = Operation.None; _context = null; + _strategy = null; _cancellationToken = default; _singleSegment = default; _readScatterBuffers = null; @@ -165,7 +175,7 @@ public ValueTask QueueRead(Memory buffer, long fileOffset, Cancellati return new ValueTask(this, _source.Version); } - public ValueTask QueueWrite(ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken) + public ValueTask QueueWrite(ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { ValidateInvariants(); @@ -173,6 +183,7 @@ public ValueTask QueueWrite(ReadOnlyMemory buffer, long fileOffset, Cancel _singleSegment = buffer; _fileOffset = fileOffset; _cancellationToken = cancellationToken; + _strategy = strategy; QueueToThreadPool(); return new ValueTask(this, _source.Version); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs index f98e9874bd416d..b4fcab363a4818 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs @@ -202,8 +202,8 @@ internal static unsafe void WriteGatherAtOffset(SafeFileHandle handle, IReadOnly } } - internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken) - => ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); + internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null) + => ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); private static ValueTask WriteGatherAtOffsetAsync(SafeFileHandle handle, IReadOnlyList> buffers, long fileOffset, CancellationToken cancellationToken) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs index c87f81fc88b23e..2aa2bee4aa4abe 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs @@ -301,10 +301,13 @@ internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int error return (vts, -1); } - internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken) + internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, + CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null) { if (handle.IsAsync) { + Debug.Assert(strategy is null || strategy is not AsyncWindowsFileStreamStrategy); // AsyncWindowsFileStreamStrategy should not use this code path + (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncWriteFile(handle, buffer, fileOffset, cancellationToken); if (vts is not null) @@ -320,7 +323,7 @@ internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemo return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode)); } - return ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); + return ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); } internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs index ad190fe2f75e6d..61d33f8a9ee8b2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; +using System.IO.Strategies; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -276,9 +277,9 @@ private static ValueTask ScheduleSyncReadScatterAtOffsetAsync(SafeFileHand } private static ValueTask ScheduleSyncWriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, - long fileOffset, CancellationToken cancellationToken) + long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { - return handle.GetThreadPoolValueTaskSource().QueueWrite(buffer, fileOffset, cancellationToken); + return handle.GetThreadPoolValueTaskSource().QueueWrite(buffer, fileOffset, cancellationToken, strategy); } private static ValueTask ScheduleSyncWriteGatherAtOffsetAsync(SafeFileHandle handle, IReadOnlyList> buffers, diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs index 1e768b276174a0..f76618d14b52ed 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs @@ -102,7 +102,8 @@ public unsafe sealed override long Length // in case of concurrent incomplete reads, there can be multiple threads trying to update the position // at the same time. That is why we are using Interlocked here. - internal void OnIncompleteOperation(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead); + internal void OnIncompleteOperation(int expectedBytesTransferred, int actualBytesTransferred) + => Interlocked.Add(ref _filePosition, actualBytesTransferred - expectedBytesTransferred); protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached; @@ -295,7 +296,7 @@ public sealed override Task WriteAsync(byte[] buffer, int offset, int count, Can public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken) { long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1; - return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken); + return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken, this); } public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => From 853ce96b324a755af3d50dcd7616770410833b2e Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Mon, 2 Aug 2021 13:50:33 +0200 Subject: [PATCH 4/7] ensure the tests validate the position after cancelled async read as well --- .../tests/FileStream/ReadAsync.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs index e5b8ecdfab992a..9985119b35d0d0 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs @@ -65,8 +65,12 @@ public async Task ReadAsyncBufferedCompletesSynchronously() } } - [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] - public async Task ReadAsyncCanceledFile() + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(0, true)] // 0 == no buffering + [InlineData(4096, true)] // 4096 == default buffer size + [InlineData(0, false)] + [InlineData(4096, false)] + public async Task ReadAsyncCanceledFile(int bufferSize, bool isAsync) { string fileName = GetTestFilePath(); using (FileStream fs = new FileStream(fileName, FileMode.Create)) @@ -75,7 +79,7 @@ public async Task ReadAsyncCanceledFile() fs.Write(TestBuffer, 0, TestBuffer.Length); } - using (FileStream fs = new FileStream(fileName, FileMode.Open)) + using (FileStream fs = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None, bufferSize, isAsync)) { byte[] buffer = new byte[fs.Length]; CancellationTokenSource cts = new CancellationTokenSource(); @@ -91,6 +95,8 @@ public async Task ReadAsyncCanceledFile() // Ideally we'd be doing an Assert.Throws // but since cancellation is a race condition we accept either outcome Assert.Equal(cts.Token, oce.CancellationToken); + + Assert.Equal(0, fs.Position); // if read was cancelled, the Position should remain unchanged } } } From bcf486f078c13dcd0e52adbd3f42292988edf0e1 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Mon, 2 Aug 2021 19:42:20 +0200 Subject: [PATCH 5/7] address code review feedback --- ...afeFileHandle.ThreadPoolValueTaskSource.cs | 15 ++- .../src/System/IO/RandomAccess.Unix.cs | 4 +- .../src/System/IO/RandomAccess.Windows.cs | 5 +- .../src/System/IO/RandomAccess.cs | 4 +- .../IO/Strategies/OSFileStreamStrategy.cs | 113 +----------------- 5 files changed, 20 insertions(+), 121 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs index 6d340742c04313..9fff83001662bd 100644 --- a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs +++ b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs @@ -118,11 +118,17 @@ private void ExecuteInternal() } finally { - if (_strategy is not null && exception is not null) + if (_strategy is not null) { - Debug.Assert(_operation == Operation.Write); // WriteAtOffset returns void, so we need to fix position only in case of an exception - _strategy.OnIncompleteOperation(_singleSegment.Length, 0); + if (exception is not null) + { + _strategy.OnIncompleteOperation(_singleSegment.Length, 0); + } + else if (_operation == Operation.Read && result != _singleSegment.Length) + { + _strategy.OnIncompleteOperation(_singleSegment.Length, (int)result); + } } _operation = Operation.None; @@ -162,7 +168,7 @@ private void QueueToThreadPool() ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true); } - public ValueTask QueueRead(Memory buffer, long fileOffset, CancellationToken cancellationToken) + public ValueTask QueueRead(Memory buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { ValidateInvariants(); @@ -170,6 +176,7 @@ public ValueTask QueueRead(Memory buffer, long fileOffset, Cancellati _singleSegment = buffer; _fileOffset = fileOffset; _cancellationToken = cancellationToken; + _strategy = strategy; QueueToThreadPool(); return new ValueTask(this, _source.Version); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs index b4fcab363a4818..992dcc54f5be58 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs @@ -74,8 +74,8 @@ internal static unsafe long ReadScatterAtOffset(SafeFileHandle handle, IReadOnly return FileStreamHelpers.CheckFileCall(result, handle.Path); } - internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, CancellationToken cancellationToken) - => ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); + internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null) + => ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); private static ValueTask ReadScatterAtOffsetAsync(SafeFileHandle handle, IReadOnlyList> buffers, long fileOffset, CancellationToken cancellationToken) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs index 2aa2bee4aa4abe..c88dbb893b474e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs @@ -220,7 +220,8 @@ private static unsafe void WriteSyncUsingAsyncHandle(SafeFileHandle handle, Read } } - internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, CancellationToken cancellationToken) + internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, + CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null) { if (handle.IsAsync) { @@ -239,7 +240,7 @@ internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory(Win32Marshal.GetExceptionForWin32Error(errorCode)); } - return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); + return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); } internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory buffer, long fileOffset, diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs index 61d33f8a9ee8b2..04b59ae472a769 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs @@ -265,9 +265,9 @@ private static void ValidateBuffers(IReadOnlyList buffers) } private static ValueTask ScheduleSyncReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, - long fileOffset, CancellationToken cancellationToken) + long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { - return handle.GetThreadPoolValueTaskSource().QueueRead(buffer, fileOffset, cancellationToken); + return handle.GetThreadPoolValueTaskSource().QueueRead(buffer, fileOffset, cancellationToken, strategy); } private static ValueTask ScheduleSyncReadScatterAtOffsetAsync(SafeFileHandle handle, IReadOnlyList> buffers, diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs index f76618d14b52ed..e62e9b3c16f72f 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs @@ -14,7 +14,6 @@ internal abstract class OSFileStreamStrategy : FileStreamStrategy { protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws private readonly FileAccess _access; // What file was opened for. - private ReadAsyncTaskSource? _readAsyncTaskSource; // Cached IValueTaskSource used for async-over-sync reads protected long _filePosition; protected long _length = -1; // negative means that hasn't been fetched. @@ -310,118 +309,10 @@ public sealed override Task ReadAsync(byte[] buffer, int offset, int count, public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken) { - if (!CanSeek) - { - return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken); - } - // This implementation updates the file position before the operation starts and updates it after incomplete read. // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations. - long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length; - ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this); - return rats.QueueRead(destination, readOffset, cancellationToken); - } - - /// Provides a reusable ValueTask-backing object for implementing ReadAsync. - private sealed class ReadAsyncTaskSource : IValueTaskSource, IThreadPoolWorkItem - { - private readonly OSFileStreamStrategy _stream; - private ManualResetValueTaskSourceCore _source; - - private Memory _destination; - private long _readOffset; - private ExecutionContext? _context; - private CancellationToken _cancellationToken; - - public ReadAsyncTaskSource(OSFileStreamStrategy stream) => _stream = stream; - - public ValueTask QueueRead(Memory destination, long readOffset, CancellationToken cancellationToken) - { - _destination = destination; - _readOffset = readOffset; - _cancellationToken = cancellationToken; - _context = ExecutionContext.Capture(); - - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true); - return new ValueTask(this, _source.Version); - } - - void IThreadPoolWorkItem.Execute() - { - if (_context is null || _context.IsDefault) - { - Read(); - } - else - { - ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.Read(), this); - } - } - - private void Read() - { - Exception? error = null; - int result = 0; - - try - { - if (_cancellationToken.IsCancellationRequested) - { - error = new OperationCanceledException(_cancellationToken); - } - else - { - result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset); - } - } - catch (Exception e) - { - error = e; - } - finally - { - // if the read was incomplete, we need to update the file position: - if (result != _destination.Length) - { - _stream.OnIncompleteOperation(_destination.Length, result); - } - - _destination = default; - _readOffset = -1; - _cancellationToken = default; - _context = null; - } - - if (error is not null) - { - _source.SetException(error); - } - else - { - _source.SetResult(result); - } - } - - int IValueTaskSource.GetResult(short token) - { - try - { - return _source.GetResult(token); - } - finally - { - _source.Reset(); -#pragma warning disable CS0197 - Volatile.Write(ref _stream._readAsyncTaskSource, this); -#pragma warning restore CS0197 - } - } - - ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => - _source.GetStatus(token); - - void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => - _source.OnCompleted(continuation, state, token, flags); + long readOffset = CanSeek ? Interlocked.Add(ref _filePosition, destination.Length) - destination.Length : -1; + return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, readOffset, cancellationToken, this); } } } From 457f6f2b8655455ede66bcffbf0b937ffaba4f43 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Mon, 2 Aug 2021 19:52:29 +0200 Subject: [PATCH 6/7] remove even more code ;) --- ...andle.OverlappedValueTaskSource.Windows.cs | 8 +++-- .../src/System/IO/RandomAccess.Windows.cs | 18 +++++----- .../AsyncWindowsFileStreamStrategy.cs | 35 ------------------- .../IO/Strategies/OSFileStreamStrategy.cs | 25 +++++++++---- 4 files changed, 32 insertions(+), 54 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs index b0a39138f71b07..1ba3516c42a9f1 100644 --- a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs @@ -46,7 +46,7 @@ internal sealed unsafe class OverlappedValueTaskSource : IValueTaskSource, internal readonly PreAllocatedOverlapped _preallocatedOverlapped; internal readonly SafeFileHandle _fileHandle; - private AsyncWindowsFileStreamStrategy? _strategy; + private OSFileStreamStrategy? _strategy; internal MemoryHandle _memoryHandle; private int _bufferSize; internal ManualResetValueTaskSourceCore _source; // mutable struct; do not make this readonly @@ -77,8 +77,10 @@ internal static Exception GetIOError(int errorCode, string? path) ? ThrowHelper.CreateEndOfFileException() : Win32Marshal.GetExceptionForWin32Error(errorCode, path); - internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory memory, long fileOffset, AsyncWindowsFileStreamStrategy? strategy = null) + internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory memory, long fileOffset, OSFileStreamStrategy? strategy = null) { + Debug.Assert(strategy is null || strategy is AsyncWindowsFileStreamStrategy, $"Strategy was expected to be null or async, got {strategy}."); + _result = 0; _strategy = strategy; _bufferSize = memory.Length; @@ -195,7 +197,7 @@ internal void Complete(uint errorCode, uint numBytes) { Debug.Assert(errorCode == Interop.Errors.ERROR_SUCCESS || numBytes == 0, $"Callback returned {errorCode} error and {numBytes} bytes"); - AsyncWindowsFileStreamStrategy? strategy = _strategy; + OSFileStreamStrategy? strategy = _strategy; ReleaseResources(); if (strategy is not null && _bufferSize != numBytes) // true only for incomplete operations diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs index c88dbb893b474e..d367abb8fb9736 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs @@ -225,7 +225,7 @@ internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory ReadAtOffsetAsync(SafeFileHandle handle, Memory(Win32Marshal.GetExceptionForWin32Error(errorCode)); + return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode, handle.Path)); } return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); } - internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory buffer, long fileOffset, - CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null) + private static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory buffer, long fileOffset, + CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { handle.EnsureThreadPoolBindingInitialized(); @@ -307,9 +307,7 @@ internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemo { if (handle.IsAsync) { - Debug.Assert(strategy is null || strategy is not AsyncWindowsFileStreamStrategy); // AsyncWindowsFileStreamStrategy should not use this code path - - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncWriteFile(handle, buffer, fileOffset, cancellationToken); + (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncWriteFile(handle, buffer, fileOffset, cancellationToken, strategy); if (vts is not null) { @@ -321,14 +319,14 @@ internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemo return ValueTask.CompletedTask; } - return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode)); + return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode, handle.Path)); } return ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); } - internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, - CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null) + private static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, + CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { handle.EnsureThreadPoolBindingInitialized(); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs index 084d5053df554e..82ed7659b83639 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs @@ -21,41 +21,6 @@ internal AsyncWindowsFileStreamStrategy(string path, FileMode mode, FileAccess a internal override bool IsAsync => true; - public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) - { - if (!CanSeek) - { - return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken); - } - - if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length) - { - // We know for sure that the file length can be safely cached and it has already been obtained. - // If we have reached EOF we just return here and avoid a sys-call. - return ValueTask.FromResult(0); - } - - // This implementation updates the file position before the operation starts and updates it after incomplete read. - // This is done to keep backward compatibility for concurrent reads. - // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time. - long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length; - - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this); - return vts != null - ? new ValueTask(vts, vts.Version) - : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException(SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path)); - } - - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) - { - long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, buffer.Length) - buffer.Length : -1; - - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, buffer, writeOffset, cancellationToken, this); - return vts != null - ? new ValueTask(vts, vts.Version) - : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path)); - } - public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) { // Fail if the file was closed diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs index e62e9b3c16f72f..0386dfa78d5631 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs @@ -16,7 +16,7 @@ internal abstract class OSFileStreamStrategy : FileStreamStrategy private readonly FileAccess _access; // What file was opened for. protected long _filePosition; - protected long _length = -1; // negative means that hasn't been fetched. + private long _length = -1; // negative means that hasn't been fetched. private long _appendStart; // When appending, prevent overwriting file. private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing. @@ -104,7 +104,7 @@ public unsafe sealed override long Length internal void OnIncompleteOperation(int expectedBytesTransferred, int actualBytesTransferred) => Interlocked.Add(ref _filePosition, actualBytesTransferred - expectedBytesTransferred); - protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached; + private bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached; /// Gets or sets the position within the current stream public sealed override long Position @@ -292,7 +292,7 @@ public sealed override void EndWrite(IAsyncResult asyncResult) => public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); - public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken) + public sealed override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken) { long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1; return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken, this); @@ -307,11 +307,24 @@ public sealed override int EndRead(IAsyncResult asyncResult) => public sealed override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => ReadAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); - public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken) + public sealed override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken) { + if (!CanSeek) + { + return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken); + } + + if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length) + { + // We know for sure that the file length can be safely cached and it has already been obtained. + // If we have reached EOF we just return here and avoid a sys-call. + return ValueTask.FromResult(0); + } + // This implementation updates the file position before the operation starts and updates it after incomplete read. - // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations. - long readOffset = CanSeek ? Interlocked.Add(ref _filePosition, destination.Length) - destination.Length : -1; + // This is done to keep backward compatibility for concurrent reads. + // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time. + long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length; return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, readOffset, cancellationToken, this); } } From 3096c1a88f626f2e6abf6ba8a22defb92e1d480d Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Tue, 3 Aug 2021 12:46:28 +0200 Subject: [PATCH 7/7] add one more assert --- .../System.IO.FileSystem/tests/FileStream/WriteAsync.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs index 5092676f406a44..06a9422589ee84 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs @@ -108,7 +108,7 @@ public async Task SimpleWriteAsync() public async Task WriteAsyncCancelledFile(int bufferSize, bool isAsync) { const int writeSize = 1024 * 1024; - using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create, FileAccess.Write, FileShare.None, bufferSize, isAsync)) + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.CreateNew, FileAccess.Write, FileShare.None, bufferSize, isAsync)) { byte[] buffer = new byte[writeSize]; CancellationTokenSource cts = new CancellationTokenSource(); @@ -124,6 +124,7 @@ public async Task WriteAsyncCancelledFile(int bufferSize, bool isAsync) // but since cancellation is a race condition we accept either outcome Assert.Equal(cts.Token, oce.CancellationToken); + Assert.Equal(0, fs.Length); // if write was cancelled, the file should be empty Assert.Equal(0, fs.Position); // if write was cancelled, the Position should remain unchanged } }